diff options
author | ildar-khisam <ikhis@ydb.tech> | 2023-08-23 11:40:44 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2023-08-23 12:12:01 +0300 |
commit | 893b026c0f6fd89611c845720d4336ced0f683cd (patch) | |
tree | 257473ca8f1f45c9144c59ffc3d6cd52062821e3 | |
parent | 188310f21191de3cc9fa5284ade4f2869d39470e (diff) | |
download | ydb-893b026c0f6fd89611c845720d4336ced0f683cd.tar.gz |
fallback to single db
fallback to single db
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp | 50 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp | 30 |
2 files changed, 59 insertions, 21 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp index 2742a79d147..e335eebca32 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp @@ -126,31 +126,39 @@ void TFederatedDbObserver::OnFederationDiscovery(TStatus&& status, Ydb::Federati return; } - if (!status.IsSuccess()) { - // if UNIMPLEMENTED - fall back to single db mode: - // - initialize FederatedDbState with original db + endpoint - // - no more updates: no reschedules + if (status.GetStatus() == EStatus::CLIENT_CALL_UNIMPLEMENTED) { + // fall back to single db mode + FederatedDbState->Status = TPlainStatus{}; // SUCCESS + auto dbState = Connections_->GetDriverState(Nothing(),Nothing(),Nothing(),Nothing(),Nothing()); + FederatedDbState->ControlPlaneEndpoint = dbState->DiscoveryEndpoint; + // FederatedDbState->SelfLocation = ???; + auto db = std::make_shared<Ydb::FederationDiscovery::DatabaseInfo>(); + db->set_path(dbState->Database); + db->set_endpoint(dbState->DiscoveryEndpoint); + db->set_status(Ydb::FederationDiscovery::DatabaseInfo_Status_AVAILABLE); + db->set_weight(100); + FederatedDbState->DbInfos.emplace_back(std::move(db)); - // TODO - // update counters errors - - if (!FederationDiscoveryRetryState) { - FederationDiscoveryRetryState = FederationDiscoveryRetryPolicy->CreateRetryState(); - } - TMaybe<TDuration> retryDelay = FederationDiscoveryRetryState->GetNextRetryDelay(status.GetStatus()); - if (retryDelay) { - ScheduleFederationDiscoveryImpl(*retryDelay); - return; - } } else { - ScheduleFederationDiscoveryImpl(REDISCOVERY_DELAY); - } + if (!status.IsSuccess()) { + if (!FederationDiscoveryRetryState) { + FederationDiscoveryRetryState = FederationDiscoveryRetryPolicy->CreateRetryState(); + } + TMaybe<TDuration> retryDelay = FederationDiscoveryRetryState->GetNextRetryDelay(status.GetStatus()); + if (retryDelay) { + ScheduleFederationDiscoveryImpl(*retryDelay); + return; + } + } else { + ScheduleFederationDiscoveryImpl(REDISCOVERY_DELAY); + } - // TODO validate new state and check if differs from previous + // TODO validate new state and check if differs from previous - auto newInfo = std::make_shared<TFederatedDbState>(std::move(result), std::move(status)); - // TODO update only if new state differs - std::swap(FederatedDbState, newInfo); + auto newInfo = std::make_shared<TFederatedDbState>(std::move(result), std::move(status)); + // TODO update only if new state differs + std::swap(FederatedDbState, newInfo); + } } if (!PromiseToInitState.HasValue()) { diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp index 743c09b06af..5c21c087915 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp @@ -390,6 +390,36 @@ Y_UNIT_TEST_SUITE(BasicUsage) { // UNIT_ASSERT(std::holds_alternative<NTopic::TSessionClosedEvent>(*event)); } + Y_UNIT_TEST(FallbackToSingleDb) { + auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME, false); + + setup->Start(true, true); + + std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> ReadSession; + + NYdb::NFederatedTopic::TFederatedTopicClient topicClient(setup->GetDriver()); + + // Create read session. + NYdb::NFederatedTopic::TFederatedReadSessionSettings readSettings; + readSettings + .ConsumerName("shared/user") + .MaxMemoryUsageBytes(1_MB) + .AppendTopics(setup->GetTestTopic()); + + ReadSession = topicClient.CreateFederatedReadSession(readSettings); + Cerr << "Session was created" << Endl; + + ReadSession->WaitEvent().Wait(TDuration::Seconds(1)); + TMaybe<NYdb::NFederatedTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent(false); + Y_ASSERT(event); + Cerr << "Got new read session event: " << DebugString(*event) << Endl; + + auto* startPartitionSessionEvent = std::get_if<NYdb::NFederatedTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event); + Y_ASSERT(startPartitionSessionEvent); + startPartitionSessionEvent->Confirm(); + + ReadSession->Close(TDuration::MilliSeconds(10)); + } } |