diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-03-16 17:26:49 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-03-16 17:26:49 +0300 |
commit | c3ac866fbfd9872ea4afc6b2c15fc27cf384d06c (patch) | |
tree | 2eeb723b88bd676256f25b2630d157cf228a3e71 | |
parent | 87b918ead2dcdebbc8ba3798592ac1626a4f8dd5 (diff) | |
download | ydb-c3ac866fbfd9872ea4afc6b2c15fc27cf384d06c.tar.gz |
Topic reader
13 files changed, 200 insertions, 11 deletions
diff --git a/ydb/core/tx/replication/ydb_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/replication/ydb_proxy/CMakeLists.darwin-x86_64.txt index f5289141cd4..9fbdd52b18b 100644 --- a/ydb/core/tx/replication/ydb_proxy/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/replication/ydb_proxy/CMakeLists.darwin-x86_64.txt @@ -20,6 +20,7 @@ target_link_libraries(tx-replication-ydb_proxy PUBLIC cpp-client-ydb_driver cpp-client-ydb_scheme cpp-client-ydb_table + cpp-client-ydb_topic client-ydb_types-credentials ydb_types-credentials-login ) diff --git a/ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-aarch64.txt index dd04a0b7fea..34147cb6a4b 100644 --- a/ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-aarch64.txt @@ -21,6 +21,7 @@ target_link_libraries(tx-replication-ydb_proxy PUBLIC cpp-client-ydb_driver cpp-client-ydb_scheme cpp-client-ydb_table + cpp-client-ydb_topic client-ydb_types-credentials ydb_types-credentials-login ) diff --git a/ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-x86_64.txt index dd04a0b7fea..34147cb6a4b 100644 --- a/ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-x86_64.txt @@ -21,6 +21,7 @@ target_link_libraries(tx-replication-ydb_proxy PUBLIC cpp-client-ydb_driver cpp-client-ydb_scheme cpp-client-ydb_table + cpp-client-ydb_topic client-ydb_types-credentials ydb_types-credentials-login ) diff --git a/ydb/core/tx/replication/ydb_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/tx/replication/ydb_proxy/CMakeLists.windows-x86_64.txt index f5289141cd4..9fbdd52b18b 100644 --- a/ydb/core/tx/replication/ydb_proxy/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/replication/ydb_proxy/CMakeLists.windows-x86_64.txt @@ -20,6 +20,7 @@ target_link_libraries(tx-replication-ydb_proxy PUBLIC cpp-client-ydb_driver cpp-client-ydb_scheme cpp-client-ydb_table + cpp-client-ydb_topic client-ydb_types-credentials ydb_types-credentials-login ) diff --git a/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.darwin-x86_64.txt index c9503b232af..b361b4b1934 100644 --- a/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.darwin-x86_64.txt @@ -23,6 +23,7 @@ target_link_libraries(ydb-core-tx-replication-ydb_proxy-ut PUBLIC tx-replication-ydb_proxy cpp-testing-unittest core-testlib-default + cpp-client-ydb_topic ) target_link_options(ydb-core-tx-replication-ydb_proxy-ut PRIVATE -Wl,-no_deduplicate diff --git a/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-aarch64.txt b/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-aarch64.txt index b802327a08c..958ec4ce5a0 100644 --- a/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-aarch64.txt @@ -23,6 +23,7 @@ target_link_libraries(ydb-core-tx-replication-ydb_proxy-ut PUBLIC tx-replication-ydb_proxy cpp-testing-unittest core-testlib-default + cpp-client-ydb_topic ) target_link_options(ydb-core-tx-replication-ydb_proxy-ut PRIVATE -ldl diff --git a/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-x86_64.txt b/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-x86_64.txt index 61f6cd7a7dc..3e3361ef1fb 100644 --- a/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-x86_64.txt @@ -25,6 +25,7 @@ target_link_libraries(ydb-core-tx-replication-ydb_proxy-ut PUBLIC tx-replication-ydb_proxy cpp-testing-unittest core-testlib-default + cpp-client-ydb_topic ) target_link_options(ydb-core-tx-replication-ydb_proxy-ut PRIVATE -ldl diff --git a/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.windows-x86_64.txt b/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.windows-x86_64.txt index 6ce6ad90f3c..ae305f17382 100644 --- a/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.windows-x86_64.txt @@ -23,6 +23,7 @@ target_link_libraries(ydb-core-tx-replication-ydb_proxy-ut PUBLIC tx-replication-ydb_proxy cpp-testing-unittest core-testlib-default + cpp-client-ydb_topic ) target_sources(ydb-core-tx-replication-ydb_proxy-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp diff --git a/ydb/core/tx/replication/ydb_proxy/ut/ya.make b/ydb/core/tx/replication/ydb_proxy/ut/ya.make index 3c2b3ebf050..5ab140af115 100644 --- a/ydb/core/tx/replication/ydb_proxy/ut/ya.make +++ b/ydb/core/tx/replication/ydb_proxy/ut/ya.make @@ -9,6 +9,7 @@ TIMEOUT(600) PEERDIR( library/cpp/testing/unittest ydb/core/testlib/default + ydb/public/sdk/cpp/client/ydb_topic ) SRCS( diff --git a/ydb/core/tx/replication/ydb_proxy/ya.make b/ydb/core/tx/replication/ydb_proxy/ya.make index d9c9bc301bf..27ced7a7376 100644 --- a/ydb/core/tx/replication/ydb_proxy/ya.make +++ b/ydb/core/tx/replication/ydb_proxy/ya.make @@ -6,6 +6,7 @@ PEERDIR( ydb/public/sdk/cpp/client/ydb_driver ydb/public/sdk/cpp/client/ydb_scheme ydb/public/sdk/cpp/client/ydb_table + ydb/public/sdk/cpp/client/ydb_topic ydb/public/sdk/cpp/client/ydb_types/credentials ydb/public/sdk/cpp/client/ydb_types/credentials/login ) diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp index b0830cbc888..2f247114a2c 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp @@ -27,9 +27,11 @@ class TBaseProxyActor: public TActor<TDerived> { class TRequest; using TRequestPtr = std::shared_ptr<TRequest>; +protected: struct TEvPrivate { enum EEv { EvComplete = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + EvTopicEventReady, EvEnd, }; @@ -45,8 +47,20 @@ class TBaseProxyActor: public TActor<TDerived> { } }; + struct TEvTopicEventReady: public TEventLocal<TEvTopicEventReady, EvTopicEventReady> { + const TActorId Sender; + const ui64 Cookie; + + explicit TEvTopicEventReady(const TActorId& sender, ui64 cookie) + : Sender(sender) + , Cookie(cookie) + { + } + }; + }; // TEvPrivate +private: class TRequest: public std::enable_shared_from_this<TRequest> { friend class TBaseProxyActor<TDerived>; @@ -98,11 +112,6 @@ class TBaseProxyActor: public TActor<TDerived> { Requests.erase(ev->Get()->Request); } - void PassAway() override { - Requests.clear(); - IActor::PassAway(); - } - protected: using TActor<TDerived>::TActor; @@ -112,7 +121,12 @@ protected: } } - std::weak_ptr<TRequest> MakeRequest(const TActorId& sender, ui64 cookie) { + void PassAway() override { + Requests.clear(); + IActor::PassAway(); + } + + std::weak_ptr<TRequest> MakeRequest(const TActorId& sender, ui64 cookie = 0) { auto request = std::make_shared<TRequest>(TlsActivationContext->ActorSystem(), this->SelfId(), sender, cookie); Requests.emplace(request); return request; @@ -144,6 +158,51 @@ private: }; // TBaseProxyActor +class TTopicReader: public TBaseProxyActor<TTopicReader> { + void Handle(TEvYdbProxy::TEvReadTopicRequest::TPtr& ev) { + auto request = MakeRequest(SelfId()); + auto cb = [request, sender = ev->Sender, cookie = ev->Cookie](const NThreading::TFuture<void>&) { + if (auto r = request.lock()) { + r->Complete(new TEvPrivate::TEvTopicEventReady(sender, cookie)); + } + }; + + Session->WaitEvent().Subscribe(std::move(cb)); + } + + void Handle(TEvPrivate::TEvTopicEventReady::TPtr& ev) { + auto event = Session->GetEvent(true); + Y_VERIFY(event.Defined()); + Send(ev->Get()->Sender, new TEvYdbProxy::TEvReadTopicResponse(std::move(*event)), 0, ev->Get()->Cookie); + } + + void PassAway() override { + Session->Close(TDuration::MilliSeconds(100)); // non-blocking if there is no inflight commits + TBaseProxyActor<TTopicReader>::PassAway(); + } + +public: + explicit TTopicReader(const std::shared_ptr<IReadSession>& session) + : TBaseProxyActor(&TThis::StateWork) + , Session(session) + { + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvYdbProxy::TEvReadTopicRequest, Handle); + hFunc(TEvPrivate::TEvTopicEventReady, Handle); + + default: + return StateBase(ev, TlsActivationContext->AsActorContext()); + } + } + +private: + std::shared_ptr<IReadSession> Session; + +}; // TTopicReader + class TYdbProxy: public TBaseProxyActor<TYdbProxy> { template <typename TEvResponse, typename TClient, typename... Args> using TFunc = typename TEvResponse::TAsyncResult(TClient::*)(Args...); @@ -288,6 +347,13 @@ class TYdbProxy: public TBaseProxyActor<TYdbProxy> { Call<TEvYdbProxy::TEvDescribeConsumerResponse>(ev, &TTopicClient::DescribeConsumer); } + void Handle(TEvYdbProxy::TEvCreateTopicReaderRequest::TPtr& ev) { + auto* client = EnsureClient<TTopicClient>(); + auto args = std::move(ev->Get()->GetArgs()); + auto session = std::apply(&TTopicClient::CreateReadSession, std::tuple_cat(std::tie(client), std::move(args))); + Send(ev->Sender, new TEvYdbProxy::TEvCreateTopicReaderResponse(RegisterWithSameMailbox(new TTopicReader(session)))); + } + static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database) { return TCommonClientSettings() .DiscoveryEndpoint(endpoint) @@ -338,6 +404,7 @@ public: hFunc(TEvYdbProxy::TEvDropTopicRequest, Handle); hFunc(TEvYdbProxy::TEvDescribeTopicRequest, Handle); hFunc(TEvYdbProxy::TEvDescribeConsumerRequest, Handle); + hFunc(TEvYdbProxy::TEvCreateTopicReaderRequest, Handle); default: return StateBase(ev, TlsActivationContext->AsActorContext()); diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h index e87d6ccbb5d..0e2e1d178cd 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h @@ -47,6 +47,8 @@ struct TEvYdbProxy { EV_REQUEST_RESPONSE(DropTopic), EV_REQUEST_RESPONSE(DescribeTopic), EV_REQUEST_RESPONSE(DescribeConsumer), + EV_REQUEST_RESPONSE(CreateTopicReader), + EV_REQUEST_RESPONSE(ReadTopic), EvEnd, }; @@ -75,6 +77,11 @@ struct TEvYdbProxy { using TBase = TGenericRequest<TDerived, EventType, Args...>; }; + template <typename TDerived, ui32 EventType> + struct TGenericRequest<TDerived, EventType, void>: public TEventLocal<TDerived, EventType> { + using TBase = TGenericRequest<TDerived, EventType, void>; + }; + template <typename TDerived, ui32 EventType, typename T> struct TGenericResponse: public TEventLocal<TDerived, EventType> { using TResult = T; @@ -126,6 +133,8 @@ struct TEvYdbProxy { DEFINE_GENERIC_REQUEST_RESPONSE(DropTopic, NYdb::TStatus, TString, NYdb::NTopic::TDropTopicSettings); DEFINE_GENERIC_REQUEST_RESPONSE(DescribeTopic, NYdb::NTopic::TDescribeTopicResult, TString, NYdb::NTopic::TDescribeTopicSettings); DEFINE_GENERIC_REQUEST_RESPONSE(DescribeConsumer, NYdb::NTopic::TDescribeConsumerResult, TString, TString, NYdb::NTopic::TDescribeConsumerSettings); + DEFINE_GENERIC_REQUEST_RESPONSE(CreateTopicReader, TActorId, NYdb::NTopic::TReadSessionSettings); + DEFINE_GENERIC_REQUEST_RESPONSE(ReadTopic, NYdb::NTopic::TReadSessionEvent::TEvent, void); #undef DEFINE_GENERIC_REQUEST_RESPONSE #undef DEFINE_GENERIC_RESPONSE diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp index df4a220a5a2..d6f0cb5be6e 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp @@ -7,6 +7,7 @@ #include <library/cpp/testing/unittest/registar.h> #include <ydb/core/base/ticket_parser.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> namespace NKikimr::NReplication { @@ -29,11 +30,11 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { Server.SetupDefaultProfiles(); Client.InitRootScheme(DomainName); - const auto endpoint = "localhost:" + ToString(grpcPort); - const auto database = "/" + ToString(DomainName); + Endpoint = "localhost:" + ToString(grpcPort); + Database = "/" + ToString(DomainName); YdbProxy = Server.GetRuntime()->Register(CreateYdbProxy( - endpoint, UseDatabase ? database : "", std::forward<Args>(args)...)); + Endpoint, UseDatabase ? Database : "", std::forward<Args>(args)...)); Sender = Server.GetRuntime()->AllocateEdgeActor(); } @@ -48,6 +49,12 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { UNIT_ASSERT(!resp->Get()->Record.GetToken().empty()); } + template <typename TEvResponse> + auto SendImpl(const TActorId& recipient, IEventBase* ev) { + Server.GetRuntime()->Send(new IEventHandleFat(recipient, Sender, ev)); + return Server.GetRuntime()->GrabEdgeEvent<TEvResponse>(Sender); + } + public: TEnv(bool init = true) : Settings(Tests::TServerSettings(PortManager.GetPort(), {}, MakePqConfig()) @@ -105,8 +112,24 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { template <typename TEvResponse> auto Send(IEventBase* ev) { - Server.GetRuntime()->Send(new IEventHandleFat(YdbProxy, Sender, ev)); - return Server.GetRuntime()->GrabEdgeEvent<TEvResponse>(Sender); + return SendImpl<TEvResponse>(YdbProxy, ev); + } + + template <typename TEvResponse> + auto Send(const TActorId& recipient, IEventBase* ev) { + return SendImpl<TEvResponse>(recipient, ev); + } + + const NYdb::TDriver& GetDriver() const { + return Server.GetDriver(); + } + + const TString& GetEndpoint() const { + return Endpoint; + } + + const TString& GetDatabase() const { + return Database; } private: @@ -114,6 +137,8 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { Tests::TServerSettings Settings; Tests::TServer Server; Tests::TClient Client; + TString Endpoint; + TString Database; TActorId YdbProxy; TActorId Sender; }; @@ -676,6 +701,84 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { } } + template <typename Env> + TActorId CreateTopicReader(Env& env, const TString& topicPath) { + auto settings = NYdb::NTopic::TReadSessionSettings() + .ConsumerName("consumer") + .AppendTopics(NYdb::NTopic::TTopicReadSettings(topicPath)); + + auto ev = env.template Send<TEvYdbProxy::TEvCreateTopicReaderResponse>( + new TEvYdbProxy::TEvCreateTopicReaderRequest(settings)); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()->Result); + + return ev->Get()->Result; + } + + template <typename Env> + bool WriteTopic(const Env& env, const TString& topicPath, const TString& data) { + NYdb::NTopic::TTopicClient client(env.GetDriver(), NYdb::NTopic::TTopicClientSettings() + .DiscoveryEndpoint(env.GetEndpoint()) + .Database(env.GetDatabase()) + ); + + auto session = client.CreateSimpleBlockingWriteSession(NYdb::NTopic::TWriteSessionSettings() + .Path(topicPath) + .ProducerId("producer") + .MessageGroupId("producer") + ); + + const auto result = session->Write(data); + session->Close(); + + return result; + } + + template <typename TEvent, typename Env> + TEvent ReadTopic(Env& env, const TActorId& reader) { + auto ev = env.template Send<TEvYdbProxy::TEvReadTopicResponse>(reader, + new TEvYdbProxy::TEvReadTopicRequest()); + UNIT_ASSERT(ev); + + const auto* event = std::get_if<TEvent>(&ev->Get()->Result); + UNIT_ASSERT(event); + + return *event; + } + + Y_UNIT_TEST(ReadTopic) { + using TReadSessionEvent = NYdb::NTopic::TReadSessionEvent; + TEnv env; + + // create topic + { + auto settings = NYdb::NTopic::TCreateTopicSettings() + .BeginAddConsumer() + .ConsumerName("consumer") + .EndAddConsumer(); + + auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>( + new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", settings)); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()->Result.IsSuccess()); + } + + TActorId reader = CreateTopicReader(env, "/Root/topic"); + + UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-1")); + { + ReadTopic<TReadSessionEvent::TStartPartitionSessionEvent>(env, reader).Confirm(); + + auto data = ReadTopic<TReadSessionEvent::TDataReceivedEvent>(env, reader); + UNIT_ASSERT_VALUES_EQUAL(data.GetMessages().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(data.GetMessages().at(0).GetData(), "message-1"); + data.Commit(); + + auto ack = ReadTopic<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(env, reader); + UNIT_ASSERT_VALUES_EQUAL(ack.GetCommittedOffset(), 1); + } + } + } // YdbProxyTests } |