aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Kardymon <kardymon-d@ydb.tech>2025-04-23 15:15:52 +0300
committerGitHub <noreply@github.com>2025-04-23 15:15:52 +0300
commitdb3e06b00adc26b1b93cb15910e646585c301bf8 (patch)
tree7a66e70c54485adcc69852c5665b13cc841fbd7c
parentd573ec3cac244a604046c82e7119278d32251b29 (diff)
downloadydb-db3e06b00adc26b1b93cb15910e646585c301bf8.tar.gz
YQ-4188 Fix StartingMessageTs for federated topic (#17600)
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp1
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp2
-rw-r--r--ydb/tests/fq/yds/test_row_dispatcher.py132
3 files changed, 134 insertions, 1 deletions
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
index ca03490ac75..42655a5ca8e 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
@@ -574,6 +574,7 @@ void TDqPqRdReadActor::InitChild() {
NextOffsetFromRD[partitionKey.PartitionId] = offset;
}
}
+ StartingMessageTimestamp = Parent->StartingMessageTimestamp;
SRC_LOG_I("Send TEvCoordinatorChangesSubscribe to local RD (" << LocalRowDispatcherActorId << ")");
Send(LocalRowDispatcherActorId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe());
Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState());
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp
index 9143be3c062..bc1c607ab77 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp
@@ -120,7 +120,7 @@ void TDqPqReadActorBase::LoadState(const TSourceState& state) {
ingressBytes += stateProto.GetIngressBytes();
}
TStringStream str;
- str << "SessionId: " << GetSessionId() << " Restoring offset: ";
+ str << "SessionId: " << GetSessionId() << " StartingMessageTs " << minStartingMessageTs << " Restoring offset: ";
for (const auto& [key, value] : PartitionToOffset) {
str << "{" << key << "," << value << "},";
}
diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py
index de23144adf2..775600c98da 100644
--- a/ydb/tests/fq/yds/test_row_dispatcher.py
+++ b/ydb/tests/fq/yds/test_row_dispatcher.py
@@ -1001,3 +1001,135 @@ class TestPqRowDispatcher(TestYdsBase):
filtered_bytes = stat['Graph=0']['IngressFilteredBytes']['sum']
filtered_rows = stat['Graph=0']['IngressFilteredRows']['sum']
assert filtered_bytes > 1 and filtered_rows > 0
+
+ @yq_v1
+ @pytest.mark.skip(reason="Is not implemented")
+ def test_group_by_hop_restart_query(self, kikimr, client):
+ client.create_yds_connection(
+ YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
+ )
+ self.init_topics("test_group_by_hop_restart")
+
+ sql1 = Rf'''
+ $data = SELECT * FROM {YDS_CONNECTION}.`{self.input_topic}`
+ WITH (format=json_each_row, SCHEMA (time String NOT NULL, project String NOT NULL));
+ $hop_data = SELECT
+ CAST(HOP_END() as String) as time,
+ COUNT(*) as count
+ FROM $data
+ GROUP BY
+ HOP(CAST(time as TimeStamp), "PT10S", "PT10S", "PT0S");
+ INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
+ SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $hop_data;'''
+
+ query_id = start_yds_query(kikimr, client, sql1)
+ wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
+
+ data = [
+ '{"time": "2025-04-23T09:00:00.000000Z", "project": "project1"}',
+ '{"time": "2025-04-23T09:00:01.000000Z", "project": "project1"}',
+ '{"time": "2025-04-23T09:00:02.000000Z", "project": "project1"}',
+ '{"time": "2025-04-23T09:00:03.000000Z", "project": "project1"}',
+ '{"time": "2025-04-23T09:00:04.000000Z", "project": "project1"}',
+ '{"time": "2025-04-23T09:00:15.000000Z", "project": "project1"}',
+ '{"time": "2025-04-23T09:00:16.000000Z", "project": "project1"}',
+ ]
+ self.write_stream(data)
+ expected = ['{"count":5,"time":"2025-04-23T09:00:10Z"}']
+ assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
+
+ kikimr.compute_plane.wait_completed_checkpoints(
+ query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
+ )
+ stop_yds_query(client, query_id)
+ wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
+
+ data = [
+ '{"time": "2025-04-23T09:00:17.000000Z", "project": "project1"}',
+ '{"time": "2025-04-23T09:00:18.000000Z", "project": "project1"}',
+ '{"time": "2025-04-23T09:00:21.000000Z", "project": "project1"}',
+ '{"time": "2025-04-23T09:00:25.000000Z", "project": "project1"}',
+ '{"time": "2025-04-23T09:00:31.000000Z", "project": "project1"}',
+ ]
+ self.write_stream(data)
+
+ client.modify_query(
+ query_id,
+ "continue",
+ sql1,
+ type=fq.QueryContent.QueryType.STREAMING,
+ state_load_mode=fq.StateLoadMode.FROM_LAST_CHECKPOINT,
+ streaming_disposition=StreamingDisposition.from_last_checkpoint(),
+ )
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+
+ expected = [
+ '{"count":4,"time":"2025-04-23T09:00:20Z"}',
+ '{"count":2,"time":"2025-04-23T09:00:30Z"}']
+ assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
+
+ stop_yds_query(client, query_id)
+ wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
+
+ @yq_v1
+ def test_group_by_hop_restart_node(self, kikimr, client):
+ client.create_yds_connection(
+ YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
+ )
+ self.init_topics("test_group_by_hop_restart")
+
+ sql1 = Rf'''
+ $data = SELECT * FROM {YDS_CONNECTION}.`{self.input_topic}`
+ WITH (format=json_each_row, SCHEMA (time String NOT NULL, project String NOT NULL));
+ $hop_data = SELECT
+ project,
+ CAST(HOP_END() as String) as time,
+ COUNT(*) as count
+ FROM $data
+ GROUP BY
+ HOP(CAST(time as TimeStamp), "PT10S", "PT10S", "PT0S"), project;
+ INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
+ SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $hop_data;'''
+
+ query_id = start_yds_query(kikimr, client, sql1)
+ wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
+
+ data = [
+ '{"time": "2025-04-23T09:00:00.000000Z", "project": "project1"}',
+ '{"time": "2025-04-23T09:00:04.000000Z", "project": "project1"}',
+ '{"time": "2025-04-23T09:00:05.000000Z", "project": "project2"}',
+ '{"time": "2025-04-23T09:00:15.000000Z", "project": "project1"}',
+ '{"time": "2025-04-23T09:00:16.000000Z", "project": "project2"}',
+ ]
+ self.write_stream(data)
+ expected = [
+ '{"count":2,"project":"project1","time":"2025-04-23T09:00:10Z"}',
+ '{"count":1,"project":"project2","time":"2025-04-23T09:00:10Z"}']
+ assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
+
+ kikimr.compute_plane.wait_completed_checkpoints(
+ query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
+ )
+
+ node_index = 1
+ logging.debug("Restart compute node {}".format(node_index))
+ kikimr.compute_plane.kikimr_cluster.nodes[node_index].stop()
+
+ data = [
+ '{"time": "2025-04-23T09:00:17.000000Z", "project": "project1"}',
+ '{"time": "2025-04-23T09:00:18.000000Z", "project": "project2"}',
+ '{"time": "2025-04-23T09:00:21.000000Z", "project": "project1"}',
+ '{"time": "2025-04-23T09:00:25.000000Z", "project": "project2"}'
+ ]
+ self.write_stream(data)
+
+ kikimr.compute_plane.kikimr_cluster.nodes[node_index].start()
+ kikimr.compute_plane.wait_bootstrap(node_index)
+
+ expected = [
+ '{"count":2,"project":"project1","time":"2025-04-23T09:00:20Z"}',
+ '{"count":2,"project":"project2","time":"2025-04-23T09:00:20Z"}']
+ assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
+
+ stop_yds_query(client, query_id)
+ wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)