aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorqyryq <qyryq@ydb.tech>2024-04-08 13:09:22 +0300
committerGitHub <noreply@github.com>2024-04-08 13:09:22 +0300
commit7ccec7095a02bb365cb4574a9de97ad068dbdcaf (patch)
tree46b4b3a640b0bf678d05ff098eda6c9e605208ff
parent9569967f0ee797557f309ca252c5d6968c678cbb (diff)
downloadydb-7ccec7095a02bb365cb4574a9de97ad068dbdcaf.tar.gz
TFederatedWriteSession: take into account AllowFallback setting; use retry policy (#3460)
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp109
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h11
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp261
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h10
4 files changed, 362 insertions, 29 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp
index 3b969d4cc72..9907343612e 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp
@@ -129,50 +129,93 @@ void TFederatedWriteSessionImpl::OpenSubSessionImpl(std::shared_ptr<TDbInfo> db)
CurrentDatabase = db;
}
-std::shared_ptr<TDbInfo> TFederatedWriteSessionImpl::SelectDatabaseImpl() {
- std::vector<std::shared_ptr<TDbInfo>> availableDatabases;
+std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabaseByHash(
+ NTopic::TFederatedWriteSessionSettings const& settings,
+ std::vector<std::shared_ptr<TDbInfo>> const& dbInfos
+) {
ui64 totalWeight = 0;
+ std::vector<std::shared_ptr<TDbInfo>> available;
- for (const auto& db : FederationState->DbInfos) {
- if (db->status() != TDbInfo::Status::DatabaseInfo_Status_AVAILABLE) {
- continue;
- }
-
- if (Settings.PreferredDatabase_ && (AsciiEqualsIgnoreCase(db->name(), *Settings.PreferredDatabase_) ||
- AsciiEqualsIgnoreCase(db->id(), *Settings.PreferredDatabase_))) {
- return db;
- } else if (AsciiEqualsIgnoreCase(FederationState->SelfLocation, db->location())) {
- return db;
- } else {
- availableDatabases.push_back(db);
+ for (const auto& db : dbInfos) {
+ if (db->status() == TDbInfo::Status::DatabaseInfo_Status_AVAILABLE) {
+ available.push_back(db);
totalWeight += db->weight();
}
}
- if (availableDatabases.empty() || totalWeight == 0) {
- // close session, return error
- return nullptr;
+ if (available.empty() || totalWeight == 0) {
+ return {nullptr, EStatus::NOT_FOUND};
}
- std::sort(availableDatabases.begin(), availableDatabases.end(), [](const std::shared_ptr<TDbInfo>& lhs, const std::shared_ptr<TDbInfo>& rhs){
- return lhs->weight() > rhs->weight()
- || lhs->weight() == rhs->weight() && lhs->name() < rhs->name();
- });
+ std::sort(available.begin(), available.end(), [](auto const& lhs, auto const& rhs) { return lhs->name() < rhs->name(); });
- ui64 hashValue = THash<TString>()(Settings.Path_);
- hashValue = CombineHashes(hashValue, THash<TString>()(Settings.ProducerId_));
+ ui64 hashValue = THash<TString>()(settings.Path_);
+ hashValue = CombineHashes(hashValue, THash<TString>()(settings.ProducerId_));
hashValue %= totalWeight;
ui64 borderWeight = 0;
- for (const auto& db : availableDatabases) {
+ for (auto const& db : available) {
borderWeight += db->weight();
if (hashValue < borderWeight) {
- return db;
+ return {db, EStatus::SUCCESS};
}
}
Y_UNREACHABLE();
}
+std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabase(
+ NTopic::TFederatedWriteSessionSettings const& settings,
+ std::vector<std::shared_ptr<TDbInfo>> const& dbInfos, TString const& selfLocation
+) {
+ /* Logic of the function should follow this table:
+
+ | PreferredDb | Preferred state | Local state | AllowFallback | Return |
+ |-------------+-----------------+-------------+---------------+-------------|
+ | set | not found | - | any | NOT_FOUND |
+ | set | available | - | any | preferred |
+ | set | unavailable | - | false | UNAVAILABLE |
+ | set | unavailable | - | true | by hash |
+ | unset | - | not found | false | NOT_FOUND |
+ | unset | - | not found | true | by hash |
+ | unset | - | available | any | local |
+ | unset | - | unavailable | false | UNAVAILABLE |
+ | unset | - | unavailable | true | by hash |
+ */
+
+ decltype(begin(dbInfos)) it;
+ if (settings.PreferredDatabase_) {
+ it = std::find_if(begin(dbInfos), end(dbInfos), [&preferred = settings.PreferredDatabase_](auto const& db) {
+ return AsciiEqualsIgnoreCase(*preferred, db->name()) || AsciiEqualsIgnoreCase(*preferred, db->id());
+ });
+ if (it == end(dbInfos)) {
+ return {nullptr, EStatus::NOT_FOUND};
+ }
+ } else {
+ it = std::find_if(begin(dbInfos), end(dbInfos), [&selfLocation](auto const& db) {
+ return AsciiEqualsIgnoreCase(selfLocation, db->location());
+ });
+ if (it == end(dbInfos)) {
+ if (!settings.AllowFallback_) {
+ return {nullptr, EStatus::NOT_FOUND};
+ }
+ return SelectDatabaseByHash(settings, dbInfos);
+ }
+ }
+
+ auto db = *it;
+ if (db->status() == TDbInfo::Status::DatabaseInfo_Status_AVAILABLE) {
+ return {db, EStatus::SUCCESS};
+ }
+ if (!settings.AllowFallback_) {
+ return {nullptr, EStatus::UNAVAILABLE};
+ }
+ return SelectDatabaseByHash(settings, dbInfos);
+}
+
+std::pair<std::shared_ptr<TDbInfo>, EStatus> TFederatedWriteSessionImpl::SelectDatabaseImpl() {
+ return SelectDatabase(Settings, FederationState->DbInfos, FederationState->SelfLocation);
+}
+
void TFederatedWriteSessionImpl::OnFederatedStateUpdateImpl() {
if (!FederationState->Status.IsSuccess()) {
CloseImpl(FederationState->Status.GetStatus(), NYql::TIssues(FederationState->Status.GetIssues()));
@@ -181,13 +224,23 @@ void TFederatedWriteSessionImpl::OnFederatedStateUpdateImpl() {
Y_ABORT_UNLESS(!FederationState->DbInfos.empty());
- auto preferrableDb = SelectDatabaseImpl();
+ auto [preferrableDb, status] = SelectDatabaseImpl();
if (!preferrableDb) {
- CloseImpl(EStatus::UNAVAILABLE,
- NYql::TIssues{NYql::TIssue("Fail to select database: no available database with positive weight")});
+ if (!RetryState) {
+ RetryState = Settings.RetryPolicy_->CreateRetryState();
+ }
+ if (auto delay = RetryState->GetNextRetryDelay(status)) {
+ LOG_LAZY(Log, TLOG_NOTICE, GetLogPrefix() << "Retry to update federation state in " << delay);
+ ScheduleFederatedStateUpdateImpl(*delay);
+ } else {
+ TString message = "Failed to select database: no available database";
+ LOG_LAZY(Log, TLOG_ERR, GetLogPrefix() << message);
+ CloseImpl(status, NYql::TIssues{NYql::TIssue(message)});
+ }
return;
}
+ RetryState.reset();
if (!DatabasesAreSame(preferrableDb, CurrentDatabase)) {
LOG_LAZY(Log, TLOG_INFO, GetLogPrefix()
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h
index 52110c2496d..68ba96e6d9c 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h
@@ -10,6 +10,14 @@
namespace NYdb::NFederatedTopic {
+std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabaseByHash(
+ NTopic::TFederatedWriteSessionSettings const& settings,
+ std::vector<std::shared_ptr<TDbInfo>> const& dbInfos);
+
+std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabase(
+ NTopic::TFederatedWriteSessionSettings const& settings,
+ std::vector<std::shared_ptr<TDbInfo>> const& dbInfos, TString const& selfLocation);
+
class TFederatedWriteSessionImpl : public NTopic::TContinuationTokenIssuer,
public NTopic::TEnableSelfContext<TFederatedWriteSessionImpl> {
friend class TFederatedWriteSession;
@@ -78,7 +86,7 @@ private:
void Start();
void OpenSubSessionImpl(std::shared_ptr<TDbInfo> db);
- std::shared_ptr<TDbInfo> SelectDatabaseImpl();
+ std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabaseImpl();
void OnFederatedStateUpdateImpl();
void ScheduleFederatedStateUpdateImpl(TDuration delay);
@@ -98,6 +106,7 @@ private:
const NTopic::TTopicClientSettings SubClientSetttings;
std::shared_ptr<std::unordered_map<NTopic::ECodec, THolder<NTopic::ICodec>>> ProvidedCodecs;
+ NTopic::IRetryPolicy::IRetryState::TPtr RetryState;
std::shared_ptr<TFederatedDbObserver> Observer;
NThreading::TFuture<void> AsyncInit;
std::shared_ptr<TFederatedDbState> FederationState;
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
index a3bffe452ae..eeee33da1b7 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
@@ -1,4 +1,5 @@
#include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h>
+#include <ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/managed_executor.h>
@@ -835,6 +836,266 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
WriteSession->Close(TDuration::MilliSeconds(10));
}
+ Y_UNIT_TEST(PreferredDatabaseNoFallback) {
+ // The test checks that the session keeps trying to connect to the preferred database
+ // and does not fall back to other databases.
+
+ auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(
+ TEST_CASE_NAME, false, ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::PRI_DEBUG, 2);
+
+ setup->Start(true, true);
+
+ TFederationDiscoveryServiceMock fdsMock;
+ fdsMock.Port = setup->GetGrpcPort();
+ ui16 newServicePort = setup->GetPortManager()->GetPort(4285);
+ auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock);
+
+ NYdb::TDriverConfig cfg;
+ cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort);
+ cfg.SetDatabase("/Root");
+ cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG));
+ NYdb::TDriver driver(cfg);
+ NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver);
+
+ auto retryPolicy = std::make_shared<NPersQueue::NTests::TYdbPqTestRetryPolicy>();
+
+ auto writeSettings = TFederatedWriteSessionSettings()
+ .AllowFallback(false)
+ .PreferredDatabase("dc2");
+
+ writeSettings
+ .RetryPolicy(retryPolicy)
+ .DirectWriteToPartition(false)
+ .Path(setup->GetTestTopic())
+ .MessageGroupId("src_id");
+
+ retryPolicy->Initialize();
+ retryPolicy->ExpectBreakDown();
+
+ auto writer = topicClient.CreateWriteSession(writeSettings);
+
+ Ydb::FederationDiscovery::ListFederationDatabasesResponse response;
+ auto op = response.mutable_operation();
+ op->set_status(Ydb::StatusIds::SUCCESS);
+ response.mutable_operation()->set_ready(true);
+ response.mutable_operation()->set_id("12345");
+ Ydb::FederationDiscovery::ListFederationDatabasesResult mockResult;
+ mockResult.set_control_plane_endpoint("cp.logbroker-federation:2135");
+ mockResult.set_self_location("fancy_datacenter");
+ auto c1 = mockResult.add_federation_databases();
+ c1->set_name("dc1");
+ c1->set_path("/Root");
+ c1->set_id("account-dc1");
+ c1->set_endpoint("localhost:" + ToString(fdsMock.Port));
+ c1->set_location("dc1");
+ c1->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE);
+ c1->set_weight(1000);
+ auto c2 = mockResult.add_federation_databases();
+ c2->set_name("dc2");
+ c2->set_path("/Root");
+ c2->set_id("account-dc2");
+ c2->set_endpoint("localhost:" + ToString(fdsMock.Port));
+ c2->set_location("dc2");
+ c2->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_UNAVAILABLE);
+ c2->set_weight(500);
+ op->mutable_result()->PackFrom(mockResult);
+ auto fdsRequest = fdsMock.WaitNextPendingRequest();
+ fdsRequest.Result.SetValue({std::move(response), grpc::Status::OK});
+
+ Cerr << "=== Session was created, waiting for retries" << Endl;
+ retryPolicy->WaitForRetriesSync(3);
+
+ Cerr << "=== In the next federation discovery response dc2 will be available" << Endl;
+ // fdsMock.PreparedResponse.Clear();
+ // std::optional<TFederationDiscoveryServiceMock::TManualRequest> fdsRequest;
+ fdsRequest = fdsMock.WaitNextPendingRequest();
+ fdsRequest.Result.SetValue(fdsMock.ComposeOkResult());
+
+ Cerr << "=== Waiting for repair" << Endl;
+ retryPolicy->WaitForRepairSync();
+
+ Cerr << "=== Closing the session" << Endl;
+ writer->Close(TDuration::MilliSeconds(10));
+ }
+
+ void AddDatabase(std::vector<std::shared_ptr<TDbInfo>>& dbInfos, int id, int weight) {
+ auto db = std::make_shared<TDbInfo>();
+ db->set_id(ToString(id));
+ db->set_name("db" + ToString(id));
+ db->set_location("dc" + ToString(id));
+ db->set_weight(weight);
+ db->set_status(Ydb::FederationDiscovery::DatabaseInfo_Status_AVAILABLE);
+ dbInfos.push_back(db);
+ }
+
+ void EnableDatabase(std::vector<std::shared_ptr<TDbInfo>>& dbInfos, int id) {
+ dbInfos[id - 1]->set_status(Ydb::FederationDiscovery::DatabaseInfo_Status_AVAILABLE);
+ }
+
+ void DisableDatabase(std::vector<std::shared_ptr<TDbInfo>>& dbInfos, int id) {
+ dbInfos[id - 1]->set_status(Ydb::FederationDiscovery::DatabaseInfo_Status_UNAVAILABLE);
+ }
+
+ Y_UNIT_TEST(SelectDatabaseByHash) {
+ std::vector<std::shared_ptr<TDbInfo>> dbInfos;
+ using Settings = TFederatedWriteSessionSettings;
+
+ {
+ auto [db, status] = SelectDatabaseByHash(Settings(), dbInfos);
+ UNIT_ASSERT(!db);
+ UNIT_ASSERT_EQUAL(status, EStatus::NOT_FOUND);
+ }
+
+ AddDatabase(dbInfos, 1, 0);
+
+ {
+ auto [db, status] = SelectDatabaseByHash(Settings(), dbInfos);
+ UNIT_ASSERT(!db);
+ UNIT_ASSERT_EQUAL(status, EStatus::NOT_FOUND);
+ }
+
+ AddDatabase(dbInfos, 2, 100);
+
+ {
+ auto [db, status] = SelectDatabaseByHash(Settings(), dbInfos);
+ UNIT_ASSERT(db);
+ UNIT_ASSERT_EQUAL(db->id(), "2");
+ UNIT_ASSERT_EQUAL(status, EStatus::SUCCESS);
+ }
+ }
+
+ Y_UNIT_TEST(SelectDatabase) {
+ std::vector<std::shared_ptr<TDbInfo>> dbInfos;
+ for (int i = 1; i < 11; ++i) {
+ AddDatabase(dbInfos, i, 1000);
+ }
+
+ using Settings = TFederatedWriteSessionSettings;
+
+ {
+ /*
+ | PreferredDb | Preferred state | Local state | AllowFallback | Return |
+ |-------------+-----------------+-------------+---------------+-------------|
+ | set | not found | - | any | NOT_FOUND |
+ */
+ for (bool allow : {false, true}) {
+ auto settings = Settings().PreferredDatabase("db0").AllowFallback(allow);
+ auto [db, status] = SelectDatabase(settings, dbInfos, "dc1");
+ UNIT_ASSERT(!db);
+ UNIT_ASSERT_EQUAL(status, EStatus::NOT_FOUND);
+ }
+ }
+
+ {
+ /*
+ | PreferredDb | Preferred state | Local state | AllowFallback | Return |
+ |-------------+-----------------+-------------+---------------+-------------|
+ | set | available | - | any | preferred |
+ */
+ for (bool allow : {false, true}) {
+ auto settings = Settings().PreferredDatabase("db8").AllowFallback(allow);
+ auto [db, status] = SelectDatabase(settings, dbInfos, "dc1");
+ UNIT_ASSERT(db);
+ UNIT_ASSERT_EQUAL(db->id(), "8");
+ }
+ }
+
+ {
+ /*
+ | PreferredDb | Preferred state | Local state | AllowFallback | Return |
+ |-------------+-----------------+-------------+---------------+-------------|
+ | set | unavailable | - | false | UNAVAILABLE |
+ */
+ DisableDatabase(dbInfos, 8);
+ auto settings = Settings().PreferredDatabase("db8").AllowFallback(false);
+ auto [db, status] = SelectDatabase(settings, dbInfos, "dc1");
+ UNIT_ASSERT(!db);
+ UNIT_ASSERT_EQUAL(status, EStatus::UNAVAILABLE);
+ EnableDatabase(dbInfos, 8);
+ }
+
+ {
+ /*
+ | PreferredDb | Preferred state | Local state | AllowFallback | Return |
+ |-------------+-----------------+-------------+---------------+-------------|
+ | set | unavailable | - | true | by hash |
+ */
+ DisableDatabase(dbInfos, 8);
+ auto settings = Settings().PreferredDatabase("db8").AllowFallback(true);
+ auto [db, status] = SelectDatabase(settings, dbInfos, "dc1");
+ UNIT_ASSERT(db);
+ UNIT_ASSERT_UNEQUAL(db->id(), "8");
+ UNIT_ASSERT_EQUAL(status, EStatus::SUCCESS);
+ EnableDatabase(dbInfos, 8);
+ }
+
+ {
+ /*
+ | PreferredDb | Preferred state | Local state | AllowFallback | Return |
+ |-------------+-----------------+-------------+---------------+-------------|
+ | unset | - | not found | false | NOT_FOUND |
+ */
+ auto settings = Settings().AllowFallback(false);
+ auto [db, status] = SelectDatabase(settings, dbInfos, "dc0");
+ UNIT_ASSERT(!db);
+ UNIT_ASSERT_EQUAL(status, EStatus::NOT_FOUND);
+ }
+ {
+ /*
+ | PreferredDb | Preferred state | Local state | AllowFallback | Return |
+ |-------------+-----------------+-------------+---------------+-------------|
+ | unset | - | not found | true | by hash |
+ */
+ auto settings = Settings().AllowFallback(true);
+ auto [db, status] = SelectDatabase(settings, dbInfos, "dc0");
+ UNIT_ASSERT(db);
+ UNIT_ASSERT_EQUAL(status, EStatus::SUCCESS);
+ }
+
+ {
+ /*
+ | PreferredDb | Preferred state | Local state | AllowFallback | Return |
+ |-------------+-----------------+-------------+---------------+-------------|
+ | unset | - | available | any | local |
+ */
+ for (bool allow : {false, true}) {
+ auto settings = Settings().AllowFallback(allow);
+ auto [db, status] = SelectDatabase(settings, dbInfos, "dc1");
+ UNIT_ASSERT(db);
+ UNIT_ASSERT_EQUAL(db->id(), "1");
+ }
+ }
+
+ {
+ /*
+ | PreferredDb | Preferred state | Local state | AllowFallback | Return |
+ |-------------+-----------------+-------------+---------------+-------------|
+ | unset | - | unavailable | false | UNAVAILABLE |
+ */
+ DisableDatabase(dbInfos, 1);
+ auto settings = Settings().AllowFallback(false);
+ auto [db, status] = SelectDatabase(settings, dbInfos, "dc1");
+ UNIT_ASSERT(!db);
+ UNIT_ASSERT_EQUAL(status, EStatus::UNAVAILABLE);
+ EnableDatabase(dbInfos, 1);
+ }
+
+ {
+ /*
+ | PreferredDb | Preferred state | Local state | AllowFallback | Return |
+ |-------------+-----------------+-------------+---------------+-------------|
+ | unset | - | unavailable | true | by hash |
+ */
+ DisableDatabase(dbInfos, 1);
+ auto settings = Settings().AllowFallback(true);
+ auto [db, status] = SelectDatabase(settings, dbInfos, "dc1");
+ UNIT_ASSERT(db);
+ UNIT_ASSERT_UNEQUAL(db->id(), "1");
+ UNIT_ASSERT_EQUAL(status, EStatus::SUCCESS);
+ EnableDatabase(dbInfos, 1);
+ }
+ }
+
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h
index 5e48d4c662d..01e40014fc7 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h
@@ -46,6 +46,16 @@ public:
return result;
}
+ TManualRequest WaitNextPendingRequest() {
+ do {
+ auto result = GetNextPendingRequest();
+ if (result.has_value()) {
+ return *result;
+ }
+ Sleep(TDuration::MilliSeconds(50));
+ } while (true);
+ }
+
virtual grpc::Status ListFederationDatabases(grpc::ServerContext*,
const TRequest* request,
TResponse* response) override {