diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-03-14 16:08:08 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-03-14 16:08:08 +0300 |
commit | 4e8f8398c4ed7aa1faf5228e6a401a90e11e2085 (patch) | |
tree | 4c7e9990467e13a215877190cefc236031ce71da | |
parent | 1c8debf9a4446f20e279e2309eeaba5938e5b044 (diff) | |
download | ydb-4e8f8398c4ed7aa1faf5228e6a401a90e11e2085.tar.gz |
ydb proxy: add topics support
-rw-r--r-- | ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp | 76 | ||||
-rw-r--r-- | ydb/core/tx/replication/ydb_proxy/ydb_proxy.h | 20 | ||||
-rw-r--r-- | ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp | 168 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp | 4 |
4 files changed, 249 insertions, 19 deletions
diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp index 234a08a7556..b0830cbc888 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp @@ -14,13 +14,13 @@ #include <memory> #include <mutex> -namespace NKikimr { -namespace NReplication { +namespace NKikimr::NReplication { using namespace NKikimrReplication; using namespace NYdb; using namespace NYdb::NScheme; using namespace NYdb::NTable; +using namespace NYdb::NTopic; template <typename TDerived> class TBaseProxyActor: public TActor<TDerived> { @@ -148,11 +148,31 @@ class TYdbProxy: public TBaseProxyActor<TYdbProxy> { template <typename TEvResponse, typename TClient, typename... Args> using TFunc = typename TEvResponse::TAsyncResult(TClient::*)(Args...); - template <typename TClient> + template <typename TSettings> + static TSettings ClientSettings(const TCommonClientSettings& base) { + auto derived = TSettings(); + + if (base.DiscoveryEndpoint_) { + derived.DiscoveryEndpoint(*base.DiscoveryEndpoint_); + } + if (base.DiscoveryMode_) { + derived.DiscoveryMode(*base.DiscoveryMode_); + } + if (base.Database_) { + derived.Database(*base.Database_); + } + if (base.CredentialsProviderFactory_) { + derived.CredentialsProviderFactory(*base.CredentialsProviderFactory_); + } + + return derived; + } + + template <typename TClient, typename TSettings> TClient* EnsureClient(THolder<TClient>& client) { if (!client) { Y_VERIFY(AppData()->YdbDriver); - client.Reset(new TClient(*AppData()->YdbDriver, Settings)); + client.Reset(new TClient(*AppData()->YdbDriver, ClientSettings<TSettings>(CommonSettings))); } return client.Get(); @@ -161,9 +181,11 @@ class TYdbProxy: public TBaseProxyActor<TYdbProxy> { template <typename TClient> TClient* EnsureClient() { if constexpr (std::is_same_v<TClient, TSchemeClient>) { - return EnsureClient<TClient>(SchemeClient); + return EnsureClient<TClient, TCommonClientSettings>(SchemeClient); } else if constexpr (std::is_same_v<TClient, TTableClient>) { - return EnsureClient<TClient>(TableClient); + return EnsureClient<TClient, TClientSettings>(TableClient); + } else if constexpr (std::is_same_v<TClient, TTopicClient>) { + return EnsureClient<TClient, TTopicClientSettings>(TopicClient); } else { Y_FAIL("unreachable"); } @@ -246,19 +268,39 @@ class TYdbProxy: public TBaseProxyActor<TYdbProxy> { CallSession<TEvYdbProxy::TEvDescribeTableResponse>(ev, &TSession::DescribeTable); } - static TClientSettings MakeSettings(const TString& endpoint, const TString& database) { - return TClientSettings() + void Handle(TEvYdbProxy::TEvCreateTopicRequest::TPtr& ev) { + Call<TEvYdbProxy::TEvCreateTopicResponse>(ev, &TTopicClient::CreateTopic); + } + + void Handle(TEvYdbProxy::TEvAlterTopicRequest::TPtr& ev) { + Call<TEvYdbProxy::TEvAlterTopicResponse>(ev, &TTopicClient::AlterTopic); + } + + void Handle(TEvYdbProxy::TEvDropTopicRequest::TPtr& ev) { + Call<TEvYdbProxy::TEvDropTopicResponse>(ev, &TTopicClient::DropTopic); + } + + void Handle(TEvYdbProxy::TEvDescribeTopicRequest::TPtr& ev) { + Call<TEvYdbProxy::TEvDescribeTopicResponse>(ev, &TTopicClient::DescribeTopic); + } + + void Handle(TEvYdbProxy::TEvDescribeConsumerRequest::TPtr& ev) { + Call<TEvYdbProxy::TEvDescribeConsumerResponse>(ev, &TTopicClient::DescribeConsumer); + } + + static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database) { + return TCommonClientSettings() .DiscoveryEndpoint(endpoint) .DiscoveryMode(EDiscoveryMode::Async) .Database(database); } - static TClientSettings MakeSettings(const TString& endpoint, const TString& database, const TString& token) { + static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database, const TString& token) { return MakeSettings(endpoint, database) .AuthToken(token); } - static TClientSettings MakeSettings(const TString& endpoint, const TString& database, const TStaticCredentials& credentials) { + static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database, const TStaticCredentials& credentials) { return MakeSettings(endpoint, database) .CredentialsProviderFactory(CreateLoginCredentialsProviderFactory({ .User = credentials.GetUser(), @@ -270,7 +312,7 @@ public: template <typename... Args> explicit TYdbProxy(Args&&... args) : TBaseProxyActor(&TThis::StateWork) - , Settings(MakeSettings(std::forward<Args>(args)...)) + , CommonSettings(MakeSettings(std::forward<Args>(args)...)) { } @@ -290,6 +332,12 @@ public: hFunc(TEvYdbProxy::TEvCopyTablesRequest, Handle); hFunc(TEvYdbProxy::TEvRenameTablesRequest, Handle); hFunc(TEvYdbProxy::TEvDescribeTableRequest, Handle); + // Topic + hFunc(TEvYdbProxy::TEvCreateTopicRequest, Handle); + hFunc(TEvYdbProxy::TEvAlterTopicRequest, Handle); + hFunc(TEvYdbProxy::TEvDropTopicRequest, Handle); + hFunc(TEvYdbProxy::TEvDescribeTopicRequest, Handle); + hFunc(TEvYdbProxy::TEvDescribeConsumerRequest, Handle); default: return StateBase(ev, TlsActivationContext->AsActorContext()); @@ -297,9 +345,10 @@ public: } private: - const TClientSettings Settings; + const TCommonClientSettings CommonSettings; THolder<TSchemeClient> SchemeClient; THolder<TTableClient> TableClient; + THolder<TTopicClient> TopicClient; }; // TYdbProxy @@ -315,5 +364,4 @@ IActor* CreateYdbProxy(const TString& endpoint, const TString& database, const T return new TYdbProxy(endpoint, database, credentials); } -} // NReplication -} // NKikimr +} diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h index 73590bfe8b6..e87d6ccbb5d 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h @@ -2,6 +2,7 @@ #include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <ydb/core/base/defs.h> #include <ydb/core/base/events.h> @@ -10,8 +11,7 @@ namespace NKikimrReplication { class TStaticCredentials; } -namespace NKikimr { -namespace NReplication { +namespace NKikimr::NReplication { #pragma push_macro("RemoveDirectory") #undef RemoveDirectory @@ -41,6 +41,13 @@ struct TEvYdbProxy { EV_REQUEST_RESPONSE(RenameTables), EV_REQUEST_RESPONSE(DescribeTable), + EvTopic = EvBegin + 2 * 100, + EV_REQUEST_RESPONSE(CreateTopic), + EV_REQUEST_RESPONSE(AlterTopic), + EV_REQUEST_RESPONSE(DropTopic), + EV_REQUEST_RESPONSE(DescribeTopic), + EV_REQUEST_RESPONSE(DescribeConsumer), + EvEnd, }; @@ -113,6 +120,12 @@ struct TEvYdbProxy { DEFINE_GENERIC_REQUEST_RESPONSE(CopyTables, NYdb::TStatus, TVector<NYdb::NTable::TCopyItem>, NYdb::NTable::TCopyTablesSettings); DEFINE_GENERIC_REQUEST_RESPONSE(RenameTables, NYdb::TStatus, TVector<NYdb::NTable::TRenameItem>, NYdb::NTable::TRenameTablesSettings); DEFINE_GENERIC_REQUEST_RESPONSE(DescribeTable, NYdb::NTable::TDescribeTableResult, TString, NYdb::NTable::TDescribeTableSettings); + // Topic + DEFINE_GENERIC_REQUEST_RESPONSE(CreateTopic, NYdb::TStatus, TString, NYdb::NTopic::TCreateTopicSettings); + DEFINE_GENERIC_REQUEST_RESPONSE(AlterTopic, NYdb::TStatus, TString, NYdb::NTopic::TAlterTopicSettings); + DEFINE_GENERIC_REQUEST_RESPONSE(DropTopic, NYdb::TStatus, TString, NYdb::NTopic::TDropTopicSettings); + DEFINE_GENERIC_REQUEST_RESPONSE(DescribeTopic, NYdb::NTopic::TDescribeTopicResult, TString, NYdb::NTopic::TDescribeTopicSettings); + DEFINE_GENERIC_REQUEST_RESPONSE(DescribeConsumer, NYdb::NTopic::TDescribeConsumerResult, TString, TString, NYdb::NTopic::TDescribeConsumerSettings); #undef DEFINE_GENERIC_REQUEST_RESPONSE #undef DEFINE_GENERIC_RESPONSE @@ -127,5 +140,4 @@ IActor* CreateYdbProxy(const TString& endpoint, const TString& database, const T IActor* CreateYdbProxy(const TString& endpoint, const TString& database, const NKikimrReplication::TStaticCredentials& credentials); -} // NReplication -} // NKikimr +} diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp index 12677ebf027..df4a220a5a2 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp @@ -15,6 +15,12 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { class TEnv { static constexpr char DomainName[] = "Root"; + static NKikimrPQ::TPQConfig MakePqConfig() { + NKikimrPQ::TPQConfig config; + config.SetRequireCredentialsInNewProtocol(false); + return config; + } + template <typename... Args> void Init(Args&&... args) { auto grpcPort = PortManager.GetPort(); @@ -44,7 +50,7 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { public: TEnv(bool init = true) - : Settings(Tests::TServerSettings(PortManager.GetPort()) + : Settings(Tests::TServerSettings(PortManager.GetPort(), {}, MakePqConfig()) .SetDomainName(DomainName) ) , Server(Settings) @@ -510,6 +516,166 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { } } + Y_UNIT_TEST(CreateTopic) { + TEnv env; + // invalid retention period + { + auto settings = NYdb::NTopic::TCreateTopicSettings() + .RetentionPeriod(TDuration::Days(365)); + + auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>( + new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", settings)); + UNIT_ASSERT(ev); + UNIT_ASSERT(!ev->Get()->Result.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::BAD_REQUEST); + } + // ok + { + auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>( + new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", {})); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()->Result.IsSuccess()); + } + } + + Y_UNIT_TEST(AlterTopic) { + TEnv env; + // fail + { + auto settings = NYdb::NTopic::TAlterTopicSettings() + .SetRetentionPeriod(TDuration::Days(2)); + + auto ev = env.Send<TEvYdbProxy::TEvAlterTopicResponse>( + new TEvYdbProxy::TEvAlterTopicRequest("/Root/topic", settings)); + UNIT_ASSERT(ev); + UNIT_ASSERT(!ev->Get()->Result.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::SCHEME_ERROR); + } + // create + { + auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>( + new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", {})); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()->Result.IsSuccess()); + } + // ok + { + auto settings = NYdb::NTopic::TAlterTopicSettings() + .SetRetentionPeriod(TDuration::Days(2)); + + auto ev = env.Send<TEvYdbProxy::TEvAlterTopicResponse>( + new TEvYdbProxy::TEvAlterTopicRequest("/Root/topic", settings)); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()->Result.IsSuccess()); + } + // invalid retention period + { + auto settings = NYdb::NTopic::TAlterTopicSettings() + .SetRetentionPeriod(TDuration::Days(365)); + + auto ev = env.Send<TEvYdbProxy::TEvAlterTopicResponse>( + new TEvYdbProxy::TEvAlterTopicRequest("/Root/topic", settings)); + UNIT_ASSERT(ev); + UNIT_ASSERT(!ev->Get()->Result.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::BAD_REQUEST); + } + } + + Y_UNIT_TEST(DropTopic) { + TEnv env; + // create + { + auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>( + new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", {})); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()->Result.IsSuccess()); + } + // ok + { + auto ev = env.Send<TEvYdbProxy::TEvDropTopicResponse>( + new TEvYdbProxy::TEvDropTopicRequest("/Root/topic", {})); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()->Result.IsSuccess()); + } + // fail + { + auto ev = env.Send<TEvYdbProxy::TEvDropTopicResponse>( + new TEvYdbProxy::TEvDropTopicRequest("/Root/topic", {})); + UNIT_ASSERT(ev); + UNIT_ASSERT(!ev->Get()->Result.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::SCHEME_ERROR); + } + } + + Y_UNIT_TEST(DescribeTopic) { + TEnv env; + // fail + { + auto ev = env.Send<TEvYdbProxy::TEvDescribeTopicResponse>( + new TEvYdbProxy::TEvDescribeTopicRequest("/Root/topic", {})); + UNIT_ASSERT(ev); + UNIT_ASSERT(!ev->Get()->Result.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::SCHEME_ERROR); + } + // create + { + auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>( + new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", {})); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()->Result.IsSuccess()); + } + // ok + { + auto ev = env.Send<TEvYdbProxy::TEvDescribeTopicResponse>( + new TEvYdbProxy::TEvDescribeTopicRequest("/Root/topic", {})); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()->Result.IsSuccess()); + + const auto& schema = ev->Get()->Result.GetTopicDescription(); + UNIT_ASSERT_VALUES_EQUAL(schema.GetTotalPartitionsCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(schema.GetRetentionPeriod(), TDuration::Days(1)); + } + } + + Y_UNIT_TEST(DescribeConsumer) { + TEnv env; + // fail + { + auto ev = env.Send<TEvYdbProxy::TEvDescribeConsumerResponse>( + new TEvYdbProxy::TEvDescribeConsumerRequest("/Root/topic", "consumer", {})); + UNIT_ASSERT(ev); + UNIT_ASSERT(!ev->Get()->Result.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::SCHEME_ERROR); + } + // create + { + auto settings = NYdb::NTopic::TCreateTopicSettings() + .BeginAddConsumer() + .ConsumerName("consumer") + .EndAddConsumer(); + + auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>( + new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", settings)); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()->Result.IsSuccess()); + } + // ok + { + auto ev = env.Send<TEvYdbProxy::TEvDescribeConsumerResponse>( + new TEvYdbProxy::TEvDescribeConsumerRequest("/Root/topic", "consumer", {})); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()->Result.IsSuccess()); + } + // fail + { + auto ev = env.Send<TEvYdbProxy::TEvDescribeConsumerResponse>( + new TEvYdbProxy::TEvDescribeConsumerRequest("/Root/topic", "consumer2", {})); + UNIT_ASSERT(ev); + UNIT_ASSERT(!ev->Get()->Result.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::SCHEME_ERROR); + } + } + } // YdbProxyTests } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp index f5d1846bc59..bfa05a487c8 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp @@ -123,6 +123,10 @@ const TVector<TPartitionInfo>& TConsumerDescription::GetPartitions() const { return Partitions_; } +const TConsumer& TConsumerDescription::GetConsumer() const { + return Consumer_; +} + const TVector<ECodec>& TTopicDescription::GetSupportedCodecs() const { return SupportedCodecs_; } |