aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-03-14 16:08:08 +0300
committerilnaz <ilnaz@ydb.tech>2023-03-14 16:08:08 +0300
commit4e8f8398c4ed7aa1faf5228e6a401a90e11e2085 (patch)
tree4c7e9990467e13a215877190cefc236031ce71da
parent1c8debf9a4446f20e279e2309eeaba5938e5b044 (diff)
downloadydb-4e8f8398c4ed7aa1faf5228e6a401a90e11e2085.tar.gz
ydb proxy: add topics support
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp76
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ydb_proxy.h20
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp168
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp4
4 files changed, 249 insertions, 19 deletions
diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
index 234a08a7556..b0830cbc888 100644
--- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
+++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
@@ -14,13 +14,13 @@
#include <memory>
#include <mutex>
-namespace NKikimr {
-namespace NReplication {
+namespace NKikimr::NReplication {
using namespace NKikimrReplication;
using namespace NYdb;
using namespace NYdb::NScheme;
using namespace NYdb::NTable;
+using namespace NYdb::NTopic;
template <typename TDerived>
class TBaseProxyActor: public TActor<TDerived> {
@@ -148,11 +148,31 @@ class TYdbProxy: public TBaseProxyActor<TYdbProxy> {
template <typename TEvResponse, typename TClient, typename... Args>
using TFunc = typename TEvResponse::TAsyncResult(TClient::*)(Args...);
- template <typename TClient>
+ template <typename TSettings>
+ static TSettings ClientSettings(const TCommonClientSettings& base) {
+ auto derived = TSettings();
+
+ if (base.DiscoveryEndpoint_) {
+ derived.DiscoveryEndpoint(*base.DiscoveryEndpoint_);
+ }
+ if (base.DiscoveryMode_) {
+ derived.DiscoveryMode(*base.DiscoveryMode_);
+ }
+ if (base.Database_) {
+ derived.Database(*base.Database_);
+ }
+ if (base.CredentialsProviderFactory_) {
+ derived.CredentialsProviderFactory(*base.CredentialsProviderFactory_);
+ }
+
+ return derived;
+ }
+
+ template <typename TClient, typename TSettings>
TClient* EnsureClient(THolder<TClient>& client) {
if (!client) {
Y_VERIFY(AppData()->YdbDriver);
- client.Reset(new TClient(*AppData()->YdbDriver, Settings));
+ client.Reset(new TClient(*AppData()->YdbDriver, ClientSettings<TSettings>(CommonSettings)));
}
return client.Get();
@@ -161,9 +181,11 @@ class TYdbProxy: public TBaseProxyActor<TYdbProxy> {
template <typename TClient>
TClient* EnsureClient() {
if constexpr (std::is_same_v<TClient, TSchemeClient>) {
- return EnsureClient<TClient>(SchemeClient);
+ return EnsureClient<TClient, TCommonClientSettings>(SchemeClient);
} else if constexpr (std::is_same_v<TClient, TTableClient>) {
- return EnsureClient<TClient>(TableClient);
+ return EnsureClient<TClient, TClientSettings>(TableClient);
+ } else if constexpr (std::is_same_v<TClient, TTopicClient>) {
+ return EnsureClient<TClient, TTopicClientSettings>(TopicClient);
} else {
Y_FAIL("unreachable");
}
@@ -246,19 +268,39 @@ class TYdbProxy: public TBaseProxyActor<TYdbProxy> {
CallSession<TEvYdbProxy::TEvDescribeTableResponse>(ev, &TSession::DescribeTable);
}
- static TClientSettings MakeSettings(const TString& endpoint, const TString& database) {
- return TClientSettings()
+ void Handle(TEvYdbProxy::TEvCreateTopicRequest::TPtr& ev) {
+ Call<TEvYdbProxy::TEvCreateTopicResponse>(ev, &TTopicClient::CreateTopic);
+ }
+
+ void Handle(TEvYdbProxy::TEvAlterTopicRequest::TPtr& ev) {
+ Call<TEvYdbProxy::TEvAlterTopicResponse>(ev, &TTopicClient::AlterTopic);
+ }
+
+ void Handle(TEvYdbProxy::TEvDropTopicRequest::TPtr& ev) {
+ Call<TEvYdbProxy::TEvDropTopicResponse>(ev, &TTopicClient::DropTopic);
+ }
+
+ void Handle(TEvYdbProxy::TEvDescribeTopicRequest::TPtr& ev) {
+ Call<TEvYdbProxy::TEvDescribeTopicResponse>(ev, &TTopicClient::DescribeTopic);
+ }
+
+ void Handle(TEvYdbProxy::TEvDescribeConsumerRequest::TPtr& ev) {
+ Call<TEvYdbProxy::TEvDescribeConsumerResponse>(ev, &TTopicClient::DescribeConsumer);
+ }
+
+ static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database) {
+ return TCommonClientSettings()
.DiscoveryEndpoint(endpoint)
.DiscoveryMode(EDiscoveryMode::Async)
.Database(database);
}
- static TClientSettings MakeSettings(const TString& endpoint, const TString& database, const TString& token) {
+ static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database, const TString& token) {
return MakeSettings(endpoint, database)
.AuthToken(token);
}
- static TClientSettings MakeSettings(const TString& endpoint, const TString& database, const TStaticCredentials& credentials) {
+ static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database, const TStaticCredentials& credentials) {
return MakeSettings(endpoint, database)
.CredentialsProviderFactory(CreateLoginCredentialsProviderFactory({
.User = credentials.GetUser(),
@@ -270,7 +312,7 @@ public:
template <typename... Args>
explicit TYdbProxy(Args&&... args)
: TBaseProxyActor(&TThis::StateWork)
- , Settings(MakeSettings(std::forward<Args>(args)...))
+ , CommonSettings(MakeSettings(std::forward<Args>(args)...))
{
}
@@ -290,6 +332,12 @@ public:
hFunc(TEvYdbProxy::TEvCopyTablesRequest, Handle);
hFunc(TEvYdbProxy::TEvRenameTablesRequest, Handle);
hFunc(TEvYdbProxy::TEvDescribeTableRequest, Handle);
+ // Topic
+ hFunc(TEvYdbProxy::TEvCreateTopicRequest, Handle);
+ hFunc(TEvYdbProxy::TEvAlterTopicRequest, Handle);
+ hFunc(TEvYdbProxy::TEvDropTopicRequest, Handle);
+ hFunc(TEvYdbProxy::TEvDescribeTopicRequest, Handle);
+ hFunc(TEvYdbProxy::TEvDescribeConsumerRequest, Handle);
default:
return StateBase(ev, TlsActivationContext->AsActorContext());
@@ -297,9 +345,10 @@ public:
}
private:
- const TClientSettings Settings;
+ const TCommonClientSettings CommonSettings;
THolder<TSchemeClient> SchemeClient;
THolder<TTableClient> TableClient;
+ THolder<TTopicClient> TopicClient;
}; // TYdbProxy
@@ -315,5 +364,4 @@ IActor* CreateYdbProxy(const TString& endpoint, const TString& database, const T
return new TYdbProxy(endpoint, database, credentials);
}
-} // NReplication
-} // NKikimr
+}
diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
index 73590bfe8b6..e87d6ccbb5d 100644
--- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
+++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
@@ -2,6 +2,7 @@
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/core/base/defs.h>
#include <ydb/core/base/events.h>
@@ -10,8 +11,7 @@ namespace NKikimrReplication {
class TStaticCredentials;
}
-namespace NKikimr {
-namespace NReplication {
+namespace NKikimr::NReplication {
#pragma push_macro("RemoveDirectory")
#undef RemoveDirectory
@@ -41,6 +41,13 @@ struct TEvYdbProxy {
EV_REQUEST_RESPONSE(RenameTables),
EV_REQUEST_RESPONSE(DescribeTable),
+ EvTopic = EvBegin + 2 * 100,
+ EV_REQUEST_RESPONSE(CreateTopic),
+ EV_REQUEST_RESPONSE(AlterTopic),
+ EV_REQUEST_RESPONSE(DropTopic),
+ EV_REQUEST_RESPONSE(DescribeTopic),
+ EV_REQUEST_RESPONSE(DescribeConsumer),
+
EvEnd,
};
@@ -113,6 +120,12 @@ struct TEvYdbProxy {
DEFINE_GENERIC_REQUEST_RESPONSE(CopyTables, NYdb::TStatus, TVector<NYdb::NTable::TCopyItem>, NYdb::NTable::TCopyTablesSettings);
DEFINE_GENERIC_REQUEST_RESPONSE(RenameTables, NYdb::TStatus, TVector<NYdb::NTable::TRenameItem>, NYdb::NTable::TRenameTablesSettings);
DEFINE_GENERIC_REQUEST_RESPONSE(DescribeTable, NYdb::NTable::TDescribeTableResult, TString, NYdb::NTable::TDescribeTableSettings);
+ // Topic
+ DEFINE_GENERIC_REQUEST_RESPONSE(CreateTopic, NYdb::TStatus, TString, NYdb::NTopic::TCreateTopicSettings);
+ DEFINE_GENERIC_REQUEST_RESPONSE(AlterTopic, NYdb::TStatus, TString, NYdb::NTopic::TAlterTopicSettings);
+ DEFINE_GENERIC_REQUEST_RESPONSE(DropTopic, NYdb::TStatus, TString, NYdb::NTopic::TDropTopicSettings);
+ DEFINE_GENERIC_REQUEST_RESPONSE(DescribeTopic, NYdb::NTopic::TDescribeTopicResult, TString, NYdb::NTopic::TDescribeTopicSettings);
+ DEFINE_GENERIC_REQUEST_RESPONSE(DescribeConsumer, NYdb::NTopic::TDescribeConsumerResult, TString, TString, NYdb::NTopic::TDescribeConsumerSettings);
#undef DEFINE_GENERIC_REQUEST_RESPONSE
#undef DEFINE_GENERIC_RESPONSE
@@ -127,5 +140,4 @@ IActor* CreateYdbProxy(const TString& endpoint, const TString& database, const T
IActor* CreateYdbProxy(const TString& endpoint, const TString& database,
const NKikimrReplication::TStaticCredentials& credentials);
-} // NReplication
-} // NKikimr
+}
diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
index 12677ebf027..df4a220a5a2 100644
--- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
+++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
@@ -15,6 +15,12 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
class TEnv {
static constexpr char DomainName[] = "Root";
+ static NKikimrPQ::TPQConfig MakePqConfig() {
+ NKikimrPQ::TPQConfig config;
+ config.SetRequireCredentialsInNewProtocol(false);
+ return config;
+ }
+
template <typename... Args>
void Init(Args&&... args) {
auto grpcPort = PortManager.GetPort();
@@ -44,7 +50,7 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
public:
TEnv(bool init = true)
- : Settings(Tests::TServerSettings(PortManager.GetPort())
+ : Settings(Tests::TServerSettings(PortManager.GetPort(), {}, MakePqConfig())
.SetDomainName(DomainName)
)
, Server(Settings)
@@ -510,6 +516,166 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
}
}
+ Y_UNIT_TEST(CreateTopic) {
+ TEnv env;
+ // invalid retention period
+ {
+ auto settings = NYdb::NTopic::TCreateTopicSettings()
+ .RetentionPeriod(TDuration::Days(365));
+
+ auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>(
+ new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", settings));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(!ev->Get()->Result.IsSuccess());
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::BAD_REQUEST);
+ }
+ // ok
+ {
+ auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>(
+ new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", {}));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(ev->Get()->Result.IsSuccess());
+ }
+ }
+
+ Y_UNIT_TEST(AlterTopic) {
+ TEnv env;
+ // fail
+ {
+ auto settings = NYdb::NTopic::TAlterTopicSettings()
+ .SetRetentionPeriod(TDuration::Days(2));
+
+ auto ev = env.Send<TEvYdbProxy::TEvAlterTopicResponse>(
+ new TEvYdbProxy::TEvAlterTopicRequest("/Root/topic", settings));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(!ev->Get()->Result.IsSuccess());
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::SCHEME_ERROR);
+ }
+ // create
+ {
+ auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>(
+ new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", {}));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(ev->Get()->Result.IsSuccess());
+ }
+ // ok
+ {
+ auto settings = NYdb::NTopic::TAlterTopicSettings()
+ .SetRetentionPeriod(TDuration::Days(2));
+
+ auto ev = env.Send<TEvYdbProxy::TEvAlterTopicResponse>(
+ new TEvYdbProxy::TEvAlterTopicRequest("/Root/topic", settings));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(ev->Get()->Result.IsSuccess());
+ }
+ // invalid retention period
+ {
+ auto settings = NYdb::NTopic::TAlterTopicSettings()
+ .SetRetentionPeriod(TDuration::Days(365));
+
+ auto ev = env.Send<TEvYdbProxy::TEvAlterTopicResponse>(
+ new TEvYdbProxy::TEvAlterTopicRequest("/Root/topic", settings));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(!ev->Get()->Result.IsSuccess());
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::BAD_REQUEST);
+ }
+ }
+
+ Y_UNIT_TEST(DropTopic) {
+ TEnv env;
+ // create
+ {
+ auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>(
+ new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", {}));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(ev->Get()->Result.IsSuccess());
+ }
+ // ok
+ {
+ auto ev = env.Send<TEvYdbProxy::TEvDropTopicResponse>(
+ new TEvYdbProxy::TEvDropTopicRequest("/Root/topic", {}));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(ev->Get()->Result.IsSuccess());
+ }
+ // fail
+ {
+ auto ev = env.Send<TEvYdbProxy::TEvDropTopicResponse>(
+ new TEvYdbProxy::TEvDropTopicRequest("/Root/topic", {}));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(!ev->Get()->Result.IsSuccess());
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::SCHEME_ERROR);
+ }
+ }
+
+ Y_UNIT_TEST(DescribeTopic) {
+ TEnv env;
+ // fail
+ {
+ auto ev = env.Send<TEvYdbProxy::TEvDescribeTopicResponse>(
+ new TEvYdbProxy::TEvDescribeTopicRequest("/Root/topic", {}));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(!ev->Get()->Result.IsSuccess());
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::SCHEME_ERROR);
+ }
+ // create
+ {
+ auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>(
+ new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", {}));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(ev->Get()->Result.IsSuccess());
+ }
+ // ok
+ {
+ auto ev = env.Send<TEvYdbProxy::TEvDescribeTopicResponse>(
+ new TEvYdbProxy::TEvDescribeTopicRequest("/Root/topic", {}));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(ev->Get()->Result.IsSuccess());
+
+ const auto& schema = ev->Get()->Result.GetTopicDescription();
+ UNIT_ASSERT_VALUES_EQUAL(schema.GetTotalPartitionsCount(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(schema.GetRetentionPeriod(), TDuration::Days(1));
+ }
+ }
+
+ Y_UNIT_TEST(DescribeConsumer) {
+ TEnv env;
+ // fail
+ {
+ auto ev = env.Send<TEvYdbProxy::TEvDescribeConsumerResponse>(
+ new TEvYdbProxy::TEvDescribeConsumerRequest("/Root/topic", "consumer", {}));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(!ev->Get()->Result.IsSuccess());
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::SCHEME_ERROR);
+ }
+ // create
+ {
+ auto settings = NYdb::NTopic::TCreateTopicSettings()
+ .BeginAddConsumer()
+ .ConsumerName("consumer")
+ .EndAddConsumer();
+
+ auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>(
+ new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", settings));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(ev->Get()->Result.IsSuccess());
+ }
+ // ok
+ {
+ auto ev = env.Send<TEvYdbProxy::TEvDescribeConsumerResponse>(
+ new TEvYdbProxy::TEvDescribeConsumerRequest("/Root/topic", "consumer", {}));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(ev->Get()->Result.IsSuccess());
+ }
+ // fail
+ {
+ auto ev = env.Send<TEvYdbProxy::TEvDescribeConsumerResponse>(
+ new TEvYdbProxy::TEvDescribeConsumerRequest("/Root/topic", "consumer2", {}));
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(!ev->Get()->Result.IsSuccess());
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::SCHEME_ERROR);
+ }
+ }
+
} // YdbProxyTests
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
index f5d1846bc59..bfa05a487c8 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
@@ -123,6 +123,10 @@ const TVector<TPartitionInfo>& TConsumerDescription::GetPartitions() const {
return Partitions_;
}
+const TConsumer& TConsumerDescription::GetConsumer() const {
+ return Consumer_;
+}
+
const TVector<ECodec>& TTopicDescription::GetSupportedCodecs() const {
return SupportedCodecs_;
}