diff options
author | qyryq <qyryq@ydb.tech> | 2024-04-08 13:09:22 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-08 13:09:22 +0300 |
commit | 7ccec7095a02bb365cb4574a9de97ad068dbdcaf (patch) | |
tree | 46b4b3a640b0bf678d05ff098eda6c9e605208ff | |
parent | 9569967f0ee797557f309ca252c5d6968c678cbb (diff) | |
download | ydb-7ccec7095a02bb365cb4574a9de97ad068dbdcaf.tar.gz |
TFederatedWriteSession: take into account AllowFallback setting; use retry policy (#3460)
4 files changed, 362 insertions, 29 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp index 3b969d4cc72..9907343612e 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp @@ -129,50 +129,93 @@ void TFederatedWriteSessionImpl::OpenSubSessionImpl(std::shared_ptr<TDbInfo> db) CurrentDatabase = db; } -std::shared_ptr<TDbInfo> TFederatedWriteSessionImpl::SelectDatabaseImpl() { - std::vector<std::shared_ptr<TDbInfo>> availableDatabases; +std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabaseByHash( + NTopic::TFederatedWriteSessionSettings const& settings, + std::vector<std::shared_ptr<TDbInfo>> const& dbInfos +) { ui64 totalWeight = 0; + std::vector<std::shared_ptr<TDbInfo>> available; - for (const auto& db : FederationState->DbInfos) { - if (db->status() != TDbInfo::Status::DatabaseInfo_Status_AVAILABLE) { - continue; - } - - if (Settings.PreferredDatabase_ && (AsciiEqualsIgnoreCase(db->name(), *Settings.PreferredDatabase_) || - AsciiEqualsIgnoreCase(db->id(), *Settings.PreferredDatabase_))) { - return db; - } else if (AsciiEqualsIgnoreCase(FederationState->SelfLocation, db->location())) { - return db; - } else { - availableDatabases.push_back(db); + for (const auto& db : dbInfos) { + if (db->status() == TDbInfo::Status::DatabaseInfo_Status_AVAILABLE) { + available.push_back(db); totalWeight += db->weight(); } } - if (availableDatabases.empty() || totalWeight == 0) { - // close session, return error - return nullptr; + if (available.empty() || totalWeight == 0) { + return {nullptr, EStatus::NOT_FOUND}; } - std::sort(availableDatabases.begin(), availableDatabases.end(), [](const std::shared_ptr<TDbInfo>& lhs, const std::shared_ptr<TDbInfo>& rhs){ - return lhs->weight() > rhs->weight() - || lhs->weight() == rhs->weight() && lhs->name() < rhs->name(); - }); + std::sort(available.begin(), available.end(), [](auto const& lhs, auto const& rhs) { return lhs->name() < rhs->name(); }); - ui64 hashValue = THash<TString>()(Settings.Path_); - hashValue = CombineHashes(hashValue, THash<TString>()(Settings.ProducerId_)); + ui64 hashValue = THash<TString>()(settings.Path_); + hashValue = CombineHashes(hashValue, THash<TString>()(settings.ProducerId_)); hashValue %= totalWeight; ui64 borderWeight = 0; - for (const auto& db : availableDatabases) { + for (auto const& db : available) { borderWeight += db->weight(); if (hashValue < borderWeight) { - return db; + return {db, EStatus::SUCCESS}; } } Y_UNREACHABLE(); } +std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabase( + NTopic::TFederatedWriteSessionSettings const& settings, + std::vector<std::shared_ptr<TDbInfo>> const& dbInfos, TString const& selfLocation +) { + /* Logic of the function should follow this table: + + | PreferredDb | Preferred state | Local state | AllowFallback | Return | + |-------------+-----------------+-------------+---------------+-------------| + | set | not found | - | any | NOT_FOUND | + | set | available | - | any | preferred | + | set | unavailable | - | false | UNAVAILABLE | + | set | unavailable | - | true | by hash | + | unset | - | not found | false | NOT_FOUND | + | unset | - | not found | true | by hash | + | unset | - | available | any | local | + | unset | - | unavailable | false | UNAVAILABLE | + | unset | - | unavailable | true | by hash | + */ + + decltype(begin(dbInfos)) it; + if (settings.PreferredDatabase_) { + it = std::find_if(begin(dbInfos), end(dbInfos), [&preferred = settings.PreferredDatabase_](auto const& db) { + return AsciiEqualsIgnoreCase(*preferred, db->name()) || AsciiEqualsIgnoreCase(*preferred, db->id()); + }); + if (it == end(dbInfos)) { + return {nullptr, EStatus::NOT_FOUND}; + } + } else { + it = std::find_if(begin(dbInfos), end(dbInfos), [&selfLocation](auto const& db) { + return AsciiEqualsIgnoreCase(selfLocation, db->location()); + }); + if (it == end(dbInfos)) { + if (!settings.AllowFallback_) { + return {nullptr, EStatus::NOT_FOUND}; + } + return SelectDatabaseByHash(settings, dbInfos); + } + } + + auto db = *it; + if (db->status() == TDbInfo::Status::DatabaseInfo_Status_AVAILABLE) { + return {db, EStatus::SUCCESS}; + } + if (!settings.AllowFallback_) { + return {nullptr, EStatus::UNAVAILABLE}; + } + return SelectDatabaseByHash(settings, dbInfos); +} + +std::pair<std::shared_ptr<TDbInfo>, EStatus> TFederatedWriteSessionImpl::SelectDatabaseImpl() { + return SelectDatabase(Settings, FederationState->DbInfos, FederationState->SelfLocation); +} + void TFederatedWriteSessionImpl::OnFederatedStateUpdateImpl() { if (!FederationState->Status.IsSuccess()) { CloseImpl(FederationState->Status.GetStatus(), NYql::TIssues(FederationState->Status.GetIssues())); @@ -181,13 +224,23 @@ void TFederatedWriteSessionImpl::OnFederatedStateUpdateImpl() { Y_ABORT_UNLESS(!FederationState->DbInfos.empty()); - auto preferrableDb = SelectDatabaseImpl(); + auto [preferrableDb, status] = SelectDatabaseImpl(); if (!preferrableDb) { - CloseImpl(EStatus::UNAVAILABLE, - NYql::TIssues{NYql::TIssue("Fail to select database: no available database with positive weight")}); + if (!RetryState) { + RetryState = Settings.RetryPolicy_->CreateRetryState(); + } + if (auto delay = RetryState->GetNextRetryDelay(status)) { + LOG_LAZY(Log, TLOG_NOTICE, GetLogPrefix() << "Retry to update federation state in " << delay); + ScheduleFederatedStateUpdateImpl(*delay); + } else { + TString message = "Failed to select database: no available database"; + LOG_LAZY(Log, TLOG_ERR, GetLogPrefix() << message); + CloseImpl(status, NYql::TIssues{NYql::TIssue(message)}); + } return; } + RetryState.reset(); if (!DatabasesAreSame(preferrableDb, CurrentDatabase)) { LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h index 52110c2496d..68ba96e6d9c 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h @@ -10,6 +10,14 @@ namespace NYdb::NFederatedTopic { +std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabaseByHash( + NTopic::TFederatedWriteSessionSettings const& settings, + std::vector<std::shared_ptr<TDbInfo>> const& dbInfos); + +std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabase( + NTopic::TFederatedWriteSessionSettings const& settings, + std::vector<std::shared_ptr<TDbInfo>> const& dbInfos, TString const& selfLocation); + class TFederatedWriteSessionImpl : public NTopic::TContinuationTokenIssuer, public NTopic::TEnableSelfContext<TFederatedWriteSessionImpl> { friend class TFederatedWriteSession; @@ -78,7 +86,7 @@ private: void Start(); void OpenSubSessionImpl(std::shared_ptr<TDbInfo> db); - std::shared_ptr<TDbInfo> SelectDatabaseImpl(); + std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabaseImpl(); void OnFederatedStateUpdateImpl(); void ScheduleFederatedStateUpdateImpl(TDuration delay); @@ -98,6 +106,7 @@ private: const NTopic::TTopicClientSettings SubClientSetttings; std::shared_ptr<std::unordered_map<NTopic::ECodec, THolder<NTopic::ICodec>>> ProvidedCodecs; + NTopic::IRetryPolicy::IRetryState::TPtr RetryState; std::shared_ptr<TFederatedDbObserver> Observer; NThreading::TFuture<void> AsyncInit; std::shared_ptr<TFederatedDbState> FederationState; diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp index a3bffe452ae..eeee33da1b7 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp @@ -1,4 +1,5 @@ #include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h> +#include <ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h> #include <ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.h> @@ -835,6 +836,266 @@ Y_UNIT_TEST_SUITE(BasicUsage) { WriteSession->Close(TDuration::MilliSeconds(10)); } + Y_UNIT_TEST(PreferredDatabaseNoFallback) { + // The test checks that the session keeps trying to connect to the preferred database + // and does not fall back to other databases. + + auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>( + TEST_CASE_NAME, false, ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::PRI_DEBUG, 2); + + setup->Start(true, true); + + TFederationDiscoveryServiceMock fdsMock; + fdsMock.Port = setup->GetGrpcPort(); + ui16 newServicePort = setup->GetPortManager()->GetPort(4285); + auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock); + + NYdb::TDriverConfig cfg; + cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); + cfg.SetDatabase("/Root"); + cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + NYdb::TDriver driver(cfg); + NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver); + + auto retryPolicy = std::make_shared<NPersQueue::NTests::TYdbPqTestRetryPolicy>(); + + auto writeSettings = TFederatedWriteSessionSettings() + .AllowFallback(false) + .PreferredDatabase("dc2"); + + writeSettings + .RetryPolicy(retryPolicy) + .DirectWriteToPartition(false) + .Path(setup->GetTestTopic()) + .MessageGroupId("src_id"); + + retryPolicy->Initialize(); + retryPolicy->ExpectBreakDown(); + + auto writer = topicClient.CreateWriteSession(writeSettings); + + Ydb::FederationDiscovery::ListFederationDatabasesResponse response; + auto op = response.mutable_operation(); + op->set_status(Ydb::StatusIds::SUCCESS); + response.mutable_operation()->set_ready(true); + response.mutable_operation()->set_id("12345"); + Ydb::FederationDiscovery::ListFederationDatabasesResult mockResult; + mockResult.set_control_plane_endpoint("cp.logbroker-federation:2135"); + mockResult.set_self_location("fancy_datacenter"); + auto c1 = mockResult.add_federation_databases(); + c1->set_name("dc1"); + c1->set_path("/Root"); + c1->set_id("account-dc1"); + c1->set_endpoint("localhost:" + ToString(fdsMock.Port)); + c1->set_location("dc1"); + c1->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE); + c1->set_weight(1000); + auto c2 = mockResult.add_federation_databases(); + c2->set_name("dc2"); + c2->set_path("/Root"); + c2->set_id("account-dc2"); + c2->set_endpoint("localhost:" + ToString(fdsMock.Port)); + c2->set_location("dc2"); + c2->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_UNAVAILABLE); + c2->set_weight(500); + op->mutable_result()->PackFrom(mockResult); + auto fdsRequest = fdsMock.WaitNextPendingRequest(); + fdsRequest.Result.SetValue({std::move(response), grpc::Status::OK}); + + Cerr << "=== Session was created, waiting for retries" << Endl; + retryPolicy->WaitForRetriesSync(3); + + Cerr << "=== In the next federation discovery response dc2 will be available" << Endl; + // fdsMock.PreparedResponse.Clear(); + // std::optional<TFederationDiscoveryServiceMock::TManualRequest> fdsRequest; + fdsRequest = fdsMock.WaitNextPendingRequest(); + fdsRequest.Result.SetValue(fdsMock.ComposeOkResult()); + + Cerr << "=== Waiting for repair" << Endl; + retryPolicy->WaitForRepairSync(); + + Cerr << "=== Closing the session" << Endl; + writer->Close(TDuration::MilliSeconds(10)); + } + + void AddDatabase(std::vector<std::shared_ptr<TDbInfo>>& dbInfos, int id, int weight) { + auto db = std::make_shared<TDbInfo>(); + db->set_id(ToString(id)); + db->set_name("db" + ToString(id)); + db->set_location("dc" + ToString(id)); + db->set_weight(weight); + db->set_status(Ydb::FederationDiscovery::DatabaseInfo_Status_AVAILABLE); + dbInfos.push_back(db); + } + + void EnableDatabase(std::vector<std::shared_ptr<TDbInfo>>& dbInfos, int id) { + dbInfos[id - 1]->set_status(Ydb::FederationDiscovery::DatabaseInfo_Status_AVAILABLE); + } + + void DisableDatabase(std::vector<std::shared_ptr<TDbInfo>>& dbInfos, int id) { + dbInfos[id - 1]->set_status(Ydb::FederationDiscovery::DatabaseInfo_Status_UNAVAILABLE); + } + + Y_UNIT_TEST(SelectDatabaseByHash) { + std::vector<std::shared_ptr<TDbInfo>> dbInfos; + using Settings = TFederatedWriteSessionSettings; + + { + auto [db, status] = SelectDatabaseByHash(Settings(), dbInfos); + UNIT_ASSERT(!db); + UNIT_ASSERT_EQUAL(status, EStatus::NOT_FOUND); + } + + AddDatabase(dbInfos, 1, 0); + + { + auto [db, status] = SelectDatabaseByHash(Settings(), dbInfos); + UNIT_ASSERT(!db); + UNIT_ASSERT_EQUAL(status, EStatus::NOT_FOUND); + } + + AddDatabase(dbInfos, 2, 100); + + { + auto [db, status] = SelectDatabaseByHash(Settings(), dbInfos); + UNIT_ASSERT(db); + UNIT_ASSERT_EQUAL(db->id(), "2"); + UNIT_ASSERT_EQUAL(status, EStatus::SUCCESS); + } + } + + Y_UNIT_TEST(SelectDatabase) { + std::vector<std::shared_ptr<TDbInfo>> dbInfos; + for (int i = 1; i < 11; ++i) { + AddDatabase(dbInfos, i, 1000); + } + + using Settings = TFederatedWriteSessionSettings; + + { + /* + | PreferredDb | Preferred state | Local state | AllowFallback | Return | + |-------------+-----------------+-------------+---------------+-------------| + | set | not found | - | any | NOT_FOUND | + */ + for (bool allow : {false, true}) { + auto settings = Settings().PreferredDatabase("db0").AllowFallback(allow); + auto [db, status] = SelectDatabase(settings, dbInfos, "dc1"); + UNIT_ASSERT(!db); + UNIT_ASSERT_EQUAL(status, EStatus::NOT_FOUND); + } + } + + { + /* + | PreferredDb | Preferred state | Local state | AllowFallback | Return | + |-------------+-----------------+-------------+---------------+-------------| + | set | available | - | any | preferred | + */ + for (bool allow : {false, true}) { + auto settings = Settings().PreferredDatabase("db8").AllowFallback(allow); + auto [db, status] = SelectDatabase(settings, dbInfos, "dc1"); + UNIT_ASSERT(db); + UNIT_ASSERT_EQUAL(db->id(), "8"); + } + } + + { + /* + | PreferredDb | Preferred state | Local state | AllowFallback | Return | + |-------------+-----------------+-------------+---------------+-------------| + | set | unavailable | - | false | UNAVAILABLE | + */ + DisableDatabase(dbInfos, 8); + auto settings = Settings().PreferredDatabase("db8").AllowFallback(false); + auto [db, status] = SelectDatabase(settings, dbInfos, "dc1"); + UNIT_ASSERT(!db); + UNIT_ASSERT_EQUAL(status, EStatus::UNAVAILABLE); + EnableDatabase(dbInfos, 8); + } + + { + /* + | PreferredDb | Preferred state | Local state | AllowFallback | Return | + |-------------+-----------------+-------------+---------------+-------------| + | set | unavailable | - | true | by hash | + */ + DisableDatabase(dbInfos, 8); + auto settings = Settings().PreferredDatabase("db8").AllowFallback(true); + auto [db, status] = SelectDatabase(settings, dbInfos, "dc1"); + UNIT_ASSERT(db); + UNIT_ASSERT_UNEQUAL(db->id(), "8"); + UNIT_ASSERT_EQUAL(status, EStatus::SUCCESS); + EnableDatabase(dbInfos, 8); + } + + { + /* + | PreferredDb | Preferred state | Local state | AllowFallback | Return | + |-------------+-----------------+-------------+---------------+-------------| + | unset | - | not found | false | NOT_FOUND | + */ + auto settings = Settings().AllowFallback(false); + auto [db, status] = SelectDatabase(settings, dbInfos, "dc0"); + UNIT_ASSERT(!db); + UNIT_ASSERT_EQUAL(status, EStatus::NOT_FOUND); + } + { + /* + | PreferredDb | Preferred state | Local state | AllowFallback | Return | + |-------------+-----------------+-------------+---------------+-------------| + | unset | - | not found | true | by hash | + */ + auto settings = Settings().AllowFallback(true); + auto [db, status] = SelectDatabase(settings, dbInfos, "dc0"); + UNIT_ASSERT(db); + UNIT_ASSERT_EQUAL(status, EStatus::SUCCESS); + } + + { + /* + | PreferredDb | Preferred state | Local state | AllowFallback | Return | + |-------------+-----------------+-------------+---------------+-------------| + | unset | - | available | any | local | + */ + for (bool allow : {false, true}) { + auto settings = Settings().AllowFallback(allow); + auto [db, status] = SelectDatabase(settings, dbInfos, "dc1"); + UNIT_ASSERT(db); + UNIT_ASSERT_EQUAL(db->id(), "1"); + } + } + + { + /* + | PreferredDb | Preferred state | Local state | AllowFallback | Return | + |-------------+-----------------+-------------+---------------+-------------| + | unset | - | unavailable | false | UNAVAILABLE | + */ + DisableDatabase(dbInfos, 1); + auto settings = Settings().AllowFallback(false); + auto [db, status] = SelectDatabase(settings, dbInfos, "dc1"); + UNIT_ASSERT(!db); + UNIT_ASSERT_EQUAL(status, EStatus::UNAVAILABLE); + EnableDatabase(dbInfos, 1); + } + + { + /* + | PreferredDb | Preferred state | Local state | AllowFallback | Return | + |-------------+-----------------+-------------+---------------+-------------| + | unset | - | unavailable | true | by hash | + */ + DisableDatabase(dbInfos, 1); + auto settings = Settings().AllowFallback(true); + auto [db, status] = SelectDatabase(settings, dbInfos, "dc1"); + UNIT_ASSERT(db); + UNIT_ASSERT_UNEQUAL(db->id(), "1"); + UNIT_ASSERT_EQUAL(status, EStatus::SUCCESS); + EnableDatabase(dbInfos, 1); + } + } + } } diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h index 5e48d4c662d..01e40014fc7 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h @@ -46,6 +46,16 @@ public: return result; } + TManualRequest WaitNextPendingRequest() { + do { + auto result = GetNextPendingRequest(); + if (result.has_value()) { + return *result; + } + Sleep(TDuration::MilliSeconds(50)); + } while (true); + } + virtual grpc::Status ListFederationDatabases(grpc::ServerContext*, const TRequest* request, TResponse* response) override { |