diff options
author | hor911 <hor911@ydb.tech> | 2023-11-11 23:20:21 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-11-11 23:36:59 +0300 |
commit | f0c60cc7e121ab9f7b4fabdffecd1b0af0f48ae3 (patch) | |
tree | bc25bb865306c57bc63bb87a6af7bf60587b4250 | |
parent | 175ec788d14db88a2fafb920c8b5117209c12d5b (diff) | |
download | ydb-f0c60cc7e121ab9f7b4fabdffecd1b0af0f48ae3.tar.gz |
Move pq_read tool to ydb
-rw-r--r-- | .mapping.json | 5 | ||||
-rw-r--r-- | ydb/tests/fq/s3/test_s3.py | 139 | ||||
-rw-r--r-- | ydb/tests/fq/s3/ya.make | 1 | ||||
-rw-r--r-- | ydb/tests/tools/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/tests/tools/datastreams_helpers/data_plane.py | 2 | ||||
-rw-r--r-- | ydb/tests/tools/pq_read/CMakeLists.darwin-x86_64.txt | 33 | ||||
-rw-r--r-- | ydb/tests/tools/pq_read/CMakeLists.linux-aarch64.txt | 36 | ||||
-rw-r--r-- | ydb/tests/tools/pq_read/CMakeLists.linux-x86_64.txt | 38 | ||||
-rw-r--r-- | ydb/tests/tools/pq_read/CMakeLists.txt | 17 | ||||
-rw-r--r-- | ydb/tests/tools/pq_read/CMakeLists.windows-x86_64.txt | 26 | ||||
-rw-r--r-- | ydb/tests/tools/pq_read/main.cpp | 172 | ||||
-rw-r--r-- | ydb/tests/tools/pq_read/test/test_commit.py | 22 | ||||
-rw-r--r-- | ydb/tests/tools/pq_read/test/test_timeout.py | 17 | ||||
-rw-r--r-- | ydb/tests/tools/pq_read/test/ya.make | 16 | ||||
-rw-r--r-- | ydb/tests/tools/pq_read/ya.make | 18 | ||||
-rw-r--r-- | ydb/tests/tools/ya.make | 1 |
16 files changed, 543 insertions, 1 deletions
diff --git a/.mapping.json b/.mapping.json index 244ebef6e1..b31a969bf0 100644 --- a/.mapping.json +++ b/.mapping.json @@ -9818,6 +9818,11 @@ "ydb/tests/tools/kqprun/src/CMakeLists.linux-x86_64.txt":"", "ydb/tests/tools/kqprun/src/CMakeLists.txt":"", "ydb/tests/tools/kqprun/src/CMakeLists.windows-x86_64.txt":"", + "ydb/tests/tools/pq_read/CMakeLists.darwin-x86_64.txt":"", + "ydb/tests/tools/pq_read/CMakeLists.linux-aarch64.txt":"", + "ydb/tests/tools/pq_read/CMakeLists.linux-x86_64.txt":"", + "ydb/tests/tools/pq_read/CMakeLists.txt":"", + "ydb/tests/tools/pq_read/CMakeLists.windows-x86_64.txt":"", "yt/CMakeLists.txt":"", "yt/cpp/CMakeLists.txt":"", "yt/cpp/mapreduce/CMakeLists.txt":"", diff --git a/ydb/tests/fq/s3/test_s3.py b/ydb/tests/fq/s3/test_s3.py index 300671eecb..d0ed8814bc 100644 --- a/ydb/tests/fq/s3/test_s3.py +++ b/ydb/tests/fq/s3/test_s3.py @@ -4,8 +4,10 @@ import boto3 import logging import pytest +import time import ydb.public.api.protos.draft.fq_pb2 as fq import ydb.public.api.protos.ydb_value_pb2 as ydb +import ydb.tests.library.common.yatest_common as yatest_common from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1, yq_all @@ -215,6 +217,143 @@ Pear,15,33''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.FAILED) + @yq_v1 + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_checkpoints_on_join_s3_with_yds(self, kikimr, s3, client): + # Prepare S3 + resource = boto3.resource( + "s3", + endpoint_url=s3.s3_url, + aws_access_key_id="key", + aws_secret_access_key="secret_key" + ) + + s3_client = boto3.client( + "s3", + endpoint_url=s3.s3_url, + aws_access_key_id="key", + aws_secret_access_key="secret_key" + ) + + bucket_name = "join_s3_with_yds" + bucket = resource.Bucket(bucket_name) + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + def put_kv(k, v): + json = '{}"key": {}, "value": "{}"{}'.format("{", k, v, "}") + s3_client.put_object(Body=json, Bucket=bucket_name, Key='a/b/c/{}.json'.format(k), ContentType='text/json') + + put_kv(1, "one") + put_kv(2, "two") + put_kv(3, "three") + + kikimr.control_plane.wait_bootstrap(1) + client.create_storage_connection("s3_dict", bucket_name) + + # Prepare YDS + self.init_topics("yds_dict") + client.create_yds_connection(name="yds", database_id="FakeDatabaseId") + + # Run query + sql = R''' + PRAGMA dq.MaxTasksPerStage="2"; + + $s3_dict_raw = + SELECT cast(Data AS json) AS data + FROM s3_dict.`*` + WITH (format=raw, SCHEMA ( + Data String NOT NULL + )); + + $s3_dict = + SELECT + cast(JSON_VALUE(data, '$.key') AS int64) AS key, + cast(JSON_VALUE(data, '$.value') AS String) AS value + FROM $s3_dict_raw; + + $parsed_yson_topic = + SELECT + Yson::LookupInt64(yson_data, "key") AS key, + Yson::LookupString(yson_data, "val") AS val + FROM ( + SELECT + Yson::Parse(Data) AS yson_data + FROM yds.`{input_topic}` WITH SCHEMA (Data String NOT NULL)); + + $joined_seq = + SELECT + s3_dict.value AS num, + yds_seq.val AS word + FROM $parsed_yson_topic AS yds_seq + INNER JOIN $s3_dict AS s3_dict + ON yds_seq.key = s3_dict.key; + + INSERT INTO yds.`{output_topic}` + SELECT + Yson::SerializeText(Yson::From(TableRow())) + FROM $joined_seq; + '''\ + .format( + input_topic=self.input_topic, + output_topic=self.output_topic, + ) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.control_plane.wait_zero_checkpoint(query_id) + + yds_data = [ + '{"key" = 1; "val" = "January";}', + '{"key" = 2; "val" = "February";}', + '{"key" = 3; "val" = "March";}', + '{"key" = 1; "val" = "Monday";}', + '{"key" = 2; "val" = "Tuesday";}', + '{"key" = 3; "val" = "Wednesday";}', + '{"key" = 1; "val" = "Gold";}', + '{"key" = 2; "val" = "Silver";}', + '{"key" = 3; "val" = "Bronze";}', + ] + self.write_stream(yds_data) + + expected = [ + '{"num" = "one"; "word" = "January"}', + '{"num" = "two"; "word" = "February"}', + '{"num" = "three"; "word" = "March"}', + '{"num" = "one"; "word" = "Monday"}', + '{"num" = "two"; "word" = "Tuesday"}', + '{"num" = "three"; "word" = "Wednesday"}', + '{"num" = "one"; "word" = "Gold"}', + '{"num" = "two"; "word" = "Silver"}', + '{"num" = "three"; "word" = "Bronze"}', + ] + assert self.read_stream(len(expected)) == expected + + # Check that checkpointing is finished + def wait_checkpoints(require_query_is_on=False): + deadline = time.time() + yatest_common.plain_or_under_sanitizer(300, 900) + while True: + completed = kikimr.control_plane.get_completed_checkpoints(query_id, require_query_is_on) + if completed >= 3: + break + assert time.time() < deadline, "Completed: {}".format(completed) + time.sleep(yatest_common.plain_or_under_sanitizer(0.5, 2)) + + logging.debug("Wait checkpoints") + wait_checkpoints(True) + logging.debug("Wait checkpoints success") + + kikimr.control_plane.kikimr_cluster.nodes[1].stop() + kikimr.control_plane.kikimr_cluster.nodes[1].start() + kikimr.control_plane.wait_bootstrap(1) + + logging.debug("Wait checkpoints after restore") + wait_checkpoints(False) + logging.debug("Wait checkpoints after restore success") + + client.abort_query(query_id) + client.wait_query(query_id) + @yq_v1 # v2 compute with multiple nodes is not supported yet @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) @pytest.mark.parametrize("kikimr", [{"compute": 3}], indirect=True) diff --git a/ydb/tests/fq/s3/ya.make b/ydb/tests/fq/s3/ya.make index cc0ca2d546..8bf632c446 100644 --- a/ydb/tests/fq/s3/ya.make +++ b/ydb/tests/fq/s3/ya.make @@ -16,6 +16,7 @@ PEERDIR( DEPENDS( contrib/python/moto/bin + ydb/tests/tools/pq_read ) TEST_SRCS( diff --git a/ydb/tests/tools/CMakeLists.txt b/ydb/tests/tools/CMakeLists.txt index 9ed8c2439a..2416e812ff 100644 --- a/ydb/tests/tools/CMakeLists.txt +++ b/ydb/tests/tools/CMakeLists.txt @@ -8,3 +8,4 @@ add_subdirectory(idx_test) add_subdirectory(kqprun) +add_subdirectory(pq_read) diff --git a/ydb/tests/tools/datastreams_helpers/data_plane.py b/ydb/tests/tools/datastreams_helpers/data_plane.py index 8ddc16a222..e3aef0d6ce 100644 --- a/ydb/tests/tools/datastreams_helpers/data_plane.py +++ b/ydb/tests/tools/datastreams_helpers/data_plane.py @@ -49,7 +49,7 @@ def read_stream(path, messages_count, commit_after_processing=True, consumer_nam ) result_file = yatest.common.output_path(result_file_name) cmd = [ - yatest.common.binary_path("kikimr/yq/tools/pq_read/pq_read"), + yatest.common.binary_path("ydb/tests/tools/pq_read/pq_read"), "--endpoint", os.getenv("YDB_ENDPOINT"), "--database", os.getenv("YDB_DATABASE"), "--topic-path", path, diff --git a/ydb/tests/tools/pq_read/CMakeLists.darwin-x86_64.txt b/ydb/tests/tools/pq_read/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..a1834fd6f0 --- /dev/null +++ b/ydb/tests/tools/pq_read/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,33 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(pq_read) +target_link_libraries(pq_read PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + library-cpp-colorizer + library-cpp-getopt + cpp-threading-future + cpp-client-ydb_persqueue_public +) +target_link_options(pq_read PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(pq_read PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/tests/tools/pq_read/main.cpp +) +target_allocator(pq_read + system_allocator +) +vcs_info(pq_read) diff --git a/ydb/tests/tools/pq_read/CMakeLists.linux-aarch64.txt b/ydb/tests/tools/pq_read/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..7427ce0f1c --- /dev/null +++ b/ydb/tests/tools/pq_read/CMakeLists.linux-aarch64.txt @@ -0,0 +1,36 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(pq_read) +target_link_libraries(pq_read PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-colorizer + library-cpp-getopt + cpp-threading-future + cpp-client-ydb_persqueue_public +) +target_link_options(pq_read PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(pq_read PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/tests/tools/pq_read/main.cpp +) +target_allocator(pq_read + cpp-malloc-jemalloc +) +vcs_info(pq_read) diff --git a/ydb/tests/tools/pq_read/CMakeLists.linux-x86_64.txt b/ydb/tests/tools/pq_read/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..a4436cdee8 --- /dev/null +++ b/ydb/tests/tools/pq_read/CMakeLists.linux-x86_64.txt @@ -0,0 +1,38 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(pq_read) +target_link_libraries(pq_read PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + library-cpp-colorizer + library-cpp-getopt + cpp-threading-future + cpp-client-ydb_persqueue_public +) +target_link_options(pq_read PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(pq_read PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/tests/tools/pq_read/main.cpp +) +target_allocator(pq_read + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache +) +vcs_info(pq_read) diff --git a/ydb/tests/tools/pq_read/CMakeLists.txt b/ydb/tests/tools/pq_read/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/tests/tools/pq_read/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/tests/tools/pq_read/CMakeLists.windows-x86_64.txt b/ydb/tests/tools/pq_read/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..890d1fdd5b --- /dev/null +++ b/ydb/tests/tools/pq_read/CMakeLists.windows-x86_64.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(pq_read) +target_link_libraries(pq_read PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + library-cpp-colorizer + library-cpp-getopt + cpp-threading-future + cpp-client-ydb_persqueue_public +) +target_sources(pq_read PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/tests/tools/pq_read/main.cpp +) +target_allocator(pq_read + system_allocator +) +vcs_info(pq_read) diff --git a/ydb/tests/tools/pq_read/main.cpp b/ydb/tests/tools/pq_read/main.cpp new file mode 100644 index 0000000000..cfcf4f5e3b --- /dev/null +++ b/ydb/tests/tools/pq_read/main.cpp @@ -0,0 +1,172 @@ +#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> + +#include <library/cpp/getopt/last_getopt.h> +#include <library/cpp/threading/future/future.h> + +#include <util/stream/output.h> +#include <util/system/env.h> + +#include <atomic> +#include <thread> + +struct TOptions { + TString Endpoint; + TString Database; + TString TopicPath; + TString ConsumerName; + bool CommitAfterProcessing = false; + bool DisableClusterDiscovery = false; + size_t MessagesCount = std::numeric_limits<size_t>::max(); + TDuration Timeout = TDuration::Seconds(30); + + TOptions(int argc, const char* argv[]) { + NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); + opts.AddHelpOption('h'); + opts.AddLongOption('e', "endpoint", "YDB endpoint").Required().RequiredArgument("HOST:PORT") + .StoreResult(&Endpoint); + opts.AddLongOption('d', "database", "YDB database name").DefaultValue("/Root").RequiredArgument("PATH") + .StoreResult(&Database); + opts.AddLongOption('t', "topic-path", "Topic path for reading").Required().RequiredArgument("PATH") + .StoreResult(&TopicPath); + opts.AddLongOption('c', "consumer-name", "Consumer name").Required().RequiredArgument("CONSUMER") + .StoreResult(&ConsumerName); + opts.AddLongOption("commit-after-processing", "Commit data after processing") + .SetFlag(&CommitAfterProcessing).NoArgument(); + opts.AddLongOption("disable-cluster-discovery", "Disable cluster discovery") + .SetFlag(&DisableClusterDiscovery).NoArgument(); + opts.AddLongOption("messages-count", "Wait for specified messages count").RequiredArgument("COUNT") + .StoreResult(&MessagesCount); + opts.AddLongOption("timeout", "When no data arrives during this timeout session will be closed").RequiredArgument("DURATION") + .StoreResult(&Timeout).DefaultValue(TDuration::Seconds(30)); + opts.SetFreeArgsNum(0); + + NLastGetopt::TOptsParseResult res(&opts, argc, argv); + } +}; + +int main(int argc, const char* argv[]) { + TOptions opts(argc, argv); + + // Create driver instance. + auto driverConfig = NYdb::TDriverConfig() + .SetNetworkThreadsNum(2) + .SetEndpoint(opts.Endpoint) + .SetDatabase(opts.Database) + .SetLog(CreateLogBackend("cerr")); + NYdb::TDriver driver(driverConfig); + + NYdb::NPersQueue::TPersQueueClient persqueueClient(driver); + + NYdb::NPersQueue::TReadSessionSettings settings; + settings + .DisableClusterDiscovery(opts.DisableClusterDiscovery) + .ConsumerName(opts.ConsumerName) + .AppendTopics(opts.TopicPath); + + std::shared_ptr<NYdb::NPersQueue::IReadSession> readSession; + size_t messagesReceived = 0; + THashMap<ui64, ui64> maxReceivedOffsets; // partition id -> max received offset + THashMap<ui64, ui64> committedOffsets; // partition id -> confirmed offset + std::atomic<bool> closing = false; + std::atomic<TInstant::TValue> lastDataReceiveTime = TInstant::Now().GetValue(); + + settings + .EventHandlers_.SimpleDataHandlers( + [&messagesReceived, &maxReceivedOffsets, &closing, &lastDataReceiveTime](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) mutable + { + if (closing) { + return; + } + + lastDataReceiveTime = TInstant::Now().GetValue(); + ui64& maxReceivedOffset = maxReceivedOffsets[event.GetPartitionStream()->GetPartitionId()]; + for (const auto& msg : event.GetMessages()) { + ++messagesReceived; + maxReceivedOffset = Max(maxReceivedOffset, msg.GetOffset()); + Cout << msg.GetData() << Endl; + } + }, + opts.CommitAfterProcessing); + + auto commitAckHandler = settings.EventHandlers_.CommitAcknowledgementHandler_; + settings.EventHandlers_.CommitAcknowledgementHandler( + [&readSession, &messagesReceived, &maxReceivedOffsets, &committedOffsets, maxMessagesCount = opts.MessagesCount, &commitAckHandler, &closing, &commitAfterProcessing = opts.CommitAfterProcessing](NYdb::NPersQueue::TReadSessionEvent::TCommitAcknowledgementEvent& event) mutable + { + committedOffsets[event.GetPartitionStream()->GetPartitionId()] = event.GetCommittedOffset(); + if (messagesReceived >= maxMessagesCount) { + bool allCommitted = true; + if (commitAfterProcessing) { + for (const auto [partitionId, maxReceivedOffset] : maxReceivedOffsets) { + if (maxReceivedOffset >= committedOffsets[partitionId]) { + allCommitted = false; + break; + } + } + } + bool prev = false; + if (allCommitted && !closing && closing.compare_exchange_strong(prev, true)) { + closing = true; + Cerr << "Closing session. Got " << messagesReceived << " messages" << Endl; + readSession->Close(TDuration::Seconds(5)); + Cerr << "Session closed" << Endl; + } + } + if (commitAckHandler) { + commitAckHandler(event); + } + }); + + readSession = persqueueClient.CreateReadSession(settings); + + // Timeout tracking thread + NThreading::TPromise<void> barrier = NThreading::NewPromise<void>(); + std::thread timeoutTrackingThread( + [barrierFuture = barrier.GetFuture(), &closing, &lastDataReceiveTime, readSession, timeout = opts.Timeout]() mutable { + if (timeout == TDuration::Zero()) { + return; + } + + bool futureIsSignalled = false; + while (!closing && !futureIsSignalled) { + const TInstant lastDataTime = TInstant::FromValue(lastDataReceiveTime); + const TInstant now = TInstant::Now(); + const TInstant deadline = lastDataTime + timeout; + bool prev = false; + if (now > deadline && closing.compare_exchange_strong(prev, true)) { + Cerr << "Closing session. No data during " << timeout << Endl; + readSession->Close(TDuration::Seconds(5)); + Cerr << "Session closed" << Endl; + break; + } + futureIsSignalled = barrierFuture.Wait(deadline); + } + }); + + auto maybeEvent = readSession->GetEvent(true); + + barrier.SetValue(); + + Cerr << "Stopping driver..." << Endl; + driver.Stop(); + + Cerr << "Driver stopped. Exit" << Endl; + + timeoutTrackingThread.join(); + + if (!maybeEvent) { + Cerr << "No finish event" << Endl; + return 1; + } + if (!std::holds_alternative<NYdb::NPersQueue::TSessionClosedEvent>(*maybeEvent)) { + Cerr << "TSessionClosedEvent expected, but got event: " << DebugString(*maybeEvent) << Endl; + return 2; + } + + const NYdb::NPersQueue::TSessionClosedEvent& closedEvent = std::get<NYdb::NPersQueue::TSessionClosedEvent>(*maybeEvent); + Cerr << "Session closed event: " << closedEvent.DebugString() << Endl; + if (closedEvent.IsSuccess() || (closing && closedEvent.GetStatus() == NYdb::EStatus::ABORTED)) { + return 0; + } else { + return 3; + } +} diff --git a/ydb/tests/tools/pq_read/test/test_commit.py b/ydb/tests/tools/pq_read/test/test_commit.py new file mode 100644 index 0000000000..4d86da6bdb --- /dev/null +++ b/ydb/tests/tools/pq_read/test/test_commit.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +from ydb.tests.tools.datastreams_helpers.control_plane import create_stream, create_read_rule +from ydb.tests.tools.datastreams_helpers.data_plane import write_stream, read_stream + + +class TestCommit(object): + def test_commit(self): + consumer_name = "test_client" + topic = "topic" + create_stream(topic, partitions_count=1) + create_read_rule(topic, consumer_name) + + data = ["1", "2"] + write_stream(topic, data) + + assert read_stream(topic, len(data), commit_after_processing=True, consumer_name=consumer_name) == data + + data_2 = ["3", "4"] + write_stream(topic, data_2) + + assert read_stream(topic, len(data_2), commit_after_processing=True, consumer_name=consumer_name) == data_2 diff --git a/ydb/tests/tools/pq_read/test/test_timeout.py b/ydb/tests/tools/pq_read/test/test_timeout.py new file mode 100644 index 0000000000..af54b068d0 --- /dev/null +++ b/ydb/tests/tools/pq_read/test/test_timeout.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +from ydb.tests.tools.datastreams_helpers.control_plane import create_stream, create_read_rule +from ydb.tests.tools.datastreams_helpers.data_plane import write_stream, read_stream + + +class TestTimeout(object): + def test_timeout(self): + consumer_name = "test_client" + topic = "timeout" + create_stream(topic, partitions_count=1) + create_read_rule(topic, consumer_name) + + data = ["1", "2"] + write_stream(topic, data) + + assert read_stream(topic, len(data) + 42, commit_after_processing=True, consumer_name=consumer_name, timeout=3) == data diff --git a/ydb/tests/tools/pq_read/test/ya.make b/ydb/tests/tools/pq_read/test/ya.make new file mode 100644 index 0000000000..9c4d7a6c7e --- /dev/null +++ b/ydb/tests/tools/pq_read/test/ya.make @@ -0,0 +1,16 @@ +PY3TEST() + +INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/fq_runner/ydb_runner_with_datastreams.inc) + +PEERDIR( + ydb/tests/tools/datastreams_helpers +) + +DEPENDS(ydb/tests/tools/pq_read) + +TEST_SRCS( + test_commit.py + test_timeout.py +) + +END() diff --git a/ydb/tests/tools/pq_read/ya.make b/ydb/tests/tools/pq_read/ya.make new file mode 100644 index 0000000000..3fbeb140c4 --- /dev/null +++ b/ydb/tests/tools/pq_read/ya.make @@ -0,0 +1,18 @@ +PROGRAM() + +SRCS( + main.cpp +) + +PEERDIR( + library/cpp/colorizer + library/cpp/getopt + library/cpp/threading/future + ydb/public/sdk/cpp/client/ydb_persqueue_public +) + +END() + +RECURSE_FOR_TESTS( + test +) diff --git a/ydb/tests/tools/ya.make b/ydb/tests/tools/ya.make index f3d73bb51f..c0f08d466c 100644 --- a/ydb/tests/tools/ya.make +++ b/ydb/tests/tools/ya.make @@ -3,6 +3,7 @@ RECURSE( fq_runner idx_test kqprun + pq_read s3_recipe ydb_serializable ydb_serializable/replay |