diff options
author | Dmitry Kardymon <kardymon-d@ydb.tech> | 2024-01-17 12:48:36 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-17 12:48:36 +0300 |
commit | 4c962e715d375794519095718b4806ce7c20dcca (patch) | |
tree | dfef342db4782444bd58acab54af13cef83c5dd7 | |
parent | 749c6cdb686028c4fac36872c97512e8938b5bc2 (diff) | |
download | ydb-4c962e715d375794519095718b4806ce7c20dcca.tar.gz |
Revert "Disable coordinator if no ingress tasks (#472)" (#1083)
This reverts commit 7c308ffca4e2e378bf364c843707bbd9f434b587.
-rw-r--r-- | ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp | 6 | ||||
-rw-r--r-- | ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp | 31 |
2 files changed, 10 insertions, 27 deletions
diff --git a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp index 6832f05f2dc..5186054fdbb 100644 --- a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp +++ b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp @@ -103,12 +103,6 @@ void TCheckpointCoordinator::Handle(NYql::NDqs::TEvReadyState::TPtr& ev) { AllActorsSet.insert(actorId); } - CC_LOG_D("ActorsToTrigger count: " << ActorsToTrigger.size() << ", ActorsToNotify count: " << ActorsToNotify.size() << ", ActorsToWaitFor count: " << ActorsToWaitFor.size()); - - if (ActorsToTrigger.empty()) { - CC_LOG_D("No ingress tasks, coordinator was disabled"); - return; - } PendingInit = std::make_unique<TPendingInitCoordinator>(AllActors.size()); CC_LOG_D("Send TEvRegisterCoordinatorRequest"); diff --git a/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp b/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp index 6cd2a11461b..5ca54341612 100644 --- a/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp +++ b/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp @@ -19,7 +19,7 @@ enum ETestGraphFlags : ui64 { SourceWithChannelInOneTask = 2, }; -NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags, const TString& sourceType) { +NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags = 0) { NYql::NDqProto::TReadyState result; @@ -29,7 +29,7 @@ NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags, const TString& sourceType ingressOutput->AddChannels(); if (flags & ETestGraphFlags::InputWithSource) { auto* source = ingress->AddInputs()->MutableSource(); - source->SetType(sourceType); + source->SetType("PqSource"); } auto* map = result.AddTask(); @@ -40,7 +40,7 @@ NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags, const TString& sourceType mapOutput->AddChannels(); if (flags & ETestGraphFlags::SourceWithChannelInOneTask) { auto* source = map->AddInputs()->MutableSource(); - source->SetType(sourceType); + source->SetType("PqSource"); } auto* egress = result.AddTask(); @@ -70,9 +70,9 @@ struct TTestBootstrap : public TTestActorRuntime { ::NMonitoring::TDynamicCounterPtr Counters = new ::NMonitoring::TDynamicCounters(); - explicit TTestBootstrap(ui64 graphFlags, const TString& sourceType) + explicit TTestBootstrap(ui64 graphFlags = 0) : TTestActorRuntime(true) - , GraphState(BuildTestGraph(graphFlags, sourceType)) + , GraphState(BuildTestGraph(graphFlags)) , CoordinatorId("my-graph-id", 42) , CheckpointId(CoordinatorId.Generation, 1) { @@ -281,8 +281,8 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { class CheckpointsTestHelper : public TTestBootstrap { public: - CheckpointsTestHelper(ui64 graphFlags, const TString& sourceType) - : TTestBootstrap(graphFlags, sourceType) { + CheckpointsTestHelper(ui64 graphFlags) + : TTestBootstrap(graphFlags) { } void InjectCheckpoint() { @@ -372,33 +372,22 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { }; Y_UNIT_TEST(ShouldTriggerCheckpointWithSource) { - CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, "PqSource"); + CheckpointsTestHelper test(ETestGraphFlags::InputWithSource); test.InjectCheckpoint(); test.AllSavedAndCommited(); } Y_UNIT_TEST(ShouldTriggerCheckpointWithSourcesAndWithChannel) { - CheckpointsTestHelper test(ETestGraphFlags::InputWithSource | ETestGraphFlags::SourceWithChannelInOneTask, "PqSource"); + CheckpointsTestHelper test(ETestGraphFlags::InputWithSource | ETestGraphFlags::SourceWithChannelInOneTask); test.InjectCheckpoint(); test.AllSavedAndCommited(); } Y_UNIT_TEST(ShouldAbortPreviousCheckpointsIfNodeStateCantBeSaved) { - CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, "PqSource"); + CheckpointsTestHelper test(ETestGraphFlags::InputWithSource); test.InjectCheckpoint(); test.SaveFailed(); } - - Y_UNIT_TEST(ShouldDoNothingIfNoIngressTasks) { - CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, "S3Source"); - bool empty = false; - try { - test.GrabEdgeEvent<TEvCheckpointStorage::TEvRegisterCoordinatorRequest>(test.StorageProxy, TDuration::Seconds(10)); - } catch (TEmptyEventQueueException&) { - empty = true; - } - UNIT_ASSERT(empty); - } } } // namespace NFq |