aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-11-13 10:42:42 +0300
committerhor911 <hor911@ydb.tech>2023-11-13 11:29:33 +0300
commit1e8ae22002dcb5248f29a4dca925e9708af9bc31 (patch)
treeb0dde5b6701be458c198d5a16d3fea6b837fdce6
parent005735009aadf431092af0ecbb430dafee3f58ea (diff)
downloadydb-1e8ae22002dcb5248f29a4dca925e9708af9bc31.tar.gz
Move more tests to OSS
-rw-r--r--ydb/tests/fq/mem_alloc/test_alloc_default.py373
-rw-r--r--ydb/tests/fq/mem_alloc/test_dc_local.py65
-rwxr-xr-xydb/tests/fq/mem_alloc/test_result_limits.py102
-rw-r--r--ydb/tests/fq/mem_alloc/test_scheduling.py140
-rw-r--r--ydb/tests/fq/mem_alloc/ya.make30
-rw-r--r--ydb/tests/fq/multi_plane/test_cp_ic.py28
-rw-r--r--ydb/tests/fq/multi_plane/test_dispatch.py66
-rw-r--r--ydb/tests/fq/multi_plane/test_retry.py125
-rw-r--r--ydb/tests/fq/multi_plane/ya.make21
-rw-r--r--ydb/tests/fq/ya.make3
-rw-r--r--ydb/tests/fq/yds/canondata/result.json60
-rw-r--r--ydb/tests/fq/yds/conftest.py71
-rw-r--r--ydb/tests/fq/yds/test_2_selects_limit.py100
-rw-r--r--ydb/tests/fq/yds/test_3_selects.py50
-rw-r--r--ydb/tests/fq/yds/test_bad_syntax.py99
-rw-r--r--ydb/tests/fq/yds/test_base.py26
-rw-r--r--ydb/tests/fq/yds/test_big_state.py48
-rw-r--r--ydb/tests/fq/yds/test_compression_data/test.json.brbin0 -> 66 bytes
-rw-r--r--ydb/tests/fq/yds/test_compression_data/test.json.bz2bin0 -> 92 bytes
-rw-r--r--ydb/tests/fq/yds/test_compression_data/test.json.gzbin0 -> 81 bytes
-rw-r--r--ydb/tests/fq/yds/test_compression_data/test.json.lz4bin0 -> 79 bytes
-rw-r--r--ydb/tests/fq/yds/test_compression_data/test.json.xzbin0 -> 116 bytes
-rw-r--r--ydb/tests/fq/yds/test_compression_data/test.json.zstbin0 -> 71 bytes
-rw-r--r--ydb/tests/fq/yds/test_continue_mode.py214
-rw-r--r--ydb/tests/fq/yds/test_cpu_quota.py76
-rw-r--r--ydb/tests/fq/yds/test_delete_read_rules_after_abort_by_system.py54
-rw-r--r--ydb/tests/fq/yds/test_eval.py26
-rw-r--r--ydb/tests/fq/yds/test_mem_alloc.py84
-rw-r--r--ydb/tests/fq/yds/test_metrics_cleanup.py55
-rw-r--r--ydb/tests/fq/yds/test_pq_read_write.py136
-rw-r--r--ydb/tests/fq/yds/test_public_metrics.py98
-rw-r--r--ydb/tests/fq/yds/test_read_rules_deletion.py70
-rw-r--r--ydb/tests/fq/yds/test_restart_query.py63
-rw-r--r--ydb/tests/fq/yds/test_select_1.py130
-rw-r--r--ydb/tests/fq/yds/test_select_limit.py46
-rw-r--r--ydb/tests/fq/yds/test_select_limit_db_id.py52
-rw-r--r--ydb/tests/fq/yds/test_select_timings.py72
-rw-r--r--ydb/tests/fq/yds/test_stop.py93
-rw-r--r--ydb/tests/fq/yds/test_watermarks.py124
-rw-r--r--ydb/tests/fq/yds/test_yds_bindings.py57
-rw-r--r--ydb/tests/fq/yds/test_yq_streaming.py329
-rw-r--r--ydb/tests/fq/yds/ya.make58
42 files changed, 3244 insertions, 0 deletions
diff --git a/ydb/tests/fq/mem_alloc/test_alloc_default.py b/ydb/tests/fq/mem_alloc/test_alloc_default.py
new file mode 100644
index 0000000000..4580110a42
--- /dev/null
+++ b/ydb/tests/fq/mem_alloc/test_alloc_default.py
@@ -0,0 +1,373 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import os
+
+import pytest
+import six
+import time
+
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+import ydb.tests.library.common.yatest_common as yatest_common
+from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
+from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
+from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+K = 1024
+M = 1024*1024
+G = 1024*1024*1024
+DEFAULT_LIMIT = 8*G
+DEFAULT_DELTA = 30*M
+
+
+@pytest.fixture
+def kikimr(request):
+ (initial, total, step, hard_limit) = request.param
+ kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True)
+ kikimr = StreamingOverKikimr(kikimr_conf)
+ kikimr.mkql_initial_memory_limit = initial
+ kikimr.mkql_total_memory_limit = total
+ kikimr.mkql_alloc_size = step
+ kikimr.mkql_task_hard_memory_limit = hard_limit
+ kikimr.compute_plane.fq_config['resource_manager']['mkql_initial_memory_limit'] = kikimr.mkql_initial_memory_limit
+ kikimr.compute_plane.fq_config['resource_manager']['mkql_total_memory_limit'] = kikimr.mkql_total_memory_limit
+ kikimr.compute_plane.fq_config['resource_manager']['mkql_alloc_size'] = kikimr.mkql_alloc_size
+ kikimr.compute_plane.fq_config['resource_manager']['mkql_task_hard_memory_limit'] = kikimr.mkql_task_hard_memory_limit
+ kikimr.control_plane.fq_config['quotas_manager']['quotas'] = []
+ kikimr.control_plane.fq_config['quotas_manager']['quotas'].append({"subject_type": "cloud",
+ "subject_id": "my_cloud",
+ "limit": [{"name": "yq.streamingQuery.count", "limit": 100}]})
+ kikimr.start_mvp_mock_server()
+ kikimr.start()
+ yield kikimr
+ kikimr.stop_mvp_mock_server()
+ kikimr.stop()
+
+
+def wait_until(predicate, wait_time=yatest_common.plain_or_under_sanitizer(10, 50), wait_step=yatest_common.plain_or_under_sanitizer(0.5, 2)):
+ deadline = time.time() + wait_time
+ while time.time() < deadline:
+ if predicate():
+ return True
+ time.sleep(wait_step)
+ else:
+ return False
+
+
+def feq(a, b):
+ if abs(a) <= 1*G:
+ return a == b
+ else:
+ return abs((a - b) / a) < 0.0000001
+
+
+class TestAlloc(TestYdsBase):
+ @pytest.mark.parametrize("kikimr", [(None, None, None, None)], indirect=["kikimr"])
+ def test_default_limits(self, kikimr):
+
+ kikimr.control_plane.wait_bootstrap(1)
+ assert kikimr.control_plane.get_mkql_limit(1) == 0, "Incorrect Limit"
+ assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc"
+
+ self.init_topics("select_default_limits", create_output=False)
+
+ sql = R'''
+ SELECT * FROM myyds.`{input_topic}`
+ '''\
+ .format(
+ input_topic=self.input_topic,
+ )
+
+ client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr)
+
+ client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.control_plane.wait_zero_checkpoint(query_id)
+
+ task_count = kikimr.control_plane.get_task_count(1, query_id)
+ assert feq(kikimr.control_plane.get_mkql_allocated(1), task_count * DEFAULT_LIMIT), "Incorrect Alloc"
+
+ limit = sum(v for k, v in six.iteritems(kikimr.control_plane.get_sensors(1, "dq_tasks").find_sensors({"operation": query_id, "subsystem": "mkql", "sensor": "MemoryLimit"}, "id")))
+ usage = sum(v for k, v in six.iteritems(kikimr.control_plane.get_sensors(1, "dq_tasks").find_sensors({"operation": query_id, "subsystem": "mkql", "sensor": "MemoryUsage"}, "id")))
+ assert limit is not None
+ assert usage is not None
+ # assert limit == task_count * DEFAULT_LIMIT, "Incorrect Alloc"
+
+ client.abort_query(query_id)
+ client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)
+ assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc"
+
+ @pytest.mark.parametrize("kikimr", [(1 * M, 8 * G, None, None)], indirect=["kikimr"])
+ def test_default_delta(self, kikimr):
+
+ kikimr.control_plane.wait_bootstrap(1)
+ assert kikimr.control_plane.get_mkql_limit(1) == kikimr.mkql_total_memory_limit, "Incorrect Limit"
+ assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc"
+
+ self.init_topics("select_default_delta", create_output=False)
+
+ sql = R'''
+ SELECT COUNT(*)
+ FROM myyds.`{input_topic}`
+ GROUP BY HOP(Just(CurrentUtcTimestamp()), "PT10S", "PT10S", "PT10S"), Data
+ LIMIT 1
+ '''\
+ .format(
+ input_topic=self.input_topic,
+ )
+
+ client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr)
+
+ client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.control_plane.wait_zero_checkpoint(query_id)
+
+ task_count = kikimr.control_plane.get_task_count(1, query_id)
+ initially_allocated = kikimr.control_plane.get_mkql_allocated(1)
+ assert initially_allocated == task_count * kikimr.mkql_initial_memory_limit, "Incorrect Alloc"
+
+ for i in range(10000):
+ self.write_stream([format(i, "0x")])
+ last_allocated = kikimr.control_plane.get_mkql_allocated(1)
+ if last_allocated != initially_allocated:
+ assert last_allocated == initially_allocated + DEFAULT_DELTA
+ break
+ else:
+ assert False, "Limit was not increased"
+
+ client.abort_query(query_id)
+ client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)
+ assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc"
+
+ @pytest.mark.parametrize("kikimr", [(1 * M, 24 * M, 1 * M, None)], indirect=["kikimr"])
+ def test_node_limit(self, kikimr):
+
+ kikimr.control_plane.wait_bootstrap(1)
+ assert kikimr.control_plane.get_mkql_limit(1) == kikimr.mkql_total_memory_limit, "Incorrect Limit"
+ assert kikimr.control_plane.get_mkql_allocated() == 0, "Incorrect Alloc"
+
+ self.init_topics("select_node_limit", create_output=False)
+
+ sql = R'''
+ SELECT COUNT(*)
+ FROM myyds.`{input_topic}`
+ GROUP BY HOP(Just(CurrentUtcTimestamp()), "PT10S", "PT10S", "PT10S"), Data
+ LIMIT 1
+ '''\
+ .format(
+ input_topic=self.input_topic,
+ )
+
+ client = FederatedQueryClient("my_folder@my_cloud", streaming_over_kikimr=kikimr)
+
+ client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+
+ queries = []
+ task_count = 0
+ memory_per_graph = None
+
+ while True:
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ assert wait_until((lambda : kikimr.control_plane.get_task_count(1, query_id) > 0)), "TaskController not started"
+ task_count += kikimr.control_plane.get_task_count(1, query_id)
+ allocated = task_count * kikimr.mkql_initial_memory_limit
+ assert wait_until((lambda : kikimr.control_plane.get_mkql_allocated(1) == allocated)), "Task memory was not allocated"
+ queries.append(query_id)
+ if memory_per_graph is None:
+ memory_per_graph = allocated
+ if kikimr.mkql_total_memory_limit < allocated + memory_per_graph:
+ break
+
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+
+ def not_enough_memory():
+ issues = client.describe_query(query_id).result.query.transient_issue
+ if len(issues) == 0:
+ return False
+ # it is possible to get several similar issues
+ for issue in issues:
+ if issue.message == "Not enough free memory in the cluster" and issue.issue_code == 6001:
+ return True
+ assert False, "Incorrect issues " + str(issues)
+
+ assert wait_until(not_enough_memory), "Allocation was not failed"
+ assert kikimr.control_plane.get_mkql_allocated(1) == task_count * kikimr.mkql_initial_memory_limit, "Incorrect allocation size"
+ client.abort_query(query_id)
+ # query is respawned every 30s, so wait with increased timeout
+ # we may be lucky to stop the query, or it is stopped automatically due to high failure rate
+ client.wait_query(query_id, 60, [fq.QueryMeta.ABORTED_BY_USER, fq.QueryMeta.ABORTED_BY_SYSTEM])
+
+ task_count_0 = kikimr.control_plane.get_task_count(1, queries[0])
+ assert task_count_0 > 0, "Strange query stat " + queries[0]
+ client.abort_query(queries[0])
+ client.wait_query_status(queries[0], fq.QueryMeta.ABORTED_BY_USER)
+
+ assert wait_until((lambda : kikimr.control_plane.get_mkql_allocated(1) == (task_count - task_count_0) * kikimr.mkql_initial_memory_limit)), "Task memory was not freed"
+
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+
+ assert wait_until((lambda : kikimr.control_plane.get_task_count(1, query_id) > 0)), "TaskController not started " + query_id
+ task_count += kikimr.control_plane.get_task_count(1, query_id) - task_count_0
+ assert wait_until((lambda : kikimr.control_plane.get_mkql_allocated(1) == task_count * kikimr.mkql_initial_memory_limit)), "Task memory was not allocated"
+ queries[0] = query_id
+
+ for q in queries:
+ client.abort_query(q)
+ client.wait_query_status(q, fq.QueryMeta.ABORTED_BY_USER)
+
+ assert wait_until((lambda : kikimr.control_plane.get_mkql_allocated() == 0)), "Incorrect final free"
+
+ @pytest.mark.parametrize("kikimr", [(1 * G, 8 * G, None, None)], indirect=["kikimr"])
+ def test_alloc_and_free(self, kikimr):
+
+ kikimr.control_plane.wait_bootstrap(1)
+ assert kikimr.control_plane.get_mkql_limit(1) == kikimr.mkql_total_memory_limit, "Incorrect Limit"
+ assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc"
+
+ self.init_topics("select_alloc_and_free", create_output=False)
+
+ sql = R'''
+ SELECT COUNT(*)
+ FROM myyds.`{input_topic}`
+ GROUP BY HOP(Just(CurrentUtcTimestamp()), "PT10S", "PT10S", "PT10S"), Data
+ LIMIT 1
+ '''\
+ .format(
+ input_topic=self.input_topic,
+ )
+
+ client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr)
+
+ client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+
+ task_count = kikimr.control_plane.get_task_count(1, query_id)
+ assert kikimr.control_plane.get_mkql_allocated(1) == task_count * kikimr.mkql_initial_memory_limit, "Incorrect Alloc"
+
+ client.abort_query(query_id)
+ client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)
+ assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc"
+
+ @pytest.mark.parametrize("kikimr", [(1 * M, 1 * G, 16 * K, None)], indirect=["kikimr"])
+ def test_up_down(self, kikimr):
+
+ kikimr.control_plane.wait_bootstrap(1)
+ assert kikimr.control_plane.get_mkql_limit(1) == kikimr.mkql_total_memory_limit, "Incorrect Limit"
+ assert kikimr.control_plane.get_mkql_allocated() == 0, "Incorrect Alloc"
+
+ self.init_topics("select_up_down")
+
+ sql = R'''
+ INSERT INTO myyds.`{output_topic}`
+ SELECT Data
+ FROM myyds.`{input_topic}`
+ GROUP BY HOP(Just(CurrentUtcTimestamp()), "PT10S", "PT10S", "PT10S"), Data
+ '''\
+ .format(
+ input_topic=self.input_topic,
+ output_topic=self.output_topic,
+ )
+
+ client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr)
+
+ client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.control_plane.wait_zero_checkpoint(query_id)
+ assert wait_until((lambda : kikimr.control_plane.get_task_count(1, query_id) > 0)), "TaskController not started"
+ allocated_at_start = kikimr.control_plane.get_mkql_allocated(1)
+
+ for i in range(10000):
+ self.write_stream([format(i, "09x")])
+ allocated = kikimr.control_plane.get_mkql_allocated(1)
+ if allocated > allocated_at_start + 1 * M:
+ break
+ else:
+ assert False, "Memory limit was not increased"
+
+ # de-allocation doesn't work as expected
+ # assert wait_until((lambda : kikimr.get_mkql_allocated(1) < allocated), timeout=60), "Memory limit was not decreased"
+
+ client.abort_query(query_id)
+ client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)
+ assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc"
+
+ @pytest.mark.parametrize("kikimr", [(1 * M, 10 * M, 1 * G, None)], indirect=["kikimr"])
+ def test_mkql_not_increased(self, kikimr):
+
+ kikimr.control_plane.wait_bootstrap(1)
+ assert kikimr.control_plane.get_mkql_limit(1) == kikimr.mkql_total_memory_limit, "Incorrect Limit"
+ assert kikimr.control_plane.get_mkql_allocated() == 0, "Incorrect Alloc"
+
+ self.init_topics("test_mkql_not_increased")
+
+ sql = R'''
+ INSERT INTO myyds.`{output_topic}`
+ SELECT Data
+ FROM myyds.`{input_topic}`
+ GROUP BY HOP(Just(CurrentUtcTimestamp()), "PT10S", "PT10S", "PT10S"), Data
+ '''\
+ .format(
+ input_topic=self.input_topic,
+ output_topic=self.output_topic,
+ )
+
+ client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr)
+
+ client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.control_plane.wait_zero_checkpoint(query_id)
+ assert wait_until((lambda : kikimr.control_plane.get_task_count(1, query_id) > 0)), "TaskController not started"
+
+ for i in range(10000):
+ self.write_stream([format(i, "09x")])
+ query = client.describe_query(query_id).result.query
+ issues = query.transient_issue
+ if len(issues) >= 1:
+ assert issues[0].message.startswith("Mkql memory limit exceeded, limit: 1048576"), "Incorrect message text"
+ assert issues[0].message.endswith("canAllocateExtraMemory: 1"), "Incorrect settings"
+ assert issues[0].issue_code == 2029, "Incorrect issue code" + issues[0].message
+ break
+ else:
+ assert False, "Memory limit was not reached"
+
+ assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc"
+
+ @pytest.mark.parametrize("kikimr", [(350 * K, 100 * M, 1 * K, 400 * K)], indirect=["kikimr"])
+ def test_hard_limit(self, kikimr):
+
+ kikimr.control_plane.wait_bootstrap(1)
+ assert kikimr.control_plane.get_mkql_limit(1) == kikimr.mkql_total_memory_limit, "Incorrect Limit"
+ assert kikimr.control_plane.get_mkql_allocated() == 0, "Incorrect Alloc"
+
+ client = FederatedQueryClient("my_folder@my_cloud", streaming_over_kikimr=kikimr)
+ sql = R'''
+ SELECT ListLast(ListCollect(ListFromRange(0, {n} + 1)))
+ '''
+ n = 1
+ for i in range(0, 10):
+ query_id = client.create_query("simple", sql.format(n=n), type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ status = client.wait_query_status(query_id, [fq.QueryMeta.COMPLETED, fq.QueryMeta.FAILED])
+ if status == fq.QueryMeta.FAILED:
+ assert i > 1, "First queries must be successfull"
+ query = client.describe_query(query_id).result.query
+ describe_str = str(query)
+ assert "LIMIT_EXCEEDED" in describe_str
+ break
+ n = n * 10
+ else:
+ assert False, "Limit was NOT exceeded"
diff --git a/ydb/tests/fq/mem_alloc/test_dc_local.py b/ydb/tests/fq/mem_alloc/test_dc_local.py
new file mode 100644
index 0000000000..2d5092604a
--- /dev/null
+++ b/ydb/tests/fq/mem_alloc/test_dc_local.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import os
+
+import pytest
+
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
+from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
+from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+K = 1024
+M = 1024*1024
+G = 1024*1024*1024
+DEFAULT_LIMIT = 8*G
+DEFAULT_DELTA = 30*M
+
+
+@pytest.fixture
+def kikimr(request):
+ kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True, node_count=4, dc_mapping={1: "DC1", 2: "DC2", 3: "DC1", 4: "DC2"})
+ kikimr = StreamingOverKikimr(kikimr_conf)
+ kikimr.start_mvp_mock_server()
+ kikimr.start()
+ yield kikimr
+ kikimr.stop_mvp_mock_server()
+ kikimr.stop()
+
+
+class TestAlloc(TestYdsBase):
+ @pytest.mark.parametrize("kikimr", [(None, None, None)], indirect=["kikimr"])
+ def test_dc_locality(self, kikimr):
+
+ self.init_topics("select_dc_locality", create_output=False)
+
+ sql = R'''
+ SELECT * FROM myyds.`{input_topic}`
+ '''\
+ .format(
+ input_topic=self.input_topic,
+ )
+
+ client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr)
+
+ client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+
+ kikimr.control_plane.wait_discovery()
+
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.control_plane.wait_zero_checkpoint(query_id)
+
+ w1 = kikimr.control_plane.get_worker_count(1)
+ w2 = kikimr.control_plane.get_worker_count(2)
+ w3 = kikimr.control_plane.get_worker_count(3)
+ w4 = kikimr.control_plane.get_worker_count(4)
+
+ assert ((w1 * w3) != 0) ^ ((w2 * w4) != 0), "Incorrect placement " + str([w1, w2, w3, w4])
+
+ client.abort_query(query_id)
+ client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)
diff --git a/ydb/tests/fq/mem_alloc/test_result_limits.py b/ydb/tests/fq/mem_alloc/test_result_limits.py
new file mode 100755
index 0000000000..a2c9879e59
--- /dev/null
+++ b/ydb/tests/fq/mem_alloc/test_result_limits.py
@@ -0,0 +1,102 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import pytest
+import time
+
+from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
+from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
+from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+
+# Quota per cloud
+QUOTA_ANALYTICS_COUNT_LIMIT = "yq.analyticsQuery.count"
+QUOTA_STREAMING_COUNT_LIMIT = "yq.streamingQuery.count"
+QUOTA_CPU_PERCENT_LIMIT = "yq.cpuPercent.count"
+QUOTA_MEMORY_LIMIT = "yq.memory.size"
+QUOTA_RESULT_LIMIT = "yq.result.size"
+
+# Quota per query
+QUOTA_ANALYTICS_DURATION_LIMIT = "yq.analyticsQueryDurationMinutes.count"
+QUOTA_STREAMING_DURATION_LIMIT = "yq.streamingQueryDurationMinutes.count" # internal, for preview purposes
+QUOTA_QUERY_RESULT_LIMIT = "yq.queryResult.size"
+
+
+@pytest.fixture
+def kikimr(request):
+ kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True)
+ kikimr = StreamingOverKikimr(kikimr_conf)
+ if hasattr(request, "param"):
+ kikimr.control_plane.fq_config['quotas_manager'] = {}
+ kikimr.control_plane.fq_config['quotas_manager']['enabled'] = True
+ kikimr.control_plane.fq_config['quotas_manager']['quotas'] = []
+ for cloud, limit in request.param.items():
+ kikimr.control_plane.fq_config['quotas_manager']['quotas'].append({"subject_type": "cloud", "subject_id": cloud, "limit": [{"name": QUOTA_QUERY_RESULT_LIMIT, "limit": limit}]})
+ kikimr.start_mvp_mock_server()
+ kikimr.start()
+ yield kikimr
+ kikimr.stop_mvp_mock_server()
+ kikimr.stop()
+
+
+def wait_until(predicate, wait_time=10, wait_step=0.5):
+ deadline = time.time() + wait_time
+ while time.time() < deadline:
+ if predicate():
+ return True
+ time.sleep(wait_step)
+ else:
+ return False
+
+
+class TestResultLimits(object):
+ def test_many_rows(self, kikimr):
+
+ kikimr.control_plane.wait_bootstrap(1)
+ assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc"
+
+ sql = R'''
+SELECT * FROM AS_TABLE(()->(Yql::ToStream(ListReplicate(<|x:
+"0123456789ABCDEF"
+|>, 4000000000))));
+'''
+ client = FederatedQueryClient("my_folder@cloud", streaming_over_kikimr=kikimr)
+
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+
+ client.wait_query_status(query_id, fq.QueryMeta.FAILED, timeout=600)
+ issue = client.describe_query(query_id).result.query.issue[0]
+ assert "LIMIT_EXCEEDED" in issue.message, "Incorrect issue " + issue.message
+ assert "Can not write results with size > 20971520 byte(s)" in issue.issues[0].message, "Incorrect issue " + issue.issues[0].message
+
+ def test_large_row(self, kikimr):
+
+ kikimr.control_plane.wait_bootstrap(1)
+ assert kikimr.control_plane.get_mkql_allocated(1) == 0, "Incorrect Alloc"
+
+ sql = R'''
+SELECT ListReplicate("A", 10000000);
+'''
+ client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr)
+
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+
+ client.wait_query_status(query_id, fq.QueryMeta.FAILED, timeout=600)
+ issue = client.describe_query(query_id).result.query.issue[0]
+ assert "LIMIT_EXCEEDED" in issue.message, "Incorrect issue " + issue.message
+ assert "Can not write Row[0] with size" in issue.issues[0].message and "(> 10_MB)" in issue.issues[0].message, "Incorrect issue " + issue.issues[0].message
+
+ @pytest.mark.parametrize("kikimr", [{"tiny": 2, "huge": 1000000}], indirect=["kikimr"])
+ def test_quotas(self, kikimr):
+ huge_client = FederatedQueryClient("my_folder@huge", streaming_over_kikimr=kikimr)
+ huge_query_id = huge_client.create_query("simple", "select 1000", type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ huge_client.wait_query_status(huge_query_id, fq.QueryMeta.COMPLETED)
+
+ tiny_client = FederatedQueryClient("my_folder@tiny", streaming_over_kikimr=kikimr)
+ tiny_query_id = tiny_client.create_query("simple", "select 1000", type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ tiny_client.wait_query_status(tiny_query_id, fq.QueryMeta.FAILED)
+ issue = tiny_client.describe_query(tiny_query_id).result.query.issue[0]
+ assert "LIMIT_EXCEEDED" in issue.message, "Incorrect issue " + issue.message
+ assert "Can not write results with size > 2 byte(s)" in issue.issues[0].message, "Incorrect issue " + issue.issues[0].message
diff --git a/ydb/tests/fq/mem_alloc/test_scheduling.py b/ydb/tests/fq/mem_alloc/test_scheduling.py
new file mode 100644
index 0000000000..b2a6e53c2a
--- /dev/null
+++ b/ydb/tests/fq/mem_alloc/test_scheduling.py
@@ -0,0 +1,140 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import os
+
+import pytest
+import time
+
+import ydb.tests.library.common.yatest_common as yatest_common
+from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
+from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
+from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+K = 1024
+M = 1024*1024
+G = 1024*1024*1024
+DEFAULT_LIMIT = 8*G
+DEFAULT_DELTA = 30*M
+DEFAULT_WAIT_TIME = yatest_common.plain_or_under_sanitizer(10, 50)
+LONG_WAIT_TIME = yatest_common.plain_or_under_sanitizer(60, 300)
+
+
+@pytest.fixture
+def kikimr(request):
+ (initial, total, step) = request.param
+ kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True)
+ kikimr = StreamingOverKikimr(kikimr_conf)
+ kikimr.compute_plane.fq_config['resource_manager']['mkql_initial_memory_limit'] = initial
+ kikimr.compute_plane.fq_config['resource_manager']['mkql_total_memory_limit'] = total
+ kikimr.compute_plane.fq_config['resource_manager']['mkql_alloc_size'] = step
+ kikimr.start_mvp_mock_server()
+ kikimr.start()
+ yield kikimr
+ kikimr.stop_mvp_mock_server()
+ kikimr.stop()
+
+
+def wait_until(predicate, wait_time=DEFAULT_WAIT_TIME, wait_step=yatest_common.plain_or_under_sanitizer(0.5, 2)):
+ deadline = time.time() + wait_time
+ while time.time() < deadline:
+ if predicate():
+ return True
+ time.sleep(wait_step)
+ else:
+ return False
+
+
+def feq(a, b):
+ if abs(a) <= 1*G:
+ return a == b
+ else:
+ return abs((a - b) / a) < 0.0000001
+
+
+class TestSchedule(object):
+ @pytest.mark.parametrize("kikimr", [(1 * M, 6 * M, 1 * M)], indirect=["kikimr"])
+ @pytest.mark.skip(reason="Should be refactored")
+ def test_skip_busy(self, kikimr):
+
+ kikimr.wait_bootstrap()
+ kikimr.kikimr_cluster.nodes[2].stop()
+
+ self.init_topics("select_skip_busy", create_output=False)
+
+ sql = R'''
+ SELECT COUNT(*)
+ FROM myyds.`{input_topic}`
+ GROUP BY HOP(Just(CurrentUtcTimestamp()), "PT10S", "PT10S", "PT10S"), Data
+ LIMIT 1
+ '''\
+ .format(
+ input_topic=self.input_topic,
+ )
+
+ client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr)
+
+ client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+
+ nodes = [1, 3, 4, 5, 6, 7, 8]
+
+ for node_index in nodes:
+ n = kikimr.get_sensors(node_index, "yq").find_sensor({"subsystem": "node_manager", "sensor": "NodesHealthCheckOk"})
+ wait_until(lambda : kikimr.get_sensors(node_index, "yq").find_sensor({"subsystem": "node_manager", "sensor": "NodesHealthCheckOk"}) > n, wait_time=LONG_WAIT_TIME)
+ wait_until(lambda : kikimr.get_peer_count(node_index) == len(nodes) - 1, wait_time=LONG_WAIT_TIME)
+
+ assert kikimr.get_mkql_limit(node_index) == kikimr.mkql_total_memory_limit, "Incorrect Limit"
+ assert kikimr.get_mkql_allocated(node_index) == 0, "Incorrect Alloc"
+
+ queries = []
+ task_count = 0
+ memory_per_graph = None
+ tasks_per_graph = 0
+
+ while True:
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ assert wait_until((lambda : sum(kikimr.get_task_count(n, query_id) for n in nodes) > 0)), "TaskController not started"
+ task_count += sum(kikimr.get_task_count(n, query_id) for n in nodes)
+ allocated = task_count * kikimr.mkql_initial_memory_limit
+ assert wait_until((lambda : sum(kikimr.get_mkql_allocated(n) for n in nodes) == allocated)), "Task memory was not allocated"
+ queries.append(query_id)
+ if memory_per_graph is None:
+ memory_per_graph = allocated
+ tasks_per_graph = task_count
+ if kikimr.mkql_total_memory_limit < allocated + memory_per_graph:
+ break
+
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+
+ def not_enough_memory():
+ issues = client.describe_query(query_id).result.query.transient_issue
+ if len(issues) == 0:
+ return False
+ assert len(issues) == 1, "Too many issues " + str(issues)
+ assert issues[0].message == "Not enough memory to allocate tasks" and issues[0].issue_code == 6001, "Incorrect issue " + issues[0].message
+ return True
+
+ assert wait_until(not_enough_memory), "Allocation was not failed"
+ assert sum(kikimr.get_mkql_allocated(n) for n in nodes) == task_count * kikimr.mkql_initial_memory_limit, "Incorrect allocation size"
+ client.abort_query(query_id)
+ # query is respawned every 30s, so wait with increased timeout
+ # we may be lucky to stop the query, or it is stopped automatically due to high failure rate
+ client.wait_query(query_id, 60, [fq.QueryMeta.ABORTED_BY_USER, fq.QueryMeta.ABORTED_BY_SYSTEM])
+
+ kikimr.kikimr_cluster.nodes[2].start()
+ for node_index in kikimr.kikimr_cluster.nodes:
+ wait_until(lambda : kikimr.get_peer_count(1) == 8 - 1, wait_time=LONG_WAIT_TIME)
+
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+
+ wait_until(lambda : kikimr.get_task_count(None, query_id) == tasks_per_graph)
+ assert kikimr.get_mkql_allocated() == (task_count + tasks_per_graph) * kikimr.mkql_initial_memory_limit, "Incorrect allocation size"
+
+ for q in queries:
+ client.abort_query(q)
+ client.abort_query(query_id)
diff --git a/ydb/tests/fq/mem_alloc/ya.make b/ydb/tests/fq/mem_alloc/ya.make
new file mode 100644
index 0000000000..e8c55b968e
--- /dev/null
+++ b/ydb/tests/fq/mem_alloc/ya.make
@@ -0,0 +1,30 @@
+PY3TEST()
+
+INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/fq_runner/ydb_runner_with_datastreams.inc)
+
+PEERDIR(
+ ydb/tests/tools/datastreams_helpers
+ ydb/tests/tools/fq_runner
+)
+
+DEPENDS(ydb/tests/tools/pq_read)
+
+TEST_SRCS(
+ test_alloc_default.py
+ test_dc_local.py
+ test_result_limits.py
+ test_scheduling.py
+)
+
+IF (SANITIZER_TYPE == "thread")
+ TIMEOUT(2400)
+ SIZE(LARGE)
+ TAG(ya:fat)
+ELSE()
+ TIMEOUT(600)
+ SIZE(MEDIUM)
+ENDIF()
+
+REQUIREMENTS(ram:9)
+
+END()
diff --git a/ydb/tests/fq/multi_plane/test_cp_ic.py b/ydb/tests/fq/multi_plane/test_cp_ic.py
new file mode 100644
index 0000000000..dde77bd74d
--- /dev/null
+++ b/ydb/tests/fq/multi_plane/test_cp_ic.py
@@ -0,0 +1,28 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import pytest
+
+from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
+from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
+
+
+@pytest.fixture
+def kikimr():
+ kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True)
+ kikimr = StreamingOverKikimr(kikimr_conf)
+ kikimr.control_plane.fq_config['control_plane_storage']['mapping'] = {
+ "common_tenant_name": ["/alpha"]
+ }
+ kikimr.control_plane.fq_config['private_api']['loopback'] = True
+ kikimr.control_plane.fq_config['nodes_manager']['enabled'] = True
+ kikimr.start_mvp_mock_server()
+ kikimr.start()
+ yield kikimr
+ kikimr.stop()
+ kikimr.stop_mvp_mock_server()
+
+
+class TestCpIc(object):
+ def test_discovery(self, kikimr):
+ kikimr.control_plane.wait_discovery()
diff --git a/ydb/tests/fq/multi_plane/test_dispatch.py b/ydb/tests/fq/multi_plane/test_dispatch.py
new file mode 100644
index 0000000000..1f6fdc9a9a
--- /dev/null
+++ b/ydb/tests/fq/multi_plane/test_dispatch.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import pytest
+
+from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
+from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
+from ydb.tests.tools.fq_runner.kikimr_runner import TenantConfig
+from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+
+@pytest.fixture
+def kikimr():
+ kikimr_conf = StreamingOverKikimrConfig(
+ cloud_mode=True,
+ node_count={"/cp": TenantConfig(1),
+ "/alpha": TenantConfig(1),
+ "/beta": TenantConfig(1)},
+ tenant_mapping={"alpha": "/alpha", "beta": "/beta"},
+ cloud_mapping={"a_cloud": "alpha", "b_cloud": "beta"})
+ kikimr = StreamingOverKikimr(kikimr_conf)
+ kikimr.start_mvp_mock_server()
+ kikimr.start()
+ yield kikimr
+ kikimr.stop()
+ kikimr.stop_mvp_mock_server()
+
+
+class TestMapping(object):
+ def test_mapping(self, kikimr):
+ sql = "select 101"
+ a_client = FederatedQueryClient("a_folder@a_cloud", streaming_over_kikimr=kikimr)
+ b_client = FederatedQueryClient("b_folder@b_cloud", streaming_over_kikimr=kikimr)
+ a_query_id = a_client.create_query("a", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ a_client.wait_query_status(a_query_id, fq.QueryMeta.COMPLETED)
+ kikimr.tenants["/alpha"].stop()
+ a_query_id = a_client.create_query("a", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ for _ in range(10):
+ b_query_id = b_client.create_query("b", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ b_client.wait_query_status(b_query_id, fq.QueryMeta.COMPLETED)
+ assert a_client.describe_query(a_query_id).result.query.meta.status == fq.QueryMeta.STARTING
+ kikimr.tenants["/alpha"].start()
+ a_client.wait_query_status(a_query_id, fq.QueryMeta.COMPLETED)
+
+ def test_idle(self, kikimr):
+ sql = "select 107"
+ a_client = FederatedQueryClient("a_folder@a_cloud", streaming_over_kikimr=kikimr)
+ b_client = FederatedQueryClient("b_folder@b_cloud", streaming_over_kikimr=kikimr)
+ a_query_id = a_client.create_query("a", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ a_client.wait_query_status(a_query_id, fq.QueryMeta.COMPLETED)
+ kikimr.tenants["/alpha"].stop()
+ a_query_id = a_client.create_query("a", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ for _ in range(10):
+ b_query_id = b_client.create_query("b", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ b_client.wait_query_status(b_query_id, fq.QueryMeta.COMPLETED)
+ assert a_client.describe_query(a_query_id).result.query.meta.status == fq.QueryMeta.STARTING
+ kikimr.exec_db_statement(
+ """--!syntax_v1
+ PRAGMA TablePathPrefix("{}");
+ UPDATE mappings SET vtenant = "beta" WHERE subject_id = "a_cloud";
+ UPDATE tenants SET state = 2, state_time = CurrentUtcTimestamp() WHERE tenant = "/alpha";
+ """
+ )
+ a_client.wait_query_status(a_query_id, fq.QueryMeta.COMPLETED)
diff --git a/ydb/tests/fq/multi_plane/test_retry.py b/ydb/tests/fq/multi_plane/test_retry.py
new file mode 100644
index 0000000000..789dd703a0
--- /dev/null
+++ b/ydb/tests/fq/multi_plane/test_retry.py
@@ -0,0 +1,125 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import os
+import pytest
+import time
+
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
+from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
+from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+
+class Param(object):
+ def __init__(
+ self,
+ retry_limit=2,
+ retry_period=20,
+ task_lease_ttl=4,
+ ping_period=2,
+ ):
+ self.retry_limit = retry_limit
+ self.retry_period = retry_period
+ self.task_lease_ttl = task_lease_ttl
+ self.ping_period = ping_period
+
+
+@pytest.fixture
+def kikimr(request):
+ kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True)
+ kikimr = StreamingOverKikimr(kikimr_conf)
+ # control
+ kikimr.control_plane.fq_config['control_plane_storage']['mapping'] = {"common_tenant_name": ["/compute"]}
+ kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy'] = {}
+ kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy']['retry_count'] = request.param.retry_limit
+ kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy']['retry_period'] = "{}s".format(request.param.retry_period)
+ kikimr.control_plane.fq_config['control_plane_storage']['task_lease_ttl'] = "{}s".format(request.param.task_lease_ttl)
+ # compute
+ kikimr.compute_plane.fq_config['pinger']['ping_period'] = "{}s".format(request.param.ping_period)
+ kikimr.start_mvp_mock_server()
+ kikimr.start()
+ yield kikimr
+ kikimr.stop()
+ kikimr.stop_mvp_mock_server()
+
+
+class TestRetry(TestYdsBase):
+ @pytest.mark.parametrize("kikimr", [Param(retry_limit=0, retry_period=1000)], indirect=["kikimr"])
+ def test_fail_first(self, kikimr):
+ topic_name = "fail_first"
+ connection = "fail_first"
+ self.init_topics(topic_name)
+ sql = R'''SELECT * FROM {connection}.`{input_topic}`;'''\
+ .format(
+ input_topic=self.input_topic,
+ connection=connection
+ )
+ client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr)
+ client.create_yds_connection(connection, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("a", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.compute_plane.stop()
+ kikimr.compute_plane.start()
+ client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_SYSTEM)
+ retry_count = kikimr.compute_plane.get_sensors(1, "yq").find_sensor({"query_id": query_id, "sensor": "RetryCount"})
+ assert retry_count == 0, "Incorrect RetryCount"
+
+ @pytest.mark.parametrize("kikimr", [Param(retry_limit=1, retry_period=2, task_lease_ttl=1, ping_period=0.5)], indirect=["kikimr"])
+ def test_low_rate(self, kikimr):
+ topic_name = "low_rate"
+ connection = "low_rate"
+ self.init_topics(topic_name)
+ sql = R'''SELECT * FROM {connection}.`{input_topic}`;'''\
+ .format(
+ input_topic=self.input_topic,
+ connection=connection
+ )
+ client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr)
+ client.create_yds_connection(connection, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("a", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ deadline = time.time() + 2 * 2
+ for _ in range(5):
+ delta = deadline - time.time()
+ if delta > 0:
+ time.sleep(delta)
+ deadline = time.time() + 2 * 2
+ kikimr.compute_plane.stop()
+ kikimr.compute_plane.start()
+ kikimr.compute_plane.wait_bootstrap()
+ # client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ time.sleep(4)
+ client.abort_query(query_id)
+ client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)
+ retry_count = kikimr.compute_plane.get_sensors(1, "yq").find_sensor({"query_id": query_id, "sensor": "RetryCount"})
+ assert retry_count >= 1, "Incorrect RetryCount"
+
+ @pytest.mark.parametrize("kikimr", [Param(retry_limit=3, task_lease_ttl=1, ping_period=0.5)], indirect=["kikimr"])
+ def test_high_rate(self, kikimr):
+ topic_name = "high_rate"
+ connection = "high_rate"
+ self.init_topics(topic_name)
+ sql = R'''SELECT * FROM {connection}.`{input_topic}`;'''\
+ .format(
+ input_topic=self.input_topic,
+ connection=connection
+ )
+ client = FederatedQueryClient("my_folder", streaming_over_kikimr=kikimr)
+ client.create_yds_connection(connection, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("a", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ for _ in range(10):
+ deadline = time.time() + 1
+ kikimr.compute_plane.stop()
+ kikimr.compute_plane.start()
+ kikimr.compute_plane.wait_bootstrap()
+ if client.describe_query(query_id).result.query.meta.status == fq.QueryMeta.ABORTED_BY_SYSTEM:
+ break
+ delta = deadline - time.time()
+ if delta > 0:
+ time.sleep(delta)
+ else:
+ assert False, "Query was NOT aborted"
diff --git a/ydb/tests/fq/multi_plane/ya.make b/ydb/tests/fq/multi_plane/ya.make
new file mode 100644
index 0000000000..42b3c4d553
--- /dev/null
+++ b/ydb/tests/fq/multi_plane/ya.make
@@ -0,0 +1,21 @@
+PY3TEST()
+
+INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/fq_runner/ydb_runner_with_datastreams.inc)
+
+PEERDIR(
+ ydb/tests/tools/datastreams_helpers
+ ydb/tests/tools/fq_runner
+)
+
+DEPENDS(ydb/tests/tools/pq_read)
+
+TEST_SRCS(
+ test_cp_ic.py
+ test_dispatch.py
+ test_retry.py
+)
+
+TIMEOUT(600)
+SIZE(MEDIUM)
+
+END()
diff --git a/ydb/tests/fq/ya.make b/ydb/tests/fq/ya.make
index 77586532cc..d18f304d22 100644
--- a/ydb/tests/fq/ya.make
+++ b/ydb/tests/fq/ya.make
@@ -1,4 +1,7 @@
RECURSE_FOR_TESTS(
+ mem_alloc
+ multi_plane
plans
s3
+ yds
)
diff --git a/ydb/tests/fq/yds/canondata/result.json b/ydb/tests/fq/yds/canondata/result.json
new file mode 100644
index 0000000000..545124f3df
--- /dev/null
+++ b/ydb/tests/fq/yds/canondata/result.json
@@ -0,0 +1,60 @@
+{
+ "test_monitoring.TestSolomon.test_pq_read_solomon_write[v1]": [
+ {
+ "labels": [
+ [
+ "label1",
+ "key1"
+ ],
+ [
+ "name",
+ "sensor1"
+ ]
+ ],
+ "ts": "1970-01-01T00:00:01.000000Z",
+ "value": 1
+ },
+ {
+ "labels": [
+ [
+ "label1",
+ "key2"
+ ],
+ [
+ "name",
+ "sensor1"
+ ]
+ ],
+ "ts": "1970-01-01T00:00:03.000000Z",
+ "value": 2
+ },
+ {
+ "labels": [
+ [
+ "label1",
+ "key2"
+ ],
+ [
+ "name",
+ "sensor1"
+ ]
+ ],
+ "ts": "1970-01-01T00:00:04.000000Z",
+ "value": 3
+ },
+ {
+ "labels": [
+ [
+ "label1",
+ "key1"
+ ],
+ [
+ "name",
+ "sensor1"
+ ]
+ ],
+ "ts": "1970-01-01T00:00:07.000000Z",
+ "value": 4
+ }
+ ]
+}
diff --git a/ydb/tests/fq/yds/conftest.py b/ydb/tests/fq/yds/conftest.py
new file mode 100644
index 0000000000..687e11205a
--- /dev/null
+++ b/ydb/tests/fq/yds/conftest.py
@@ -0,0 +1,71 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import pytest
+
+from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
+from ydb.tests.tools.fq_runner.custom_hooks import * # noqa: F401,F403 Adding custom hooks for YQv2 support
+from ydb.tests.tools.fq_runner.kikimr_utils import ExtensionPoint
+from ydb.tests.tools.fq_runner.kikimr_utils import YQv2Extension
+from ydb.tests.tools.fq_runner.kikimr_utils import ComputeExtension
+from ydb.tests.tools.fq_runner.kikimr_utils import DefaultConfigExtension
+from ydb.tests.tools.fq_runner.kikimr_utils import StatsModeExtension
+from ydb.tests.tools.fq_runner.kikimr_utils import start_kikimr
+
+
+@pytest.fixture
+def stats_mode():
+ return ''
+
+
+@pytest.fixture
+def kikimr(request: pytest.FixtureRequest, yq_version: str, stats_mode: str):
+ kikimr_extensions = [DefaultConfigExtension(""),
+ YQv2Extension(yq_version),
+ ComputeExtension(),
+ StatsModeExtension(stats_mode)]
+ with start_kikimr(request, kikimr_extensions) as kikimr:
+ yield kikimr
+
+
+class ManyRetriesConfigExtension(ExtensionPoint):
+ def __init__(self):
+ super().__init__()
+
+ def is_applicable(self, request):
+ return True
+
+ def apply_to_kikimr(self, request, kikimr):
+ kikimr.compute_plane.fq_config['control_plane_storage']['retry_policy_mapping'] = [
+ {
+ 'status_code': [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13],
+ 'policy': {
+ 'retry_count': 10000
+ }
+ }
+ ]
+
+
+@pytest.fixture
+def kikimr_many_retries(request: pytest.FixtureRequest, yq_version: str):
+ kikimr_extensions = [DefaultConfigExtension(""),
+ ManyRetriesConfigExtension(),
+ YQv2Extension(yq_version),
+ ComputeExtension()]
+ with start_kikimr(request, kikimr_extensions) as kikimr:
+ yield kikimr
+
+
+def create_client(kikimr, request):
+ return FederatedQueryClient(request.param["folder_id"] if request is not None else "my_folder",
+ streaming_over_kikimr=kikimr)
+
+
+@pytest.fixture
+def client(kikimr, request=None):
+ return create_client(kikimr, request)
+
+
+@pytest.fixture
+def client_many_retries(kikimr_many_retries, request=None):
+ return create_client(kikimr_many_retries, request)
diff --git a/ydb/tests/fq/yds/test_2_selects_limit.py b/ydb/tests/fq/yds/test_2_selects_limit.py
new file mode 100644
index 0000000000..abaf1d9301
--- /dev/null
+++ b/ydb/tests/fq/yds/test_2_selects_limit.py
@@ -0,0 +1,100 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+import os
+import pytest
+import time
+
+from ydb.tests.tools.datastreams_helpers.control_plane import create_read_rule
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+
+
+class TestSelectLimit(object):
+ @yq_v1
+ def test_select_same(self, kikimr, client):
+ pytest.skip("Skip until streaming disposition is implemented YQ-589")
+
+ self.init_topics("select_same", create_output=False)
+
+ sql = R'''
+ SELECT * FROM yds1.`{input_topic}` LIMIT 2;
+ SELECT * FROM yds1.`{input_topic}` LIMIT 2;
+ ''' \
+ .format(
+ input_topic=self.input_topic,
+ )
+
+ client.create_yds_connection("yds1", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.wait_zero_checkpoint(query_id)
+
+ time.sleep(2) # Workaround race between write and read "from now". Remove when YQ-589 will be done.
+ messages = ["A", "B", "C", "D", "E"]
+ self.write_stream(messages)
+
+ client.wait_query(query_id, 30)
+
+ # by default selects use different consumers, so they will read the same
+ # messages - first 2 of them
+
+ rs = client.get_result_data(query_id, result_set_index=0).result.result_set
+ logging.debug(str(rs))
+ assert len(rs.rows) == 2
+ assert len(rs.columns) == 1
+ assert rs.rows[0].items[0].bytes_value == messages[0]
+ assert rs.rows[1].items[0].bytes_value == messages[1]
+
+ rs = client.get_result_data(query_id, result_set_index=1).result.result_set
+ logging.debug(str(rs))
+ assert len(rs.rows) == 2
+ assert len(rs.columns) == 1
+ assert rs.rows[0].items[0].bytes_value == messages[0]
+ assert rs.rows[1].items[0].bytes_value == messages[1]
+
+ @yq_v1
+ def test_select_sequence(self, kikimr, client):
+ pytest.skip("does not work as expected, need attention")
+ self.init_topics("select_sequence", create_output=False)
+
+ create_read_rule(self.input_topic, self.consumer_name)
+ sql = R'''
+ PRAGMA pq.Consumer="{consumer_name}";
+ SELECT * FROM yds2.`{input_topic}` LIMIT 2;
+ SELECT * FROM yds2.`{input_topic}` LIMIT 2;
+ ''' \
+ .format(
+ consumer_name=self.consumer_name,
+ input_topic=self.input_topic,
+ )
+
+ client.create_yds_connection("yds2", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.wait_zero_checkpoint(query_id)
+
+ time.sleep(2) # Workaround race between write and read "from now". Remove when YQ-589 will be done.
+ messages = ["A", "B", "C", "D", "E"]
+ self.write_stream(messages)
+
+ client.wait_query(query_id, 30)
+
+ # explicit consumer will be used for each query, so second select will
+ # fetch different set of next messages
+
+ rs = client.get_result_data(query_id, result_set_index=0).result.result_set
+ logging.debug(str(rs))
+ assert len(rs.rows) == 2
+ assert len(rs.columns) == 1
+ assert rs.rows[0].items[0].bytes_value == messages[0]
+ assert rs.rows[1].items[0].bytes_value == messages[1]
+
+ rs = client.get_result_data(query_id, result_set_index=1).result.result_set
+ logging.debug(str(rs))
+ assert len(rs.rows) == 2
+ assert len(rs.columns) == 1
+ assert rs.rows[0].items[0].bytes_value == messages[2]
+ assert rs.rows[1].items[0].bytes_value == messages[3]
diff --git a/ydb/tests/fq/yds/test_3_selects.py b/ydb/tests/fq/yds/test_3_selects.py
new file mode 100644
index 0000000000..37ab65058f
--- /dev/null
+++ b/ydb/tests/fq/yds/test_3_selects.py
@@ -0,0 +1,50 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+import ydb.public.api.protos.ydb_value_pb2 as ydb
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+
+
+class TestSelects(object):
+ @yq_v1
+ def test_3_selects(self, client):
+ sql = R'''
+ SELECT 1 AS SingleColumn;
+ SELECT "A" AS TextColumn;
+ SELECT 11 AS Column1, 22 AS Column2;
+ '''
+
+ client.create_yds_connection(name="myyds", database_id="FakeDatabaseId")
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+
+ client.wait_query(query_id, 30)
+
+ rs = client.get_result_data(query_id, result_set_index=0).result.result_set
+ logging.debug(str(rs))
+ assert len(rs.rows) == 1
+ assert len(rs.columns) == 1
+ assert rs.columns[0].name == "SingleColumn"
+ assert rs.columns[0].type.type_id == ydb.Type.INT32
+ assert rs.rows[0].items[0].int32_value == 1
+
+ rs = client.get_result_data(query_id, result_set_index=1).result.result_set
+ logging.debug(str(rs))
+ assert len(rs.rows) == 1
+ assert len(rs.columns) == 1
+ assert rs.columns[0].name == "TextColumn"
+ assert rs.columns[0].type.type_id == ydb.Type.STRING
+ assert rs.rows[0].items[0].bytes_value == b"A"
+
+ rs = client.get_result_data(query_id, result_set_index=2).result.result_set
+ logging.debug(str(rs))
+ assert len(rs.rows) == 1
+ assert len(rs.columns) == 2
+ assert rs.columns[0].name == "Column1"
+ assert rs.columns[0].type.type_id == ydb.Type.INT32
+ assert rs.rows[0].items[0].int32_value == 11
+ assert rs.columns[1].name == "Column2"
+ assert rs.columns[1].type.type_id == ydb.Type.INT32
+ assert rs.rows[0].items[1].int32_value == 22
diff --git a/ydb/tests/fq/yds/test_bad_syntax.py b/ydb/tests/fq/yds/test_bad_syntax.py
new file mode 100644
index 0000000000..51ac943c84
--- /dev/null
+++ b/ydb/tests/fq/yds/test_bad_syntax.py
@@ -0,0 +1,99 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+import pytest
+import time
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+
+
+class TestBadSyntax(TestYdsBase):
+ @yq_v1
+ @pytest.mark.parametrize(
+ "query_type",
+ [fq.QueryContent.QueryType.ANALYTICS, fq.QueryContent.QueryType.STREAMING],
+ ids=["analytics", "streaming"]
+ )
+ @pytest.mark.parametrize(
+ "after_modify",
+ [True, False],
+ ids=["modify", "create"]
+ )
+ @pytest.mark.parametrize(
+ "with_read_rules",
+ [True, False],
+ ids=["with_created_read_rules", "without_created_read_rules"]
+ )
+ def test_bad_syntax(self, kikimr, client, query_type, after_modify, with_read_rules):
+ if with_read_rules and not after_modify:
+ return
+
+ correct_sql = "SELECT 42;"
+ incorrect_sql = "blablabla"
+ if with_read_rules:
+ self.init_topics("bad_syntax_{}".format(query_type))
+ connection_name = "yds_{}".format(query_type)
+ sql_with_rr = "INSERT INTO {connection_name}.`{output_topic}` SELECT * FROM {connection_name}.`{input_topic}` LIMIT 1;" \
+ .format(connection_name=connection_name,
+ output_topic=self.output_topic,
+ input_topic=self.input_topic)
+
+ if after_modify:
+ if with_read_rules:
+ client.create_yds_connection(name=connection_name, database_id="FakeDatabaseId")
+ create_response = client.create_query("q", sql_with_rr, type=query_type)
+ else:
+ create_response = client.create_query("q", correct_sql, type=query_type)
+ else:
+ create_response = client.create_query("q", incorrect_sql, type=query_type)
+
+ query_id = create_response.result.query_id
+ assert not create_response.issues, str(create_response.issues)
+ logging.debug(str(create_response.result))
+
+ if after_modify:
+ if with_read_rules:
+ client.wait_query(query_id, statuses=[fq.QueryMeta.RUNNING])
+ if query_type == fq.QueryContent.QueryType.STREAMING:
+ kikimr.compute_plane.wait_zero_checkpoint(query_id)
+ else:
+ time.sleep(3) # TODO: remove it after streaming disposition will be supported
+ self.write_stream(["A"])
+ client.wait_query(query_id, statuses=[fq.QueryMeta.COMPLETED])
+ client.modify_query(query_id, "q", incorrect_sql, type=query_type)
+ client.wait_query(query_id, statuses=[fq.QueryMeta.FAILED])
+ else:
+ client.wait_query(query_id, statuses=[fq.QueryMeta.FAILED])
+
+ describe_result = client.describe_query(query_id).result
+ logging.debug("Describe result: {}".format(describe_result))
+ describe_string = "{}".format(describe_result)
+ assert "Internal Error" not in describe_string
+ assert "Failed to parse query" in describe_string
+
+ @yq_v1
+ def test_require_as(self, client):
+ bad_sql = "SELECT 42 a"
+ query_id = client.create_query("bad", bad_sql).result.query_id
+
+ client.wait_query(query_id, statuses=[fq.QueryMeta.FAILED])
+
+ describe_result = client.describe_query(query_id).result
+ logging.debug("Describe result: {}".format(describe_result))
+ describe_string = "{}".format(describe_result)
+ assert "Expecting mandatory AS here" in describe_string
+
+ @yq_v1
+ def test_type_as_column(self, client):
+ bad_sql = "select max(DateTime) from AS_TABLE([<|x:1|>]);"
+ query_id = client.create_query("bad", bad_sql).result.query_id
+
+ client.wait_query(query_id, statuses=[fq.QueryMeta.FAILED])
+
+ describe_result = client.describe_query(query_id).result
+ logging.debug("Describe result: {}".format(describe_result))
+ describe_string = "{}".format(describe_result)
+ assert "Member not found: DateTime" in describe_string
diff --git a/ydb/tests/fq/yds/test_base.py b/ydb/tests/fq/yds/test_base.py
new file mode 100644
index 0000000000..6b09f7a702
--- /dev/null
+++ b/ydb/tests/fq/yds/test_base.py
@@ -0,0 +1,26 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
+from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+
+
+class TestBaseWithAbortingConfigParams(TestYdsBase):
+
+ @classmethod
+ def setup_class(cls):
+ kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True)
+ cls.streaming_over_kikimr = StreamingOverKikimr(kikimr_conf)
+ cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_ttl'] = "2s"
+ cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy'] = {}
+ cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy']['retry_count'] = 1
+ cls.streaming_over_kikimr.compute_plane.fq_config['pinger']['ping_period'] = "1s"
+ cls.streaming_over_kikimr.start_mvp_mock_server()
+ cls.streaming_over_kikimr.start()
+
+ @classmethod
+ def teardown_class(cls):
+ if hasattr(cls, "streaming_over_kikimr"):
+ cls.streaming_over_kikimr.stop_mvp_mock_server()
+ cls.streaming_over_kikimr.stop()
diff --git a/ydb/tests/fq/yds/test_big_state.py b/ydb/tests/fq/yds/test_big_state.py
new file mode 100644
index 0000000000..7b86f55e3f
--- /dev/null
+++ b/ydb/tests/fq/yds/test_big_state.py
@@ -0,0 +1,48 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import os
+import time
+
+import ydb.tests.library.common.yatest_common as yatest_common
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+
+
+class TestBigState(TestYdsBase):
+ @yq_v1
+ def test_gt_8mb(self, kikimr, client):
+
+ self.init_topics("select_hop_8mb", create_output=False)
+
+ sql = R'''
+ $s = SELECT ListConcat(ListReplicate(Data, 9000000)) as Data2
+ FROM myyds.`{input_topic}`
+ WITH SCHEMA (Data String NOT NULL);
+
+ SELECT * from $s
+ GROUP BY HOP(Just(CurrentUtcTimestamp()), "PT1S", "PT1S", "PT1S"), Data2
+ LIMIT 1
+ ''' \
+ .format(
+ input_topic=self.input_topic
+ )
+
+ client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ kikimr.compute_plane.wait_zero_checkpoint(query_id)
+
+ for _ in range(5):
+ messages = ["A", "B", "C"]
+ self.write_stream(messages)
+ time.sleep(1)
+
+ deadline = time.time() + yatest_common.plain_or_under_sanitizer(60, 300)
+ while time.time() < deadline:
+ aborted_checkpoints = kikimr.compute_plane.get_checkpoint_coordinator_metric(query_id, "AbortedCheckpoints")
+ if not aborted_checkpoints:
+ time.sleep(0.5)
+
+ assert aborted_checkpoints != 0
diff --git a/ydb/tests/fq/yds/test_compression_data/test.json.br b/ydb/tests/fq/yds/test_compression_data/test.json.br
new file mode 100644
index 0000000000..ad9f8da80b
--- /dev/null
+++ b/ydb/tests/fq/yds/test_compression_data/test.json.br
Binary files differ
diff --git a/ydb/tests/fq/yds/test_compression_data/test.json.bz2 b/ydb/tests/fq/yds/test_compression_data/test.json.bz2
new file mode 100644
index 0000000000..f5b217efb4
--- /dev/null
+++ b/ydb/tests/fq/yds/test_compression_data/test.json.bz2
Binary files differ
diff --git a/ydb/tests/fq/yds/test_compression_data/test.json.gz b/ydb/tests/fq/yds/test_compression_data/test.json.gz
new file mode 100644
index 0000000000..ebd73dc216
--- /dev/null
+++ b/ydb/tests/fq/yds/test_compression_data/test.json.gz
Binary files differ
diff --git a/ydb/tests/fq/yds/test_compression_data/test.json.lz4 b/ydb/tests/fq/yds/test_compression_data/test.json.lz4
new file mode 100644
index 0000000000..f78d6a71ee
--- /dev/null
+++ b/ydb/tests/fq/yds/test_compression_data/test.json.lz4
Binary files differ
diff --git a/ydb/tests/fq/yds/test_compression_data/test.json.xz b/ydb/tests/fq/yds/test_compression_data/test.json.xz
new file mode 100644
index 0000000000..588fb1dd01
--- /dev/null
+++ b/ydb/tests/fq/yds/test_compression_data/test.json.xz
Binary files differ
diff --git a/ydb/tests/fq/yds/test_compression_data/test.json.zst b/ydb/tests/fq/yds/test_compression_data/test.json.zst
new file mode 100644
index 0000000000..3f0bf6862f
--- /dev/null
+++ b/ydb/tests/fq/yds/test_compression_data/test.json.zst
Binary files differ
diff --git a/ydb/tests/fq/yds/test_continue_mode.py b/ydb/tests/fq/yds/test_continue_mode.py
new file mode 100644
index 0000000000..cb7768a947
--- /dev/null
+++ b/ydb/tests/fq/yds/test_continue_mode.py
@@ -0,0 +1,214 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import logging
+import time
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+import ydb.tests.library.common.yatest_common as yatest_common
+from ydb.tests.tools.fq_runner.fq_client import StreamingDisposition
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+from ydb.tests.tools.datastreams_helpers.data_plane import write_stream, read_stream
+
+
+def assert_issues_contain(text, issues):
+ for issue in issues:
+ if text in issue.message:
+ return
+ assert False, f"Text [{text}] is expected to be among issues:\n{issues}"
+
+
+class TestContinueMode(TestYdsBase):
+ @yq_v1
+ def test_continue_from_offsets(self, kikimr, client):
+ client.create_yds_connection(name="yds", database_id="FakeDatabaseId")
+
+ self.init_topics("continue_1", partitions_count=2)
+ input_topic_1 = self.input_topic
+ output_topic_1 = self.output_topic
+ consumer_1 = self.consumer_name
+ self.init_topics("continue_2", partitions_count=2)
+ input_topic_2 = self.input_topic
+ output_topic_2 = self.output_topic
+ consumer_2 = self.consumer_name
+
+ partition_key = "trololo"
+
+ sql = Rf'''
+ PRAGMA dq.MaxTasksPerStage="2";
+
+ INSERT INTO yds.`{output_topic_1}`
+ SELECT Data FROM yds.`{input_topic_1}`;'''
+
+ query_id = client.create_query("continue-from-offsets-query", sql,
+ type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.compute_plane.wait_zero_checkpoint(query_id)
+
+ data = ["1", "2"]
+ write_stream(input_topic_1, data, partition_key=partition_key)
+
+ assert read_stream(output_topic_1, len(data), consumer_name=consumer_1) == data
+
+ kikimr.compute_plane.wait_completed_checkpoints(query_id,
+ kikimr.compute_plane.get_completed_checkpoints(query_id) + 1)
+
+ client.abort_query(query_id)
+ client.wait_query(query_id)
+
+ # Write to first topic.
+ # These offsets wouldn't be fresh but we should get this data in second output topic
+ # because offsets should be restored from checkpoint
+ data_2 = ["3", "4"]
+ write_stream(input_topic_1, data_2, partition_key=partition_key)
+
+ sql2 = Rf'''
+ PRAGMA dq.MaxTasksPerStage="1";
+
+ INSERT INTO yds.`{output_topic_2}`
+ SELECT Data FROM yds.`{input_topic_1}`;
+
+ INSERT INTO yds.`{output_topic_2}`
+ SELECT Data FROM yds.`{input_topic_2}`;'''
+
+ def assert_has_saved_checkpoints():
+ describe_result = client.describe_query(query_id).result
+ logging.debug("Describe result: {}".format(describe_result))
+ assert describe_result.query.meta.has_saved_checkpoints, "Expected has_saved_checkpoints flag: {}".format(
+ describe_result.query.meta)
+
+ def find_text(issues, text):
+ for issue in issues:
+ if text in issue.message:
+ return True
+ if len(issue.issues) > 0 and find_text(issue.issues, text):
+ return True
+ return False
+
+ def assert_has_issues(text, transient=False, deadline=0):
+ describe_result = client.describe_query(query_id).result
+ logging.debug("Describe result: {}".format(describe_result))
+ if find_text(describe_result.query.transient_issue if transient else describe_result.query.issue, text):
+ return True
+ assert deadline and deadline > time.time(), "Text \"{}\" is expected to be found in{} issues, but was not found. Describe query result:\n{}" \
+ .format(text, " transient" if transient else "", describe_result)
+ return False
+
+ assert_has_saved_checkpoints()
+
+ # 1. Not forced mode. Expect to fail
+ client.modify_query(query_id, "continue-from-offsets-query", sql2,
+ type=fq.QueryContent.QueryType.STREAMING,
+ state_load_mode=fq.StateLoadMode.EMPTY,
+ streaming_disposition=StreamingDisposition.from_last_checkpoint())
+ client.wait_query_status(query_id, fq.QueryMeta.FAILED)
+ assert_has_issues(
+ "Topic `continue_2_input` is not found in previous query. Use force mode to ignore this issue")
+
+ # 2. Forced mode. Expect to run.
+ client.modify_query(query_id, "continue-from-offsets-query", sql2,
+ type=fq.QueryContent.QueryType.STREAMING,
+ state_load_mode=fq.StateLoadMode.EMPTY,
+ streaming_disposition=StreamingDisposition.from_last_checkpoint(True))
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ transient_issues_deadline = time.time() + yatest_common.plain_or_under_sanitizer(60, 300)
+ msg = "Topic `continue_2_input` is not found in previous query. Query will use fresh offsets for its partitions"
+ while True:
+ if assert_has_issues(msg, True, transient_issues_deadline):
+ break
+ else:
+ time.sleep(0.3)
+ kikimr.compute_plane.wait_completed_checkpoints(query_id,
+ kikimr.compute_plane.get_completed_checkpoints(query_id) + 1)
+
+ restored_metric = kikimr.compute_plane.get_checkpoint_coordinator_metric(query_id,
+ "RestoredStreamingOffsetsFromCheckpoint")
+ assert restored_metric == 1
+
+ data_3 = ["5", "6"]
+ write_stream(input_topic_2, data_3, partition_key=partition_key)
+
+ assert sorted(read_stream(output_topic_2, 4, consumer_name=consumer_2)) == ["3", "4", "5", "6"]
+
+ kikimr.compute_plane.wait_completed_checkpoints(query_id,
+ kikimr.compute_plane.get_completed_checkpoints(query_id) + 1)
+
+ assert_has_saved_checkpoints()
+
+ # Check that after node failure the query will restore from its usual checkpoint
+ kikimr.compute_plane.kikimr_cluster.nodes[1].stop()
+ kikimr.compute_plane.kikimr_cluster.nodes[1].start()
+ kikimr.compute_plane.wait_bootstrap(1)
+
+ # sleep task lease ttl / 2
+ time.sleep(2.5)
+
+ # Wait while graph is restored from checkpoint
+ n = 100
+ while n:
+ n -= 1
+ restored_metric = kikimr.compute_plane.get_checkpoint_coordinator_metric(query_id,
+ "RestoredFromSavedCheckpoint",
+ expect_counters_exist=False)
+ if restored_metric >= 1:
+ break
+ time.sleep(0.3)
+ assert restored_metric >= 1
+
+ assert_has_saved_checkpoints()
+
+ @yq_v1
+ def test_deny_disposition_from_checkpoint_in_create_query(self, client):
+ client.create_yds_connection(name="yds_create", database_id="FakeDatabaseId")
+
+ self.init_topics("deny_disposition_from_checkpoint_in_create_query")
+
+ sql = Rf'''
+ PRAGMA dq.MaxTasksPerStage="2";
+
+ INSERT INTO yds_create.`{self.output_topic}`
+ SELECT Data FROM yds_create.`{self.input_topic}`;'''
+
+ response = client.create_query(
+ "deny_disposition_from_checkpoint_in_create_query",
+ sql,
+ type=fq.QueryContent.QueryType.STREAMING,
+ streaming_disposition=StreamingDisposition.from_last_checkpoint(),
+ check_issues=False)
+ assert response.issues
+ assert_issues_contain("Streaming disposition \"from_last_checkpoint\" is not allowed in CreateQuery request",
+ response.issues)
+
+ @yq_v1
+ def test_deny_state_load_mode_from_checkpoint_in_modify_query(self, kikimr, client):
+ client.create_yds_connection(name="yds_modify", database_id="FakeDatabaseId")
+
+ self.init_topics("deny_state_load_mode_from_checkpoint_in_modify_query")
+
+ sql = Rf'''
+ PRAGMA dq.MaxTasksPerStage="2";
+
+ INSERT INTO yds_modify.`{self.output_topic}`
+ SELECT Data FROM yds_modify.`{self.input_topic}`;'''
+
+ response = client.create_query(
+ "deny_state_load_mode_from_checkpoint_in_modify_query",
+ sql,
+ type=fq.QueryContent.QueryType.STREAMING)
+
+ query_id = response.result.query_id
+
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.compute_plane.wait_zero_checkpoint(query_id)
+
+ client.abort_query(query_id)
+ client.wait_query(query_id)
+
+ response = client.modify_query(query_id, "deny_state_load_mode_from_checkpoint_in_modify_query", sql,
+ type=fq.QueryContent.QueryType.STREAMING,
+ state_load_mode=fq.StateLoadMode.FROM_LAST_CHECKPOINT,
+ streaming_disposition=StreamingDisposition.from_last_checkpoint(),
+ check_issues=False)
+
+ assert response.issues
+ assert_issues_contain("State load mode \"FROM_LAST_CHECKPOINT\" is not supported", response.issues)
diff --git a/ydb/tests/fq/yds/test_cpu_quota.py b/ydb/tests/fq/yds/test_cpu_quota.py
new file mode 100644
index 0000000000..60a133cebf
--- /dev/null
+++ b/ydb/tests/fq/yds/test_cpu_quota.py
@@ -0,0 +1,76 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import logging
+import time
+
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+
+class TestCpuQuota(TestYdsBase):
+ @yq_v1
+ def test_cpu_quota(self, kikimr, client):
+ self.init_topics("cpu_quota")
+
+ sql = R'''
+ PRAGMA dq.MaxTasksPerStage="2";
+
+ INSERT INTO yds.`{output_topic}`
+ SELECT Unwrap(ListConcat(ListReplicate(Data, 100000))) AS Data
+ FROM yds.`{input_topic}`;''' \
+ .format(
+ input_topic=self.input_topic,
+ output_topic=self.output_topic,
+ )
+
+ client.create_yds_connection(name="yds", database_id="FakeDatabaseId")
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING,
+ vcpu_time_limit=1).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.compute_plane.wait_zero_checkpoint(query_id)
+
+ data = [
+ 'ABC',
+ 'DEF',
+ 'GHI',
+ ]
+ self.write_stream(data)
+ self.read_stream(len(data))
+
+ # One more time after pause
+ time.sleep(0.5)
+ self.write_stream(data)
+ self.read_stream(len(data))
+
+ time.sleep(0.5)
+ sensors = kikimr.compute_plane.get_sensors(1, "dq_tasks")
+
+ def calc_histogram_counter(sensor):
+ s = 0
+ for i in sensor["hist"]["buckets"]:
+ s += i
+ s += sensor["hist"]["inf"]
+ return s
+
+ get_quota_latency = 0
+ quota_wait_delay = 0
+
+ for sensor in sensors.data:
+ labels = sensor["labels"]
+ if labels.get("operation") != query_id:
+ continue
+
+ if labels.get("sensor") == "CpuTimeGetQuotaLatencyMs":
+ get_quota_latency += calc_histogram_counter(sensor)
+ if labels.get("sensor") == "CpuTimeQuotaWaitDelayMs":
+ quota_wait_delay += calc_histogram_counter(sensor)
+
+ assert get_quota_latency > 0
+ assert quota_wait_delay > 0
+
+ client.abort_query(query_id)
+ client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)
+
+ logging.debug("Sensors: {}".format(sensors))
diff --git a/ydb/tests/fq/yds/test_delete_read_rules_after_abort_by_system.py b/ydb/tests/fq/yds/test_delete_read_rules_after_abort_by_system.py
new file mode 100644
index 0000000000..e37551216d
--- /dev/null
+++ b/ydb/tests/fq/yds/test_delete_read_rules_after_abort_by_system.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import os
+
+from test_base import TestBaseWithAbortingConfigParams
+
+from ydb.tests.tools.datastreams_helpers.control_plane import list_read_rules
+from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+
+class TestDeleteReadRulesAfterAbortBySystem(TestBaseWithAbortingConfigParams):
+ @yq_v1
+ def test_delete_read_rules_after_abort_by_system(self):
+ topic_name = "read_rules_leaking"
+ conn = "yds_0"
+ self.init_topics(topic_name)
+
+ sql = R'''
+ PRAGMA dq.MaxTasksPerStage="5";
+
+ INSERT INTO {conn}.`{output_topic}`
+ SELECT * FROM {conn}.`{input_topic}`;'''\
+ .format(
+ input_topic=self.input_topic,
+ output_topic=self.output_topic,
+ conn=conn
+ )
+
+ client = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr)
+
+ client.create_yds_connection(conn, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ read_rules = list_read_rules(self.input_topic)
+ assert len(read_rules) == 1, read_rules
+
+ for _ in range(5):
+ self.streaming_over_kikimr.compute_plane.stop()
+ self.streaming_over_kikimr.compute_plane.start()
+ get_task_count = self.streaming_over_kikimr.control_plane.get_request_count(1, "GetTask")
+ while get_task_count == self.streaming_over_kikimr.control_plane.get_request_count(1, "GetTask"):
+ pass
+ if client.describe_query(query_id).result.query.meta.status == fq.QueryMeta.ABORTED_BY_SYSTEM:
+ break
+ else:
+ assert False, "Query was NOT aborted"
+ # client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_SYSTEM)
+
+ read_rules = list_read_rules(self.input_topic)
+ assert len(read_rules) == 0, read_rules
diff --git a/ydb/tests/fq/yds/test_eval.py b/ydb/tests/fq/yds/test_eval.py
new file mode 100644
index 0000000000..56e6de416c
--- /dev/null
+++ b/ydb/tests/fq/yds/test_eval.py
@@ -0,0 +1,26 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+import ydb.public.api.protos.ydb_value_pb2 as ydb
+
+
+class TestEval(object):
+ @yq_v1
+ def test_eval_2_2(self, client):
+ sql = "SELECT EvaluateExpr(2+2) AS C1;"
+ query_id = client.create_query("simple1", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
+ data = client.get_result_data(query_id)
+
+ result_set = data.result.result_set
+ logging.debug(str(result_set))
+ assert len(result_set.columns) == 1
+ assert result_set.columns[0].name == "C1"
+ assert result_set.columns[0].type.type_id == ydb.Type.INT32
+ assert len(result_set.rows) == 1
+ assert result_set.rows[0].items[0].int32_value == 4
diff --git a/ydb/tests/fq/yds/test_mem_alloc.py b/ydb/tests/fq/yds/test_mem_alloc.py
new file mode 100644
index 0000000000..8a7deeea43
--- /dev/null
+++ b/ydb/tests/fq/yds/test_mem_alloc.py
@@ -0,0 +1,84 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import json
+import logging
+import os
+import pytest
+import time
+
+import ydb.tests.library.common.yatest_common as yatest_common
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+
+from ydb.tests.tools.datastreams_helpers.control_plane import create_stream
+from ydb.tests.tools.datastreams_helpers.data_plane import write_stream
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+
+class TestMemAlloc(TestYdsBase):
+ @yq_v1
+ def test_join_alloc(self, kikimr, client):
+ pytest.skip("This test is not ready yet")
+
+ self.init_topics("select_join_alloc", create_output=False)
+ create_stream("joined_topic")
+
+ sql = R'''
+ SELECT S1.Data as Data1, S2.Data as Data2, ListReplicate(S2.Data, 1000000) as Data20
+ FROM myyds.`{input_topic}` AS S1
+ INNER JOIN (SELECT * FROM myyds.`{joined_topic}` LIMIT 2) AS S2
+ ON S1.Data = S2.Data
+ LIMIT 2
+ ''' \
+ .format(
+ input_topic=self.input_topic,
+ joined_topic="joined_topic"
+ )
+
+ client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+
+ time.sleep(2) # Workaround race between write and read "from now". Remove when YQ-589 will be done.
+ messages = ["A", "B", "C"]
+ self.write_stream(messages)
+ write_stream("joined_topic", messages)
+
+ client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
+
+ logging.debug(client.get_result_data(query_id))
+ st = json.loads(client.describe_query(query_id).result.query.statistics.json)
+ logging.debug(st["Graph=1"]["TaskRunner"]["Stage=Total"]["MkqlMaxMemoryUsage"]["count"])
+
+ @yq_v1
+ def test_hop_alloc(self, kikimr, client):
+ pytest.skip("This test is not ready yet")
+
+ self.init_topics("select_hop_alloc", create_output=False)
+
+ sql = R'''
+ --SELECT COUNT(*)
+ --FROM myyds.`{input_topic}`
+ --GROUP BY HOP(Just(CurrentUtcTimestamp()), "PT10S", "PT10S", "PT10S"), Data
+ --LIMIT 1
+ SELECT 1
+ ''' \
+ .format(
+ input_topic=self.input_topic,
+ )
+
+ client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+
+ time.sleep(2) # Workaround race between write and read "from now". Remove when YQ-589 will be done.
+ for i in range(1):
+ self.write_stream([format(i, "0x")])
+ time.sleep(yatest_common.plain_or_under_sanitizer(15, 60))
+
+ client.abort_query(query_id)
+ client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)
+
+ logging.debug(client.get_result_data(query_id))
+ st = json.loads(client.describe_query(query_id).result.query.statistics.json)
+ logging.debug(st["Graph=1"]["TaskRunner"]["Stage=Total"]["MkqlMaxMemoryUsage"]["count"])
diff --git a/ydb/tests/fq/yds/test_metrics_cleanup.py b/ydb/tests/fq/yds/test_metrics_cleanup.py
new file mode 100644
index 0000000000..2858dc834c
--- /dev/null
+++ b/ydb/tests/fq/yds/test_metrics_cleanup.py
@@ -0,0 +1,55 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import os
+import time
+
+import ydb.tests.library.common.yatest_common as yatest_common
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+
+class TestCleanup(TestYdsBase):
+ @yq_v1
+ def test_cleanup(self, kikimr, client):
+ sql = "SELECT 1;"
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
+
+ assert kikimr.compute_plane.get_task_count(1, query_id) == 0
+ deadline = time.time() + yatest_common.plain_or_under_sanitizer(120, 500)
+ while True:
+ value = kikimr.compute_plane.get_sensors(1, "yq").find_sensor(
+ {"query_id": query_id, "subsystem": "task_controller", "Stage": "Total", "sensor": "TaskCount"})
+ if value is None:
+ break
+ assert time.time() < deadline, "TaskCount was not cleaned"
+ time.sleep(0.5)
+
+ @yq_v1
+ def test_keep(self, kikimr, client):
+ self.init_topics("cleanup_keep", create_output=False)
+ client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+
+ query_id = client.create_query("simple", "SELECT 1;", type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
+
+ sql = R'''
+ SELECT * FROM myyds.`{input_topic}`;
+ ''' \
+ .format(
+ input_topic=self.input_topic,
+ )
+ client.modify_query(query_id, "simple", sql, type=fq.QueryContent.QueryType.STREAMING)
+
+ deadline = time.time() + 90 # x1.5 of 60 sec
+ while True:
+ value = kikimr.compute_plane.get_sensors(1, "yq").find_sensor(
+ {"query_id": query_id, "subsystem": "task_controller", "Stage": "Total", "sensor": "TaskCount"})
+ assert value is not None, "TaskCount was cleaned"
+ if time.time() > deadline:
+ break
+ else:
+ time.sleep(0.5)
diff --git a/ydb/tests/fq/yds/test_pq_read_write.py b/ydb/tests/fq/yds/test_pq_read_write.py
new file mode 100644
index 0000000000..1ee91ca138
--- /dev/null
+++ b/ydb/tests/fq/yds/test_pq_read_write.py
@@ -0,0 +1,136 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+
+from ydb.tests.tools.datastreams_helpers.control_plane import list_read_rules
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+YDS_CONNECTION = "yds"
+
+
+def start_yds_query(kikimr, client, sql) -> str:
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.compute_plane.wait_zero_checkpoint(query_id)
+ return query_id
+
+
+def stop_yds_query(client, query_id):
+ client.abort_query(query_id)
+ client.wait_query(query_id)
+
+
+class TestPqReadWrite(TestYdsBase):
+ @yq_v1
+ def test_pq_read_write(self, kikimr, client):
+ client.create_yds_connection(name=YDS_CONNECTION, database_id="FakeDatabaseId")
+ self.init_topics("pq_test_pq_read_write")
+ sql = Rf'''
+ PRAGMA dq.MaxTasksPerStage="2";
+
+ INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
+ SELECT STREAM
+ Yson::SerializeText(Yson::From(TableRow()))
+ FROM (
+ SELECT STREAM
+ k,
+ Sum(v) as sum
+ FROM (
+ SELECT STREAM
+ Yson::LookupUint64(ys, "time") as t,
+ Yson::LookupInt64(ys, "key") as k,
+ Yson::LookupInt64(ys, "val") as v
+ FROM (
+ SELECT STREAM
+ Yson::Parse(Data) AS ys
+ FROM {YDS_CONNECTION}.`{self.input_topic}`))
+ GROUP BY
+ k,
+ HOP(DateTime::FromMilliseconds(CAST(Unwrap(t) as Uint32)), "PT0.005S", "PT0.01S", "PT0.01S"));'''
+
+ query_id = start_yds_query(kikimr, client, sql)
+
+ # 100 105 110 115 120 125 130 135 140 (ms)
+ # [ Bucket1 )<--Delay1-->
+ # [ Bucket2 )<--Delay2-->
+ # [ Bucket3 )<--Delay3-->
+ # [ Bucket4 )<--Delay4-->
+ # [ Bucket5 )<--Delay5-->
+ data = [
+ '{"time" = 105; "key" = 1; "val" = 1;}', # Group 1 Bucket 1, 2
+ '{"time" = 107; "key" = 1; "val" = 4;}', # Group 1 Bucket 1, 2
+ '{"time" = 106; "key" = 2; "val" = 3;}', # Group 2 Bucket 1, 2
+ '{"time" = 111; "key" = 1; "val" = 7;}', # Group 1 Bucket 2, 3
+ '{"time" = 117; "key" = 1; "val" = 3;}', # Group 1 Bucket 3, 4
+ '{"time" = 110; "key" = 2; "val" = 2;}', # Group 2 Bucket 2, 3
+ '{"time" = 108; "key" = 1; "val" = 9;}', # Group 1 Bucket 1, 2 (delayed)
+ '{"time" = 121; "key" = 1; "val" = 4;}', # Group 1 Bucket 4, 5 (close bucket 1)
+ '{"time" = 107; "key" = 2; "val" = 2;}', # Group 2 Bucket 1, 2 (delayed)
+ '{"time" = 141; "key" = 2; "val" = 5;}', # Group 2 Close all buckets
+ '{"time" = 141; "key" = 1; "val" = 10;}', # Group 1 Close all buckets
+ ]
+
+ self.write_stream(data)
+
+ expected = [
+ '{"k" = 1; "sum" = 14}', # Group 1 Bucket 1
+ '{"k" = 2; "sum" = 3}', # Group 2 Bucket 1
+ '{"k" = 2; "sum" = 7}', # Group 2 Bucket 2
+ '{"k" = 2; "sum" = 2}', # Group 2 Bucket 3
+ '{"k" = 1; "sum" = 21}', # Group 1 Bucket 2
+ '{"k" = 1; "sum" = 10}', # Group 1 Bucket 3
+ '{"k" = 1; "sum" = 7}', # Group 1 Bucket 4
+ '{"k" = 1; "sum" = 4}', # Group 1 Bucket 5
+ ]
+
+ assert self.read_stream(len(expected)) == expected
+
+ read_rules = list_read_rules(self.input_topic)
+ assert len(read_rules) == 1, read_rules
+
+ stop_yds_query(client, query_id)
+
+ # Assert that all read rules were removed after query stops
+ read_rules = list_read_rules(self.input_topic)
+ assert len(read_rules) == 0, read_rules
+
+ @yq_v1
+ def test_pq_read_schema_metadata(self, kikimr, client):
+ client.create_yds_connection(name=YDS_CONNECTION, database_id="FakeDatabaseId")
+ self.init_topics("pq_test_pq_read_schema_metadata")
+ sql = Rf'''
+ PRAGMA dq.MaxTasksPerStage="2";
+
+ INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
+ SELECT UNWRAP(Yson::SerializeJson(Yson::From(TableRow())))
+ FROM (
+ SELECT field1, field2, SystemMetadata("offset") as field3
+ FROM {YDS_CONNECTION}.`{self.input_topic}`
+ WITH (
+ format=json_each_row,
+ SCHEMA (
+ field1 String,
+ field2 Int32
+ )
+ )
+ )'''
+
+ query_id = start_yds_query(kikimr, client, sql)
+
+ data1 = [
+ '{"field1": "value1", "field2": 105}',
+ '{"field1": "value2", "field2": 106}',
+ ]
+ self.write_stream(data1)
+
+ expected = [
+ '{"field1":"value1","field2":105,"field3":0}',
+ '{"field1":"value2","field2":106,"field3":1}'
+ ]
+
+ assert self.read_stream(len(expected)) == expected
+
+ stop_yds_query(client, query_id)
diff --git a/ydb/tests/fq/yds/test_public_metrics.py b/ydb/tests/fq/yds/test_public_metrics.py
new file mode 100644
index 0000000000..b4edbda818
--- /dev/null
+++ b/ydb/tests/fq/yds/test_public_metrics.py
@@ -0,0 +1,98 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import os
+import pytest
+import time
+
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+
+class TestPublicMetrics(TestYdsBase):
+ @yq_v1
+ def test_select_limit(self, kikimr, client):
+ self.init_topics("public_select_limit", create_output=False)
+
+ sql = R'''
+ SELECT * FROM myyds1.`{input_topic}` LIMIT 2;
+ '''.format(
+ input_topic=self.input_topic,
+ )
+
+ cloud_id = "mock_cloud"
+ folder_id = "my_folder"
+
+ client.create_yds_connection("myyds1", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.compute_plane.wait_zero_checkpoint(query_id)
+
+ self.write_stream(["A", "B"])
+ client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
+
+ metrics = kikimr.compute_plane.get_sensors(1, "yq_public")
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.running_tasks"}) == 0
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.cpu_usage_us"}) > 0
+ assert metrics.find_sensor({"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id,
+ "name": "query.memory_usage_bytes"}) > 0
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.input_bytes"}) > 0
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.output_bytes"}) is None
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.uptime_seconds"}) >= 0
+
+ @yq_v1
+ @pytest.mark.parametrize("stats_mode", ["STATS_MODE_FULL"])
+ def test_select_unlimited(self, kikimr, client):
+ self.init_topics("public_select_unlimited")
+
+ sql = R'''
+ INSERT INTO myyds2.`{output_topic}`
+ SELECT * FROM myyds2.`{input_topic}`;
+ '''.format(
+ input_topic=self.input_topic,
+ output_topic=self.output_topic,
+ )
+
+ cloud_id = "mock_cloud"
+ folder_id = "my_folder"
+
+ client.create_yds_connection("myyds2", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.compute_plane.wait_zero_checkpoint(query_id)
+
+ data = ["A", "B", "C", "D"]
+ self.write_stream(data)
+ assert self.read_stream(len(data)) == data
+
+ time.sleep(5) # TODO: fix and remove
+ client.abort_query(query_id)
+ client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)
+
+ metrics = kikimr.compute_plane.get_sensors(1, "yq_public")
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.running_tasks"}) == 0
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.cpu_usage_us"}) > 0
+ assert metrics.find_sensor({"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id,
+ "name": "query.memory_usage_bytes"}) > 0
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.input_bytes"}) > 0
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.output_bytes"}) > 0
+ assert metrics.find_sensor(
+ {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.uptime_seconds"}) >= 0
+ # TODO: rows must be fixed in YQ-2592
+ # assert metrics.find_sensor({"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id,
+ # "name": "query.source_input_records"}) > 0
+ # assert metrics.find_sensor({"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id,
+ # "name": "query.sink_output_records"}) > 0
+ # assert metrics.find_sensor(
+ # {"cloud_id": cloud_id, "folder_id": folder_id, "query_id": query_id, "name": "query.late_events"}) == 0
diff --git a/ydb/tests/fq/yds/test_read_rules_deletion.py b/ydb/tests/fq/yds/test_read_rules_deletion.py
new file mode 100644
index 0000000000..da4678887c
--- /dev/null
+++ b/ydb/tests/fq/yds/test_read_rules_deletion.py
@@ -0,0 +1,70 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import os
+import pytest
+
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+
+from ydb.tests.tools.datastreams_helpers.control_plane import list_read_rules
+import ydb.tests.library.common.yatest_common as yatest_common
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+
+class TestReadRulesDeletion(TestYdsBase):
+ @yq_v1
+ @pytest.mark.parametrize(
+ "with_recovery",
+ [False, True],
+ ids=["simple", "with_recovery"]
+ )
+ def test_delete_read_rules(self, kikimr, client, with_recovery):
+ topic_name = "delete_read_rules_{}".format(1 if with_recovery else 0)
+ conn = "yds_{}".format(1 if with_recovery else 0)
+ self.init_topics(topic_name)
+
+ sql = R'''
+ PRAGMA dq.MaxTasksPerStage="5";
+
+ INSERT INTO {conn}.`{output_topic}`
+ SELECT * FROM {conn}.`{input_topic}`;'''.format(
+ input_topic=self.input_topic,
+ output_topic=self.output_topic,
+ conn=conn
+ )
+
+ client.create_yds_connection(conn, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.compute_plane.wait_zero_checkpoint(query_id)
+
+ data = [
+ '42',
+ '42',
+ '42',
+ '42',
+ '42',
+ '42',
+ '42',
+ ]
+
+ self.write_stream(data)
+
+ self.read_stream(len(data))
+
+ read_rules = list_read_rules(self.input_topic)
+ assert len(read_rules) == 1, read_rules
+
+ if with_recovery:
+ kikimr.compute_plane.kikimr_cluster.nodes[1].stop()
+ kikimr.compute_plane.kikimr_cluster.nodes[1].start()
+ kikimr.compute_plane.wait_bootstrap(1)
+
+ client.abort_query(query_id)
+ client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER,
+ timeout=yatest_common.plain_or_under_sanitizer(60, 300))
+
+ # Assert that all read rules were removed after query stops
+ read_rules = list_read_rules(self.input_topic)
+ assert len(read_rules) == 0, read_rules
diff --git a/ydb/tests/fq/yds/test_restart_query.py b/ydb/tests/fq/yds/test_restart_query.py
new file mode 100644
index 0000000000..c1cdc4edf6
--- /dev/null
+++ b/ydb/tests/fq/yds/test_restart_query.py
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import logging
+import pytest
+import time
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+import ydb.tests.library.common.yatest_common as yatest_common
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+from ydb.tests.tools.datastreams_helpers.control_plane import delete_stream
+
+
+class TestRestartQuery(TestYdsBase):
+ @yq_v1
+ @pytest.mark.parametrize(
+ "query_type",
+ [fq.QueryContent.QueryType.ANALYTICS, fq.QueryContent.QueryType.STREAMING],
+ ids=["analytics", "streaming"]
+ )
+ def test_restart_runtime_errors(self, kikimr_many_retries, client_many_retries, query_type):
+ streaming_query = query_type == fq.QueryContent.QueryType.STREAMING
+ unique_suffix = "_1" if streaming_query else "_2"
+
+ connection_name = "yds" + unique_suffix
+ client_many_retries.create_yds_connection(name=connection_name, database_id="FakeDatabaseId")
+
+ self.init_topics("restart" + unique_suffix)
+
+ sql = Rf'''
+ INSERT INTO {connection_name}.`{self.output_topic}`
+ SELECT Data FROM {connection_name}.`{self.input_topic}`;'''
+
+ query_id = client_many_retries.create_query("restart-query", sql, type=query_type).result.query_id
+ client_many_retries.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ if streaming_query:
+ kikimr_many_retries.compute_plane.wait_zero_checkpoint(query_id)
+ else:
+ # this ugly delay is needed to wait for ca source init until we pass correct read disposition param
+ time.sleep(3)
+
+ # Start work
+ data = ["data"]
+ self.write_stream(["data"])
+ assert self.read_stream(len(data)) == data
+
+ delete_stream(self.output_topic)
+
+ def assert_has_transient_issues(text, deadline):
+ describe_result = client_many_retries.describe_query(query_id).result
+ logging.debug("Describe result: {}".format(describe_result))
+ for issue in describe_result.query.transient_issue:
+ if text in issue.message:
+ return True
+ assert deadline and deadline > time.time(), f'Text "{text}" is expected to be found in transient issues, but was not found. Describe query result:\n{describe_result}'
+ return False
+
+ deadline = time.time() + yatest_common.plain_or_under_sanitizer(60, 300)
+ while not assert_has_transient_issues("SCHEME_ERROR", deadline):
+ time.sleep(0.3)
+
+ describe_result = client_many_retries.describe_query(query_id).result
+ assert describe_result.query.meta.status == fq.QueryMeta.RUNNING
diff --git a/ydb/tests/fq/yds/test_select_1.py b/ydb/tests/fq/yds/test_select_1.py
new file mode 100644
index 0000000000..6c43b72ffa
--- /dev/null
+++ b/ydb/tests/fq/yds/test_select_1.py
@@ -0,0 +1,130 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1, yq_all
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+import ydb.public.api.protos.ydb_value_pb2 as ydb
+
+
+class TestSelect1(object):
+ @yq_all
+ def test_select_1(self, kikimr, client):
+ sql = 'SELECT 1 AS SingleColumn;'
+ query_id = client.create_query("simple1", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
+ describe_result = client.describe_query(query_id).result
+
+ assert len(describe_result.query.plan.json) > 0, "plan must not be empty"
+ assert len(describe_result.query.ast.data) > 0, "ast must not be empty"
+
+ data = client.get_result_data(query_id)
+ result_set = data.result.result_set
+ logging.debug(str(result_set))
+ assert len(result_set.columns) == 1
+ assert result_set.columns[0].name == "SingleColumn"
+ assert result_set.columns[0].type.type_id == ydb.Type.INT32
+ assert len(result_set.rows) == 1
+ assert result_set.rows[0].items[0].int32_value == 1
+ assert sum(kikimr.control_plane.get_metering()) == 10
+
+ @yq_all
+ def test_select_z_x_y(self, client):
+ sql = "select 1 as z, 2 as x, 3 as y;"
+ query_id = client.create_query("simple2", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
+ data = client.get_result_data(query_id)
+
+ result_set = data.result.result_set
+ logging.debug(str(result_set))
+ assert len(result_set.columns) == 3
+ assert result_set.columns[0].name == "z"
+ assert result_set.columns[1].name == "x"
+ assert result_set.columns[2].name == "y"
+ assert len(result_set.rows) == 1
+ assert result_set.rows[0].items[0].int32_value == 1
+ assert result_set.rows[0].items[1].int32_value == 2
+ assert result_set.rows[0].items[2].int32_value == 3
+
+ @yq_v1
+ def test_unwrap_null(self, client):
+ sql = "select unwrap(1/0);"
+ query_id = client.create_query("simple3", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.FAILED)
+ describe_result = client.describe_query(query_id).result
+ assert "Failed to unwrap empty optional" in describe_result.query.issue[0].issues[0].message
+
+ @yq_all
+ def test_select_pg(self, client):
+ sql = R'''select ARRAY[ARRAY[1,2,3]], null, 'null', 1, true, null::int4'''
+
+ query_id = client.create_query("simple4", sql, type=fq.QueryContent.QueryType.ANALYTICS,
+ pg_syntax=True).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
+ data = client.get_result_data(query_id)
+
+ result_set = data.result.result_set
+ logging.debug(str(result_set))
+ assert len(result_set.columns) == 6
+ assert result_set.columns[0].name == "column0"
+ assert result_set.columns[0].type.pg_type.oid == 1007
+ assert result_set.columns[0].type.pg_type.typlen == -1
+ assert result_set.columns[0].type.pg_type.typmod == -1
+ assert result_set.columns[1].name == "column1"
+ assert result_set.columns[1].type.null_type is not None
+ assert result_set.columns[2].name == "column2"
+ assert result_set.columns[2].type.pg_type.oid == 25
+ assert result_set.columns[2].type.pg_type.typlen == -1
+ assert result_set.columns[2].type.pg_type.typmod == -1
+ assert result_set.columns[3].name == "column3"
+ assert result_set.columns[3].type.pg_type.oid == 23
+ assert result_set.columns[3].type.pg_type.typlen == 4
+ assert result_set.columns[3].type.pg_type.typmod == -1
+ assert result_set.columns[4].name == "column4"
+ assert result_set.columns[4].type.pg_type.oid == 16
+ assert result_set.columns[4].type.pg_type.typlen == 1
+ assert result_set.columns[4].type.pg_type.typmod == -1
+ assert result_set.columns[5].name == "column5"
+ assert result_set.columns[5].type.pg_type.oid == 23
+ assert result_set.columns[5].type.pg_type.typlen == 4
+ assert result_set.columns[5].type.pg_type.typmod == -1
+
+ assert len(result_set.rows) == 1
+ assert result_set.rows[0].items[0].text_value == "{{1,2,3}}"
+ assert result_set.rows[0].items[1].WhichOneof("value") is None
+ assert result_set.rows[0].items[2].text_value == "null"
+ assert result_set.rows[0].items[3].text_value == "1"
+ assert result_set.rows[0].items[4].text_value == "t"
+ assert result_set.rows[0].items[5].WhichOneof("value") is None
+
+ @yq_all
+ def test_select_10_p_19_plus_1(self, client):
+ sql = "SELECT 10000000000000000000+1 AS LargeColumn;"
+ query_id = client.create_query("simple1", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
+ describe_string = str(client.describe_query(query_id).result)
+ assert "Integral type implicit bitcast: Uint64 and Int32" in describe_string, describe_string
+
+ @yq_all
+ def test_compile_error(self, client, yq_version):
+ sql = "SUPERSELECT;"
+ query_id = client.create_query("simple1", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.FAILED)
+ describe_string = str(client.describe_query(query_id).result)
+ assert "Query failed with code " + ("ABORTED" if yq_version == "v1" else "GENERIC_ERROR") in describe_string, describe_string
+ assert "Unexpected token" in describe_string, describe_string
+ # Failed to parse query is added in YQv1 only
+ if yq_version == "v1":
+ assert "Failed to parse query" in describe_string, describe_string
+
+ @yq_all
+ def test_ast_in_failed_query(self, client):
+ sql = "SELECT unwrap(1 / 0)"
+
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.FAILED)
+
+ ast = str(client.describe_query(query_id).result.query.ast)
+ assert ast != "", "Query ast not found"
diff --git a/ydb/tests/fq/yds/test_select_limit.py b/ydb/tests/fq/yds/test_select_limit.py
new file mode 100644
index 0000000000..ce5dd8ffe6
--- /dev/null
+++ b/ydb/tests/fq/yds/test_select_limit.py
@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+import os
+
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+from ydb.tests.tools.datastreams_helpers.control_plane import list_read_rules
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+
+class TestSelectLimit(TestYdsBase):
+ @yq_v1
+ def test_select_limit(self, kikimr, client):
+ self.init_topics("select_limit", create_output=False)
+
+ sql = R'''
+ SELECT * FROM myyds.`{input_topic}` LIMIT 2;
+ '''.format(
+ input_topic=self.input_topic,
+ )
+
+ client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.compute_plane.wait_zero_checkpoint(query_id)
+
+ messages = [b'A', b'B', b'C']
+ self.write_stream(messages)
+
+ client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
+
+ data = client.get_result_data(query_id)
+
+ result_set = data.result.result_set
+ logging.debug(str(result_set))
+ assert len(result_set.rows) == 2
+ assert len(result_set.columns) == 1
+ assert result_set.rows[0].items[0].bytes_value == messages[0]
+ assert result_set.rows[1].items[0].bytes_value == messages[1]
+
+ # Assert that all read rules were removed after query stops
+ read_rules = list_read_rules(self.input_topic)
+ assert len(read_rules) == 0, read_rules
diff --git a/ydb/tests/fq/yds/test_select_limit_db_id.py b/ydb/tests/fq/yds/test_select_limit_db_id.py
new file mode 100644
index 0000000000..d8b75b7577
--- /dev/null
+++ b/ydb/tests/fq/yds/test_select_limit_db_id.py
@@ -0,0 +1,52 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+import pytest
+
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+
+class TestSelectLimitWithDbId(TestYdsBase):
+ @yq_v1
+ def test_select_same_with_id(self, kikimr, client):
+ pytest.skip("Skip until streaming disposition is implemented YQ-589")
+
+ self.init_topics("select_same_with_id", create_output=False)
+
+ sql = R'''
+ SELECT * FROM yds1.`{input_topic}` LIMIT 2;
+ SELECT * FROM yds1.`{input_topic}` LIMIT 2;
+ '''.format(
+ input_topic=self.input_topic,
+ )
+
+ client.create_yds_connection(name="yds1", database_id="FakeDatabaseId")
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.wait_zero_checkpoint(query_id)
+
+ messages = ["A", "B", "C", "D", "E"]
+ self.write_stream(messages)
+
+ client.wait_query(query_id, 30)
+
+ # by default selects use different consumers, so they will read the same
+ # messages - first 2 of them
+
+ rs = client.get_result_data(query_id, result_set_index=0).result.result_set
+ logging.debug(str(rs))
+ assert len(rs.rows) == 2
+ assert len(rs.columns) == 1
+ assert rs.rows[0].items[0].bytes_value == messages[0]
+ assert rs.rows[1].items[0].bytes_value == messages[1]
+
+ rs = client.get_result_data(query_id, result_set_index=1).result.result_set
+ logging.debug(str(rs))
+ assert len(rs.rows) == 2
+ assert len(rs.columns) == 1
+ assert rs.rows[0].items[0].bytes_value == messages[0]
+ assert rs.rows[1].items[0].bytes_value == messages[1]
diff --git a/ydb/tests/fq/yds/test_select_timings.py b/ydb/tests/fq/yds/test_select_timings.py
new file mode 100644
index 0000000000..2f62fdb639
--- /dev/null
+++ b/ydb/tests/fq/yds/test_select_timings.py
@@ -0,0 +1,72 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import pytest
+import os
+import time
+
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+import ydb.tests.library.common.yatest_common as yatest_common
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+
+class TestSelectTimings(TestYdsBase):
+ @yq_v1
+ @pytest.mark.parametrize(
+ "success",
+ [True, False],
+ ids=["finished", "aborted"]
+ )
+ @pytest.mark.parametrize(
+ "query_type",
+ [fq.QueryContent.QueryType.ANALYTICS, fq.QueryContent.QueryType.STREAMING],
+ ids=["analytics", "streaming"]
+ )
+ def test_select_timings(self, kikimr, client, success, query_type):
+ suffix = (str(query_type)[0] + str(success)[0]).lower()
+ self.init_topics("select_timings_" + suffix, create_output=False)
+ connection = "myyds_" + suffix
+
+ # TBD auto-create consumer and read_rule in analytics query
+ sql = "SELECT * FROM {connection}.`{input_topic}` LIMIT 1;".format(
+ connection=connection,
+ input_topic=self.input_topic,
+ )
+
+ client.create_yds_connection(connection, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("simple", sql, type=query_type).result.query_id
+
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+
+ meta = client.describe_query(query_id).result.query.meta
+ assert meta.started_at.seconds != 0, "Must set started_at"
+ assert meta.finished_at.seconds == 0, "Must not set finished_at"
+ seconds = meta.started_at.seconds
+
+ # streaming query should continue after cluster restart
+ if query_type == fq.QueryContent.QueryType.STREAMING:
+ kikimr.compute_plane.kikimr_cluster.nodes[1].stop()
+ kikimr.compute_plane.kikimr_cluster.nodes[1].start()
+ kikimr.compute_plane.wait_bootstrap(1)
+
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.compute_plane.wait_zero_checkpoint(query_id, expect_counters_exist=False)
+
+ meta = client.describe_query(query_id).result.query.meta
+ assert meta.started_at.seconds == seconds, "Must not change started_at"
+ assert meta.finished_at.seconds == 0, "Must not set finished_at"
+
+ # will wait 30 sec for lease expiration, reduce timeout when is congifurable
+ wait_query_timeout = yatest_common.plain_or_under_sanitizer(120, 300)
+ if success:
+ time.sleep(2) # Workaround race between write and read "from now". Remove when YQ-589 will be done.
+ self.write_stream(["Message"])
+ client.wait_query_status(query_id, fq.QueryMeta.COMPLETED, timeout=wait_query_timeout)
+ else:
+ client.abort_query(query_id)
+ client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER, timeout=wait_query_timeout)
+
+ meta = client.describe_query(query_id).result.query.meta
+ assert meta.started_at.seconds == seconds, "Must not change started_at (meta={})".format(meta)
+ assert meta.finished_at.seconds >= seconds, "Must set finished_at (meta={})".format(meta)
diff --git a/ydb/tests/fq/yds/test_stop.py b/ydb/tests/fq/yds/test_stop.py
new file mode 100644
index 0000000000..b9074f6dd1
--- /dev/null
+++ b/ydb/tests/fq/yds/test_stop.py
@@ -0,0 +1,93 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+import os
+import pytest
+import time
+
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+
+import ydb.tests.library.common.yatest_common as yatest_common
+from ydb.tests.tools.datastreams_helpers.control_plane import create_read_rule, list_read_rules
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+
+class TestStop(TestYdsBase):
+ @yq_v1
+ @pytest.mark.parametrize(
+ "query_type",
+ [fq.QueryContent.QueryType.ANALYTICS, fq.QueryContent.QueryType.STREAMING],
+ ids=["analytics", "streaming"]
+ )
+ def test_stop_query(self, kikimr, client, query_type):
+ self.init_topics("select_stop_" + str(query_type), create_output=False)
+
+ connection = "myyds_" + str(query_type)
+ # TBD auto-create consumer and read_rule in analytics query
+ if type == fq.QueryContent.QueryType.STREAMING:
+ sql = R'''
+ SELECT * FROM {connection}.`{input_topic}` LIMIT 3;
+ '''.format(
+ connection=connection,
+ input_topic=self.input_topic,
+ )
+ else:
+ create_read_rule(self.input_topic, self.consumer_name)
+ sql = R'''
+ PRAGMA pq.Consumer="{consumer_name}";
+ SELECT * FROM {connection}.`{input_topic}` LIMIT 3;
+ '''.format(
+ consumer_name=self.consumer_name,
+ connection=connection,
+ input_topic=self.input_topic,
+ )
+
+ assert self.wait_until((lambda: kikimr.compute_plane.get_actor_count(1, "YQ_PINGER") == 0))
+
+ client.create_yds_connection(connection, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("simple", sql, type=query_type).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+
+ assert self.wait_until((lambda: kikimr.compute_plane.get_actor_count(1, "YQ_PINGER") == 1))
+
+ time.sleep(2) # Workaround race between write and read "from now". Remove when YQ-589 will be done.
+ messages = ["A", "B"]
+ self.write_stream(messages)
+
+ deadline = time.time() + yatest_common.plain_or_under_sanitizer(30, 150)
+ while True:
+ if time.time() > deadline:
+ break
+ describe_response = client.describe_query(query_id)
+ status = describe_response.result.query.meta.status
+ assert status in [fq.QueryMeta.STARTING, fq.QueryMeta.RUNNING], fq.QueryMeta.ComputeStatus.Name(status)
+
+ client.abort_query(query_id)
+
+ client.wait_query(query_id)
+
+ # TODO: remove this if
+ if type == fq.QueryContent.QueryType.STREAMING:
+ # Assert that all read rules were removed after query stops
+ read_rules = list_read_rules(self.input_topic)
+ assert len(read_rules) == 0, read_rules
+
+ describe_response = client.describe_query(query_id)
+ status = describe_response.result.query.meta.status
+ assert status == fq.QueryMeta.ABORTED_BY_USER, fq.QueryMeta.ComputeStatus.Name(status)
+
+ assert self.wait_until((lambda: kikimr.compute_plane.get_actor_count(1, "YQ_PINGER") == 0))
+
+ # TBD implement and check analytics partial results as well
+ if type == fq.QueryContent.QueryType.STREAMING:
+ data = client.get_result_data(query_id)
+
+ result_set = data.result.result_set
+ logging.debug(str(result_set))
+ assert len(result_set.rows) == 2
+ assert len(result_set.columns) == 1
+ assert result_set.rows[0].items[0].bytes_value == messages[0]
+ assert result_set.rows[1].items[0].bytes_value == messages[1]
diff --git a/ydb/tests/fq/yds/test_watermarks.py b/ydb/tests/fq/yds/test_watermarks.py
new file mode 100644
index 0000000000..e9488c2075
--- /dev/null
+++ b/ydb/tests/fq/yds/test_watermarks.py
@@ -0,0 +1,124 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import time
+
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+from ydb.tests.tools.fq_runner.fq_client import StreamingDisposition
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+
+def start_yds_query(kikimr, client, sql, streaming_disposition) -> str:
+ query_id = client.create_query("simple", sql, streaming_disposition=streaming_disposition,
+ type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.compute_plane.wait_zero_checkpoint(query_id)
+ return query_id
+
+
+def stop_yds_query(client, query_id):
+ client.abort_query(query_id)
+ client.wait_query(query_id)
+
+
+class TestWatermarks(TestYdsBase):
+ @yq_v1
+ def test_pq_watermarks(self, kikimr, client):
+ client.create_yds_connection(name="yds", database_id="FakeDatabaseId")
+ self.init_topics("pq_test_pq_watermarks")
+
+ sql = Rf'''
+ PRAGMA dq.WatermarksMode="default";
+ PRAGMA dq.WatermarksGranularityMs="500";
+ PRAGMA dq.WatermarksLateArrivalDelayMs="100";
+
+ INSERT INTO yds.`{self.output_topic}`
+ SELECT STREAM
+ Yson::SerializeText(Yson::From(TableRow()))
+ FROM (
+ SELECT AGGREGATE_LIST(field1) as data
+ FROM (SELECT field1
+ FROM yds.`{self.input_topic}`
+ WITH (
+ format=json_each_row,
+ schema=(
+ field1 String,
+ field2 String
+ )
+ )
+ WHERE field2 == "abc1"
+ )
+ GROUP BY HoppingWindow("PT0.5S", "PT1S")
+ );'''
+
+ query_id = start_yds_query(kikimr, client, sql, streaming_disposition=StreamingDisposition.fresh())
+
+ data1 = [
+ '{"field1": "row1", "field2": "abc1"}',
+ '{"field1": "row2", "field2": "abc2"}',
+ ]
+ self.write_stream(data1)
+
+ time.sleep(2) # wait for the write time in lb is moved forward
+
+ data2 = [
+ '{"field1": "row3", "field2": "abc3"}',
+ ]
+ self.write_stream(data2)
+
+ expected = [
+ '{"data" = ["row1"]}',
+ '{"data" = ["row1"]}'
+ ]
+
+ assert self.read_stream(len(expected)) == expected
+
+ stop_yds_query(client, query_id)
+
+ @yq_v1
+ def test_idle_watermarks(self, kikimr, client):
+ client.create_yds_connection(name="yds", database_id="FakeDatabaseId")
+ self.init_topics("pq_test_idle_watermarks")
+
+ sql = Rf'''
+ PRAGMA dq.WatermarksMode="default";
+ PRAGMA dq.WatermarksGranularityMs="500";
+ PRAGMA dq.WatermarksLateArrivalDelayMs="2000";
+ PRAGMA dq.WatermarksEnableIdlePartitions="true";
+
+ INSERT INTO yds.`{self.output_topic}`
+ SELECT STREAM
+ Yson::SerializeText(Yson::From(TableRow()))
+ FROM (
+ SELECT AGGREGATE_LIST(field1) as data
+ FROM (SELECT field1
+ FROM yds.`{self.input_topic}`
+ WITH (
+ format=json_each_row,
+ schema=(
+ field1 String,
+ field2 String
+ )
+ )
+ WHERE field2 == "abc1"
+ )
+ GROUP BY HoppingWindow("PT0.5S", "PT1S")
+ );'''
+
+ query_id = start_yds_query(kikimr, client, sql, streaming_disposition=StreamingDisposition.fresh())
+
+ data1 = [
+ '{"field1": "row1", "field2": "abc1"}',
+ ]
+ self.write_stream(data1)
+
+ expected = [
+ '{"data" = ["row1"]}',
+ '{"data" = ["row1"]}'
+ ]
+
+ assert self.read_stream(len(expected)) == expected
+
+ stop_yds_query(client, query_id)
diff --git a/ydb/tests/fq/yds/test_yds_bindings.py b/ydb/tests/fq/yds/test_yds_bindings.py
new file mode 100644
index 0000000000..4b64cd6482
--- /dev/null
+++ b/ydb/tests/fq/yds/test_yds_bindings.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import os
+import pytest
+
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+
+import ydb.public.api.protos.ydb_value_pb2 as ydb
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+
+class TestBindings(TestYdsBase):
+ @yq_v1
+ @pytest.mark.skip(reason="Is not implemented in YDS yet")
+ def test_yds_insert(self, client):
+ self.init_topics("yds_insert")
+
+ connection_id = client.create_yds_connection("ydsconnection", os.getenv("YDB_DATABASE"),
+ os.getenv("YDB_ENDPOINT")).result.connection_id
+
+ foo_type = ydb.Column(name="foo", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32))
+ bar_type = ydb.Column(name="bar", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8))
+ client.create_yds_binding(name="ydsbinding",
+ stream=self.input_topic,
+ format="json_each_row",
+ connection_id=connection_id,
+ columns=[foo_type, bar_type])
+
+ sql = R'''
+ insert into bindings.`ydsbinding`
+ select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]);
+ '''
+
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
+
+ sql = R'''
+ select foo, bar from bindings.`ydsbinding` limit 2;
+ '''
+
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
+
+ data = client.get_result_data(query_id)
+ result_set = data.result.result_set
+ assert len(result_set.columns) == 2
+ assert result_set.columns[0].name == "foo"
+ assert result_set.columns[0].type.type_id == ydb.Type.INT32
+ assert result_set.columns[1].name == "bar"
+ assert result_set.columns[1].type.type_id == ydb.Type.UTF8
+ assert len(result_set.rows) == 2
+ assert result_set.rows[0].items[0].int32_value == 123
+ assert result_set.rows[0].items[1].text_value == 'xxx'
+ assert result_set.rows[1].items[0].int32_value == 456
+ assert result_set.rows[1].items[1].text_value == 'yyy'
diff --git a/ydb/tests/fq/yds/test_yq_streaming.py b/ydb/tests/fq/yds/test_yq_streaming.py
new file mode 100644
index 0000000000..3c9a7d5067
--- /dev/null
+++ b/ydb/tests/fq/yds/test_yq_streaming.py
@@ -0,0 +1,329 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+import os
+import time
+
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
+from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+import ydb.public.api.protos.ydb_value_pb2 as ydb_value
+
+
+class TestYqStreaming(TestYdsBase):
+ @yq_v1
+ def test_yq_streaming(self, kikimr, client, yq_version):
+ self.init_topics(f"pq_yq_streaming_{yq_version}")
+
+ sql = R'''
+ PRAGMA dq.MaxTasksPerStage="2";
+ INSERT INTO myyds.`{output_topic}`
+ SELECT STREAM (Data || Data) ?? "" As Data FROM myyds.`{input_topic}`;
+ ''' \
+ .format(
+ input_topic=self.input_topic,
+ output_topic=self.output_topic,
+ )
+
+ client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.compute_plane.wait_zero_checkpoint(query_id)
+
+ data = [
+ '{"Data" = "hello";}',
+ ]
+ self.write_stream(data)
+
+ expected = [
+ '{"Data" = "hello";}{"Data" = "hello";}'
+ ]
+ assert self.read_stream(len(expected)) == expected
+
+ client.describe_query(query_id)
+
+ client.abort_query(query_id)
+
+ client.wait_query(query_id)
+
+ describe_response = client.describe_query(query_id)
+ status = describe_response.result.query.meta.status
+ assert not describe_response.issues, str(describe_response.issues)
+ assert status == fq.QueryMeta.ABORTED_BY_USER, fq.QueryMeta.ComputeStatus.Name(status)
+
+ @yq_v1
+ def test_yq_streaming_read_from_binding(self, kikimr, client, yq_version):
+ self.init_topics(f"pq_yq_read_from_binding_{yq_version}")
+
+ # Consumer and topics to create are written in ya.make file.
+ sql = R'''
+ PRAGMA dq.MaxTasksPerStage="2";
+
+ INSERT INTO myyds2.`{output_topic}`
+ SELECT STREAM key ?? "" FROM bindings.my_binding;
+ ''' \
+ .format(
+ output_topic=self.output_topic
+ )
+
+ connection_response = client.create_yds_connection("myyds2", os.getenv("YDB_DATABASE"),
+ os.getenv("YDB_ENDPOINT"))
+ logging.debug(str(connection_response))
+ assert not connection_response.issues, str(connection_response.issues)
+
+ keyColumn = ydb_value.Column(name="key", type=ydb_value.Type(
+ optional_type=ydb_value.OptionalType(item=ydb_value.Type(type_id=ydb_value.Type.PrimitiveTypeId.STRING))))
+
+ binding_response = client.create_yds_binding(name="my_binding",
+ stream=self.input_topic,
+ format="json_each_row",
+ connection_id=connection_response.result.connection_id,
+ columns=[keyColumn])
+ logging.debug(str(binding_response))
+ assert not binding_response.issues, str(binding_response.issues)
+
+ response = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING)
+ assert not response.issues, str(response.issues)
+ logging.debug(str(response.result))
+ client.wait_query_status(response.result.query_id, fq.QueryMeta.RUNNING)
+ kikimr.compute_plane.wait_zero_checkpoint(response.result.query_id)
+
+ # Write to input.
+ data = [
+ '{"key": "abc"}',
+ '{"key": "xxx"}'
+ ]
+
+ self.write_stream(data)
+ logging.info("Data was written: {}".format(data))
+
+ # Read from output.
+ expected = [
+ 'abc',
+ 'xxx'
+ ]
+
+ read_data = self.read_stream(len(expected))
+ logging.info("Data was read: {}".format(read_data))
+
+ assert read_data == expected
+
+ describe_response = client.describe_query(response.result.query_id)
+ assert not describe_response.issues, str(describe_response.issues)
+ logging.debug(str(describe_response.result))
+
+ client.abort_query(response.result.query_id)
+
+ client.wait_query(response.result.query_id)
+
+ @yq_v1
+ def test_yq_streaming_read_from_binding_date_time(self, kikimr, client, yq_version):
+ self.init_topics(f"pq_yq_read_from_binding_date_time_{yq_version}")
+
+ # Consumer and topics to create are written in ya.make file.
+ sql = R'''
+ PRAGMA dq.MaxTasksPerStage="2";
+
+ INSERT INTO myyds4.`{output_topic}`
+ SELECT STREAM Unwrap(key || CAST(value as String)) as data FROM bindings.my_binding4;
+ ''' \
+ .format(
+ output_topic=self.output_topic
+ )
+
+ connection_response = client.create_yds_connection("myyds4", os.getenv("YDB_DATABASE"),
+ os.getenv("YDB_ENDPOINT"))
+ logging.debug(str(connection_response))
+ assert not connection_response.issues, str(connection_response.issues)
+
+ keyColumn = ydb_value.Column(name="key", type=ydb_value.Type(
+ optional_type=ydb_value.OptionalType(item=ydb_value.Type(type_id=ydb_value.Type.PrimitiveTypeId.STRING))))
+
+ valueColumn = ydb_value.Column(name="value",
+ type=ydb_value.Type(type_id=ydb_value.Type.PrimitiveTypeId.DATETIME))
+
+ binding_response = client.create_yds_binding(name="my_binding4",
+ stream=self.input_topic,
+ format="json_each_row",
+ connection_id=connection_response.result.connection_id,
+ columns=[keyColumn, valueColumn])
+ logging.debug(str(binding_response))
+ assert not binding_response.issues, str(binding_response.issues)
+
+ response = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING)
+ assert not response.issues, str(response.issues)
+ logging.debug(str(response.result))
+ client.wait_query_status(response.result.query_id, fq.QueryMeta.RUNNING)
+ kikimr.compute_plane.wait_zero_checkpoint(response.result.query_id)
+
+ # Write to input.
+ data = [
+ '{"key": "abc", "value": "2022-10-19 16:40:47"}',
+ '{"key": "xxx", "value": "2022-10-19 16:40:48"}'
+ ]
+
+ self.write_stream(data)
+ logging.info("Data was written: {}".format(data))
+
+ # Read from output.
+ expected = [
+ 'abc2022-10-19T16:40:47Z',
+ 'xxx2022-10-19T16:40:48Z'
+ ]
+
+ read_data = self.read_stream(len(expected))
+ logging.info("Data was read: {}".format(read_data))
+
+ assert read_data == expected
+
+ describe_response = client.describe_query(response.result.query_id)
+ assert not describe_response.issues, str(describe_response.issues)
+ logging.debug(str(describe_response.result))
+
+ client.abort_query(response.result.query_id)
+
+ client.wait_query(response.result.query_id)
+
+ @yq_v1
+ def test_yq_streaming_read_date_time_format(self, kikimr, client, yq_version):
+ self.init_topics(f"pq_yq_read_from_binding_dt_format_settings_{yq_version}")
+
+ # Consumer and topics to create are written in ya.make file.
+ sql = R'''
+ PRAGMA dq.MaxTasksPerStage="2";
+
+ INSERT INTO myyds3.`{output_topic}`
+ SELECT STREAM Unwrap(key || CAST(value as String)) as data FROM bindings.my_binding3;
+ ''' \
+ .format(
+ output_topic=self.output_topic
+ )
+
+ connection_response = client.create_yds_connection("myyds3", os.getenv("YDB_DATABASE"),
+ os.getenv("YDB_ENDPOINT"))
+ logging.debug(str(connection_response))
+ assert not connection_response.issues, str(connection_response.issues)
+
+ keyColumn = ydb_value.Column(name="key", type=ydb_value.Type(
+ optional_type=ydb_value.OptionalType(item=ydb_value.Type(type_id=ydb_value.Type.PrimitiveTypeId.STRING))))
+
+ valueColumn = ydb_value.Column(name="value",
+ type=ydb_value.Type(type_id=ydb_value.Type.PrimitiveTypeId.DATETIME))
+
+ binding_response = client.create_yds_binding(name="my_binding3",
+ stream=self.input_topic,
+ format="json_each_row",
+ connection_id=connection_response.result.connection_id,
+ columns=[keyColumn, valueColumn],
+ format_setting={"data.datetime.format_name": "ISO"})
+ logging.debug(str(binding_response))
+ assert not binding_response.issues, str(binding_response.issues)
+
+ response = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING)
+ assert not response.issues, str(response.issues)
+ logging.debug(str(response.result))
+ client.wait_query_status(response.result.query_id, fq.QueryMeta.RUNNING)
+ kikimr.compute_plane.wait_zero_checkpoint(response.result.query_id)
+
+ # Write to input.
+ data = [
+ '{"key": "abc", "value": "2022-10-19T16:40:47Z"}',
+ '{"key": "xxx", "value": "2022-10-19T16:40:48Z"}'
+ ]
+
+ self.write_stream(data)
+ logging.info("Data was written: {}".format(data))
+
+ # Read from output.
+ expected = [
+ 'abc2022-10-19T16:40:47Z',
+ 'xxx2022-10-19T16:40:48Z'
+ ]
+
+ read_data = self.read_stream(len(expected))
+ logging.info("Data was read: {}".format(read_data))
+
+ assert read_data == expected
+
+ describe_response = client.describe_query(response.result.query_id)
+ assert not describe_response.issues, str(describe_response.issues)
+ logging.debug(str(describe_response.result))
+
+ client.abort_query(response.result.query_id)
+
+ client.wait_query(response.result.query_id)
+
+ @yq_v1
+ def test_state_load_mode(self, kikimr, client, yq_version):
+ self.init_topics(f"pq_test_state_load_mode_{yq_version}")
+
+ sql = R'''
+ INSERT INTO myyds1.`{output_topic}`
+ SELECT STREAM (Data || Data) ?? "" As Data FROM myyds1.`{input_topic}`;
+ ''' \
+ .format(
+ input_topic=self.input_topic,
+ output_topic=self.output_topic,
+ )
+
+ client.create_yds_connection("myyds1", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
+ name = "simple"
+ create_query_result = client.create_query(name, sql, type=fq.QueryContent.QueryType.STREAMING).result
+ query_id = create_query_result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.compute_plane.wait_zero_checkpoint(query_id)
+
+ data = [
+ '{"Data" = "hello";}',
+ ]
+ self.write_stream(data)
+
+ expected = [
+ '{"Data" = "hello";}{"Data" = "hello";}'
+ ]
+ assert self.read_stream(len(expected)) == expected
+
+ client.describe_query(query_id)
+
+ client.abort_query(query_id)
+
+ client.wait_query(query_id)
+
+ describe_response = client.describe_query(query_id)
+ status = describe_response.result.query.meta.status
+ assert not describe_response.issues, str(describe_response.issues)
+ assert status == fq.QueryMeta.ABORTED_BY_USER, fq.QueryMeta.ComputeStatus.Name(status)
+
+ self.init_topics("test_state_load_mode2")
+ new_sql = R'''
+ INSERT INTO myyds1.`{output_topic}`
+ SELECT STREAM (Data || CAST(COUNT(*) as string)) ?? "" as cnt
+ FROM myyds1.`{input_topic}`
+ GROUP BY Data, HOP(Just(CurrentUtcTimestamp(TableRow())), "PT1S", "PT1S", "PT1S")
+ ;
+ ''' \
+ .format(
+ input_topic=self.input_topic,
+ output_topic=self.output_topic,
+ )
+ client.modify_query(query_id, name, new_sql, type=fq.QueryContent.QueryType.STREAMING,
+ state_load_mode=fq.StateLoadMode.EMPTY)
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.compute_plane.wait_completed_checkpoints(query_id,
+ kikimr.compute_plane.get_completed_checkpoints(query_id) + 1)
+
+ modified_data = [
+ "hello new query"
+ ]
+ modified_expected = [
+ "hello new query1"
+ ]
+ self.write_stream(modified_data)
+ time.sleep(5)
+ self.write_stream(modified_data)
+ assert self.read_stream(len(modified_expected)) == modified_expected
+
+ client.abort_query(query_id)
+ client.wait_query(query_id)
diff --git a/ydb/tests/fq/yds/ya.make b/ydb/tests/fq/yds/ya.make
new file mode 100644
index 0000000000..da9713bcea
--- /dev/null
+++ b/ydb/tests/fq/yds/ya.make
@@ -0,0 +1,58 @@
+PY3TEST()
+
+FORK_SUBTESTS()
+SPLIT_FACTOR(50)
+
+INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/fq_runner/ydb_runner_with_datastreams.inc)
+
+PEERDIR(
+ ydb/public/api/protos
+ ydb/public/api/grpc
+ ydb/tests/tools/datastreams_helpers
+ ydb/tests/tools/fq_runner
+)
+
+DEPENDS(ydb/tests/tools/pq_read)
+
+PY_SRCS(
+ conftest.py
+ test_base.py
+)
+
+TEST_SRCS(
+ test_2_selects_limit.py
+ test_3_selects.py
+ test_bad_syntax.py
+ test_big_state.py
+ test_continue_mode.py
+ test_cpu_quota.py
+ test_delete_read_rules_after_abort_by_system.py
+ test_eval.py
+ test_mem_alloc.py
+ test_metrics_cleanup.py
+ test_pq_read_write.py
+ test_public_metrics.py
+ test_read_rules_deletion.py
+ test_restart_query.py
+ test_select_1.py
+ test_select_limit_db_id.py
+ test_select_limit.py
+ test_select_timings.py
+ test_stop.py
+ test_watermarks.py
+ test_yds_bindings.py
+ test_yq_streaming.py
+)
+
+IF (SANITIZER_TYPE == "thread")
+ TIMEOUT(2400)
+ SIZE(LARGE)
+ TAG(ya:fat)
+ELSE()
+ TIMEOUT(600)
+ SIZE(MEDIUM)
+ENDIF()
+
+REQUIREMENTS(ram:16)
+
+END()