aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-03-16 17:26:49 +0300
committerilnaz <ilnaz@ydb.tech>2023-03-16 17:26:49 +0300
commitc3ac866fbfd9872ea4afc6b2c15fc27cf384d06c (patch)
tree2eeb723b88bd676256f25b2630d157cf228a3e71
parent87b918ead2dcdebbc8ba3798592ac1626a4f8dd5 (diff)
downloadydb-c3ac866fbfd9872ea4afc6b2c15fc27cf384d06c.tar.gz
Topic reader
-rw-r--r--ydb/core/tx/replication/ydb_proxy/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/replication/ydb_proxy/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ut/ya.make1
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ya.make1
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp79
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ydb_proxy.h9
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp113
13 files changed, 200 insertions, 11 deletions
diff --git a/ydb/core/tx/replication/ydb_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/replication/ydb_proxy/CMakeLists.darwin-x86_64.txt
index f5289141cd4..9fbdd52b18b 100644
--- a/ydb/core/tx/replication/ydb_proxy/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/replication/ydb_proxy/CMakeLists.darwin-x86_64.txt
@@ -20,6 +20,7 @@ target_link_libraries(tx-replication-ydb_proxy PUBLIC
cpp-client-ydb_driver
cpp-client-ydb_scheme
cpp-client-ydb_table
+ cpp-client-ydb_topic
client-ydb_types-credentials
ydb_types-credentials-login
)
diff --git a/ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-aarch64.txt
index dd04a0b7fea..34147cb6a4b 100644
--- a/ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-aarch64.txt
@@ -21,6 +21,7 @@ target_link_libraries(tx-replication-ydb_proxy PUBLIC
cpp-client-ydb_driver
cpp-client-ydb_scheme
cpp-client-ydb_table
+ cpp-client-ydb_topic
client-ydb_types-credentials
ydb_types-credentials-login
)
diff --git a/ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-x86_64.txt
index dd04a0b7fea..34147cb6a4b 100644
--- a/ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/replication/ydb_proxy/CMakeLists.linux-x86_64.txt
@@ -21,6 +21,7 @@ target_link_libraries(tx-replication-ydb_proxy PUBLIC
cpp-client-ydb_driver
cpp-client-ydb_scheme
cpp-client-ydb_table
+ cpp-client-ydb_topic
client-ydb_types-credentials
ydb_types-credentials-login
)
diff --git a/ydb/core/tx/replication/ydb_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/tx/replication/ydb_proxy/CMakeLists.windows-x86_64.txt
index f5289141cd4..9fbdd52b18b 100644
--- a/ydb/core/tx/replication/ydb_proxy/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/replication/ydb_proxy/CMakeLists.windows-x86_64.txt
@@ -20,6 +20,7 @@ target_link_libraries(tx-replication-ydb_proxy PUBLIC
cpp-client-ydb_driver
cpp-client-ydb_scheme
cpp-client-ydb_table
+ cpp-client-ydb_topic
client-ydb_types-credentials
ydb_types-credentials-login
)
diff --git a/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.darwin-x86_64.txt
index c9503b232af..b361b4b1934 100644
--- a/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.darwin-x86_64.txt
@@ -23,6 +23,7 @@ target_link_libraries(ydb-core-tx-replication-ydb_proxy-ut PUBLIC
tx-replication-ydb_proxy
cpp-testing-unittest
core-testlib-default
+ cpp-client-ydb_topic
)
target_link_options(ydb-core-tx-replication-ydb_proxy-ut PRIVATE
-Wl,-no_deduplicate
diff --git a/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-aarch64.txt b/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-aarch64.txt
index b802327a08c..958ec4ce5a0 100644
--- a/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-aarch64.txt
@@ -23,6 +23,7 @@ target_link_libraries(ydb-core-tx-replication-ydb_proxy-ut PUBLIC
tx-replication-ydb_proxy
cpp-testing-unittest
core-testlib-default
+ cpp-client-ydb_topic
)
target_link_options(ydb-core-tx-replication-ydb_proxy-ut PRIVATE
-ldl
diff --git a/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-x86_64.txt b/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-x86_64.txt
index 61f6cd7a7dc..3e3361ef1fb 100644
--- a/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.linux-x86_64.txt
@@ -25,6 +25,7 @@ target_link_libraries(ydb-core-tx-replication-ydb_proxy-ut PUBLIC
tx-replication-ydb_proxy
cpp-testing-unittest
core-testlib-default
+ cpp-client-ydb_topic
)
target_link_options(ydb-core-tx-replication-ydb_proxy-ut PRIVATE
-ldl
diff --git a/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.windows-x86_64.txt b/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.windows-x86_64.txt
index 6ce6ad90f3c..ae305f17382 100644
--- a/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/replication/ydb_proxy/ut/CMakeLists.windows-x86_64.txt
@@ -23,6 +23,7 @@ target_link_libraries(ydb-core-tx-replication-ydb_proxy-ut PUBLIC
tx-replication-ydb_proxy
cpp-testing-unittest
core-testlib-default
+ cpp-client-ydb_topic
)
target_sources(ydb-core-tx-replication-ydb_proxy-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
diff --git a/ydb/core/tx/replication/ydb_proxy/ut/ya.make b/ydb/core/tx/replication/ydb_proxy/ut/ya.make
index 3c2b3ebf050..5ab140af115 100644
--- a/ydb/core/tx/replication/ydb_proxy/ut/ya.make
+++ b/ydb/core/tx/replication/ydb_proxy/ut/ya.make
@@ -9,6 +9,7 @@ TIMEOUT(600)
PEERDIR(
library/cpp/testing/unittest
ydb/core/testlib/default
+ ydb/public/sdk/cpp/client/ydb_topic
)
SRCS(
diff --git a/ydb/core/tx/replication/ydb_proxy/ya.make b/ydb/core/tx/replication/ydb_proxy/ya.make
index d9c9bc301bf..27ced7a7376 100644
--- a/ydb/core/tx/replication/ydb_proxy/ya.make
+++ b/ydb/core/tx/replication/ydb_proxy/ya.make
@@ -6,6 +6,7 @@ PEERDIR(
ydb/public/sdk/cpp/client/ydb_driver
ydb/public/sdk/cpp/client/ydb_scheme
ydb/public/sdk/cpp/client/ydb_table
+ ydb/public/sdk/cpp/client/ydb_topic
ydb/public/sdk/cpp/client/ydb_types/credentials
ydb/public/sdk/cpp/client/ydb_types/credentials/login
)
diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
index b0830cbc888..2f247114a2c 100644
--- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
+++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
@@ -27,9 +27,11 @@ class TBaseProxyActor: public TActor<TDerived> {
class TRequest;
using TRequestPtr = std::shared_ptr<TRequest>;
+protected:
struct TEvPrivate {
enum EEv {
EvComplete = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
+ EvTopicEventReady,
EvEnd,
};
@@ -45,8 +47,20 @@ class TBaseProxyActor: public TActor<TDerived> {
}
};
+ struct TEvTopicEventReady: public TEventLocal<TEvTopicEventReady, EvTopicEventReady> {
+ const TActorId Sender;
+ const ui64 Cookie;
+
+ explicit TEvTopicEventReady(const TActorId& sender, ui64 cookie)
+ : Sender(sender)
+ , Cookie(cookie)
+ {
+ }
+ };
+
}; // TEvPrivate
+private:
class TRequest: public std::enable_shared_from_this<TRequest> {
friend class TBaseProxyActor<TDerived>;
@@ -98,11 +112,6 @@ class TBaseProxyActor: public TActor<TDerived> {
Requests.erase(ev->Get()->Request);
}
- void PassAway() override {
- Requests.clear();
- IActor::PassAway();
- }
-
protected:
using TActor<TDerived>::TActor;
@@ -112,7 +121,12 @@ protected:
}
}
- std::weak_ptr<TRequest> MakeRequest(const TActorId& sender, ui64 cookie) {
+ void PassAway() override {
+ Requests.clear();
+ IActor::PassAway();
+ }
+
+ std::weak_ptr<TRequest> MakeRequest(const TActorId& sender, ui64 cookie = 0) {
auto request = std::make_shared<TRequest>(TlsActivationContext->ActorSystem(), this->SelfId(), sender, cookie);
Requests.emplace(request);
return request;
@@ -144,6 +158,51 @@ private:
}; // TBaseProxyActor
+class TTopicReader: public TBaseProxyActor<TTopicReader> {
+ void Handle(TEvYdbProxy::TEvReadTopicRequest::TPtr& ev) {
+ auto request = MakeRequest(SelfId());
+ auto cb = [request, sender = ev->Sender, cookie = ev->Cookie](const NThreading::TFuture<void>&) {
+ if (auto r = request.lock()) {
+ r->Complete(new TEvPrivate::TEvTopicEventReady(sender, cookie));
+ }
+ };
+
+ Session->WaitEvent().Subscribe(std::move(cb));
+ }
+
+ void Handle(TEvPrivate::TEvTopicEventReady::TPtr& ev) {
+ auto event = Session->GetEvent(true);
+ Y_VERIFY(event.Defined());
+ Send(ev->Get()->Sender, new TEvYdbProxy::TEvReadTopicResponse(std::move(*event)), 0, ev->Get()->Cookie);
+ }
+
+ void PassAway() override {
+ Session->Close(TDuration::MilliSeconds(100)); // non-blocking if there is no inflight commits
+ TBaseProxyActor<TTopicReader>::PassAway();
+ }
+
+public:
+ explicit TTopicReader(const std::shared_ptr<IReadSession>& session)
+ : TBaseProxyActor(&TThis::StateWork)
+ , Session(session)
+ {
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvYdbProxy::TEvReadTopicRequest, Handle);
+ hFunc(TEvPrivate::TEvTopicEventReady, Handle);
+
+ default:
+ return StateBase(ev, TlsActivationContext->AsActorContext());
+ }
+ }
+
+private:
+ std::shared_ptr<IReadSession> Session;
+
+}; // TTopicReader
+
class TYdbProxy: public TBaseProxyActor<TYdbProxy> {
template <typename TEvResponse, typename TClient, typename... Args>
using TFunc = typename TEvResponse::TAsyncResult(TClient::*)(Args...);
@@ -288,6 +347,13 @@ class TYdbProxy: public TBaseProxyActor<TYdbProxy> {
Call<TEvYdbProxy::TEvDescribeConsumerResponse>(ev, &TTopicClient::DescribeConsumer);
}
+ void Handle(TEvYdbProxy::TEvCreateTopicReaderRequest::TPtr& ev) {
+ auto* client = EnsureClient<TTopicClient>();
+ auto args = std::move(ev->Get()->GetArgs());
+ auto session = std::apply(&TTopicClient::CreateReadSession, std::tuple_cat(std::tie(client), std::move(args)));
+ Send(ev->Sender, new TEvYdbProxy::TEvCreateTopicReaderResponse(RegisterWithSameMailbox(new TTopicReader(session))));
+ }
+
static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database) {
return TCommonClientSettings()
.DiscoveryEndpoint(endpoint)
@@ -338,6 +404,7 @@ public:
hFunc(TEvYdbProxy::TEvDropTopicRequest, Handle);
hFunc(TEvYdbProxy::TEvDescribeTopicRequest, Handle);
hFunc(TEvYdbProxy::TEvDescribeConsumerRequest, Handle);
+ hFunc(TEvYdbProxy::TEvCreateTopicReaderRequest, Handle);
default:
return StateBase(ev, TlsActivationContext->AsActorContext());
diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
index e87d6ccbb5d..0e2e1d178cd 100644
--- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
+++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
@@ -47,6 +47,8 @@ struct TEvYdbProxy {
EV_REQUEST_RESPONSE(DropTopic),
EV_REQUEST_RESPONSE(DescribeTopic),
EV_REQUEST_RESPONSE(DescribeConsumer),
+ EV_REQUEST_RESPONSE(CreateTopicReader),
+ EV_REQUEST_RESPONSE(ReadTopic),
EvEnd,
};
@@ -75,6 +77,11 @@ struct TEvYdbProxy {
using TBase = TGenericRequest<TDerived, EventType, Args...>;
};
+ template <typename TDerived, ui32 EventType>
+ struct TGenericRequest<TDerived, EventType, void>: public TEventLocal<TDerived, EventType> {
+ using TBase = TGenericRequest<TDerived, EventType, void>;
+ };
+
template <typename TDerived, ui32 EventType, typename T>
struct TGenericResponse: public TEventLocal<TDerived, EventType> {
using TResult = T;
@@ -126,6 +133,8 @@ struct TEvYdbProxy {
DEFINE_GENERIC_REQUEST_RESPONSE(DropTopic, NYdb::TStatus, TString, NYdb::NTopic::TDropTopicSettings);
DEFINE_GENERIC_REQUEST_RESPONSE(DescribeTopic, NYdb::NTopic::TDescribeTopicResult, TString, NYdb::NTopic::TDescribeTopicSettings);
DEFINE_GENERIC_REQUEST_RESPONSE(DescribeConsumer, NYdb::NTopic::TDescribeConsumerResult, TString, TString, NYdb::NTopic::TDescribeConsumerSettings);
+ DEFINE_GENERIC_REQUEST_RESPONSE(CreateTopicReader, TActorId, NYdb::NTopic::TReadSessionSettings);
+ DEFINE_GENERIC_REQUEST_RESPONSE(ReadTopic, NYdb::NTopic::TReadSessionEvent::TEvent, void);
#undef DEFINE_GENERIC_REQUEST_RESPONSE
#undef DEFINE_GENERIC_RESPONSE
diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
index df4a220a5a2..d6f0cb5be6e 100644
--- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
+++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
@@ -7,6 +7,7 @@
#include <library/cpp/testing/unittest/registar.h>
#include <ydb/core/base/ticket_parser.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
namespace NKikimr::NReplication {
@@ -29,11 +30,11 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
Server.SetupDefaultProfiles();
Client.InitRootScheme(DomainName);
- const auto endpoint = "localhost:" + ToString(grpcPort);
- const auto database = "/" + ToString(DomainName);
+ Endpoint = "localhost:" + ToString(grpcPort);
+ Database = "/" + ToString(DomainName);
YdbProxy = Server.GetRuntime()->Register(CreateYdbProxy(
- endpoint, UseDatabase ? database : "", std::forward<Args>(args)...));
+ Endpoint, UseDatabase ? Database : "", std::forward<Args>(args)...));
Sender = Server.GetRuntime()->AllocateEdgeActor();
}
@@ -48,6 +49,12 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
UNIT_ASSERT(!resp->Get()->Record.GetToken().empty());
}
+ template <typename TEvResponse>
+ auto SendImpl(const TActorId& recipient, IEventBase* ev) {
+ Server.GetRuntime()->Send(new IEventHandleFat(recipient, Sender, ev));
+ return Server.GetRuntime()->GrabEdgeEvent<TEvResponse>(Sender);
+ }
+
public:
TEnv(bool init = true)
: Settings(Tests::TServerSettings(PortManager.GetPort(), {}, MakePqConfig())
@@ -105,8 +112,24 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
template <typename TEvResponse>
auto Send(IEventBase* ev) {
- Server.GetRuntime()->Send(new IEventHandleFat(YdbProxy, Sender, ev));
- return Server.GetRuntime()->GrabEdgeEvent<TEvResponse>(Sender);
+ return SendImpl<TEvResponse>(YdbProxy, ev);
+ }
+
+ template <typename TEvResponse>
+ auto Send(const TActorId& recipient, IEventBase* ev) {
+ return SendImpl<TEvResponse>(recipient, ev);
+ }
+
+ const NYdb::TDriver& GetDriver() const {
+ return Server.GetDriver();
+ }
+
+ const TString& GetEndpoint() const {
+ return Endpoint;
+ }
+
+ const TString& GetDatabase() const {
+ return Database;
}
private:
@@ -114,6 +137,8 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
Tests::TServerSettings Settings;
Tests::TServer Server;
Tests::TClient Client;
+ TString Endpoint;
+ TString Database;
TActorId YdbProxy;
TActorId Sender;
};
@@ -676,6 +701,84 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
}
}
+ template <typename Env>
+ TActorId CreateTopicReader(Env& env, const TString& topicPath) {
+ auto settings = NYdb::NTopic::TReadSessionSettings()
+ .ConsumerName("consumer")
+ .AppendTopics(NYdb::NTopic::TTopicReadSettings(topicPath));
+
+ auto ev = env.template Send<TEvYdbProxy::TEvCreateTopicReaderResponse>(
+ new TEvYdbProxy::TEvCreateTopicReaderRequest(settings));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(ev->Get()->Result);
+
+ return ev->Get()->Result;
+ }
+
+ template <typename Env>
+ bool WriteTopic(const Env& env, const TString& topicPath, const TString& data) {
+ NYdb::NTopic::TTopicClient client(env.GetDriver(), NYdb::NTopic::TTopicClientSettings()
+ .DiscoveryEndpoint(env.GetEndpoint())
+ .Database(env.GetDatabase())
+ );
+
+ auto session = client.CreateSimpleBlockingWriteSession(NYdb::NTopic::TWriteSessionSettings()
+ .Path(topicPath)
+ .ProducerId("producer")
+ .MessageGroupId("producer")
+ );
+
+ const auto result = session->Write(data);
+ session->Close();
+
+ return result;
+ }
+
+ template <typename TEvent, typename Env>
+ TEvent ReadTopic(Env& env, const TActorId& reader) {
+ auto ev = env.template Send<TEvYdbProxy::TEvReadTopicResponse>(reader,
+ new TEvYdbProxy::TEvReadTopicRequest());
+ UNIT_ASSERT(ev);
+
+ const auto* event = std::get_if<TEvent>(&ev->Get()->Result);
+ UNIT_ASSERT(event);
+
+ return *event;
+ }
+
+ Y_UNIT_TEST(ReadTopic) {
+ using TReadSessionEvent = NYdb::NTopic::TReadSessionEvent;
+ TEnv env;
+
+ // create topic
+ {
+ auto settings = NYdb::NTopic::TCreateTopicSettings()
+ .BeginAddConsumer()
+ .ConsumerName("consumer")
+ .EndAddConsumer();
+
+ auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>(
+ new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", settings));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(ev->Get()->Result.IsSuccess());
+ }
+
+ TActorId reader = CreateTopicReader(env, "/Root/topic");
+
+ UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-1"));
+ {
+ ReadTopic<TReadSessionEvent::TStartPartitionSessionEvent>(env, reader).Confirm();
+
+ auto data = ReadTopic<TReadSessionEvent::TDataReceivedEvent>(env, reader);
+ UNIT_ASSERT_VALUES_EQUAL(data.GetMessages().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(data.GetMessages().at(0).GetData(), "message-1");
+ data.Commit();
+
+ auto ack = ReadTopic<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(env, reader);
+ UNIT_ASSERT_VALUES_EQUAL(ack.GetCommittedOffset(), 1);
+ }
+ }
+
} // YdbProxyTests
}