diff options
author | Dmitry Kardymon <kardymon-d@ydb.tech> | 2025-04-23 15:15:52 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-23 15:15:52 +0300 |
commit | db3e06b00adc26b1b93cb15910e646585c301bf8 (patch) | |
tree | 7a66e70c54485adcc69852c5665b13cc841fbd7c | |
parent | d573ec3cac244a604046c82e7119278d32251b29 (diff) | |
download | ydb-db3e06b00adc26b1b93cb15910e646585c301bf8.tar.gz |
YQ-4188 Fix StartingMessageTs for federated topic (#17600)
3 files changed, 134 insertions, 1 deletions
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index ca03490ac75..42655a5ca8e 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -574,6 +574,7 @@ void TDqPqRdReadActor::InitChild() { NextOffsetFromRD[partitionKey.PartitionId] = offset; } } + StartingMessageTimestamp = Parent->StartingMessageTimestamp; SRC_LOG_I("Send TEvCoordinatorChangesSubscribe to local RD (" << LocalRowDispatcherActorId << ")"); Send(LocalRowDispatcherActorId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe()); Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState()); diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp index 9143be3c062..bc1c607ab77 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp @@ -120,7 +120,7 @@ void TDqPqReadActorBase::LoadState(const TSourceState& state) { ingressBytes += stateProto.GetIngressBytes(); } TStringStream str; - str << "SessionId: " << GetSessionId() << " Restoring offset: "; + str << "SessionId: " << GetSessionId() << " StartingMessageTs " << minStartingMessageTs << " Restoring offset: "; for (const auto& [key, value] : PartitionToOffset) { str << "{" << key << "," << value << "},"; } diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index de23144adf2..775600c98da 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -1001,3 +1001,135 @@ class TestPqRowDispatcher(TestYdsBase): filtered_bytes = stat['Graph=0']['IngressFilteredBytes']['sum'] filtered_rows = stat['Graph=0']['IngressFilteredRows']['sum'] assert filtered_bytes > 1 and filtered_rows > 0 + + @yq_v1 + @pytest.mark.skip(reason="Is not implemented") + def test_group_by_hop_restart_query(self, kikimr, client): + client.create_yds_connection( + YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True + ) + self.init_topics("test_group_by_hop_restart") + + sql1 = Rf''' + $data = SELECT * FROM {YDS_CONNECTION}.`{self.input_topic}` + WITH (format=json_each_row, SCHEMA (time String NOT NULL, project String NOT NULL)); + $hop_data = SELECT + CAST(HOP_END() as String) as time, + COUNT(*) as count + FROM $data + GROUP BY + HOP(CAST(time as TimeStamp), "PT10S", "PT10S", "PT0S"); + INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` + SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $hop_data;''' + + query_id = start_yds_query(kikimr, client, sql1) + wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) + + data = [ + '{"time": "2025-04-23T09:00:00.000000Z", "project": "project1"}', + '{"time": "2025-04-23T09:00:01.000000Z", "project": "project1"}', + '{"time": "2025-04-23T09:00:02.000000Z", "project": "project1"}', + '{"time": "2025-04-23T09:00:03.000000Z", "project": "project1"}', + '{"time": "2025-04-23T09:00:04.000000Z", "project": "project1"}', + '{"time": "2025-04-23T09:00:15.000000Z", "project": "project1"}', + '{"time": "2025-04-23T09:00:16.000000Z", "project": "project1"}', + ] + self.write_stream(data) + expected = ['{"count":5,"time":"2025-04-23T09:00:10Z"}'] + assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + + kikimr.compute_plane.wait_completed_checkpoints( + query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2 + ) + stop_yds_query(client, query_id) + wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0) + + data = [ + '{"time": "2025-04-23T09:00:17.000000Z", "project": "project1"}', + '{"time": "2025-04-23T09:00:18.000000Z", "project": "project1"}', + '{"time": "2025-04-23T09:00:21.000000Z", "project": "project1"}', + '{"time": "2025-04-23T09:00:25.000000Z", "project": "project1"}', + '{"time": "2025-04-23T09:00:31.000000Z", "project": "project1"}', + ] + self.write_stream(data) + + client.modify_query( + query_id, + "continue", + sql1, + type=fq.QueryContent.QueryType.STREAMING, + state_load_mode=fq.StateLoadMode.FROM_LAST_CHECKPOINT, + streaming_disposition=StreamingDisposition.from_last_checkpoint(), + ) + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + + expected = [ + '{"count":4,"time":"2025-04-23T09:00:20Z"}', + '{"count":2,"time":"2025-04-23T09:00:30Z"}'] + assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + + stop_yds_query(client, query_id) + wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0) + + @yq_v1 + def test_group_by_hop_restart_node(self, kikimr, client): + client.create_yds_connection( + YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True + ) + self.init_topics("test_group_by_hop_restart") + + sql1 = Rf''' + $data = SELECT * FROM {YDS_CONNECTION}.`{self.input_topic}` + WITH (format=json_each_row, SCHEMA (time String NOT NULL, project String NOT NULL)); + $hop_data = SELECT + project, + CAST(HOP_END() as String) as time, + COUNT(*) as count + FROM $data + GROUP BY + HOP(CAST(time as TimeStamp), "PT10S", "PT10S", "PT0S"), project; + INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` + SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $hop_data;''' + + query_id = start_yds_query(kikimr, client, sql1) + wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) + + data = [ + '{"time": "2025-04-23T09:00:00.000000Z", "project": "project1"}', + '{"time": "2025-04-23T09:00:04.000000Z", "project": "project1"}', + '{"time": "2025-04-23T09:00:05.000000Z", "project": "project2"}', + '{"time": "2025-04-23T09:00:15.000000Z", "project": "project1"}', + '{"time": "2025-04-23T09:00:16.000000Z", "project": "project2"}', + ] + self.write_stream(data) + expected = [ + '{"count":2,"project":"project1","time":"2025-04-23T09:00:10Z"}', + '{"count":1,"project":"project2","time":"2025-04-23T09:00:10Z"}'] + assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + + kikimr.compute_plane.wait_completed_checkpoints( + query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2 + ) + + node_index = 1 + logging.debug("Restart compute node {}".format(node_index)) + kikimr.compute_plane.kikimr_cluster.nodes[node_index].stop() + + data = [ + '{"time": "2025-04-23T09:00:17.000000Z", "project": "project1"}', + '{"time": "2025-04-23T09:00:18.000000Z", "project": "project2"}', + '{"time": "2025-04-23T09:00:21.000000Z", "project": "project1"}', + '{"time": "2025-04-23T09:00:25.000000Z", "project": "project2"}' + ] + self.write_stream(data) + + kikimr.compute_plane.kikimr_cluster.nodes[node_index].start() + kikimr.compute_plane.wait_bootstrap(node_index) + + expected = [ + '{"count":2,"project":"project1","time":"2025-04-23T09:00:20Z"}', + '{"count":2,"project":"project2","time":"2025-04-23T09:00:20Z"}'] + assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + + stop_yds_query(client, query_id) + wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0) |