aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2023-08-23 11:40:44 +0300
committerildar-khisam <ikhis@ydb.tech>2023-08-23 12:12:01 +0300
commit893b026c0f6fd89611c845720d4336ced0f683cd (patch)
tree257473ca8f1f45c9144c59ffc3d6cd52062821e3
parent188310f21191de3cc9fa5284ade4f2869d39470e (diff)
downloadydb-893b026c0f6fd89611c845720d4336ced0f683cd.tar.gz
fallback to single db
fallback to single db
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp50
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp30
2 files changed, 59 insertions, 21 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp
index 2742a79d147..e335eebca32 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp
@@ -126,31 +126,39 @@ void TFederatedDbObserver::OnFederationDiscovery(TStatus&& status, Ydb::Federati
return;
}
- if (!status.IsSuccess()) {
- // if UNIMPLEMENTED - fall back to single db mode:
- // - initialize FederatedDbState with original db + endpoint
- // - no more updates: no reschedules
+ if (status.GetStatus() == EStatus::CLIENT_CALL_UNIMPLEMENTED) {
+ // fall back to single db mode
+ FederatedDbState->Status = TPlainStatus{}; // SUCCESS
+ auto dbState = Connections_->GetDriverState(Nothing(),Nothing(),Nothing(),Nothing(),Nothing());
+ FederatedDbState->ControlPlaneEndpoint = dbState->DiscoveryEndpoint;
+ // FederatedDbState->SelfLocation = ???;
+ auto db = std::make_shared<Ydb::FederationDiscovery::DatabaseInfo>();
+ db->set_path(dbState->Database);
+ db->set_endpoint(dbState->DiscoveryEndpoint);
+ db->set_status(Ydb::FederationDiscovery::DatabaseInfo_Status_AVAILABLE);
+ db->set_weight(100);
+ FederatedDbState->DbInfos.emplace_back(std::move(db));
- // TODO
- // update counters errors
-
- if (!FederationDiscoveryRetryState) {
- FederationDiscoveryRetryState = FederationDiscoveryRetryPolicy->CreateRetryState();
- }
- TMaybe<TDuration> retryDelay = FederationDiscoveryRetryState->GetNextRetryDelay(status.GetStatus());
- if (retryDelay) {
- ScheduleFederationDiscoveryImpl(*retryDelay);
- return;
- }
} else {
- ScheduleFederationDiscoveryImpl(REDISCOVERY_DELAY);
- }
+ if (!status.IsSuccess()) {
+ if (!FederationDiscoveryRetryState) {
+ FederationDiscoveryRetryState = FederationDiscoveryRetryPolicy->CreateRetryState();
+ }
+ TMaybe<TDuration> retryDelay = FederationDiscoveryRetryState->GetNextRetryDelay(status.GetStatus());
+ if (retryDelay) {
+ ScheduleFederationDiscoveryImpl(*retryDelay);
+ return;
+ }
+ } else {
+ ScheduleFederationDiscoveryImpl(REDISCOVERY_DELAY);
+ }
- // TODO validate new state and check if differs from previous
+ // TODO validate new state and check if differs from previous
- auto newInfo = std::make_shared<TFederatedDbState>(std::move(result), std::move(status));
- // TODO update only if new state differs
- std::swap(FederatedDbState, newInfo);
+ auto newInfo = std::make_shared<TFederatedDbState>(std::move(result), std::move(status));
+ // TODO update only if new state differs
+ std::swap(FederatedDbState, newInfo);
+ }
}
if (!PromiseToInitState.HasValue()) {
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
index 743c09b06af..5c21c087915 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
@@ -390,6 +390,36 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
// UNIT_ASSERT(std::holds_alternative<NTopic::TSessionClosedEvent>(*event));
}
+ Y_UNIT_TEST(FallbackToSingleDb) {
+ auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME, false);
+
+ setup->Start(true, true);
+
+ std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> ReadSession;
+
+ NYdb::NFederatedTopic::TFederatedTopicClient topicClient(setup->GetDriver());
+
+ // Create read session.
+ NYdb::NFederatedTopic::TFederatedReadSessionSettings readSettings;
+ readSettings
+ .ConsumerName("shared/user")
+ .MaxMemoryUsageBytes(1_MB)
+ .AppendTopics(setup->GetTestTopic());
+
+ ReadSession = topicClient.CreateFederatedReadSession(readSettings);
+ Cerr << "Session was created" << Endl;
+
+ ReadSession->WaitEvent().Wait(TDuration::Seconds(1));
+ TMaybe<NYdb::NFederatedTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent(false);
+ Y_ASSERT(event);
+ Cerr << "Got new read session event: " << DebugString(*event) << Endl;
+
+ auto* startPartitionSessionEvent = std::get_if<NYdb::NFederatedTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event);
+ Y_ASSERT(startPartitionSessionEvent);
+ startPartitionSessionEvent->Confirm();
+
+ ReadSession->Close(TDuration::MilliSeconds(10));
+ }
}