aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-11-11 23:20:21 +0300
committerhor911 <hor911@ydb.tech>2023-11-11 23:36:59 +0300
commitf0c60cc7e121ab9f7b4fabdffecd1b0af0f48ae3 (patch)
treebc25bb865306c57bc63bb87a6af7bf60587b4250
parent175ec788d14db88a2fafb920c8b5117209c12d5b (diff)
downloadydb-f0c60cc7e121ab9f7b4fabdffecd1b0af0f48ae3.tar.gz
Move pq_read tool to ydb
-rw-r--r--.mapping.json5
-rw-r--r--ydb/tests/fq/s3/test_s3.py139
-rw-r--r--ydb/tests/fq/s3/ya.make1
-rw-r--r--ydb/tests/tools/CMakeLists.txt1
-rw-r--r--ydb/tests/tools/datastreams_helpers/data_plane.py2
-rw-r--r--ydb/tests/tools/pq_read/CMakeLists.darwin-x86_64.txt33
-rw-r--r--ydb/tests/tools/pq_read/CMakeLists.linux-aarch64.txt36
-rw-r--r--ydb/tests/tools/pq_read/CMakeLists.linux-x86_64.txt38
-rw-r--r--ydb/tests/tools/pq_read/CMakeLists.txt17
-rw-r--r--ydb/tests/tools/pq_read/CMakeLists.windows-x86_64.txt26
-rw-r--r--ydb/tests/tools/pq_read/main.cpp172
-rw-r--r--ydb/tests/tools/pq_read/test/test_commit.py22
-rw-r--r--ydb/tests/tools/pq_read/test/test_timeout.py17
-rw-r--r--ydb/tests/tools/pq_read/test/ya.make16
-rw-r--r--ydb/tests/tools/pq_read/ya.make18
-rw-r--r--ydb/tests/tools/ya.make1
16 files changed, 543 insertions, 1 deletions
diff --git a/.mapping.json b/.mapping.json
index 244ebef6e1..b31a969bf0 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -9818,6 +9818,11 @@
"ydb/tests/tools/kqprun/src/CMakeLists.linux-x86_64.txt":"",
"ydb/tests/tools/kqprun/src/CMakeLists.txt":"",
"ydb/tests/tools/kqprun/src/CMakeLists.windows-x86_64.txt":"",
+ "ydb/tests/tools/pq_read/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/tests/tools/pq_read/CMakeLists.linux-aarch64.txt":"",
+ "ydb/tests/tools/pq_read/CMakeLists.linux-x86_64.txt":"",
+ "ydb/tests/tools/pq_read/CMakeLists.txt":"",
+ "ydb/tests/tools/pq_read/CMakeLists.windows-x86_64.txt":"",
"yt/CMakeLists.txt":"",
"yt/cpp/CMakeLists.txt":"",
"yt/cpp/mapreduce/CMakeLists.txt":"",
diff --git a/ydb/tests/fq/s3/test_s3.py b/ydb/tests/fq/s3/test_s3.py
index 300671eecb..d0ed8814bc 100644
--- a/ydb/tests/fq/s3/test_s3.py
+++ b/ydb/tests/fq/s3/test_s3.py
@@ -4,8 +4,10 @@
import boto3
import logging
import pytest
+import time
import ydb.public.api.protos.draft.fq_pb2 as fq
import ydb.public.api.protos.ydb_value_pb2 as ydb
+import ydb.tests.library.common.yatest_common as yatest_common
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1, yq_all
@@ -215,6 +217,143 @@ Pear,15,33'''
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
+ @yq_v1
+ @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
+ def test_checkpoints_on_join_s3_with_yds(self, kikimr, s3, client):
+ # Prepare S3
+ resource = boto3.resource(
+ "s3",
+ endpoint_url=s3.s3_url,
+ aws_access_key_id="key",
+ aws_secret_access_key="secret_key"
+ )
+
+ s3_client = boto3.client(
+ "s3",
+ endpoint_url=s3.s3_url,
+ aws_access_key_id="key",
+ aws_secret_access_key="secret_key"
+ )
+
+ bucket_name = "join_s3_with_yds"
+ bucket = resource.Bucket(bucket_name)
+ bucket.create(ACL='public-read')
+ bucket.objects.all().delete()
+
+ def put_kv(k, v):
+ json = '{}"key": {}, "value": "{}"{}'.format("{", k, v, "}")
+ s3_client.put_object(Body=json, Bucket=bucket_name, Key='a/b/c/{}.json'.format(k), ContentType='text/json')
+
+ put_kv(1, "one")
+ put_kv(2, "two")
+ put_kv(3, "three")
+
+ kikimr.control_plane.wait_bootstrap(1)
+ client.create_storage_connection("s3_dict", bucket_name)
+
+ # Prepare YDS
+ self.init_topics("yds_dict")
+ client.create_yds_connection(name="yds", database_id="FakeDatabaseId")
+
+ # Run query
+ sql = R'''
+ PRAGMA dq.MaxTasksPerStage="2";
+
+ $s3_dict_raw =
+ SELECT cast(Data AS json) AS data
+ FROM s3_dict.`*`
+ WITH (format=raw, SCHEMA (
+ Data String NOT NULL
+ ));
+
+ $s3_dict =
+ SELECT
+ cast(JSON_VALUE(data, '$.key') AS int64) AS key,
+ cast(JSON_VALUE(data, '$.value') AS String) AS value
+ FROM $s3_dict_raw;
+
+ $parsed_yson_topic =
+ SELECT
+ Yson::LookupInt64(yson_data, "key") AS key,
+ Yson::LookupString(yson_data, "val") AS val
+ FROM (
+ SELECT
+ Yson::Parse(Data) AS yson_data
+ FROM yds.`{input_topic}` WITH SCHEMA (Data String NOT NULL));
+
+ $joined_seq =
+ SELECT
+ s3_dict.value AS num,
+ yds_seq.val AS word
+ FROM $parsed_yson_topic AS yds_seq
+ INNER JOIN $s3_dict AS s3_dict
+ ON yds_seq.key = s3_dict.key;
+
+ INSERT INTO yds.`{output_topic}`
+ SELECT
+ Yson::SerializeText(Yson::From(TableRow()))
+ FROM $joined_seq;
+ '''\
+ .format(
+ input_topic=self.input_topic,
+ output_topic=self.output_topic,
+ )
+
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.control_plane.wait_zero_checkpoint(query_id)
+
+ yds_data = [
+ '{"key" = 1; "val" = "January";}',
+ '{"key" = 2; "val" = "February";}',
+ '{"key" = 3; "val" = "March";}',
+ '{"key" = 1; "val" = "Monday";}',
+ '{"key" = 2; "val" = "Tuesday";}',
+ '{"key" = 3; "val" = "Wednesday";}',
+ '{"key" = 1; "val" = "Gold";}',
+ '{"key" = 2; "val" = "Silver";}',
+ '{"key" = 3; "val" = "Bronze";}',
+ ]
+ self.write_stream(yds_data)
+
+ expected = [
+ '{"num" = "one"; "word" = "January"}',
+ '{"num" = "two"; "word" = "February"}',
+ '{"num" = "three"; "word" = "March"}',
+ '{"num" = "one"; "word" = "Monday"}',
+ '{"num" = "two"; "word" = "Tuesday"}',
+ '{"num" = "three"; "word" = "Wednesday"}',
+ '{"num" = "one"; "word" = "Gold"}',
+ '{"num" = "two"; "word" = "Silver"}',
+ '{"num" = "three"; "word" = "Bronze"}',
+ ]
+ assert self.read_stream(len(expected)) == expected
+
+ # Check that checkpointing is finished
+ def wait_checkpoints(require_query_is_on=False):
+ deadline = time.time() + yatest_common.plain_or_under_sanitizer(300, 900)
+ while True:
+ completed = kikimr.control_plane.get_completed_checkpoints(query_id, require_query_is_on)
+ if completed >= 3:
+ break
+ assert time.time() < deadline, "Completed: {}".format(completed)
+ time.sleep(yatest_common.plain_or_under_sanitizer(0.5, 2))
+
+ logging.debug("Wait checkpoints")
+ wait_checkpoints(True)
+ logging.debug("Wait checkpoints success")
+
+ kikimr.control_plane.kikimr_cluster.nodes[1].stop()
+ kikimr.control_plane.kikimr_cluster.nodes[1].start()
+ kikimr.control_plane.wait_bootstrap(1)
+
+ logging.debug("Wait checkpoints after restore")
+ wait_checkpoints(False)
+ logging.debug("Wait checkpoints after restore success")
+
+ client.abort_query(query_id)
+ client.wait_query(query_id)
+
@yq_v1 # v2 compute with multiple nodes is not supported yet
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@pytest.mark.parametrize("kikimr", [{"compute": 3}], indirect=True)
diff --git a/ydb/tests/fq/s3/ya.make b/ydb/tests/fq/s3/ya.make
index cc0ca2d546..8bf632c446 100644
--- a/ydb/tests/fq/s3/ya.make
+++ b/ydb/tests/fq/s3/ya.make
@@ -16,6 +16,7 @@ PEERDIR(
DEPENDS(
contrib/python/moto/bin
+ ydb/tests/tools/pq_read
)
TEST_SRCS(
diff --git a/ydb/tests/tools/CMakeLists.txt b/ydb/tests/tools/CMakeLists.txt
index 9ed8c2439a..2416e812ff 100644
--- a/ydb/tests/tools/CMakeLists.txt
+++ b/ydb/tests/tools/CMakeLists.txt
@@ -8,3 +8,4 @@
add_subdirectory(idx_test)
add_subdirectory(kqprun)
+add_subdirectory(pq_read)
diff --git a/ydb/tests/tools/datastreams_helpers/data_plane.py b/ydb/tests/tools/datastreams_helpers/data_plane.py
index 8ddc16a222..e3aef0d6ce 100644
--- a/ydb/tests/tools/datastreams_helpers/data_plane.py
+++ b/ydb/tests/tools/datastreams_helpers/data_plane.py
@@ -49,7 +49,7 @@ def read_stream(path, messages_count, commit_after_processing=True, consumer_nam
)
result_file = yatest.common.output_path(result_file_name)
cmd = [
- yatest.common.binary_path("kikimr/yq/tools/pq_read/pq_read"),
+ yatest.common.binary_path("ydb/tests/tools/pq_read/pq_read"),
"--endpoint", os.getenv("YDB_ENDPOINT"),
"--database", os.getenv("YDB_DATABASE"),
"--topic-path", path,
diff --git a/ydb/tests/tools/pq_read/CMakeLists.darwin-x86_64.txt b/ydb/tests/tools/pq_read/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..a1834fd6f0
--- /dev/null
+++ b/ydb/tests/tools/pq_read/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,33 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(pq_read)
+target_link_libraries(pq_read PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ library-cpp-colorizer
+ library-cpp-getopt
+ cpp-threading-future
+ cpp-client-ydb_persqueue_public
+)
+target_link_options(pq_read PRIVATE
+ -Wl,-platform_version,macos,11.0,11.0
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(pq_read PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/tests/tools/pq_read/main.cpp
+)
+target_allocator(pq_read
+ system_allocator
+)
+vcs_info(pq_read)
diff --git a/ydb/tests/tools/pq_read/CMakeLists.linux-aarch64.txt b/ydb/tests/tools/pq_read/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..7427ce0f1c
--- /dev/null
+++ b/ydb/tests/tools/pq_read/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,36 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(pq_read)
+target_link_libraries(pq_read PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-colorizer
+ library-cpp-getopt
+ cpp-threading-future
+ cpp-client-ydb_persqueue_public
+)
+target_link_options(pq_read PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(pq_read PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/tests/tools/pq_read/main.cpp
+)
+target_allocator(pq_read
+ cpp-malloc-jemalloc
+)
+vcs_info(pq_read)
diff --git a/ydb/tests/tools/pq_read/CMakeLists.linux-x86_64.txt b/ydb/tests/tools/pq_read/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..a4436cdee8
--- /dev/null
+++ b/ydb/tests/tools/pq_read/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,38 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(pq_read)
+target_link_libraries(pq_read PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ library-cpp-colorizer
+ library-cpp-getopt
+ cpp-threading-future
+ cpp-client-ydb_persqueue_public
+)
+target_link_options(pq_read PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(pq_read PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/tests/tools/pq_read/main.cpp
+)
+target_allocator(pq_read
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+)
+vcs_info(pq_read)
diff --git a/ydb/tests/tools/pq_read/CMakeLists.txt b/ydb/tests/tools/pq_read/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/tests/tools/pq_read/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/tests/tools/pq_read/CMakeLists.windows-x86_64.txt b/ydb/tests/tools/pq_read/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..890d1fdd5b
--- /dev/null
+++ b/ydb/tests/tools/pq_read/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,26 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(pq_read)
+target_link_libraries(pq_read PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ library-cpp-colorizer
+ library-cpp-getopt
+ cpp-threading-future
+ cpp-client-ydb_persqueue_public
+)
+target_sources(pq_read PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/tests/tools/pq_read/main.cpp
+)
+target_allocator(pq_read
+ system_allocator
+)
+vcs_info(pq_read)
diff --git a/ydb/tests/tools/pq_read/main.cpp b/ydb/tests/tools/pq_read/main.cpp
new file mode 100644
index 0000000000..cfcf4f5e3b
--- /dev/null
+++ b/ydb/tests/tools/pq_read/main.cpp
@@ -0,0 +1,172 @@
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
+
+#include <library/cpp/getopt/last_getopt.h>
+#include <library/cpp/threading/future/future.h>
+
+#include <util/stream/output.h>
+#include <util/system/env.h>
+
+#include <atomic>
+#include <thread>
+
+struct TOptions {
+ TString Endpoint;
+ TString Database;
+ TString TopicPath;
+ TString ConsumerName;
+ bool CommitAfterProcessing = false;
+ bool DisableClusterDiscovery = false;
+ size_t MessagesCount = std::numeric_limits<size_t>::max();
+ TDuration Timeout = TDuration::Seconds(30);
+
+ TOptions(int argc, const char* argv[]) {
+ NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
+ opts.AddHelpOption('h');
+ opts.AddLongOption('e', "endpoint", "YDB endpoint").Required().RequiredArgument("HOST:PORT")
+ .StoreResult(&Endpoint);
+ opts.AddLongOption('d', "database", "YDB database name").DefaultValue("/Root").RequiredArgument("PATH")
+ .StoreResult(&Database);
+ opts.AddLongOption('t', "topic-path", "Topic path for reading").Required().RequiredArgument("PATH")
+ .StoreResult(&TopicPath);
+ opts.AddLongOption('c', "consumer-name", "Consumer name").Required().RequiredArgument("CONSUMER")
+ .StoreResult(&ConsumerName);
+ opts.AddLongOption("commit-after-processing", "Commit data after processing")
+ .SetFlag(&CommitAfterProcessing).NoArgument();
+ opts.AddLongOption("disable-cluster-discovery", "Disable cluster discovery")
+ .SetFlag(&DisableClusterDiscovery).NoArgument();
+ opts.AddLongOption("messages-count", "Wait for specified messages count").RequiredArgument("COUNT")
+ .StoreResult(&MessagesCount);
+ opts.AddLongOption("timeout", "When no data arrives during this timeout session will be closed").RequiredArgument("DURATION")
+ .StoreResult(&Timeout).DefaultValue(TDuration::Seconds(30));
+ opts.SetFreeArgsNum(0);
+
+ NLastGetopt::TOptsParseResult res(&opts, argc, argv);
+ }
+};
+
+int main(int argc, const char* argv[]) {
+ TOptions opts(argc, argv);
+
+ // Create driver instance.
+ auto driverConfig = NYdb::TDriverConfig()
+ .SetNetworkThreadsNum(2)
+ .SetEndpoint(opts.Endpoint)
+ .SetDatabase(opts.Database)
+ .SetLog(CreateLogBackend("cerr"));
+ NYdb::TDriver driver(driverConfig);
+
+ NYdb::NPersQueue::TPersQueueClient persqueueClient(driver);
+
+ NYdb::NPersQueue::TReadSessionSettings settings;
+ settings
+ .DisableClusterDiscovery(opts.DisableClusterDiscovery)
+ .ConsumerName(opts.ConsumerName)
+ .AppendTopics(opts.TopicPath);
+
+ std::shared_ptr<NYdb::NPersQueue::IReadSession> readSession;
+ size_t messagesReceived = 0;
+ THashMap<ui64, ui64> maxReceivedOffsets; // partition id -> max received offset
+ THashMap<ui64, ui64> committedOffsets; // partition id -> confirmed offset
+ std::atomic<bool> closing = false;
+ std::atomic<TInstant::TValue> lastDataReceiveTime = TInstant::Now().GetValue();
+
+ settings
+ .EventHandlers_.SimpleDataHandlers(
+ [&messagesReceived, &maxReceivedOffsets, &closing, &lastDataReceiveTime](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) mutable
+ {
+ if (closing) {
+ return;
+ }
+
+ lastDataReceiveTime = TInstant::Now().GetValue();
+ ui64& maxReceivedOffset = maxReceivedOffsets[event.GetPartitionStream()->GetPartitionId()];
+ for (const auto& msg : event.GetMessages()) {
+ ++messagesReceived;
+ maxReceivedOffset = Max(maxReceivedOffset, msg.GetOffset());
+ Cout << msg.GetData() << Endl;
+ }
+ },
+ opts.CommitAfterProcessing);
+
+ auto commitAckHandler = settings.EventHandlers_.CommitAcknowledgementHandler_;
+ settings.EventHandlers_.CommitAcknowledgementHandler(
+ [&readSession, &messagesReceived, &maxReceivedOffsets, &committedOffsets, maxMessagesCount = opts.MessagesCount, &commitAckHandler, &closing, &commitAfterProcessing = opts.CommitAfterProcessing](NYdb::NPersQueue::TReadSessionEvent::TCommitAcknowledgementEvent& event) mutable
+ {
+ committedOffsets[event.GetPartitionStream()->GetPartitionId()] = event.GetCommittedOffset();
+ if (messagesReceived >= maxMessagesCount) {
+ bool allCommitted = true;
+ if (commitAfterProcessing) {
+ for (const auto [partitionId, maxReceivedOffset] : maxReceivedOffsets) {
+ if (maxReceivedOffset >= committedOffsets[partitionId]) {
+ allCommitted = false;
+ break;
+ }
+ }
+ }
+ bool prev = false;
+ if (allCommitted && !closing && closing.compare_exchange_strong(prev, true)) {
+ closing = true;
+ Cerr << "Closing session. Got " << messagesReceived << " messages" << Endl;
+ readSession->Close(TDuration::Seconds(5));
+ Cerr << "Session closed" << Endl;
+ }
+ }
+ if (commitAckHandler) {
+ commitAckHandler(event);
+ }
+ });
+
+ readSession = persqueueClient.CreateReadSession(settings);
+
+ // Timeout tracking thread
+ NThreading::TPromise<void> barrier = NThreading::NewPromise<void>();
+ std::thread timeoutTrackingThread(
+ [barrierFuture = barrier.GetFuture(), &closing, &lastDataReceiveTime, readSession, timeout = opts.Timeout]() mutable {
+ if (timeout == TDuration::Zero()) {
+ return;
+ }
+
+ bool futureIsSignalled = false;
+ while (!closing && !futureIsSignalled) {
+ const TInstant lastDataTime = TInstant::FromValue(lastDataReceiveTime);
+ const TInstant now = TInstant::Now();
+ const TInstant deadline = lastDataTime + timeout;
+ bool prev = false;
+ if (now > deadline && closing.compare_exchange_strong(prev, true)) {
+ Cerr << "Closing session. No data during " << timeout << Endl;
+ readSession->Close(TDuration::Seconds(5));
+ Cerr << "Session closed" << Endl;
+ break;
+ }
+ futureIsSignalled = barrierFuture.Wait(deadline);
+ }
+ });
+
+ auto maybeEvent = readSession->GetEvent(true);
+
+ barrier.SetValue();
+
+ Cerr << "Stopping driver..." << Endl;
+ driver.Stop();
+
+ Cerr << "Driver stopped. Exit" << Endl;
+
+ timeoutTrackingThread.join();
+
+ if (!maybeEvent) {
+ Cerr << "No finish event" << Endl;
+ return 1;
+ }
+ if (!std::holds_alternative<NYdb::NPersQueue::TSessionClosedEvent>(*maybeEvent)) {
+ Cerr << "TSessionClosedEvent expected, but got event: " << DebugString(*maybeEvent) << Endl;
+ return 2;
+ }
+
+ const NYdb::NPersQueue::TSessionClosedEvent& closedEvent = std::get<NYdb::NPersQueue::TSessionClosedEvent>(*maybeEvent);
+ Cerr << "Session closed event: " << closedEvent.DebugString() << Endl;
+ if (closedEvent.IsSuccess() || (closing && closedEvent.GetStatus() == NYdb::EStatus::ABORTED)) {
+ return 0;
+ } else {
+ return 3;
+ }
+}
diff --git a/ydb/tests/tools/pq_read/test/test_commit.py b/ydb/tests/tools/pq_read/test/test_commit.py
new file mode 100644
index 0000000000..4d86da6bdb
--- /dev/null
+++ b/ydb/tests/tools/pq_read/test/test_commit.py
@@ -0,0 +1,22 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from ydb.tests.tools.datastreams_helpers.control_plane import create_stream, create_read_rule
+from ydb.tests.tools.datastreams_helpers.data_plane import write_stream, read_stream
+
+
+class TestCommit(object):
+ def test_commit(self):
+ consumer_name = "test_client"
+ topic = "topic"
+ create_stream(topic, partitions_count=1)
+ create_read_rule(topic, consumer_name)
+
+ data = ["1", "2"]
+ write_stream(topic, data)
+
+ assert read_stream(topic, len(data), commit_after_processing=True, consumer_name=consumer_name) == data
+
+ data_2 = ["3", "4"]
+ write_stream(topic, data_2)
+
+ assert read_stream(topic, len(data_2), commit_after_processing=True, consumer_name=consumer_name) == data_2
diff --git a/ydb/tests/tools/pq_read/test/test_timeout.py b/ydb/tests/tools/pq_read/test/test_timeout.py
new file mode 100644
index 0000000000..af54b068d0
--- /dev/null
+++ b/ydb/tests/tools/pq_read/test/test_timeout.py
@@ -0,0 +1,17 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from ydb.tests.tools.datastreams_helpers.control_plane import create_stream, create_read_rule
+from ydb.tests.tools.datastreams_helpers.data_plane import write_stream, read_stream
+
+
+class TestTimeout(object):
+ def test_timeout(self):
+ consumer_name = "test_client"
+ topic = "timeout"
+ create_stream(topic, partitions_count=1)
+ create_read_rule(topic, consumer_name)
+
+ data = ["1", "2"]
+ write_stream(topic, data)
+
+ assert read_stream(topic, len(data) + 42, commit_after_processing=True, consumer_name=consumer_name, timeout=3) == data
diff --git a/ydb/tests/tools/pq_read/test/ya.make b/ydb/tests/tools/pq_read/test/ya.make
new file mode 100644
index 0000000000..9c4d7a6c7e
--- /dev/null
+++ b/ydb/tests/tools/pq_read/test/ya.make
@@ -0,0 +1,16 @@
+PY3TEST()
+
+INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/fq_runner/ydb_runner_with_datastreams.inc)
+
+PEERDIR(
+ ydb/tests/tools/datastreams_helpers
+)
+
+DEPENDS(ydb/tests/tools/pq_read)
+
+TEST_SRCS(
+ test_commit.py
+ test_timeout.py
+)
+
+END()
diff --git a/ydb/tests/tools/pq_read/ya.make b/ydb/tests/tools/pq_read/ya.make
new file mode 100644
index 0000000000..3fbeb140c4
--- /dev/null
+++ b/ydb/tests/tools/pq_read/ya.make
@@ -0,0 +1,18 @@
+PROGRAM()
+
+SRCS(
+ main.cpp
+)
+
+PEERDIR(
+ library/cpp/colorizer
+ library/cpp/getopt
+ library/cpp/threading/future
+ ydb/public/sdk/cpp/client/ydb_persqueue_public
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ test
+)
diff --git a/ydb/tests/tools/ya.make b/ydb/tests/tools/ya.make
index f3d73bb51f..c0f08d466c 100644
--- a/ydb/tests/tools/ya.make
+++ b/ydb/tests/tools/ya.make
@@ -3,6 +3,7 @@ RECURSE(
fq_runner
idx_test
kqprun
+ pq_read
s3_recipe
ydb_serializable
ydb_serializable/replay