diff options
author | hor911 <hor911@ydb.tech> | 2023-11-13 10:42:42 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-11-13 11:29:33 +0300 |
commit | 1e8ae22002dcb5248f29a4dca925e9708af9bc31 (patch) | |
tree | b0dde5b6701be458c198d5a16d3fea6b837fdce6 | |
parent | 005735009aadf431092af0ecbb430dafee3f58ea (diff) | |
download | ydb-1e8ae22002dcb5248f29a4dca925e9708af9bc31.tar.gz |
Move more tests to OSS
42 files changed, 3244 insertions, 0 deletions
diff --git a/ydb/tests/fq/mem_alloc/test_alloc_default.py b/ydb/tests/fq/mem_alloc/test_alloc_default.py new file mode 100644 index 0000000000..4580110a42 --- /dev/null +++ b/ydb/tests/fq/mem_alloc/test_alloc_default.py @@ -0,0 +1,373 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os + +import pytest +import six +import time + +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase +import ydb.tests.library.common.yatest_common as yatest_common +from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig + +import ydb.public.api.protos.draft.fq_pb2 as fq + +K = 1024 +M = 1024*1024 +G = 1024*1024*1024 +DEFAULT_LIMIT = 8*G +DEFAULT_DELTA = 30*M + + +@pytest.fixture +def kikimr(request): + (initial, total, step, hard_limit) = request.param + kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True) + kikimr = StreamingOverKikimr(kikimr_conf) + kikimr.mkql_initial_memory_limit = initial + kikimr.mkql_total_memory_limit = total + kikimr.mkql_alloc_size = step + kikimr.mkql_task_hard_memory_limit = hard_limit + kikimr.compute_plane.fq_config['resource_manager']['mkql_initial_memory_limit'] = kikimr.mkql_initial_memory_limit + kikimr.compute_plane.fq_config['resource_manager']['mkql_total_memory_limit'] = kikimr.mkql_total_memory_limit + kikimr.compute_plane.fq_config['resource_manager']['mkql_alloc_size'] = kikimr.mkql_alloc_size + kikimr.compute_plane.fq_config['resource_manager']['mkql_task_hard_memory_limit'] = kikimr.mkql_task_hard_memory_limit + kikimr.control_plane.fq_config['quotas_manager']['quotas'] = [] + kikimr.control_plane.fq_config['quotas_manager']['quotas'].append({"subject_type": "cloud", + "subject_id": "my_cloud", + "limit": [{"name": "yq.streamingQuery.count", "limit": 100}]}) + kikimr.start_mvp_mock_server() + kikimr.start() + yield kikimr + kikimr.stop_mvp_mock_server() + kikimr.stop() + + +def wait_until(predicate, wait_time=yatest_common.plain_or_under_sanitizer(10, 50), wait_step=yatest_common.plain_or_under_sanitizer(0.5, 2)): + deadline = time.time() + wait_time + while time.time() < deadline: + if predicate(): + return True + time.sleep(wait_step) + else: + return False + + +def feq(a, b): + if abs(a) <= 1*G: + return a == b + else: + return abs((a - b) / a) < 0.0000001 + + +class TestAlloc(TestYdsBase): + @pytest.mark.parametrize("kikimr", [(None, None, None, None)], indirect=["kikimr"]) + def test_default_limits(self, kikimr): + + kikimr.control_plane.wait_bootstrap(1) + assert kikimr.control_plane.get_mkql_limit(1) == 0, "Incorrect Limit" + assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc" + + self.init_topics("select_default_limits", create_output=False) + + sql = R''' + SELECT * FROM myyds.`{input_topic}` + '''\ + .format( + input_topic=self.input_topic, + ) + + client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr) + + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.control_plane.wait_zero_checkpoint(query_id) + + task_count = kikimr.control_plane.get_task_count(1, query_id) + assert feq(kikimr.control_plane.get_mkql_allocated(1), task_count * DEFAULT_LIMIT), "Incorrect Alloc" + + limit = sum(v for k, v in six.iteritems(kikimr.control_plane.get_sensors(1, "dq_tasks").find_sensors({"operation": query_id, "subsystem": "mkql", "sensor": "MemoryLimit"}, "id"))) + usage = sum(v for k, v in six.iteritems(kikimr.control_plane.get_sensors(1, "dq_tasks").find_sensors({"operation": query_id, "subsystem": "mkql", "sensor": "MemoryUsage"}, "id"))) + assert limit is not None + assert usage is not None + # assert limit == task_count * DEFAULT_LIMIT, "Incorrect Alloc" + + client.abort_query(query_id) + client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER) + assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc" + + @pytest.mark.parametrize("kikimr", [(1 * M, 8 * G, None, None)], indirect=["kikimr"]) + def test_default_delta(self, kikimr): + + kikimr.control_plane.wait_bootstrap(1) + assert kikimr.control_plane.get_mkql_limit(1) == kikimr.mkql_total_memory_limit, "Incorrect Limit" + assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc" + + self.init_topics("select_default_delta", create_output=False) + + sql = R''' + SELECT COUNT(*) + FROM myyds.`{input_topic}` + GROUP BY HOP(Just(CurrentUtcTimestamp()), "PT10S", "PT10S", "PT10S"), Data + LIMIT 1 + '''\ + .format( + input_topic=self.input_topic, + ) + + client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr) + + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.control_plane.wait_zero_checkpoint(query_id) + + task_count = kikimr.control_plane.get_task_count(1, query_id) + initially_allocated = kikimr.control_plane.get_mkql_allocated(1) + assert initially_allocated == task_count * kikimr.mkql_initial_memory_limit, "Incorrect Alloc" + + for i in range(10000): + self.write_stream([format(i, "0x")]) + last_allocated = kikimr.control_plane.get_mkql_allocated(1) + if last_allocated != initially_allocated: + assert last_allocated == initially_allocated + DEFAULT_DELTA + break + else: + assert False, "Limit was not increased" + + client.abort_query(query_id) + client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER) + assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc" + + @pytest.mark.parametrize("kikimr", [(1 * M, 24 * M, 1 * M, None)], indirect=["kikimr"]) + def test_node_limit(self, kikimr): + + kikimr.control_plane.wait_bootstrap(1) + assert kikimr.control_plane.get_mkql_limit(1) == kikimr.mkql_total_memory_limit, "Incorrect Limit" + assert kikimr.control_plane.get_mkql_allocated() == 0, "Incorrect Alloc" + + self.init_topics("select_node_limit", create_output=False) + + sql = R''' + SELECT COUNT(*) + FROM myyds.`{input_topic}` + GROUP BY HOP(Just(CurrentUtcTimestamp()), "PT10S", "PT10S", "PT10S"), Data + LIMIT 1 + '''\ + .format( + input_topic=self.input_topic, + ) + + client = FederatedQueryClient("my_folder@my_cloud", streaming_over_kikimr=kikimr) + + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + + queries = [] + task_count = 0 + memory_per_graph = None + + while True: + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + assert wait_until((lambda : kikimr.control_plane.get_task_count(1, query_id) > 0)), "TaskController not started" + task_count += kikimr.control_plane.get_task_count(1, query_id) + allocated = task_count * kikimr.mkql_initial_memory_limit + assert wait_until((lambda : kikimr.control_plane.get_mkql_allocated(1) == allocated)), "Task memory was not allocated" + queries.append(query_id) + if memory_per_graph is None: + memory_per_graph = allocated + if kikimr.mkql_total_memory_limit < allocated + memory_per_graph: + break + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + + def not_enough_memory(): + issues = client.describe_query(query_id).result.query.transient_issue + if len(issues) == 0: + return False + # it is possible to get several similar issues + for issue in issues: + if issue.message == "Not enough free memory in the cluster" and issue.issue_code == 6001: + return True + assert False, "Incorrect issues " + str(issues) + + assert wait_until(not_enough_memory), "Allocation was not failed" + assert kikimr.control_plane.get_mkql_allocated(1) == task_count * kikimr.mkql_initial_memory_limit, "Incorrect allocation size" + client.abort_query(query_id) + # query is respawned every 30s, so wait with increased timeout + # we may be lucky to stop the query, or it is stopped automatically due to high failure rate + client.wait_query(query_id, 60, [fq.QueryMeta.ABORTED_BY_USER, fq.QueryMeta.ABORTED_BY_SYSTEM]) + + task_count_0 = kikimr.control_plane.get_task_count(1, queries[0]) + assert task_count_0 > 0, "Strange query stat " + queries[0] + client.abort_query(queries[0]) + client.wait_query_status(queries[0], fq.QueryMeta.ABORTED_BY_USER) + + assert wait_until((lambda : kikimr.control_plane.get_mkql_allocated(1) == (task_count - task_count_0) * kikimr.mkql_initial_memory_limit)), "Task memory was not freed" + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + + assert wait_until((lambda : kikimr.control_plane.get_task_count(1, query_id) > 0)), "TaskController not started " + query_id + task_count += kikimr.control_plane.get_task_count(1, query_id) - task_count_0 + assert wait_until((lambda : kikimr.control_plane.get_mkql_allocated(1) == task_count * kikimr.mkql_initial_memory_limit)), "Task memory was not allocated" + queries[0] = query_id + + for q in queries: + client.abort_query(q) + client.wait_query_status(q, fq.QueryMeta.ABORTED_BY_USER) + + assert wait_until((lambda : kikimr.control_plane.get_mkql_allocated() == 0)), "Incorrect final free" + + @pytest.mark.parametrize("kikimr", [(1 * G, 8 * G, None, None)], indirect=["kikimr"]) + def test_alloc_and_free(self, kikimr): + + kikimr.control_plane.wait_bootstrap(1) + assert kikimr.control_plane.get_mkql_limit(1) == kikimr.mkql_total_memory_limit, "Incorrect Limit" + assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc" + + self.init_topics("select_alloc_and_free", create_output=False) + + sql = R''' + SELECT COUNT(*) + FROM myyds.`{input_topic}` + GROUP BY HOP(Just(CurrentUtcTimestamp()), "PT10S", "PT10S", "PT10S"), Data + LIMIT 1 + '''\ + .format( + input_topic=self.input_topic, + ) + + client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr) + + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + + task_count = kikimr.control_plane.get_task_count(1, query_id) + assert kikimr.control_plane.get_mkql_allocated(1) == task_count * kikimr.mkql_initial_memory_limit, "Incorrect Alloc" + + client.abort_query(query_id) + client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER) + assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc" + + @pytest.mark.parametrize("kikimr", [(1 * M, 1 * G, 16 * K, None)], indirect=["kikimr"]) + def test_up_down(self, kikimr): + + kikimr.control_plane.wait_bootstrap(1) + assert kikimr.control_plane.get_mkql_limit(1) == kikimr.mkql_total_memory_limit, "Incorrect Limit" + assert kikimr.control_plane.get_mkql_allocated() == 0, "Incorrect Alloc" + + self.init_topics("select_up_down") + + sql = R''' + INSERT INTO myyds.`{output_topic}` + SELECT Data + FROM myyds.`{input_topic}` + GROUP BY HOP(Just(CurrentUtcTimestamp()), "PT10S", "PT10S", "PT10S"), Data + '''\ + .format( + input_topic=self.input_topic, + output_topic=self.output_topic, + ) + + client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr) + + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.control_plane.wait_zero_checkpoint(query_id) + assert wait_until((lambda : kikimr.control_plane.get_task_count(1, query_id) > 0)), "TaskController not started" + allocated_at_start = kikimr.control_plane.get_mkql_allocated(1) + + for i in range(10000): + self.write_stream([format(i, "09x")]) + allocated = kikimr.control_plane.get_mkql_allocated(1) + if allocated > allocated_at_start + 1 * M: + break + else: + assert False, "Memory limit was not increased" + + # de-allocation doesn't work as expected + # assert wait_until((lambda : kikimr.get_mkql_allocated(1) < allocated), timeout=60), "Memory limit was not decreased" + + client.abort_query(query_id) + client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER) + assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc" + + @pytest.mark.parametrize("kikimr", [(1 * M, 10 * M, 1 * G, None)], indirect=["kikimr"]) + def test_mkql_not_increased(self, kikimr): + + kikimr.control_plane.wait_bootstrap(1) + assert kikimr.control_plane.get_mkql_limit(1) == kikimr.mkql_total_memory_limit, "Incorrect Limit" + assert kikimr.control_plane.get_mkql_allocated() == 0, "Incorrect Alloc" + + self.init_topics("test_mkql_not_increased") + + sql = R''' + INSERT INTO myyds.`{output_topic}` + SELECT Data + FROM myyds.`{input_topic}` + GROUP BY HOP(Just(CurrentUtcTimestamp()), "PT10S", "PT10S", "PT10S"), Data + '''\ + .format( + input_topic=self.input_topic, + output_topic=self.output_topic, + ) + + client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr) + + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.control_plane.wait_zero_checkpoint(query_id) + assert wait_until((lambda : kikimr.control_plane.get_task_count(1, query_id) > 0)), "TaskController not started" + + for i in range(10000): + self.write_stream([format(i, "09x")]) + query = client.describe_query(query_id).result.query + issues = query.transient_issue + if len(issues) >= 1: + assert issues[0].message.startswith("Mkql memory limit exceeded, limit: 1048576"), "Incorrect message text" + assert issues[0].message.endswith("canAllocateExtraMemory: 1"), "Incorrect settings" + assert issues[0].issue_code == 2029, "Incorrect issue code" + issues[0].message + break + else: + assert False, "Memory limit was not reached" + + assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc" + + @pytest.mark.parametrize("kikimr", [(350 * K, 100 * M, 1 * K, 400 * K)], indirect=["kikimr"]) + def test_hard_limit(self, kikimr): + + kikimr.control_plane.wait_bootstrap(1) + assert kikimr.control_plane.get_mkql_limit(1) == kikimr.mkql_total_memory_limit, "Incorrect Limit" + assert kikimr.control_plane.get_mkql_allocated() == 0, "Incorrect Alloc" + + client = FederatedQueryClient("my_folder@my_cloud", streaming_over_kikimr=kikimr) + sql = R''' + SELECT ListLast(ListCollect(ListFromRange(0, {n} + 1))) + ''' + n = 1 + for i in range(0, 10): + query_id = client.create_query("simple", sql.format(n=n), type=fq.QueryContent.QueryType.STREAMING).result.query_id + status = client.wait_query_status(query_id, [fq.QueryMeta.COMPLETED, fq.QueryMeta.FAILED]) + if status == fq.QueryMeta.FAILED: + assert i > 1, "First queries must be successfull" + query = client.describe_query(query_id).result.query + describe_str = str(query) + assert "LIMIT_EXCEEDED" in describe_str + break + n = n * 10 + else: + assert False, "Limit was NOT exceeded" diff --git a/ydb/tests/fq/mem_alloc/test_dc_local.py b/ydb/tests/fq/mem_alloc/test_dc_local.py new file mode 100644 index 0000000000..2d5092604a --- /dev/null +++ b/ydb/tests/fq/mem_alloc/test_dc_local.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os + +import pytest + +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase +from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig + +import ydb.public.api.protos.draft.fq_pb2 as fq + +K = 1024 +M = 1024*1024 +G = 1024*1024*1024 +DEFAULT_LIMIT = 8*G +DEFAULT_DELTA = 30*M + + +@pytest.fixture +def kikimr(request): + kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True, node_count=4, dc_mapping={1: "DC1", 2: "DC2", 3: "DC1", 4: "DC2"}) + kikimr = StreamingOverKikimr(kikimr_conf) + kikimr.start_mvp_mock_server() + kikimr.start() + yield kikimr + kikimr.stop_mvp_mock_server() + kikimr.stop() + + +class TestAlloc(TestYdsBase): + @pytest.mark.parametrize("kikimr", [(None, None, None)], indirect=["kikimr"]) + def test_dc_locality(self, kikimr): + + self.init_topics("select_dc_locality", create_output=False) + + sql = R''' + SELECT * FROM myyds.`{input_topic}` + '''\ + .format( + input_topic=self.input_topic, + ) + + client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr) + + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + + kikimr.control_plane.wait_discovery() + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.control_plane.wait_zero_checkpoint(query_id) + + w1 = kikimr.control_plane.get_worker_count(1) + w2 = kikimr.control_plane.get_worker_count(2) + w3 = kikimr.control_plane.get_worker_count(3) + w4 = kikimr.control_plane.get_worker_count(4) + + assert ((w1 * w3) != 0) ^ ((w2 * w4) != 0), "Incorrect placement " + str([w1, w2, w3, w4]) + + client.abort_query(query_id) + client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER) diff --git a/ydb/tests/fq/mem_alloc/test_result_limits.py b/ydb/tests/fq/mem_alloc/test_result_limits.py new file mode 100755 index 0000000000..a2c9879e59 --- /dev/null +++ b/ydb/tests/fq/mem_alloc/test_result_limits.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import pytest +import time + +from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig + +import ydb.public.api.protos.draft.fq_pb2 as fq + + +# Quota per cloud +QUOTA_ANALYTICS_COUNT_LIMIT = "yq.analyticsQuery.count" +QUOTA_STREAMING_COUNT_LIMIT = "yq.streamingQuery.count" +QUOTA_CPU_PERCENT_LIMIT = "yq.cpuPercent.count" +QUOTA_MEMORY_LIMIT = "yq.memory.size" +QUOTA_RESULT_LIMIT = "yq.result.size" + +# Quota per query +QUOTA_ANALYTICS_DURATION_LIMIT = "yq.analyticsQueryDurationMinutes.count" +QUOTA_STREAMING_DURATION_LIMIT = "yq.streamingQueryDurationMinutes.count" # internal, for preview purposes +QUOTA_QUERY_RESULT_LIMIT = "yq.queryResult.size" + + +@pytest.fixture +def kikimr(request): + kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True) + kikimr = StreamingOverKikimr(kikimr_conf) + if hasattr(request, "param"): + kikimr.control_plane.fq_config['quotas_manager'] = {} + kikimr.control_plane.fq_config['quotas_manager']['enabled'] = True + kikimr.control_plane.fq_config['quotas_manager']['quotas'] = [] + for cloud, limit in request.param.items(): + kikimr.control_plane.fq_config['quotas_manager']['quotas'].append({"subject_type": "cloud", "subject_id": cloud, "limit": [{"name": QUOTA_QUERY_RESULT_LIMIT, "limit": limit}]}) + kikimr.start_mvp_mock_server() + kikimr.start() + yield kikimr + kikimr.stop_mvp_mock_server() + kikimr.stop() + + +def wait_until(predicate, wait_time=10, wait_step=0.5): + deadline = time.time() + wait_time + while time.time() < deadline: + if predicate(): + return True + time.sleep(wait_step) + else: + return False + + +class TestResultLimits(object): + def test_many_rows(self, kikimr): + + kikimr.control_plane.wait_bootstrap(1) + assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc" + + sql = R''' +SELECT * FROM AS_TABLE(()->(Yql::ToStream(ListReplicate(<|x: +"0123456789ABCDEF" +|>, 4000000000)))); +''' + client = FederatedQueryClient("my_folder@cloud", streaming_over_kikimr=kikimr) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + + client.wait_query_status(query_id, fq.QueryMeta.FAILED, timeout=600) + issue = client.describe_query(query_id).result.query.issue[0] + assert "LIMIT_EXCEEDED" in issue.message, "Incorrect issue " + issue.message + assert "Can not write results with size > 20971520 byte(s)" in issue.issues[0].message, "Incorrect issue " + issue.issues[0].message + + def test_large_row(self, kikimr): + + kikimr.control_plane.wait_bootstrap(1) + assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc" + + sql = R''' +SELECT ListReplicate("A", 10000000); +''' + client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + + client.wait_query_status(query_id, fq.QueryMeta.FAILED, timeout=600) + issue = client.describe_query(query_id).result.query.issue[0] + assert "LIMIT_EXCEEDED" in issue.message, "Incorrect issue " + issue.message + assert "Can not write Row[0] with size" in issue.issues[0].message and "(> 10_MB)" in issue.issues[0].message, "Incorrect issue " + issue.issues[0].message + + @pytest.mark.parametrize("kikimr", [{"tiny": 2, "huge": 1000000}], indirect=["kikimr"]) + def test_quotas(self, kikimr): + huge_client = FederatedQueryClient("my_folder@huge", streaming_over_kikimr=kikimr) + huge_query_id = huge_client.create_query("simple", "select 1000", type=fq.QueryContent.QueryType.STREAMING).result.query_id + huge_client.wait_query_status(huge_query_id, fq.QueryMeta.COMPLETED) + + tiny_client = FederatedQueryClient("my_folder@tiny", streaming_over_kikimr=kikimr) + tiny_query_id = tiny_client.create_query("simple", "select 1000", type=fq.QueryContent.QueryType.STREAMING).result.query_id + tiny_client.wait_query_status(tiny_query_id, fq.QueryMeta.FAILED) + issue = tiny_client.describe_query(tiny_query_id).result.query.issue[0] + assert "LIMIT_EXCEEDED" in issue.message, "Incorrect issue " + issue.message + assert "Can not write results with size > 2 byte(s)" in issue.issues[0].message, "Incorrect issue " + issue.issues[0].message diff --git a/ydb/tests/fq/mem_alloc/test_scheduling.py b/ydb/tests/fq/mem_alloc/test_scheduling.py new file mode 100644 index 0000000000..b2a6e53c2a --- /dev/null +++ b/ydb/tests/fq/mem_alloc/test_scheduling.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os + +import pytest +import time + +import ydb.tests.library.common.yatest_common as yatest_common +from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig + +import ydb.public.api.protos.draft.fq_pb2 as fq + +K = 1024 +M = 1024*1024 +G = 1024*1024*1024 +DEFAULT_LIMIT = 8*G +DEFAULT_DELTA = 30*M +DEFAULT_WAIT_TIME = yatest_common.plain_or_under_sanitizer(10, 50) +LONG_WAIT_TIME = yatest_common.plain_or_under_sanitizer(60, 300) + + +@pytest.fixture +def kikimr(request): + (initial, total, step) = request.param + kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True) + kikimr = StreamingOverKikimr(kikimr_conf) + kikimr.compute_plane.fq_config['resource_manager']['mkql_initial_memory_limit'] = initial + kikimr.compute_plane.fq_config['resource_manager']['mkql_total_memory_limit'] = total + kikimr.compute_plane.fq_config['resource_manager']['mkql_alloc_size'] = step + kikimr.start_mvp_mock_server() + kikimr.start() + yield kikimr + kikimr.stop_mvp_mock_server() + kikimr.stop() + + +def wait_until(predicate, wait_time=DEFAULT_WAIT_TIME, wait_step=yatest_common.plain_or_under_sanitizer(0.5, 2)): + deadline = time.time() + wait_time + while time.time() < deadline: + if predicate(): + return True + time.sleep(wait_step) + else: + return False + + +def feq(a, b): + if abs(a) <= 1*G: + return a == b + else: + return abs((a - b) / a) < 0.0000001 + + +class TestSchedule(object): + @pytest.mark.parametrize("kikimr", [(1 * M, 6 * M, 1 * M)], indirect=["kikimr"]) + @pytest.mark.skip(reason="Should be refactored") + def test_skip_busy(self, kikimr): + + kikimr.wait_bootstrap() + kikimr.kikimr_cluster.nodes[2].stop() + + self.init_topics("select_skip_busy", create_output=False) + + sql = R''' + SELECT COUNT(*) + FROM myyds.`{input_topic}` + GROUP BY HOP(Just(CurrentUtcTimestamp()), "PT10S", "PT10S", "PT10S"), Data + LIMIT 1 + '''\ + .format( + input_topic=self.input_topic, + ) + + client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr) + + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + + nodes = [1, 3, 4, 5, 6, 7, 8] + + for node_index in nodes: + n = kikimr.get_sensors(node_index, "yq").find_sensor({"subsystem": "node_manager", "sensor": "NodesHealthCheckOk"}) + wait_until(lambda : kikimr.get_sensors(node_index, "yq").find_sensor({"subsystem": "node_manager", "sensor": "NodesHealthCheckOk"}) > n, wait_time=LONG_WAIT_TIME) + wait_until(lambda : kikimr.get_peer_count(node_index) == len(nodes) - 1, wait_time=LONG_WAIT_TIME) + + assert kikimr.get_mkql_limit(node_index) == kikimr.mkql_total_memory_limit, "Incorrect Limit" + assert kikimr.get_mkql_allocated(node_index) == 0, "Incorrect Alloc" + + queries = [] + task_count = 0 + memory_per_graph = None + tasks_per_graph = 0 + + while True: + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + assert wait_until((lambda : sum(kikimr.get_task_count(n, query_id) for n in nodes) > 0)), "TaskController not started" + task_count += sum(kikimr.get_task_count(n, query_id) for n in nodes) + allocated = task_count * kikimr.mkql_initial_memory_limit + assert wait_until((lambda : sum(kikimr.get_mkql_allocated(n) for n in nodes) == allocated)), "Task memory was not allocated" + queries.append(query_id) + if memory_per_graph is None: + memory_per_graph = allocated + tasks_per_graph = task_count + if kikimr.mkql_total_memory_limit < allocated + memory_per_graph: + break + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + + def not_enough_memory(): + issues = client.describe_query(query_id).result.query.transient_issue + if len(issues) == 0: + return False + assert len(issues) == 1, "Too many issues " + str(issues) + assert issues[0].message == "Not enough memory to allocate tasks" and issues[0].issue_code == 6001, "Incorrect issue " + issues[0].message + return True + + assert wait_until(not_enough_memory), "Allocation was not failed" + assert sum(kikimr.get_mkql_allocated(n) for n in nodes) == task_count * kikimr.mkql_initial_memory_limit, "Incorrect allocation size" + client.abort_query(query_id) + # query is respawned every 30s, so wait with increased timeout + # we may be lucky to stop the query, or it is stopped automatically due to high failure rate + client.wait_query(query_id, 60, [fq.QueryMeta.ABORTED_BY_USER, fq.QueryMeta.ABORTED_BY_SYSTEM]) + + kikimr.kikimr_cluster.nodes[2].start() + for node_index in kikimr.kikimr_cluster.nodes: + wait_until(lambda : kikimr.get_peer_count(1) == 8 - 1, wait_time=LONG_WAIT_TIME) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + + wait_until(lambda : kikimr.get_task_count(None, query_id) == tasks_per_graph) + assert kikimr.get_mkql_allocated() == (task_count + tasks_per_graph) * kikimr.mkql_initial_memory_limit, "Incorrect allocation size" + + for q in queries: + client.abort_query(q) + client.abort_query(query_id) diff --git a/ydb/tests/fq/mem_alloc/ya.make b/ydb/tests/fq/mem_alloc/ya.make new file mode 100644 index 0000000000..e8c55b968e --- /dev/null +++ b/ydb/tests/fq/mem_alloc/ya.make @@ -0,0 +1,30 @@ +PY3TEST() + +INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/fq_runner/ydb_runner_with_datastreams.inc) + +PEERDIR( + ydb/tests/tools/datastreams_helpers + ydb/tests/tools/fq_runner +) + +DEPENDS(ydb/tests/tools/pq_read) + +TEST_SRCS( + test_alloc_default.py + test_dc_local.py + test_result_limits.py + test_scheduling.py +) + +IF (SANITIZER_TYPE == "thread") + TIMEOUT(2400) + SIZE(LARGE) + TAG(ya:fat) +ELSE() + TIMEOUT(600) + SIZE(MEDIUM) +ENDIF() + +REQUIREMENTS(ram:9) + +END() diff --git a/ydb/tests/fq/multi_plane/test_cp_ic.py b/ydb/tests/fq/multi_plane/test_cp_ic.py new file mode 100644 index 0000000000..dde77bd74d --- /dev/null +++ b/ydb/tests/fq/multi_plane/test_cp_ic.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import pytest + +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig + + +@pytest.fixture +def kikimr(): + kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True) + kikimr = StreamingOverKikimr(kikimr_conf) + kikimr.control_plane.fq_config['control_plane_storage']['mapping'] = { + "common_tenant_name": ["/alpha"] + } + kikimr.control_plane.fq_config['private_api']['loopback'] = True + kikimr.control_plane.fq_config['nodes_manager']['enabled'] = True + kikimr.start_mvp_mock_server() + kikimr.start() + yield kikimr + kikimr.stop() + kikimr.stop_mvp_mock_server() + + +class TestCpIc(object): + def test_discovery(self, kikimr): + kikimr.control_plane.wait_discovery() diff --git a/ydb/tests/fq/multi_plane/test_dispatch.py b/ydb/tests/fq/multi_plane/test_dispatch.py new file mode 100644 index 0000000000..1f6fdc9a9a --- /dev/null +++ b/ydb/tests/fq/multi_plane/test_dispatch.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import pytest + +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig +from ydb.tests.tools.fq_runner.kikimr_runner import TenantConfig +from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient + +import ydb.public.api.protos.draft.fq_pb2 as fq + + +@pytest.fixture +def kikimr(): + kikimr_conf = StreamingOverKikimrConfig( + cloud_mode=True, + node_count={"/cp": TenantConfig(1), + "/alpha": TenantConfig(1), + "/beta": TenantConfig(1)}, + tenant_mapping={"alpha": "/alpha", "beta": "/beta"}, + cloud_mapping={"a_cloud": "alpha", "b_cloud": "beta"}) + kikimr = StreamingOverKikimr(kikimr_conf) + kikimr.start_mvp_mock_server() + kikimr.start() + yield kikimr + kikimr.stop() + kikimr.stop_mvp_mock_server() + + +class TestMapping(object): + def test_mapping(self, kikimr): + sql = "select 101" + a_client = FederatedQueryClient("a_folder@a_cloud", streaming_over_kikimr=kikimr) + b_client = FederatedQueryClient("b_folder@b_cloud", streaming_over_kikimr=kikimr) + a_query_id = a_client.create_query("a", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + a_client.wait_query_status(a_query_id, fq.QueryMeta.COMPLETED) + kikimr.tenants["/alpha"].stop() + a_query_id = a_client.create_query("a", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + for _ in range(10): + b_query_id = b_client.create_query("b", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + b_client.wait_query_status(b_query_id, fq.QueryMeta.COMPLETED) + assert a_client.describe_query(a_query_id).result.query.meta.status == fq.QueryMeta.STARTING + kikimr.tenants["/alpha"].start() + a_client.wait_query_status(a_query_id, fq.QueryMeta.COMPLETED) + + def test_idle(self, kikimr): + sql = "select 107" + a_client = FederatedQueryClient("a_folder@a_cloud", streaming_over_kikimr=kikimr) + b_client = FederatedQueryClient("b_folder@b_cloud", streaming_over_kikimr=kikimr) + a_query_id = a_client.create_query("a", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + a_client.wait_query_status(a_query_id, fq.QueryMeta.COMPLETED) + kikimr.tenants["/alpha"].stop() + a_query_id = a_client.create_query("a", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + for _ in range(10): + b_query_id = b_client.create_query("b", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + b_client.wait_query_status(b_query_id, fq.QueryMeta.COMPLETED) + assert a_client.describe_query(a_query_id).result.query.meta.status == fq.QueryMeta.STARTING + kikimr.exec_db_statement( + """--!syntax_v1 + PRAGMA TablePathPrefix("{}"); + UPDATE mappings SET vtenant = "beta" WHERE subject_id = "a_cloud"; + UPDATE tenants SET state = 2, state_time = CurrentUtcTimestamp() WHERE tenant = "/alpha"; + """ + ) + a_client.wait_query_status(a_query_id, fq.QueryMeta.COMPLETED) diff --git a/ydb/tests/fq/multi_plane/test_retry.py b/ydb/tests/fq/multi_plane/test_retry.py new file mode 100644 index 0000000000..789dd703a0 --- /dev/null +++ b/ydb/tests/fq/multi_plane/test_retry.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import pytest +import time + +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig +from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient + +import ydb.public.api.protos.draft.fq_pb2 as fq + + +class Param(object): + def __init__( + self, + retry_limit=2, + retry_period=20, + task_lease_ttl=4, + ping_period=2, + ): + self.retry_limit = retry_limit + self.retry_period = retry_period + self.task_lease_ttl = task_lease_ttl + self.ping_period = ping_period + + +@pytest.fixture +def kikimr(request): + kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True) + kikimr = StreamingOverKikimr(kikimr_conf) + # control + kikimr.control_plane.fq_config['control_plane_storage']['mapping'] = {"common_tenant_name": ["/compute"]} + kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy'] = {} + kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy']['retry_count'] = request.param.retry_limit + kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy']['retry_period'] = "{}s".format(request.param.retry_period) + kikimr.control_plane.fq_config['control_plane_storage']['task_lease_ttl'] = "{}s".format(request.param.task_lease_ttl) + # compute + kikimr.compute_plane.fq_config['pinger']['ping_period'] = "{}s".format(request.param.ping_period) + kikimr.start_mvp_mock_server() + kikimr.start() + yield kikimr + kikimr.stop() + kikimr.stop_mvp_mock_server() + + +class TestRetry(TestYdsBase): + @pytest.mark.parametrize("kikimr", [Param(retry_limit=0, retry_period=1000)], indirect=["kikimr"]) + def test_fail_first(self, kikimr): + topic_name = "fail_first" + connection = "fail_first" + self.init_topics(topic_name) + sql = R'''SELECT * FROM {connection}.`{input_topic}`;'''\ + .format( + input_topic=self.input_topic, + connection=connection + ) + client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr) + client.create_yds_connection(connection, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("a", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.stop() + kikimr.compute_plane.start() + client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_SYSTEM) + retry_count = kikimr.compute_plane.get_sensors(1, "yq").find_sensor({"query_id": query_id, "sensor": "RetryCount"}) + assert retry_count == 0, "Incorrect RetryCount" + + @pytest.mark.parametrize("kikimr", [Param(retry_limit=1, retry_period=2, task_lease_ttl=1, ping_period=0.5)], indirect=["kikimr"]) + def test_low_rate(self, kikimr): + topic_name = "low_rate" + connection = "low_rate" + self.init_topics(topic_name) + sql = R'''SELECT * FROM {connection}.`{input_topic}`;'''\ + .format( + input_topic=self.input_topic, + connection=connection + ) + client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr) + client.create_yds_connection(connection, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("a", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + deadline = time.time() + 2 * 2 + for _ in range(5): + delta = deadline - time.time() + if delta > 0: + time.sleep(delta) + deadline = time.time() + 2 * 2 + kikimr.compute_plane.stop() + kikimr.compute_plane.start() + kikimr.compute_plane.wait_bootstrap() + # client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + time.sleep(4) + client.abort_query(query_id) + client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER) + retry_count = kikimr.compute_plane.get_sensors(1, "yq").find_sensor({"query_id": query_id, "sensor": "RetryCount"}) + assert retry_count >= 1, "Incorrect RetryCount" + + @pytest.mark.parametrize("kikimr", [Param(retry_limit=3, task_lease_ttl=1, ping_period=0.5)], indirect=["kikimr"]) + def test_high_rate(self, kikimr): + topic_name = "high_rate" + connection = "high_rate" + self.init_topics(topic_name) + sql = R'''SELECT * FROM {connection}.`{input_topic}`;'''\ + .format( + input_topic=self.input_topic, + connection=connection + ) + client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr) + client.create_yds_connection(connection, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("a", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + for _ in range(10): + deadline = time.time() + 1 + kikimr.compute_plane.stop() + kikimr.compute_plane.start() + kikimr.compute_plane.wait_bootstrap() + if client.describe_query(query_id).result.query.meta.status == fq.QueryMeta.ABORTED_BY_SYSTEM: + break + delta = deadline - time.time() + if delta > 0: + time.sleep(delta) + else: + assert False, "Query was NOT aborted" diff --git a/ydb/tests/fq/multi_plane/ya.make b/ydb/tests/fq/multi_plane/ya.make new file mode 100644 index 0000000000..42b3c4d553 --- /dev/null +++ b/ydb/tests/fq/multi_plane/ya.make @@ -0,0 +1,21 @@ +PY3TEST() + +INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/fq_runner/ydb_runner_with_datastreams.inc) + +PEERDIR( + ydb/tests/tools/datastreams_helpers + ydb/tests/tools/fq_runner +) + +DEPENDS(ydb/tests/tools/pq_read) + +TEST_SRCS( + test_cp_ic.py + test_dispatch.py + test_retry.py +) + +TIMEOUT(600) +SIZE(MEDIUM) + +END() diff --git a/ydb/tests/fq/ya.make b/ydb/tests/fq/ya.make index 77586532cc..d18f304d22 100644 --- a/ydb/tests/fq/ya.make +++ b/ydb/tests/fq/ya.make @@ -1,4 +1,7 @@ RECURSE_FOR_TESTS( + mem_alloc + multi_plane plans s3 + yds ) diff --git a/ydb/tests/fq/yds/canondata/result.json b/ydb/tests/fq/yds/canondata/result.json new file mode 100644 index 0000000000..545124f3df --- /dev/null +++ b/ydb/tests/fq/yds/canondata/result.json @@ -0,0 +1,60 @@ +{ + "test_monitoring.TestSolomon.test_pq_read_solomon_write[v1]": [ + { + "labels": [ + [ + "label1", + "key1" + ], + [ + "name", + "sensor1" + ] + ], + "ts": "1970-01-01T00:00:01.000000Z", + "value": 1 + }, + { + "labels": [ + [ + "label1", + "key2" + ], + [ + "name", + "sensor1" + ] + ], + "ts": "1970-01-01T00:00:03.000000Z", + "value": 2 + }, + { + "labels": [ + [ + "label1", + "key2" + ], + [ + "name", + "sensor1" + ] + ], + "ts": "1970-01-01T00:00:04.000000Z", + "value": 3 + }, + { + "labels": [ + [ + "label1", + "key1" + ], + [ + "name", + "sensor1" + ] + ], + "ts": "1970-01-01T00:00:07.000000Z", + "value": 4 + } + ] +} diff --git a/ydb/tests/fq/yds/conftest.py b/ydb/tests/fq/yds/conftest.py new file mode 100644 index 0000000000..687e11205a --- /dev/null +++ b/ydb/tests/fq/yds/conftest.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import pytest + +from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient +from ydb.tests.tools.fq_runner.custom_hooks import * # noqa: F401,F403 Adding custom hooks for YQv2 support +from ydb.tests.tools.fq_runner.kikimr_utils import ExtensionPoint +from ydb.tests.tools.fq_runner.kikimr_utils import YQv2Extension +from ydb.tests.tools.fq_runner.kikimr_utils import ComputeExtension +from ydb.tests.tools.fq_runner.kikimr_utils import DefaultConfigExtension +from ydb.tests.tools.fq_runner.kikimr_utils import StatsModeExtension +from ydb.tests.tools.fq_runner.kikimr_utils import start_kikimr + + +@pytest.fixture +def stats_mode(): + return '' + + +@pytest.fixture +def kikimr(request: pytest.FixtureRequest, yq_version: str, stats_mode: str): + kikimr_extensions = [DefaultConfigExtension(""), + YQv2Extension(yq_version), + ComputeExtension(), + StatsModeExtension(stats_mode)] + with start_kikimr(request, kikimr_extensions) as kikimr: + yield kikimr + + +class ManyRetriesConfigExtension(ExtensionPoint): + def __init__(self): + super().__init__() + + def is_applicable(self, request): + return True + + def apply_to_kikimr(self, request, kikimr): + kikimr.compute_plane.fq_config['control_plane_storage']['retry_policy_mapping'] = [ + { + 'status_code': [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13], + 'policy': { + 'retry_count': 10000 + } + } + ] + + +@pytest.fixture +def kikimr_many_retries(request: pytest.FixtureRequest, yq_version: str): + kikimr_extensions = [DefaultConfigExtension(""), + ManyRetriesConfigExtension(), + YQv2Extension(yq_version), + ComputeExtension()] + with start_kikimr(request, kikimr_extensions) as kikimr: + yield kikimr + + +def create_client(kikimr, request): + return FederatedQueryClient(request.param["folder_id"] if request is not None else "my_folder", + streaming_over_kikimr=kikimr) + + +@pytest.fixture +def client(kikimr, request=None): + return create_client(kikimr, request) + + +@pytest.fixture +def client_many_retries(kikimr_many_retries, request=None): + return create_client(kikimr_many_retries, request) diff --git a/ydb/tests/fq/yds/test_2_selects_limit.py b/ydb/tests/fq/yds/test_2_selects_limit.py new file mode 100644 index 0000000000..abaf1d9301 --- /dev/null +++ b/ydb/tests/fq/yds/test_2_selects_limit.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import os +import pytest +import time + +from ydb.tests.tools.datastreams_helpers.control_plane import create_read_rule + +import ydb.public.api.protos.draft.fq_pb2 as fq +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 + + +class TestSelectLimit(object): + @yq_v1 + def test_select_same(self, kikimr, client): + pytest.skip("Skip until streaming disposition is implemented YQ-589") + + self.init_topics("select_same", create_output=False) + + sql = R''' + SELECT * FROM yds1.`{input_topic}` LIMIT 2; + SELECT * FROM yds1.`{input_topic}` LIMIT 2; + ''' \ + .format( + input_topic=self.input_topic, + ) + + client.create_yds_connection("yds1", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.wait_zero_checkpoint(query_id) + + time.sleep(2) # Workaround race between write and read "from now". Remove when YQ-589 will be done. + messages = ["A", "B", "C", "D", "E"] + self.write_stream(messages) + + client.wait_query(query_id, 30) + + # by default selects use different consumers, so they will read the same + # messages - first 2 of them + + rs = client.get_result_data(query_id, result_set_index=0).result.result_set + logging.debug(str(rs)) + assert len(rs.rows) == 2 + assert len(rs.columns) == 1 + assert rs.rows[0].items[0].bytes_value == messages[0] + assert rs.rows[1].items[0].bytes_value == messages[1] + + rs = client.get_result_data(query_id, result_set_index=1).result.result_set + logging.debug(str(rs)) + assert len(rs.rows) == 2 + assert len(rs.columns) == 1 + assert rs.rows[0].items[0].bytes_value == messages[0] + assert rs.rows[1].items[0].bytes_value == messages[1] + + @yq_v1 + def test_select_sequence(self, kikimr, client): + pytest.skip("does not work as expected, need attention") + self.init_topics("select_sequence", create_output=False) + + create_read_rule(self.input_topic, self.consumer_name) + sql = R''' + PRAGMA pq.Consumer="{consumer_name}"; + SELECT * FROM yds2.`{input_topic}` LIMIT 2; + SELECT * FROM yds2.`{input_topic}` LIMIT 2; + ''' \ + .format( + consumer_name=self.consumer_name, + input_topic=self.input_topic, + ) + + client.create_yds_connection("yds2", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.wait_zero_checkpoint(query_id) + + time.sleep(2) # Workaround race between write and read "from now". Remove when YQ-589 will be done. + messages = ["A", "B", "C", "D", "E"] + self.write_stream(messages) + + client.wait_query(query_id, 30) + + # explicit consumer will be used for each query, so second select will + # fetch different set of next messages + + rs = client.get_result_data(query_id, result_set_index=0).result.result_set + logging.debug(str(rs)) + assert len(rs.rows) == 2 + assert len(rs.columns) == 1 + assert rs.rows[0].items[0].bytes_value == messages[0] + assert rs.rows[1].items[0].bytes_value == messages[1] + + rs = client.get_result_data(query_id, result_set_index=1).result.result_set + logging.debug(str(rs)) + assert len(rs.rows) == 2 + assert len(rs.columns) == 1 + assert rs.rows[0].items[0].bytes_value == messages[2] + assert rs.rows[1].items[0].bytes_value == messages[3] diff --git a/ydb/tests/fq/yds/test_3_selects.py b/ydb/tests/fq/yds/test_3_selects.py new file mode 100644 index 0000000000..37ab65058f --- /dev/null +++ b/ydb/tests/fq/yds/test_3_selects.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging + +import ydb.public.api.protos.draft.fq_pb2 as fq +import ydb.public.api.protos.ydb_value_pb2 as ydb +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 + + +class TestSelects(object): + @yq_v1 + def test_3_selects(self, client): + sql = R''' + SELECT 1 AS SingleColumn; + SELECT "A" AS TextColumn; + SELECT 11 AS Column1, 22 AS Column2; + ''' + + client.create_yds_connection(name="myyds", database_id="FakeDatabaseId") + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + + client.wait_query(query_id, 30) + + rs = client.get_result_data(query_id, result_set_index=0).result.result_set + logging.debug(str(rs)) + assert len(rs.rows) == 1 + assert len(rs.columns) == 1 + assert rs.columns[0].name == "SingleColumn" + assert rs.columns[0].type.type_id == ydb.Type.INT32 + assert rs.rows[0].items[0].int32_value == 1 + + rs = client.get_result_data(query_id, result_set_index=1).result.result_set + logging.debug(str(rs)) + assert len(rs.rows) == 1 + assert len(rs.columns) == 1 + assert rs.columns[0].name == "TextColumn" + assert rs.columns[0].type.type_id == ydb.Type.STRING + assert rs.rows[0].items[0].bytes_value == b"A" + + rs = client.get_result_data(query_id, result_set_index=2).result.result_set + logging.debug(str(rs)) + assert len(rs.rows) == 1 + assert len(rs.columns) == 2 + assert rs.columns[0].name == "Column1" + assert rs.columns[0].type.type_id == ydb.Type.INT32 + assert rs.rows[0].items[0].int32_value == 11 + assert rs.columns[1].name == "Column2" + assert rs.columns[1].type.type_id == ydb.Type.INT32 + assert rs.rows[0].items[1].int32_value == 22 diff --git a/ydb/tests/fq/yds/test_bad_syntax.py b/ydb/tests/fq/yds/test_bad_syntax.py new file mode 100644 index 0000000000..51ac943c84 --- /dev/null +++ b/ydb/tests/fq/yds/test_bad_syntax.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import pytest +import time + +import ydb.public.api.protos.draft.fq_pb2 as fq +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 + + +class TestBadSyntax(TestYdsBase): + @yq_v1 + @pytest.mark.parametrize( + "query_type", + [fq.QueryContent.QueryType.ANALYTICS, fq.QueryContent.QueryType.STREAMING], + ids=["analytics", "streaming"] + ) + @pytest.mark.parametrize( + "after_modify", + [True, False], + ids=["modify", "create"] + ) + @pytest.mark.parametrize( + "with_read_rules", + [True, False], + ids=["with_created_read_rules", "without_created_read_rules"] + ) + def test_bad_syntax(self, kikimr, client, query_type, after_modify, with_read_rules): + if with_read_rules and not after_modify: + return + + correct_sql = "SELECT 42;" + incorrect_sql = "blablabla" + if with_read_rules: + self.init_topics("bad_syntax_{}".format(query_type)) + connection_name = "yds_{}".format(query_type) + sql_with_rr = "INSERT INTO {connection_name}.`{output_topic}` SELECT * FROM {connection_name}.`{input_topic}` LIMIT 1;" \ + .format(connection_name=connection_name, + output_topic=self.output_topic, + input_topic=self.input_topic) + + if after_modify: + if with_read_rules: + client.create_yds_connection(name=connection_name, database_id="FakeDatabaseId") + create_response = client.create_query("q", sql_with_rr, type=query_type) + else: + create_response = client.create_query("q", correct_sql, type=query_type) + else: + create_response = client.create_query("q", incorrect_sql, type=query_type) + + query_id = create_response.result.query_id + assert not create_response.issues, str(create_response.issues) + logging.debug(str(create_response.result)) + + if after_modify: + if with_read_rules: + client.wait_query(query_id, statuses=[fq.QueryMeta.RUNNING]) + if query_type == fq.QueryContent.QueryType.STREAMING: + kikimr.compute_plane.wait_zero_checkpoint(query_id) + else: + time.sleep(3) # TODO: remove it after streaming disposition will be supported + self.write_stream(["A"]) + client.wait_query(query_id, statuses=[fq.QueryMeta.COMPLETED]) + client.modify_query(query_id, "q", incorrect_sql, type=query_type) + client.wait_query(query_id, statuses=[fq.QueryMeta.FAILED]) + else: + client.wait_query(query_id, statuses=[fq.QueryMeta.FAILED]) + + describe_result = client.describe_query(query_id).result + logging.debug("Describe result: {}".format(describe_result)) + describe_string = "{}".format(describe_result) + assert "Internal Error" not in describe_string + assert "Failed to parse query" in describe_string + + @yq_v1 + def test_require_as(self, client): + bad_sql = "SELECT 42 a" + query_id = client.create_query("bad", bad_sql).result.query_id + + client.wait_query(query_id, statuses=[fq.QueryMeta.FAILED]) + + describe_result = client.describe_query(query_id).result + logging.debug("Describe result: {}".format(describe_result)) + describe_string = "{}".format(describe_result) + assert "Expecting mandatory AS here" in describe_string + + @yq_v1 + def test_type_as_column(self, client): + bad_sql = "select max(DateTime) from AS_TABLE([<|x:1|>]);" + query_id = client.create_query("bad", bad_sql).result.query_id + + client.wait_query(query_id, statuses=[fq.QueryMeta.FAILED]) + + describe_result = client.describe_query(query_id).result + logging.debug("Describe result: {}".format(describe_result)) + describe_string = "{}".format(describe_result) + assert "Member not found: DateTime" in describe_string diff --git a/ydb/tests/fq/yds/test_base.py b/ydb/tests/fq/yds/test_base.py new file mode 100644 index 0000000000..6b09f7a702 --- /dev/null +++ b/ydb/tests/fq/yds/test_base.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase + + +class TestBaseWithAbortingConfigParams(TestYdsBase): + + @classmethod + def setup_class(cls): + kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True) + cls.streaming_over_kikimr = StreamingOverKikimr(kikimr_conf) + cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_ttl'] = "2s" + cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy'] = {} + cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy']['retry_count'] = 1 + cls.streaming_over_kikimr.compute_plane.fq_config['pinger']['ping_period'] = "1s" + cls.streaming_over_kikimr.start_mvp_mock_server() + cls.streaming_over_kikimr.start() + + @classmethod + def teardown_class(cls): + if hasattr(cls, "streaming_over_kikimr"): + cls.streaming_over_kikimr.stop_mvp_mock_server() + cls.streaming_over_kikimr.stop() diff --git a/ydb/tests/fq/yds/test_big_state.py b/ydb/tests/fq/yds/test_big_state.py new file mode 100644 index 0000000000..7b86f55e3f --- /dev/null +++ b/ydb/tests/fq/yds/test_big_state.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import time + +import ydb.tests.library.common.yatest_common as yatest_common + +import ydb.public.api.protos.draft.fq_pb2 as fq +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 + + +class TestBigState(TestYdsBase): + @yq_v1 + def test_gt_8mb(self, kikimr, client): + + self.init_topics("select_hop_8mb", create_output=False) + + sql = R''' + $s = SELECT ListConcat(ListReplicate(Data, 9000000)) as Data2 + FROM myyds.`{input_topic}` + WITH SCHEMA (Data String NOT NULL); + + SELECT * from $s + GROUP BY HOP(Just(CurrentUtcTimestamp()), "PT1S", "PT1S", "PT1S"), Data2 + LIMIT 1 + ''' \ + .format( + input_topic=self.input_topic + ) + + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + kikimr.compute_plane.wait_zero_checkpoint(query_id) + + for _ in range(5): + messages = ["A", "B", "C"] + self.write_stream(messages) + time.sleep(1) + + deadline = time.time() + yatest_common.plain_or_under_sanitizer(60, 300) + while time.time() < deadline: + aborted_checkpoints = kikimr.compute_plane.get_checkpoint_coordinator_metric(query_id, "AbortedCheckpoints") + if not aborted_checkpoints: + time.sleep(0.5) + + assert aborted_checkpoints != 0 diff --git a/ydb/tests/fq/yds/test_compression_data/test.json.br b/ydb/tests/fq/yds/test_compression_data/test.json.br Binary files differnew file mode 100644 index 0000000000..ad9f8da80b --- /dev/null +++ b/ydb/tests/fq/yds/test_compression_data/test.json.br diff --git a/ydb/tests/fq/yds/test_compression_data/test.json.bz2 b/ydb/tests/fq/yds/test_compression_data/test.json.bz2 Binary files differnew file mode 100644 index 0000000000..f5b217efb4 --- /dev/null +++ b/ydb/tests/fq/yds/test_compression_data/test.json.bz2 diff --git a/ydb/tests/fq/yds/test_compression_data/test.json.gz b/ydb/tests/fq/yds/test_compression_data/test.json.gz Binary files differnew file mode 100644 index 0000000000..ebd73dc216 --- /dev/null +++ b/ydb/tests/fq/yds/test_compression_data/test.json.gz diff --git a/ydb/tests/fq/yds/test_compression_data/test.json.lz4 b/ydb/tests/fq/yds/test_compression_data/test.json.lz4 Binary files differnew file mode 100644 index 0000000000..f78d6a71ee --- /dev/null +++ b/ydb/tests/fq/yds/test_compression_data/test.json.lz4 diff --git a/ydb/tests/fq/yds/test_compression_data/test.json.xz b/ydb/tests/fq/yds/test_compression_data/test.json.xz Binary files differnew file mode 100644 index 0000000000..588fb1dd01 --- /dev/null +++ b/ydb/tests/fq/yds/test_compression_data/test.json.xz diff --git a/ydb/tests/fq/yds/test_compression_data/test.json.zst b/ydb/tests/fq/yds/test_compression_data/test.json.zst Binary files differnew file mode 100644 index 0000000000..3f0bf6862f --- /dev/null +++ b/ydb/tests/fq/yds/test_compression_data/test.json.zst diff --git a/ydb/tests/fq/yds/test_continue_mode.py b/ydb/tests/fq/yds/test_continue_mode.py new file mode 100644 index 0000000000..cb7768a947 --- /dev/null +++ b/ydb/tests/fq/yds/test_continue_mode.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import logging +import time + +import ydb.public.api.protos.draft.fq_pb2 as fq +import ydb.tests.library.common.yatest_common as yatest_common +from ydb.tests.tools.fq_runner.fq_client import StreamingDisposition +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase +from ydb.tests.tools.datastreams_helpers.data_plane import write_stream, read_stream + + +def assert_issues_contain(text, issues): + for issue in issues: + if text in issue.message: + return + assert False, f"Text [{text}] is expected to be among issues:\n{issues}" + + +class TestContinueMode(TestYdsBase): + @yq_v1 + def test_continue_from_offsets(self, kikimr, client): + client.create_yds_connection(name="yds", database_id="FakeDatabaseId") + + self.init_topics("continue_1", partitions_count=2) + input_topic_1 = self.input_topic + output_topic_1 = self.output_topic + consumer_1 = self.consumer_name + self.init_topics("continue_2", partitions_count=2) + input_topic_2 = self.input_topic + output_topic_2 = self.output_topic + consumer_2 = self.consumer_name + + partition_key = "trololo" + + sql = Rf''' + PRAGMA dq.MaxTasksPerStage="2"; + + INSERT INTO yds.`{output_topic_1}` + SELECT Data FROM yds.`{input_topic_1}`;''' + + query_id = client.create_query("continue-from-offsets-query", sql, + type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(query_id) + + data = ["1", "2"] + write_stream(input_topic_1, data, partition_key=partition_key) + + assert read_stream(output_topic_1, len(data), consumer_name=consumer_1) == data + + kikimr.compute_plane.wait_completed_checkpoints(query_id, + kikimr.compute_plane.get_completed_checkpoints(query_id) + 1) + + client.abort_query(query_id) + client.wait_query(query_id) + + # Write to first topic. + # These offsets wouldn't be fresh but we should get this data in second output topic + # because offsets should be restored from checkpoint + data_2 = ["3", "4"] + write_stream(input_topic_1, data_2, partition_key=partition_key) + + sql2 = Rf''' + PRAGMA dq.MaxTasksPerStage="1"; + + INSERT INTO yds.`{output_topic_2}` + SELECT Data FROM yds.`{input_topic_1}`; + + INSERT INTO yds.`{output_topic_2}` + SELECT Data FROM yds.`{input_topic_2}`;''' + + def assert_has_saved_checkpoints(): + describe_result = client.describe_query(query_id).result + logging.debug("Describe result: {}".format(describe_result)) + assert describe_result.query.meta.has_saved_checkpoints, "Expected has_saved_checkpoints flag: {}".format( + describe_result.query.meta) + + def find_text(issues, text): + for issue in issues: + if text in issue.message: + return True + if len(issue.issues) > 0 and find_text(issue.issues, text): + return True + return False + + def assert_has_issues(text, transient=False, deadline=0): + describe_result = client.describe_query(query_id).result + logging.debug("Describe result: {}".format(describe_result)) + if find_text(describe_result.query.transient_issue if transient else describe_result.query.issue, text): + return True + assert deadline and deadline > time.time(), "Text \"{}\" is expected to be found in{} issues, but was not found. Describe query result:\n{}" \ + .format(text, " transient" if transient else "", describe_result) + return False + + assert_has_saved_checkpoints() + + # 1. Not forced mode. Expect to fail + client.modify_query(query_id, "continue-from-offsets-query", sql2, + type=fq.QueryContent.QueryType.STREAMING, + state_load_mode=fq.StateLoadMode.EMPTY, + streaming_disposition=StreamingDisposition.from_last_checkpoint()) + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + assert_has_issues( + "Topic `continue_2_input` is not found in previous query. Use force mode to ignore this issue") + + # 2. Forced mode. Expect to run. + client.modify_query(query_id, "continue-from-offsets-query", sql2, + type=fq.QueryContent.QueryType.STREAMING, + state_load_mode=fq.StateLoadMode.EMPTY, + streaming_disposition=StreamingDisposition.from_last_checkpoint(True)) + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + transient_issues_deadline = time.time() + yatest_common.plain_or_under_sanitizer(60, 300) + msg = "Topic `continue_2_input` is not found in previous query. Query will use fresh offsets for its partitions" + while True: + if assert_has_issues(msg, True, transient_issues_deadline): + break + else: + time.sleep(0.3) + kikimr.compute_plane.wait_completed_checkpoints(query_id, + kikimr.compute_plane.get_completed_checkpoints(query_id) + 1) + + restored_metric = kikimr.compute_plane.get_checkpoint_coordinator_metric(query_id, + "RestoredStreamingOffsetsFromCheckpoint") + assert restored_metric == 1 + + data_3 = ["5", "6"] + write_stream(input_topic_2, data_3, partition_key=partition_key) + + assert sorted(read_stream(output_topic_2, 4, consumer_name=consumer_2)) == ["3", "4", "5", "6"] + + kikimr.compute_plane.wait_completed_checkpoints(query_id, + kikimr.compute_plane.get_completed_checkpoints(query_id) + 1) + + assert_has_saved_checkpoints() + + # Check that after node failure the query will restore from its usual checkpoint + kikimr.compute_plane.kikimr_cluster.nodes[1].stop() + kikimr.compute_plane.kikimr_cluster.nodes[1].start() + kikimr.compute_plane.wait_bootstrap(1) + + # sleep task lease ttl / 2 + time.sleep(2.5) + + # Wait while graph is restored from checkpoint + n = 100 + while n: + n -= 1 + restored_metric = kikimr.compute_plane.get_checkpoint_coordinator_metric(query_id, + "RestoredFromSavedCheckpoint", + expect_counters_exist=False) + if restored_metric >= 1: + break + time.sleep(0.3) + assert restored_metric >= 1 + + assert_has_saved_checkpoints() + + @yq_v1 + def test_deny_disposition_from_checkpoint_in_create_query(self, client): + client.create_yds_connection(name="yds_create", database_id="FakeDatabaseId") + + self.init_topics("deny_disposition_from_checkpoint_in_create_query") + + sql = Rf''' + PRAGMA dq.MaxTasksPerStage="2"; + + INSERT INTO yds_create.`{self.output_topic}` + SELECT Data FROM yds_create.`{self.input_topic}`;''' + + response = client.create_query( + "deny_disposition_from_checkpoint_in_create_query", + sql, + type=fq.QueryContent.QueryType.STREAMING, + streaming_disposition=StreamingDisposition.from_last_checkpoint(), + check_issues=False) + assert response.issues + assert_issues_contain("Streaming disposition \"from_last_checkpoint\" is not allowed in CreateQuery request", + response.issues) + + @yq_v1 + def test_deny_state_load_mode_from_checkpoint_in_modify_query(self, kikimr, client): + client.create_yds_connection(name="yds_modify", database_id="FakeDatabaseId") + + self.init_topics("deny_state_load_mode_from_checkpoint_in_modify_query") + + sql = Rf''' + PRAGMA dq.MaxTasksPerStage="2"; + + INSERT INTO yds_modify.`{self.output_topic}` + SELECT Data FROM yds_modify.`{self.input_topic}`;''' + + response = client.create_query( + "deny_state_load_mode_from_checkpoint_in_modify_query", + sql, + type=fq.QueryContent.QueryType.STREAMING) + + query_id = response.result.query_id + + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(query_id) + + client.abort_query(query_id) + client.wait_query(query_id) + + response = client.modify_query(query_id, "deny_state_load_mode_from_checkpoint_in_modify_query", sql, + type=fq.QueryContent.QueryType.STREAMING, + state_load_mode=fq.StateLoadMode.FROM_LAST_CHECKPOINT, + streaming_disposition=StreamingDisposition.from_last_checkpoint(), + check_issues=False) + + assert response.issues + assert_issues_contain("State load mode \"FROM_LAST_CHECKPOINT\" is not supported", response.issues) diff --git a/ydb/tests/fq/yds/test_cpu_quota.py b/ydb/tests/fq/yds/test_cpu_quota.py new file mode 100644 index 0000000000..60a133cebf --- /dev/null +++ b/ydb/tests/fq/yds/test_cpu_quota.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import logging +import time + +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase + +import ydb.public.api.protos.draft.fq_pb2 as fq + + +class TestCpuQuota(TestYdsBase): + @yq_v1 + def test_cpu_quota(self, kikimr, client): + self.init_topics("cpu_quota") + + sql = R''' + PRAGMA dq.MaxTasksPerStage="2"; + + INSERT INTO yds.`{output_topic}` + SELECT Unwrap(ListConcat(ListReplicate(Data, 100000))) AS Data + FROM yds.`{input_topic}`;''' \ + .format( + input_topic=self.input_topic, + output_topic=self.output_topic, + ) + + client.create_yds_connection(name="yds", database_id="FakeDatabaseId") + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING, + vcpu_time_limit=1).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(query_id) + + data = [ + 'ABC', + 'DEF', + 'GHI', + ] + self.write_stream(data) + self.read_stream(len(data)) + + # One more time after pause + time.sleep(0.5) + self.write_stream(data) + self.read_stream(len(data)) + + time.sleep(0.5) + sensors = kikimr.compute_plane.get_sensors(1, "dq_tasks") + + def calc_histogram_counter(sensor): + s = 0 + for i in sensor["hist"]["buckets"]: + s += i + s += sensor["hist"]["inf"] + return s + + get_quota_latency = 0 + quota_wait_delay = 0 + + for sensor in sensors.data: + labels = sensor["labels"] + if labels.get("operation") != query_id: + continue + + if labels.get("sensor") == "CpuTimeGetQuotaLatencyMs": + get_quota_latency += calc_histogram_counter(sensor) + if labels.get("sensor") == "CpuTimeQuotaWaitDelayMs": + quota_wait_delay += calc_histogram_counter(sensor) + + assert get_quota_latency > 0 + assert quota_wait_delay > 0 + + client.abort_query(query_id) + client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER) + + logging.debug("Sensors: {}".format(sensors)) diff --git a/ydb/tests/fq/yds/test_delete_read_rules_after_abort_by_system.py b/ydb/tests/fq/yds/test_delete_read_rules_after_abort_by_system.py new file mode 100644 index 0000000000..e37551216d --- /dev/null +++ b/ydb/tests/fq/yds/test_delete_read_rules_after_abort_by_system.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os + +from test_base import TestBaseWithAbortingConfigParams + +from ydb.tests.tools.datastreams_helpers.control_plane import list_read_rules +from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +import ydb.public.api.protos.draft.fq_pb2 as fq + + +class TestDeleteReadRulesAfterAbortBySystem(TestBaseWithAbortingConfigParams): + @yq_v1 + def test_delete_read_rules_after_abort_by_system(self): + topic_name = "read_rules_leaking" + conn = "yds_0" + self.init_topics(topic_name) + + sql = R''' + PRAGMA dq.MaxTasksPerStage="5"; + + INSERT INTO {conn}.`{output_topic}` + SELECT * FROM {conn}.`{input_topic}`;'''\ + .format( + input_topic=self.input_topic, + output_topic=self.output_topic, + conn=conn + ) + + client = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr) + + client.create_yds_connection(conn, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + read_rules = list_read_rules(self.input_topic) + assert len(read_rules) == 1, read_rules + + for _ in range(5): + self.streaming_over_kikimr.compute_plane.stop() + self.streaming_over_kikimr.compute_plane.start() + get_task_count = self.streaming_over_kikimr.control_plane.get_request_count(1, "GetTask") + while get_task_count == self.streaming_over_kikimr.control_plane.get_request_count(1, "GetTask"): + pass + if client.describe_query(query_id).result.query.meta.status == fq.QueryMeta.ABORTED_BY_SYSTEM: + break + else: + assert False, "Query was NOT aborted" + # client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_SYSTEM) + + read_rules = list_read_rules(self.input_topic) + assert len(read_rules) == 0, read_rules diff --git a/ydb/tests/fq/yds/test_eval.py b/ydb/tests/fq/yds/test_eval.py new file mode 100644 index 0000000000..56e6de416c --- /dev/null +++ b/ydb/tests/fq/yds/test_eval.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging + +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 + +import ydb.public.api.protos.draft.fq_pb2 as fq +import ydb.public.api.protos.ydb_value_pb2 as ydb + + +class TestEval(object): + @yq_v1 + def test_eval_2_2(self, client): + sql = "SELECT EvaluateExpr(2+2) AS C1;" + query_id = client.create_query("simple1", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id) + + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 1 + assert result_set.columns[0].name == "C1" + assert result_set.columns[0].type.type_id == ydb.Type.INT32 + assert len(result_set.rows) == 1 + assert result_set.rows[0].items[0].int32_value == 4 diff --git a/ydb/tests/fq/yds/test_mem_alloc.py b/ydb/tests/fq/yds/test_mem_alloc.py new file mode 100644 index 0000000000..8a7deeea43 --- /dev/null +++ b/ydb/tests/fq/yds/test_mem_alloc.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import json +import logging +import os +import pytest +import time + +import ydb.tests.library.common.yatest_common as yatest_common +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase + +from ydb.tests.tools.datastreams_helpers.control_plane import create_stream +from ydb.tests.tools.datastreams_helpers.data_plane import write_stream + +import ydb.public.api.protos.draft.fq_pb2 as fq + + +class TestMemAlloc(TestYdsBase): + @yq_v1 + def test_join_alloc(self, kikimr, client): + pytest.skip("This test is not ready yet") + + self.init_topics("select_join_alloc", create_output=False) + create_stream("joined_topic") + + sql = R''' + SELECT S1.Data as Data1, S2.Data as Data2, ListReplicate(S2.Data, 1000000) as Data20 + FROM myyds.`{input_topic}` AS S1 + INNER JOIN (SELECT * FROM myyds.`{joined_topic}` LIMIT 2) AS S2 + ON S1.Data = S2.Data + LIMIT 2 + ''' \ + .format( + input_topic=self.input_topic, + joined_topic="joined_topic" + ) + + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + + time.sleep(2) # Workaround race between write and read "from now". Remove when YQ-589 will be done. + messages = ["A", "B", "C"] + self.write_stream(messages) + write_stream("joined_topic", messages) + + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + logging.debug(client.get_result_data(query_id)) + st = json.loads(client.describe_query(query_id).result.query.statistics.json) + logging.debug(st["Graph=1"]["TaskRunner"]["Stage=Total"]["MkqlMaxMemoryUsage"]["count"]) + + @yq_v1 + def test_hop_alloc(self, kikimr, client): + pytest.skip("This test is not ready yet") + + self.init_topics("select_hop_alloc", create_output=False) + + sql = R''' + --SELECT COUNT(*) + --FROM myyds.`{input_topic}` + --GROUP BY HOP(Just(CurrentUtcTimestamp()), "PT10S", "PT10S", "PT10S"), Data + --LIMIT 1 + SELECT 1 + ''' \ + .format( + input_topic=self.input_topic, + ) + + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + + time.sleep(2) # Workaround race between write and read "from now". Remove when YQ-589 will be done. + for i in range(1): + self.write_stream([format(i, "0x")]) + time.sleep(yatest_common.plain_or_under_sanitizer(15, 60)) + + client.abort_query(query_id) + client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER) + + logging.debug(client.get_result_data(query_id)) + st = json.loads(client.describe_query(query_id).result.query.statistics.json) + logging.debug(st["Graph=1"]["TaskRunner"]["Stage=Total"]["MkqlMaxMemoryUsage"]["count"]) diff --git a/ydb/tests/fq/yds/test_metrics_cleanup.py b/ydb/tests/fq/yds/test_metrics_cleanup.py new file mode 100644 index 0000000000..2858dc834c --- /dev/null +++ b/ydb/tests/fq/yds/test_metrics_cleanup.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import time + +import ydb.tests.library.common.yatest_common as yatest_common +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase + +import ydb.public.api.protos.draft.fq_pb2 as fq + + +class TestCleanup(TestYdsBase): + @yq_v1 + def test_cleanup(self, kikimr, client): + sql = "SELECT 1;" + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + assert kikimr.compute_plane.get_task_count(1, query_id) == 0 + deadline = time.time() + yatest_common.plain_or_under_sanitizer(120, 500) + while True: + value = kikimr.compute_plane.get_sensors(1, "yq").find_sensor( + {"query_id": query_id, "subsystem": "task_controller", "Stage": "Total", "sensor": "TaskCount"}) + if value is None: + break + assert time.time() < deadline, "TaskCount was not cleaned" + time.sleep(0.5) + + @yq_v1 + def test_keep(self, kikimr, client): + self.init_topics("cleanup_keep", create_output=False) + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + + query_id = client.create_query("simple", "SELECT 1;", type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + sql = R''' + SELECT * FROM myyds.`{input_topic}`; + ''' \ + .format( + input_topic=self.input_topic, + ) + client.modify_query(query_id, "simple", sql, type=fq.QueryContent.QueryType.STREAMING) + + deadline = time.time() + 90 # x1.5 of 60 sec + while True: + value = kikimr.compute_plane.get_sensors(1, "yq").find_sensor( + {"query_id": query_id, "subsystem": "task_controller", "Stage": "Total", "sensor": "TaskCount"}) + assert value is not None, "TaskCount was cleaned" + if time.time() > deadline: + break + else: + time.sleep(0.5) diff --git a/ydb/tests/fq/yds/test_pq_read_write.py b/ydb/tests/fq/yds/test_pq_read_write.py new file mode 100644 index 0000000000..1ee91ca138 --- /dev/null +++ b/ydb/tests/fq/yds/test_pq_read_write.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase + +from ydb.tests.tools.datastreams_helpers.control_plane import list_read_rules + +import ydb.public.api.protos.draft.fq_pb2 as fq + +YDS_CONNECTION = "yds" + + +def start_yds_query(kikimr, client, sql) -> str: + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(query_id) + return query_id + + +def stop_yds_query(client, query_id): + client.abort_query(query_id) + client.wait_query(query_id) + + +class TestPqReadWrite(TestYdsBase): + @yq_v1 + def test_pq_read_write(self, kikimr, client): + client.create_yds_connection(name=YDS_CONNECTION, database_id="FakeDatabaseId") + self.init_topics("pq_test_pq_read_write") + sql = Rf''' + PRAGMA dq.MaxTasksPerStage="2"; + + INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` + SELECT STREAM + Yson::SerializeText(Yson::From(TableRow())) + FROM ( + SELECT STREAM + k, + Sum(v) as sum + FROM ( + SELECT STREAM + Yson::LookupUint64(ys, "time") as t, + Yson::LookupInt64(ys, "key") as k, + Yson::LookupInt64(ys, "val") as v + FROM ( + SELECT STREAM + Yson::Parse(Data) AS ys + FROM {YDS_CONNECTION}.`{self.input_topic}`)) + GROUP BY + k, + HOP(DateTime::FromMilliseconds(CAST(Unwrap(t) as Uint32)), "PT0.005S", "PT0.01S", "PT0.01S"));''' + + query_id = start_yds_query(kikimr, client, sql) + + # 100 105 110 115 120 125 130 135 140 (ms) + # [ Bucket1 )<--Delay1--> + # [ Bucket2 )<--Delay2--> + # [ Bucket3 )<--Delay3--> + # [ Bucket4 )<--Delay4--> + # [ Bucket5 )<--Delay5--> + data = [ + '{"time" = 105; "key" = 1; "val" = 1;}', # Group 1 Bucket 1, 2 + '{"time" = 107; "key" = 1; "val" = 4;}', # Group 1 Bucket 1, 2 + '{"time" = 106; "key" = 2; "val" = 3;}', # Group 2 Bucket 1, 2 + '{"time" = 111; "key" = 1; "val" = 7;}', # Group 1 Bucket 2, 3 + '{"time" = 117; "key" = 1; "val" = 3;}', # Group 1 Bucket 3, 4 + '{"time" = 110; "key" = 2; "val" = 2;}', # Group 2 Bucket 2, 3 + '{"time" = 108; "key" = 1; "val" = 9;}', # Group 1 Bucket 1, 2 (delayed) + '{"time" = 121; "key" = 1; "val" = 4;}', # Group 1 Bucket 4, 5 (close bucket 1) + '{"time" = 107; "key" = 2; "val" = 2;}', # Group 2 Bucket 1, 2 (delayed) + '{"time" = 141; "key" = 2; "val" = 5;}', # Group 2 Close all buckets + '{"time" = 141; "key" = 1; "val" = 10;}', # Group 1 Close all buckets + ] + + self.write_stream(data) + + expected = [ + '{"k" = 1; "sum" = 14}', # Group 1 Bucket 1 + '{"k" = 2; "sum" = 3}', # Group 2 Bucket 1 + '{"k" = 2; "sum" = 7}', # Group 2 Bucket 2 + '{"k" = 2; "sum" = 2}', # Group 2 Bucket 3 + '{"k" = 1; "sum" = 21}', # Group 1 Bucket 2 + '{"k" = 1; "sum" = 10}', # Group 1 Bucket 3 + '{"k" = 1; "sum" = 7}', # Group 1 Bucket 4 + '{"k" = 1; "sum" = 4}', # Group 1 Bucket 5 + ] + + assert self.read_stream(len(expected)) == expected + + read_rules = list_read_rules(self.input_topic) + assert len(read_rules) == 1, read_rules + + stop_yds_query(client, query_id) + + # Assert that all read rules were removed after query stops + read_rules = list_read_rules(self.input_topic) + assert len(read_rules) == 0, read_rules + + @yq_v1 + def test_pq_read_schema_metadata(self, kikimr, client): + client.create_yds_connection(name=YDS_CONNECTION, database_id="FakeDatabaseId") + self.init_topics("pq_test_pq_read_schema_metadata") + sql = Rf''' + PRAGMA dq.MaxTasksPerStage="2"; + + INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` + SELECT UNWRAP(Yson::SerializeJson(Yson::From(TableRow()))) + FROM ( + SELECT field1, field2, SystemMetadata("offset") as field3 + FROM {YDS_CONNECTION}.`{self.input_topic}` + WITH ( + format=json_each_row, + SCHEMA ( + field1 String, + field2 Int32 + ) + ) + )''' + + query_id = start_yds_query(kikimr, client, sql) + + data1 = [ + '{"field1": "value1", "field2": 105}', + '{"field1": "value2", "field2": 106}', + ] + self.write_stream(data1) + + expected = [ + '{"field1":"value1","field2":105,"field3":0}', + '{"field1":"value2","field2":106,"field3":1}' + ] + + assert self.read_stream(len(expected)) == expected + + stop_yds_query(client, query_id) diff --git a/ydb/tests/fq/yds/test_public_metrics.py b/ydb/tests/fq/yds/test_public_metrics.py new file mode 100644 index 0000000000..b4edbda818 --- /dev/null +++ b/ydb/tests/fq/yds/test_public_metrics.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import pytest +import time + +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase + +import ydb.public.api.protos.draft.fq_pb2 as fq + + +class TestPublicMetrics(TestYdsBase): + @yq_v1 + def test_select_limit(self, kikimr, client): + self.init_topics("public_select_limit", create_output=False) + + sql = R''' + SELECT * FROM myyds1.`{input_topic}` LIMIT 2; + '''.format( + input_topic=self.input_topic, + ) + + cloud_id = "mock_cloud" + folder_id = "my_folder" + + client.create_yds_connection("myyds1", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(query_id) + + self.write_stream(["A", "B"]) + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + metrics = kikimr.compute_plane.get_sensors(1, "yq_public") + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.running_tasks"}) == 0 + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.cpu_usage_us"}) > 0 + assert metrics.find_sensor({"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, + "name": "query.memory_usage_bytes"}) > 0 + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.input_bytes"}) > 0 + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.output_bytes"}) is None + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.uptime_seconds"}) >= 0 + + @yq_v1 + @pytest.mark.parametrize("stats_mode", ["STATS_MODE_FULL"]) + def test_select_unlimited(self, kikimr, client): + self.init_topics("public_select_unlimited") + + sql = R''' + INSERT INTO myyds2.`{output_topic}` + SELECT * FROM myyds2.`{input_topic}`; + '''.format( + input_topic=self.input_topic, + output_topic=self.output_topic, + ) + + cloud_id = "mock_cloud" + folder_id = "my_folder" + + client.create_yds_connection("myyds2", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(query_id) + + data = ["A", "B", "C", "D"] + self.write_stream(data) + assert self.read_stream(len(data)) == data + + time.sleep(5) # TODO: fix and remove + client.abort_query(query_id) + client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER) + + metrics = kikimr.compute_plane.get_sensors(1, "yq_public") + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.running_tasks"}) == 0 + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.cpu_usage_us"}) > 0 + assert metrics.find_sensor({"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, + "name": "query.memory_usage_bytes"}) > 0 + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.input_bytes"}) > 0 + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.output_bytes"}) > 0 + assert metrics.find_sensor( + {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.uptime_seconds"}) >= 0 + # TODO: rows must be fixed in YQ-2592 + # assert metrics.find_sensor({"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, + # "name": "query.source_input_records"}) > 0 + # assert metrics.find_sensor({"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, + # "name": "query.sink_output_records"}) > 0 + # assert metrics.find_sensor( + # {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.late_events"}) == 0 diff --git a/ydb/tests/fq/yds/test_read_rules_deletion.py b/ydb/tests/fq/yds/test_read_rules_deletion.py new file mode 100644 index 0000000000..da4678887c --- /dev/null +++ b/ydb/tests/fq/yds/test_read_rules_deletion.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import pytest + +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase + +from ydb.tests.tools.datastreams_helpers.control_plane import list_read_rules +import ydb.tests.library.common.yatest_common as yatest_common +import ydb.public.api.protos.draft.fq_pb2 as fq + + +class TestReadRulesDeletion(TestYdsBase): + @yq_v1 + @pytest.mark.parametrize( + "with_recovery", + [False, True], + ids=["simple", "with_recovery"] + ) + def test_delete_read_rules(self, kikimr, client, with_recovery): + topic_name = "delete_read_rules_{}".format(1 if with_recovery else 0) + conn = "yds_{}".format(1 if with_recovery else 0) + self.init_topics(topic_name) + + sql = R''' + PRAGMA dq.MaxTasksPerStage="5"; + + INSERT INTO {conn}.`{output_topic}` + SELECT * FROM {conn}.`{input_topic}`;'''.format( + input_topic=self.input_topic, + output_topic=self.output_topic, + conn=conn + ) + + client.create_yds_connection(conn, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(query_id) + + data = [ + '42', + '42', + '42', + '42', + '42', + '42', + '42', + ] + + self.write_stream(data) + + self.read_stream(len(data)) + + read_rules = list_read_rules(self.input_topic) + assert len(read_rules) == 1, read_rules + + if with_recovery: + kikimr.compute_plane.kikimr_cluster.nodes[1].stop() + kikimr.compute_plane.kikimr_cluster.nodes[1].start() + kikimr.compute_plane.wait_bootstrap(1) + + client.abort_query(query_id) + client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER, + timeout=yatest_common.plain_or_under_sanitizer(60, 300)) + + # Assert that all read rules were removed after query stops + read_rules = list_read_rules(self.input_topic) + assert len(read_rules) == 0, read_rules diff --git a/ydb/tests/fq/yds/test_restart_query.py b/ydb/tests/fq/yds/test_restart_query.py new file mode 100644 index 0000000000..c1cdc4edf6 --- /dev/null +++ b/ydb/tests/fq/yds/test_restart_query.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import logging +import pytest +import time + +import ydb.public.api.protos.draft.fq_pb2 as fq +import ydb.tests.library.common.yatest_common as yatest_common +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +from ydb.tests.tools.datastreams_helpers.control_plane import delete_stream + + +class TestRestartQuery(TestYdsBase): + @yq_v1 + @pytest.mark.parametrize( + "query_type", + [fq.QueryContent.QueryType.ANALYTICS, fq.QueryContent.QueryType.STREAMING], + ids=["analytics", "streaming"] + ) + def test_restart_runtime_errors(self, kikimr_many_retries, client_many_retries, query_type): + streaming_query = query_type == fq.QueryContent.QueryType.STREAMING + unique_suffix = "_1" if streaming_query else "_2" + + connection_name = "yds" + unique_suffix + client_many_retries.create_yds_connection(name=connection_name, database_id="FakeDatabaseId") + + self.init_topics("restart" + unique_suffix) + + sql = Rf''' + INSERT INTO {connection_name}.`{self.output_topic}` + SELECT Data FROM {connection_name}.`{self.input_topic}`;''' + + query_id = client_many_retries.create_query("restart-query", sql, type=query_type).result.query_id + client_many_retries.wait_query_status(query_id, fq.QueryMeta.RUNNING) + if streaming_query: + kikimr_many_retries.compute_plane.wait_zero_checkpoint(query_id) + else: + # this ugly delay is needed to wait for ca source init until we pass correct read disposition param + time.sleep(3) + + # Start work + data = ["data"] + self.write_stream(["data"]) + assert self.read_stream(len(data)) == data + + delete_stream(self.output_topic) + + def assert_has_transient_issues(text, deadline): + describe_result = client_many_retries.describe_query(query_id).result + logging.debug("Describe result: {}".format(describe_result)) + for issue in describe_result.query.transient_issue: + if text in issue.message: + return True + assert deadline and deadline > time.time(), f'Text "{text}" is expected to be found in transient issues, but was not found. Describe query result:\n{describe_result}' + return False + + deadline = time.time() + yatest_common.plain_or_under_sanitizer(60, 300) + while not assert_has_transient_issues("SCHEME_ERROR", deadline): + time.sleep(0.3) + + describe_result = client_many_retries.describe_query(query_id).result + assert describe_result.query.meta.status == fq.QueryMeta.RUNNING diff --git a/ydb/tests/fq/yds/test_select_1.py b/ydb/tests/fq/yds/test_select_1.py new file mode 100644 index 0000000000..6c43b72ffa --- /dev/null +++ b/ydb/tests/fq/yds/test_select_1.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging + +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1, yq_all + +import ydb.public.api.protos.draft.fq_pb2 as fq +import ydb.public.api.protos.ydb_value_pb2 as ydb + + +class TestSelect1(object): + @yq_all + def test_select_1(self, kikimr, client): + sql = 'SELECT 1 AS SingleColumn;' + query_id = client.create_query("simple1", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + describe_result = client.describe_query(query_id).result + + assert len(describe_result.query.plan.json) > 0, "plan must not be empty" + assert len(describe_result.query.ast.data) > 0, "ast must not be empty" + + data = client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 1 + assert result_set.columns[0].name == "SingleColumn" + assert result_set.columns[0].type.type_id == ydb.Type.INT32 + assert len(result_set.rows) == 1 + assert result_set.rows[0].items[0].int32_value == 1 + assert sum(kikimr.control_plane.get_metering()) == 10 + + @yq_all + def test_select_z_x_y(self, client): + sql = "select 1 as z, 2 as x, 3 as y;" + query_id = client.create_query("simple2", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id) + + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 3 + assert result_set.columns[0].name == "z" + assert result_set.columns[1].name == "x" + assert result_set.columns[2].name == "y" + assert len(result_set.rows) == 1 + assert result_set.rows[0].items[0].int32_value == 1 + assert result_set.rows[0].items[1].int32_value == 2 + assert result_set.rows[0].items[2].int32_value == 3 + + @yq_v1 + def test_unwrap_null(self, client): + sql = "select unwrap(1/0);" + query_id = client.create_query("simple3", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + describe_result = client.describe_query(query_id).result + assert "Failed to unwrap empty optional" in describe_result.query.issue[0].issues[0].message + + @yq_all + def test_select_pg(self, client): + sql = R'''select ARRAY[ARRAY[1,2,3]], null, 'null', 1, true, null::int4''' + + query_id = client.create_query("simple4", sql, type=fq.QueryContent.QueryType.ANALYTICS, + pg_syntax=True).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id) + + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 6 + assert result_set.columns[0].name == "column0" + assert result_set.columns[0].type.pg_type.oid == 1007 + assert result_set.columns[0].type.pg_type.typlen == -1 + assert result_set.columns[0].type.pg_type.typmod == -1 + assert result_set.columns[1].name == "column1" + assert result_set.columns[1].type.null_type is not None + assert result_set.columns[2].name == "column2" + assert result_set.columns[2].type.pg_type.oid == 25 + assert result_set.columns[2].type.pg_type.typlen == -1 + assert result_set.columns[2].type.pg_type.typmod == -1 + assert result_set.columns[3].name == "column3" + assert result_set.columns[3].type.pg_type.oid == 23 + assert result_set.columns[3].type.pg_type.typlen == 4 + assert result_set.columns[3].type.pg_type.typmod == -1 + assert result_set.columns[4].name == "column4" + assert result_set.columns[4].type.pg_type.oid == 16 + assert result_set.columns[4].type.pg_type.typlen == 1 + assert result_set.columns[4].type.pg_type.typmod == -1 + assert result_set.columns[5].name == "column5" + assert result_set.columns[5].type.pg_type.oid == 23 + assert result_set.columns[5].type.pg_type.typlen == 4 + assert result_set.columns[5].type.pg_type.typmod == -1 + + assert len(result_set.rows) == 1 + assert result_set.rows[0].items[0].text_value == "{{1,2,3}}" + assert result_set.rows[0].items[1].WhichOneof("value") is None + assert result_set.rows[0].items[2].text_value == "null" + assert result_set.rows[0].items[3].text_value == "1" + assert result_set.rows[0].items[4].text_value == "t" + assert result_set.rows[0].items[5].WhichOneof("value") is None + + @yq_all + def test_select_10_p_19_plus_1(self, client): + sql = "SELECT 10000000000000000000+1 AS LargeColumn;" + query_id = client.create_query("simple1", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + describe_string = str(client.describe_query(query_id).result) + assert "Integral type implicit bitcast: Uint64 and Int32" in describe_string, describe_string + + @yq_all + def test_compile_error(self, client, yq_version): + sql = "SUPERSELECT;" + query_id = client.create_query("simple1", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + describe_string = str(client.describe_query(query_id).result) + assert "Query failed with code " + ("ABORTED" if yq_version == "v1" else "GENERIC_ERROR") in describe_string, describe_string + assert "Unexpected token" in describe_string, describe_string + # Failed to parse query is added in YQv1 only + if yq_version == "v1": + assert "Failed to parse query" in describe_string, describe_string + + @yq_all + def test_ast_in_failed_query(self, client): + sql = "SELECT unwrap(1 / 0)" + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + + ast = str(client.describe_query(query_id).result.query.ast) + assert ast != "", "Query ast not found" diff --git a/ydb/tests/fq/yds/test_select_limit.py b/ydb/tests/fq/yds/test_select_limit.py new file mode 100644 index 0000000000..ce5dd8ffe6 --- /dev/null +++ b/ydb/tests/fq/yds/test_select_limit.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import os + +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +from ydb.tests.tools.datastreams_helpers.control_plane import list_read_rules + +import ydb.public.api.protos.draft.fq_pb2 as fq + + +class TestSelectLimit(TestYdsBase): + @yq_v1 + def test_select_limit(self, kikimr, client): + self.init_topics("select_limit", create_output=False) + + sql = R''' + SELECT * FROM myyds.`{input_topic}` LIMIT 2; + '''.format( + input_topic=self.input_topic, + ) + + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(query_id) + + messages = [b'A', b'B', b'C'] + self.write_stream(messages) + + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = client.get_result_data(query_id) + + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.rows) == 2 + assert len(result_set.columns) == 1 + assert result_set.rows[0].items[0].bytes_value == messages[0] + assert result_set.rows[1].items[0].bytes_value == messages[1] + + # Assert that all read rules were removed after query stops + read_rules = list_read_rules(self.input_topic) + assert len(read_rules) == 0, read_rules diff --git a/ydb/tests/fq/yds/test_select_limit_db_id.py b/ydb/tests/fq/yds/test_select_limit_db_id.py new file mode 100644 index 0000000000..d8b75b7577 --- /dev/null +++ b/ydb/tests/fq/yds/test_select_limit_db_id.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import pytest + +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 + +import ydb.public.api.protos.draft.fq_pb2 as fq + + +class TestSelectLimitWithDbId(TestYdsBase): + @yq_v1 + def test_select_same_with_id(self, kikimr, client): + pytest.skip("Skip until streaming disposition is implemented YQ-589") + + self.init_topics("select_same_with_id", create_output=False) + + sql = R''' + SELECT * FROM yds1.`{input_topic}` LIMIT 2; + SELECT * FROM yds1.`{input_topic}` LIMIT 2; + '''.format( + input_topic=self.input_topic, + ) + + client.create_yds_connection(name="yds1", database_id="FakeDatabaseId") + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.wait_zero_checkpoint(query_id) + + messages = ["A", "B", "C", "D", "E"] + self.write_stream(messages) + + client.wait_query(query_id, 30) + + # by default selects use different consumers, so they will read the same + # messages - first 2 of them + + rs = client.get_result_data(query_id, result_set_index=0).result.result_set + logging.debug(str(rs)) + assert len(rs.rows) == 2 + assert len(rs.columns) == 1 + assert rs.rows[0].items[0].bytes_value == messages[0] + assert rs.rows[1].items[0].bytes_value == messages[1] + + rs = client.get_result_data(query_id, result_set_index=1).result.result_set + logging.debug(str(rs)) + assert len(rs.rows) == 2 + assert len(rs.columns) == 1 + assert rs.rows[0].items[0].bytes_value == messages[0] + assert rs.rows[1].items[0].bytes_value == messages[1] diff --git a/ydb/tests/fq/yds/test_select_timings.py b/ydb/tests/fq/yds/test_select_timings.py new file mode 100644 index 0000000000..2f62fdb639 --- /dev/null +++ b/ydb/tests/fq/yds/test_select_timings.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import pytest +import os +import time + +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +import ydb.tests.library.common.yatest_common as yatest_common +import ydb.public.api.protos.draft.fq_pb2 as fq + + +class TestSelectTimings(TestYdsBase): + @yq_v1 + @pytest.mark.parametrize( + "success", + [True, False], + ids=["finished", "aborted"] + ) + @pytest.mark.parametrize( + "query_type", + [fq.QueryContent.QueryType.ANALYTICS, fq.QueryContent.QueryType.STREAMING], + ids=["analytics", "streaming"] + ) + def test_select_timings(self, kikimr, client, success, query_type): + suffix = (str(query_type)[0] + str(success)[0]).lower() + self.init_topics("select_timings_" + suffix, create_output=False) + connection = "myyds_" + suffix + + # TBD auto-create consumer and read_rule in analytics query + sql = "SELECT * FROM {connection}.`{input_topic}` LIMIT 1;".format( + connection=connection, + input_topic=self.input_topic, + ) + + client.create_yds_connection(connection, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=query_type).result.query_id + + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + + meta = client.describe_query(query_id).result.query.meta + assert meta.started_at.seconds != 0, "Must set started_at" + assert meta.finished_at.seconds == 0, "Must not set finished_at" + seconds = meta.started_at.seconds + + # streaming query should continue after cluster restart + if query_type == fq.QueryContent.QueryType.STREAMING: + kikimr.compute_plane.kikimr_cluster.nodes[1].stop() + kikimr.compute_plane.kikimr_cluster.nodes[1].start() + kikimr.compute_plane.wait_bootstrap(1) + + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(query_id, expect_counters_exist=False) + + meta = client.describe_query(query_id).result.query.meta + assert meta.started_at.seconds == seconds, "Must not change started_at" + assert meta.finished_at.seconds == 0, "Must not set finished_at" + + # will wait 30 sec for lease expiration, reduce timeout when is congifurable + wait_query_timeout = yatest_common.plain_or_under_sanitizer(120, 300) + if success: + time.sleep(2) # Workaround race between write and read "from now". Remove when YQ-589 will be done. + self.write_stream(["Message"]) + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED, timeout=wait_query_timeout) + else: + client.abort_query(query_id) + client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER, timeout=wait_query_timeout) + + meta = client.describe_query(query_id).result.query.meta + assert meta.started_at.seconds == seconds, "Must not change started_at (meta={})".format(meta) + assert meta.finished_at.seconds >= seconds, "Must set finished_at (meta={})".format(meta) diff --git a/ydb/tests/fq/yds/test_stop.py b/ydb/tests/fq/yds/test_stop.py new file mode 100644 index 0000000000..b9074f6dd1 --- /dev/null +++ b/ydb/tests/fq/yds/test_stop.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import os +import pytest +import time + +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 + +import ydb.tests.library.common.yatest_common as yatest_common +from ydb.tests.tools.datastreams_helpers.control_plane import create_read_rule, list_read_rules + +import ydb.public.api.protos.draft.fq_pb2 as fq + + +class TestStop(TestYdsBase): + @yq_v1 + @pytest.mark.parametrize( + "query_type", + [fq.QueryContent.QueryType.ANALYTICS, fq.QueryContent.QueryType.STREAMING], + ids=["analytics", "streaming"] + ) + def test_stop_query(self, kikimr, client, query_type): + self.init_topics("select_stop_" + str(query_type), create_output=False) + + connection = "myyds_" + str(query_type) + # TBD auto-create consumer and read_rule in analytics query + if type == fq.QueryContent.QueryType.STREAMING: + sql = R''' + SELECT * FROM {connection}.`{input_topic}` LIMIT 3; + '''.format( + connection=connection, + input_topic=self.input_topic, + ) + else: + create_read_rule(self.input_topic, self.consumer_name) + sql = R''' + PRAGMA pq.Consumer="{consumer_name}"; + SELECT * FROM {connection}.`{input_topic}` LIMIT 3; + '''.format( + consumer_name=self.consumer_name, + connection=connection, + input_topic=self.input_topic, + ) + + assert self.wait_until((lambda: kikimr.compute_plane.get_actor_count(1, "YQ_PINGER") == 0)) + + client.create_yds_connection(connection, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=query_type).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + + assert self.wait_until((lambda: kikimr.compute_plane.get_actor_count(1, "YQ_PINGER") == 1)) + + time.sleep(2) # Workaround race between write and read "from now". Remove when YQ-589 will be done. + messages = ["A", "B"] + self.write_stream(messages) + + deadline = time.time() + yatest_common.plain_or_under_sanitizer(30, 150) + while True: + if time.time() > deadline: + break + describe_response = client.describe_query(query_id) + status = describe_response.result.query.meta.status + assert status in [fq.QueryMeta.STARTING, fq.QueryMeta.RUNNING], fq.QueryMeta.ComputeStatus.Name(status) + + client.abort_query(query_id) + + client.wait_query(query_id) + + # TODO: remove this if + if type == fq.QueryContent.QueryType.STREAMING: + # Assert that all read rules were removed after query stops + read_rules = list_read_rules(self.input_topic) + assert len(read_rules) == 0, read_rules + + describe_response = client.describe_query(query_id) + status = describe_response.result.query.meta.status + assert status == fq.QueryMeta.ABORTED_BY_USER, fq.QueryMeta.ComputeStatus.Name(status) + + assert self.wait_until((lambda: kikimr.compute_plane.get_actor_count(1, "YQ_PINGER") == 0)) + + # TBD implement and check analytics partial results as well + if type == fq.QueryContent.QueryType.STREAMING: + data = client.get_result_data(query_id) + + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.rows) == 2 + assert len(result_set.columns) == 1 + assert result_set.rows[0].items[0].bytes_value == messages[0] + assert result_set.rows[1].items[0].bytes_value == messages[1] diff --git a/ydb/tests/fq/yds/test_watermarks.py b/ydb/tests/fq/yds/test_watermarks.py new file mode 100644 index 0000000000..e9488c2075 --- /dev/null +++ b/ydb/tests/fq/yds/test_watermarks.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import time + +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase +from ydb.tests.tools.fq_runner.fq_client import StreamingDisposition + +import ydb.public.api.protos.draft.fq_pb2 as fq + + +def start_yds_query(kikimr, client, sql, streaming_disposition) -> str: + query_id = client.create_query("simple", sql, streaming_disposition=streaming_disposition, + type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(query_id) + return query_id + + +def stop_yds_query(client, query_id): + client.abort_query(query_id) + client.wait_query(query_id) + + +class TestWatermarks(TestYdsBase): + @yq_v1 + def test_pq_watermarks(self, kikimr, client): + client.create_yds_connection(name="yds", database_id="FakeDatabaseId") + self.init_topics("pq_test_pq_watermarks") + + sql = Rf''' + PRAGMA dq.WatermarksMode="default"; + PRAGMA dq.WatermarksGranularityMs="500"; + PRAGMA dq.WatermarksLateArrivalDelayMs="100"; + + INSERT INTO yds.`{self.output_topic}` + SELECT STREAM + Yson::SerializeText(Yson::From(TableRow())) + FROM ( + SELECT AGGREGATE_LIST(field1) as data + FROM (SELECT field1 + FROM yds.`{self.input_topic}` + WITH ( + format=json_each_row, + schema=( + field1 String, + field2 String + ) + ) + WHERE field2 == "abc1" + ) + GROUP BY HoppingWindow("PT0.5S", "PT1S") + );''' + + query_id = start_yds_query(kikimr, client, sql, streaming_disposition=StreamingDisposition.fresh()) + + data1 = [ + '{"field1": "row1", "field2": "abc1"}', + '{"field1": "row2", "field2": "abc2"}', + ] + self.write_stream(data1) + + time.sleep(2) # wait for the write time in lb is moved forward + + data2 = [ + '{"field1": "row3", "field2": "abc3"}', + ] + self.write_stream(data2) + + expected = [ + '{"data" = ["row1"]}', + '{"data" = ["row1"]}' + ] + + assert self.read_stream(len(expected)) == expected + + stop_yds_query(client, query_id) + + @yq_v1 + def test_idle_watermarks(self, kikimr, client): + client.create_yds_connection(name="yds", database_id="FakeDatabaseId") + self.init_topics("pq_test_idle_watermarks") + + sql = Rf''' + PRAGMA dq.WatermarksMode="default"; + PRAGMA dq.WatermarksGranularityMs="500"; + PRAGMA dq.WatermarksLateArrivalDelayMs="2000"; + PRAGMA dq.WatermarksEnableIdlePartitions="true"; + + INSERT INTO yds.`{self.output_topic}` + SELECT STREAM + Yson::SerializeText(Yson::From(TableRow())) + FROM ( + SELECT AGGREGATE_LIST(field1) as data + FROM (SELECT field1 + FROM yds.`{self.input_topic}` + WITH ( + format=json_each_row, + schema=( + field1 String, + field2 String + ) + ) + WHERE field2 == "abc1" + ) + GROUP BY HoppingWindow("PT0.5S", "PT1S") + );''' + + query_id = start_yds_query(kikimr, client, sql, streaming_disposition=StreamingDisposition.fresh()) + + data1 = [ + '{"field1": "row1", "field2": "abc1"}', + ] + self.write_stream(data1) + + expected = [ + '{"data" = ["row1"]}', + '{"data" = ["row1"]}' + ] + + assert self.read_stream(len(expected)) == expected + + stop_yds_query(client, query_id) diff --git a/ydb/tests/fq/yds/test_yds_bindings.py b/ydb/tests/fq/yds/test_yds_bindings.py new file mode 100644 index 0000000000..4b64cd6482 --- /dev/null +++ b/ydb/tests/fq/yds/test_yds_bindings.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import pytest + +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase + +import ydb.public.api.protos.ydb_value_pb2 as ydb +import ydb.public.api.protos.draft.fq_pb2 as fq + + +class TestBindings(TestYdsBase): + @yq_v1 + @pytest.mark.skip(reason="Is not implemented in YDS yet") + def test_yds_insert(self, client): + self.init_topics("yds_insert") + + connection_id = client.create_yds_connection("ydsconnection", os.getenv("YDB_DATABASE"), + os.getenv("YDB_ENDPOINT")).result.connection_id + + foo_type = ydb.Column(name="foo", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32)) + bar_type = ydb.Column(name="bar", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8)) + client.create_yds_binding(name="ydsbinding", + stream=self.input_topic, + format="json_each_row", + connection_id=connection_id, + columns=[foo_type, bar_type]) + + sql = R''' + insert into bindings.`ydsbinding` + select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]); + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + sql = R''' + select foo, bar from bindings.`ydsbinding` limit 2; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = client.get_result_data(query_id) + result_set = data.result.result_set + assert len(result_set.columns) == 2 + assert result_set.columns[0].name == "foo" + assert result_set.columns[0].type.type_id == ydb.Type.INT32 + assert result_set.columns[1].name == "bar" + assert result_set.columns[1].type.type_id == ydb.Type.UTF8 + assert len(result_set.rows) == 2 + assert result_set.rows[0].items[0].int32_value == 123 + assert result_set.rows[0].items[1].text_value == 'xxx' + assert result_set.rows[1].items[0].int32_value == 456 + assert result_set.rows[1].items[1].text_value == 'yyy' diff --git a/ydb/tests/fq/yds/test_yq_streaming.py b/ydb/tests/fq/yds/test_yq_streaming.py new file mode 100644 index 0000000000..3c9a7d5067 --- /dev/null +++ b/ydb/tests/fq/yds/test_yq_streaming.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import os +import time + +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase + +import ydb.public.api.protos.draft.fq_pb2 as fq +import ydb.public.api.protos.ydb_value_pb2 as ydb_value + + +class TestYqStreaming(TestYdsBase): + @yq_v1 + def test_yq_streaming(self, kikimr, client, yq_version): + self.init_topics(f"pq_yq_streaming_{yq_version}") + + sql = R''' + PRAGMA dq.MaxTasksPerStage="2"; + INSERT INTO myyds.`{output_topic}` + SELECT STREAM (Data || Data) ?? "" As Data FROM myyds.`{input_topic}`; + ''' \ + .format( + input_topic=self.input_topic, + output_topic=self.output_topic, + ) + + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(query_id) + + data = [ + '{"Data" = "hello";}', + ] + self.write_stream(data) + + expected = [ + '{"Data" = "hello";}{"Data" = "hello";}' + ] + assert self.read_stream(len(expected)) == expected + + client.describe_query(query_id) + + client.abort_query(query_id) + + client.wait_query(query_id) + + describe_response = client.describe_query(query_id) + status = describe_response.result.query.meta.status + assert not describe_response.issues, str(describe_response.issues) + assert status == fq.QueryMeta.ABORTED_BY_USER, fq.QueryMeta.ComputeStatus.Name(status) + + @yq_v1 + def test_yq_streaming_read_from_binding(self, kikimr, client, yq_version): + self.init_topics(f"pq_yq_read_from_binding_{yq_version}") + + # Consumer and topics to create are written in ya.make file. + sql = R''' + PRAGMA dq.MaxTasksPerStage="2"; + + INSERT INTO myyds2.`{output_topic}` + SELECT STREAM key ?? "" FROM bindings.my_binding; + ''' \ + .format( + output_topic=self.output_topic + ) + + connection_response = client.create_yds_connection("myyds2", os.getenv("YDB_DATABASE"), + os.getenv("YDB_ENDPOINT")) + logging.debug(str(connection_response)) + assert not connection_response.issues, str(connection_response.issues) + + keyColumn = ydb_value.Column(name="key", type=ydb_value.Type( + optional_type=ydb_value.OptionalType(item=ydb_value.Type(type_id=ydb_value.Type.PrimitiveTypeId.STRING)))) + + binding_response = client.create_yds_binding(name="my_binding", + stream=self.input_topic, + format="json_each_row", + connection_id=connection_response.result.connection_id, + columns=[keyColumn]) + logging.debug(str(binding_response)) + assert not binding_response.issues, str(binding_response.issues) + + response = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING) + assert not response.issues, str(response.issues) + logging.debug(str(response.result)) + client.wait_query_status(response.result.query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(response.result.query_id) + + # Write to input. + data = [ + '{"key": "abc"}', + '{"key": "xxx"}' + ] + + self.write_stream(data) + logging.info("Data was written: {}".format(data)) + + # Read from output. + expected = [ + 'abc', + 'xxx' + ] + + read_data = self.read_stream(len(expected)) + logging.info("Data was read: {}".format(read_data)) + + assert read_data == expected + + describe_response = client.describe_query(response.result.query_id) + assert not describe_response.issues, str(describe_response.issues) + logging.debug(str(describe_response.result)) + + client.abort_query(response.result.query_id) + + client.wait_query(response.result.query_id) + + @yq_v1 + def test_yq_streaming_read_from_binding_date_time(self, kikimr, client, yq_version): + self.init_topics(f"pq_yq_read_from_binding_date_time_{yq_version}") + + # Consumer and topics to create are written in ya.make file. + sql = R''' + PRAGMA dq.MaxTasksPerStage="2"; + + INSERT INTO myyds4.`{output_topic}` + SELECT STREAM Unwrap(key || CAST(value as String)) as data FROM bindings.my_binding4; + ''' \ + .format( + output_topic=self.output_topic + ) + + connection_response = client.create_yds_connection("myyds4", os.getenv("YDB_DATABASE"), + os.getenv("YDB_ENDPOINT")) + logging.debug(str(connection_response)) + assert not connection_response.issues, str(connection_response.issues) + + keyColumn = ydb_value.Column(name="key", type=ydb_value.Type( + optional_type=ydb_value.OptionalType(item=ydb_value.Type(type_id=ydb_value.Type.PrimitiveTypeId.STRING)))) + + valueColumn = ydb_value.Column(name="value", + type=ydb_value.Type(type_id=ydb_value.Type.PrimitiveTypeId.DATETIME)) + + binding_response = client.create_yds_binding(name="my_binding4", + stream=self.input_topic, + format="json_each_row", + connection_id=connection_response.result.connection_id, + columns=[keyColumn, valueColumn]) + logging.debug(str(binding_response)) + assert not binding_response.issues, str(binding_response.issues) + + response = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING) + assert not response.issues, str(response.issues) + logging.debug(str(response.result)) + client.wait_query_status(response.result.query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(response.result.query_id) + + # Write to input. + data = [ + '{"key": "abc", "value": "2022-10-19 16:40:47"}', + '{"key": "xxx", "value": "2022-10-19 16:40:48"}' + ] + + self.write_stream(data) + logging.info("Data was written: {}".format(data)) + + # Read from output. + expected = [ + 'abc2022-10-19T16:40:47Z', + 'xxx2022-10-19T16:40:48Z' + ] + + read_data = self.read_stream(len(expected)) + logging.info("Data was read: {}".format(read_data)) + + assert read_data == expected + + describe_response = client.describe_query(response.result.query_id) + assert not describe_response.issues, str(describe_response.issues) + logging.debug(str(describe_response.result)) + + client.abort_query(response.result.query_id) + + client.wait_query(response.result.query_id) + + @yq_v1 + def test_yq_streaming_read_date_time_format(self, kikimr, client, yq_version): + self.init_topics(f"pq_yq_read_from_binding_dt_format_settings_{yq_version}") + + # Consumer and topics to create are written in ya.make file. + sql = R''' + PRAGMA dq.MaxTasksPerStage="2"; + + INSERT INTO myyds3.`{output_topic}` + SELECT STREAM Unwrap(key || CAST(value as String)) as data FROM bindings.my_binding3; + ''' \ + .format( + output_topic=self.output_topic + ) + + connection_response = client.create_yds_connection("myyds3", os.getenv("YDB_DATABASE"), + os.getenv("YDB_ENDPOINT")) + logging.debug(str(connection_response)) + assert not connection_response.issues, str(connection_response.issues) + + keyColumn = ydb_value.Column(name="key", type=ydb_value.Type( + optional_type=ydb_value.OptionalType(item=ydb_value.Type(type_id=ydb_value.Type.PrimitiveTypeId.STRING)))) + + valueColumn = ydb_value.Column(name="value", + type=ydb_value.Type(type_id=ydb_value.Type.PrimitiveTypeId.DATETIME)) + + binding_response = client.create_yds_binding(name="my_binding3", + stream=self.input_topic, + format="json_each_row", + connection_id=connection_response.result.connection_id, + columns=[keyColumn, valueColumn], + format_setting={"data.datetime.format_name": "ISO"}) + logging.debug(str(binding_response)) + assert not binding_response.issues, str(binding_response.issues) + + response = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING) + assert not response.issues, str(response.issues) + logging.debug(str(response.result)) + client.wait_query_status(response.result.query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(response.result.query_id) + + # Write to input. + data = [ + '{"key": "abc", "value": "2022-10-19T16:40:47Z"}', + '{"key": "xxx", "value": "2022-10-19T16:40:48Z"}' + ] + + self.write_stream(data) + logging.info("Data was written: {}".format(data)) + + # Read from output. + expected = [ + 'abc2022-10-19T16:40:47Z', + 'xxx2022-10-19T16:40:48Z' + ] + + read_data = self.read_stream(len(expected)) + logging.info("Data was read: {}".format(read_data)) + + assert read_data == expected + + describe_response = client.describe_query(response.result.query_id) + assert not describe_response.issues, str(describe_response.issues) + logging.debug(str(describe_response.result)) + + client.abort_query(response.result.query_id) + + client.wait_query(response.result.query_id) + + @yq_v1 + def test_state_load_mode(self, kikimr, client, yq_version): + self.init_topics(f"pq_test_state_load_mode_{yq_version}") + + sql = R''' + INSERT INTO myyds1.`{output_topic}` + SELECT STREAM (Data || Data) ?? "" As Data FROM myyds1.`{input_topic}`; + ''' \ + .format( + input_topic=self.input_topic, + output_topic=self.output_topic, + ) + + client.create_yds_connection("myyds1", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + name = "simple" + create_query_result = client.create_query(name, sql, type=fq.QueryContent.QueryType.STREAMING).result + query_id = create_query_result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(query_id) + + data = [ + '{"Data" = "hello";}', + ] + self.write_stream(data) + + expected = [ + '{"Data" = "hello";}{"Data" = "hello";}' + ] + assert self.read_stream(len(expected)) == expected + + client.describe_query(query_id) + + client.abort_query(query_id) + + client.wait_query(query_id) + + describe_response = client.describe_query(query_id) + status = describe_response.result.query.meta.status + assert not describe_response.issues, str(describe_response.issues) + assert status == fq.QueryMeta.ABORTED_BY_USER, fq.QueryMeta.ComputeStatus.Name(status) + + self.init_topics("test_state_load_mode2") + new_sql = R''' + INSERT INTO myyds1.`{output_topic}` + SELECT STREAM (Data || CAST(COUNT(*) as string)) ?? "" as cnt + FROM myyds1.`{input_topic}` + GROUP BY Data, HOP(Just(CurrentUtcTimestamp(TableRow())), "PT1S", "PT1S", "PT1S") + ; + ''' \ + .format( + input_topic=self.input_topic, + output_topic=self.output_topic, + ) + client.modify_query(query_id, name, new_sql, type=fq.QueryContent.QueryType.STREAMING, + state_load_mode=fq.StateLoadMode.EMPTY) + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_completed_checkpoints(query_id, + kikimr.compute_plane.get_completed_checkpoints(query_id) + 1) + + modified_data = [ + "hello new query" + ] + modified_expected = [ + "hello new query1" + ] + self.write_stream(modified_data) + time.sleep(5) + self.write_stream(modified_data) + assert self.read_stream(len(modified_expected)) == modified_expected + + client.abort_query(query_id) + client.wait_query(query_id) diff --git a/ydb/tests/fq/yds/ya.make b/ydb/tests/fq/yds/ya.make new file mode 100644 index 0000000000..da9713bcea --- /dev/null +++ b/ydb/tests/fq/yds/ya.make @@ -0,0 +1,58 @@ +PY3TEST() + +FORK_SUBTESTS() +SPLIT_FACTOR(50) + +INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/fq_runner/ydb_runner_with_datastreams.inc) + +PEERDIR( + ydb/public/api/protos + ydb/public/api/grpc + ydb/tests/tools/datastreams_helpers + ydb/tests/tools/fq_runner +) + +DEPENDS(ydb/tests/tools/pq_read) + +PY_SRCS( + conftest.py + test_base.py +) + +TEST_SRCS( + test_2_selects_limit.py + test_3_selects.py + test_bad_syntax.py + test_big_state.py + test_continue_mode.py + test_cpu_quota.py + test_delete_read_rules_after_abort_by_system.py + test_eval.py + test_mem_alloc.py + test_metrics_cleanup.py + test_pq_read_write.py + test_public_metrics.py + test_read_rules_deletion.py + test_restart_query.py + test_select_1.py + test_select_limit_db_id.py + test_select_limit.py + test_select_timings.py + test_stop.py + test_watermarks.py + test_yds_bindings.py + test_yq_streaming.py +) + +IF (SANITIZER_TYPE == "thread") + TIMEOUT(2400) + SIZE(LARGE) + TAG(ya:fat) +ELSE() + TIMEOUT(600) + SIZE(MEDIUM) +ENDIF() + +REQUIREMENTS(ram:16) + +END() |