summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Kardymon <[email protected]>2025-07-10 15:59:46 +0300
committerGitHub <[email protected]>2025-07-10 15:59:46 +0300
commitf43a538dcbe3e8364ff021134c0f1177814aa57f (patch)
treefa6a77bd6e432832f42b572709604c1fa67c728b
parent9f6d559183eb9fb02d78c58ea6bb61179c9a025e (diff)
YQ-2643 Disable coordinator if no ingress tasks (#1113)oidc-1.2.1
-rw-r--r--ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp14
-rw-r--r--ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h1
-rw-r--r--ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp40
3 files changed, 47 insertions, 8 deletions
diff --git a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
index d7ff17db741..f080732147a 100644
--- a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
+++ b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
@@ -80,6 +80,14 @@ void TCheckpointCoordinator::Handle(NFq::TEvCheckpointCoordinator::TEvReadyState
AllActorsSet.insert(actorId);
}
+ CC_LOG_D("AllActors count: " << AllActors.size() << ", ActorsToTrigger count: " << ActorsToTrigger.size() << ", ActorsToNotify count: " << ActorsToNotify.size() << ", ActorsToWaitFor count: " << ActorsToWaitFor.size());
+
+ if (ActorsToTrigger.empty()) {
+ CC_LOG_D("No ingress tasks, coordinator was disabled");
+ StartAllTasks();
+ return;
+ }
+
PendingInit = std::make_unique<TPendingInitCoordinator>(AllActors.size());
CC_LOG_D("Send TEvRegisterCoordinatorRequest");
@@ -408,8 +416,12 @@ void TCheckpointCoordinator::InjectCheckpoint(const TCheckpointId& checkpointId,
transport->EventsQueue.Send(new NYql::NDq::TEvDqCompute::TEvInjectCheckpoint(checkpointId.SeqNo, checkpointId.CoordinatorGeneration, type));
}
+ StartAllTasks();
+}
+
+void TCheckpointCoordinator::StartAllTasks() {
if (!GraphIsRunning) {
- CC_LOG_I("[" << checkpointId << "] Send TEvRun to all actors");
+ CC_LOG_I("Send TEvRun to all actors");
for (const auto& [actor, transport] : AllActors) {
transport->EventsQueue.Send(new NYql::NDq::TEvDqCompute::TEvRun());
}
diff --git a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h
index 263903467b2..2c48f08e1a5 100644
--- a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h
+++ b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h
@@ -91,6 +91,7 @@ private:
void PassAway() override;
void RestoreFromOwnCheckpoint(const TCheckpointMetadata& checkpoint);
void TryToRestoreOffsetsFromForeignCheckpoint(const TCheckpointMetadata& checkpoint);
+ void StartAllTasks();
void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message, const NYql::TIssues& subIssues);
void OnInternalError(const TString& message, const NYql::TIssues& subIssues = {});
diff --git a/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp b/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp
index 8872e7d2080..fd2df55e6de 100644
--- a/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp
+++ b/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp
@@ -4,6 +4,7 @@
#include <ydb/core/testlib/basics/helpers.h>
#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/testing/unittest/gtest.h>
#include <ydb/library/actors/core/executor_pool_basic.h>
#include <ydb/library/actors/core/scheduler_basic.h>
@@ -23,7 +24,7 @@ enum ETestGraphFlags : ui64 {
SourceWithChannelInOneTask = 2,
};
-NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags = 0) {
+NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags, const TString& sourceType) {
NYql::NDqProto::TReadyState result;
@@ -33,7 +34,7 @@ NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags = 0) {
ingressOutput->AddChannels();
if (flags & ETestGraphFlags::InputWithSource) {
auto* source = ingress->AddInputs()->MutableSource();
- source->SetType("PqSource");
+ source->SetType(sourceType);
}
auto* map = result.AddTask();
@@ -44,7 +45,7 @@ NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags = 0) {
mapOutput->AddChannels();
if (flags & ETestGraphFlags::SourceWithChannelInOneTask) {
auto* source = map->AddInputs()->MutableSource();
- source->SetType("PqSource");
+ source->SetType(sourceType);
}
auto* egress = result.AddTask();
@@ -78,9 +79,9 @@ struct TTestBootstrap : public TTestActorRuntime {
::NMonitoring::TDynamicCounterPtr Counters = new ::NMonitoring::TDynamicCounters();
- explicit TTestBootstrap(ui64 graphFlags = 0, ui64 snaphotRotationPeriod = 0)
+ explicit TTestBootstrap(ui64 graphFlags, ui64 snaphotRotationPeriod, const TString& sourceType)
: TTestActorRuntime(true)
- , GraphState(BuildTestGraph(graphFlags))
+ , GraphState(BuildTestGraph(graphFlags, sourceType))
, CoordinatorId("my-graph-id", 42)
, CheckpointId1(CoordinatorId.Generation, 1)
, CheckpointId2(CoordinatorId.Generation, 2)
@@ -219,6 +220,12 @@ struct TTestBootstrap : public TTestActorRuntime {
return google::protobuf::util::MessageDifferencer::Equals(lhs.Record, rhs.Record);
}
+ bool IsEqual(
+ const NYql::NDq::TEvDqCompute::TEvRun& lhs,
+ const NYql::NDq::TEvDqCompute::TEvRun& rhs) {
+ return google::protobuf::util::MessageDifferencer::Equals(lhs.Record, rhs.Record);
+ }
+
template <typename TEvent>
void ExpectEvent(NActors::TActorId actorId, const TEvent& expectedEventValue) {
auto eventHolder = GrabEdgeEvent<TEvent>(actorId, TDuration::Seconds(10));
@@ -330,8 +337,8 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
class CheckpointsTestHelper : public TTestBootstrap
{
public:
- CheckpointsTestHelper(ui64 graphFlags, ui64 snaphotRotationPeriod = 0)
- : TTestBootstrap(graphFlags, snaphotRotationPeriod) {
+ CheckpointsTestHelper(ui64 graphFlags, ui64 snaphotRotationPeriod = 0, const TString& sourceType = "PqSource")
+ : TTestBootstrap(graphFlags, snaphotRotationPeriod, sourceType) {
}
void RegisterCoordinator() {
@@ -445,12 +452,19 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
void ScheduleCheckpointing() {
MockScheduleCheckpointing();
}
+
+ void ExpectRun() {
+ ExpectEvent(IngressActor, NYql::NDq::TEvDqCompute::TEvRun());
+ ExpectEvent(MapActor, NYql::NDq::TEvDqCompute::TEvRun());
+ ExpectEvent(EgressActor, NYql::NDq::TEvDqCompute::TEvRun());
+ }
};
Y_UNIT_TEST(ShouldTriggerCheckpointWithSource) {
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, 0);
test.RegisterCoordinator();
test.InjectCheckpoint(test.CheckpointId1);
+ test.ExpectRun();
test.AllSavedAndCommited(test.CheckpointId1);
test.MockRunGraph();
}
@@ -459,6 +473,7 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource | ETestGraphFlags::SourceWithChannelInOneTask, 0);
test.RegisterCoordinator();
test.InjectCheckpoint(test.CheckpointId1);
+ test.ExpectRun();
test.AllSavedAndCommited(test.CheckpointId1);
test.MockRunGraph();
}
@@ -467,6 +482,7 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, 0);
test.RegisterCoordinator();
test.InjectCheckpoint(test.CheckpointId1);
+ test.ExpectRun();
test.AllSavedAndCommited(test.CheckpointId1);
test.MockRunGraph();
@@ -479,6 +495,7 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, 2);
test.RegisterCoordinator();
test.InjectCheckpoint(test.CheckpointId1);
+ test.ExpectRun();
test.AllSavedAndCommited(test.CheckpointId1);
test.MockRunGraph();
@@ -499,11 +516,20 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, 0);
test.RegisterCoordinator();
test.InjectCheckpoint(test.CheckpointId1);
+ test.ExpectRun();
test.SaveFailed(test.CheckpointId1);
test.ScheduleCheckpointing();
test.InjectCheckpoint(test.CheckpointId2, test.GraphDescId, NYql::NDqProto::CHECKPOINT_TYPE_SNAPSHOT);
}
+
+ Y_UNIT_TEST(ShouldDoNothingIfNoIngressTasks) {
+ CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, 0, "S3Source");
+ test.ExpectRun();
+ ASSERT_THROW(
+ test.GrabEdgeEvent<TEvCheckpointStorage::TEvRegisterCoordinatorRequest>(test.StorageProxy, TDuration::Seconds(10)),
+ NActors::TEmptyEventQueueException);
+ }
}
} // namespace NFq