aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Kardymon <kardymon-d@ydb.tech>2024-01-17 12:48:36 +0300
committerGitHub <noreply@github.com>2024-01-17 12:48:36 +0300
commit4c962e715d375794519095718b4806ce7c20dcca (patch)
treedfef342db4782444bd58acab54af13cef83c5dd7
parent749c6cdb686028c4fac36872c97512e8938b5bc2 (diff)
downloadydb-4c962e715d375794519095718b4806ce7c20dcca.tar.gz
Revert "Disable coordinator if no ingress tasks (#472)" (#1083)
This reverts commit 7c308ffca4e2e378bf364c843707bbd9f434b587.
-rw-r--r--ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp6
-rw-r--r--ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp31
2 files changed, 10 insertions, 27 deletions
diff --git a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
index 6832f05f2dc..5186054fdbb 100644
--- a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
+++ b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
@@ -103,12 +103,6 @@ void TCheckpointCoordinator::Handle(NYql::NDqs::TEvReadyState::TPtr& ev) {
AllActorsSet.insert(actorId);
}
- CC_LOG_D("ActorsToTrigger count: " << ActorsToTrigger.size() << ", ActorsToNotify count: " << ActorsToNotify.size() << ", ActorsToWaitFor count: " << ActorsToWaitFor.size());
-
- if (ActorsToTrigger.empty()) {
- CC_LOG_D("No ingress tasks, coordinator was disabled");
- return;
- }
PendingInit = std::make_unique<TPendingInitCoordinator>(AllActors.size());
CC_LOG_D("Send TEvRegisterCoordinatorRequest");
diff --git a/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp b/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp
index 6cd2a11461b..5ca54341612 100644
--- a/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp
+++ b/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp
@@ -19,7 +19,7 @@ enum ETestGraphFlags : ui64 {
SourceWithChannelInOneTask = 2,
};
-NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags, const TString& sourceType) {
+NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags = 0) {
NYql::NDqProto::TReadyState result;
@@ -29,7 +29,7 @@ NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags, const TString& sourceType
ingressOutput->AddChannels();
if (flags & ETestGraphFlags::InputWithSource) {
auto* source = ingress->AddInputs()->MutableSource();
- source->SetType(sourceType);
+ source->SetType("PqSource");
}
auto* map = result.AddTask();
@@ -40,7 +40,7 @@ NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags, const TString& sourceType
mapOutput->AddChannels();
if (flags & ETestGraphFlags::SourceWithChannelInOneTask) {
auto* source = map->AddInputs()->MutableSource();
- source->SetType(sourceType);
+ source->SetType("PqSource");
}
auto* egress = result.AddTask();
@@ -70,9 +70,9 @@ struct TTestBootstrap : public TTestActorRuntime {
::NMonitoring::TDynamicCounterPtr Counters = new ::NMonitoring::TDynamicCounters();
- explicit TTestBootstrap(ui64 graphFlags, const TString& sourceType)
+ explicit TTestBootstrap(ui64 graphFlags = 0)
: TTestActorRuntime(true)
- , GraphState(BuildTestGraph(graphFlags, sourceType))
+ , GraphState(BuildTestGraph(graphFlags))
, CoordinatorId("my-graph-id", 42)
, CheckpointId(CoordinatorId.Generation, 1)
{
@@ -281,8 +281,8 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
class CheckpointsTestHelper : public TTestBootstrap
{
public:
- CheckpointsTestHelper(ui64 graphFlags, const TString& sourceType)
- : TTestBootstrap(graphFlags, sourceType) {
+ CheckpointsTestHelper(ui64 graphFlags)
+ : TTestBootstrap(graphFlags) {
}
void InjectCheckpoint() {
@@ -372,33 +372,22 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
};
Y_UNIT_TEST(ShouldTriggerCheckpointWithSource) {
- CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, "PqSource");
+ CheckpointsTestHelper test(ETestGraphFlags::InputWithSource);
test.InjectCheckpoint();
test.AllSavedAndCommited();
}
Y_UNIT_TEST(ShouldTriggerCheckpointWithSourcesAndWithChannel) {
- CheckpointsTestHelper test(ETestGraphFlags::InputWithSource | ETestGraphFlags::SourceWithChannelInOneTask, "PqSource");
+ CheckpointsTestHelper test(ETestGraphFlags::InputWithSource | ETestGraphFlags::SourceWithChannelInOneTask);
test.InjectCheckpoint();
test.AllSavedAndCommited();
}
Y_UNIT_TEST(ShouldAbortPreviousCheckpointsIfNodeStateCantBeSaved) {
- CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, "PqSource");
+ CheckpointsTestHelper test(ETestGraphFlags::InputWithSource);
test.InjectCheckpoint();
test.SaveFailed();
}
-
- Y_UNIT_TEST(ShouldDoNothingIfNoIngressTasks) {
- CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, "S3Source");
- bool empty = false;
- try {
- test.GrabEdgeEvent<TEvCheckpointStorage::TEvRegisterCoordinatorRequest>(test.StorageProxy, TDuration::Seconds(10));
- } catch (TEmptyEventQueueException&) {
- empty = true;
- }
- UNIT_ASSERT(empty);
- }
}
} // namespace NFq