diff options
author | ildar-khisam <ikhis@ydb.tech> | 2023-09-29 12:41:03 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2023-09-29 13:27:05 +0300 |
commit | 2e71acf92429671421fed0652526110f2dec0629 (patch) | |
tree | 0d4823ee6e0469aef562a374ab64bdacc3d173a6 | |
parent | d1b6c34393585eae85b13c71f4ca6125ad908ddd (diff) | |
download | ydb-2e71acf92429671421fed0652526110f2dec0629.tar.gz |
fed sdk add handlers
work in progress
6 files changed, 287 insertions, 27 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h index 299210ea6c..d810eed628 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h @@ -9,6 +9,8 @@ namespace NYdb::NFederatedTopic { using NTopic::TPrintable; using TDbInfo = Ydb::FederationDiscovery::DatabaseInfo; +using TSessionClosedEvent = NTopic::TSessionClosedEvent; + //! Federated partition session. struct TFederatedPartitionSession : public TThrRefBase, public TPrintable<TFederatedPartitionSession> { using TPtr = TIntrusivePtr<TFederatedPartitionSession>; @@ -113,7 +115,7 @@ struct TReadSessionEvent { NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr<TDbInfo> db); const NTopic::TPartitionSession::TPtr& GetPartitionSession() const override { - ythrow yexception() << "GetPartitionSession() method unavailable for federated objects, use GetFederatedPartitionSession() instead"; + ythrow yexception() << "GetPartitionSession method unavailable for federated objects, use GetFederatedPartitionSession instead"; } bool HasCompressedMessages() const { @@ -172,9 +174,16 @@ struct TReadSessionEvent { TStopPartitionSessionEvent, TPartitionSessionStatusEvent, TPartitionSessionClosedEvent, - NTopic::TSessionClosedEvent>; + TSessionClosedEvent>; }; +template <typename TEvent> +TReadSessionEvent::TFederated<TEvent> Federate(TEvent event, std::shared_ptr<TDbInfo> db) { + return {std::move(event), std::move(db)}; +} + +TReadSessionEvent::TDataReceivedEvent Federate(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db); + TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event, std::shared_ptr<TDbInfo> db); //! Set of offsets to commit. @@ -241,6 +250,115 @@ struct TFederatedWriteSessionSettings : public NTopic::TWriteSessionSettings { struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings { using TSelf = TFederatedReadSessionSettings; + NTopic::TReadSessionSettings& EventHandlers(const TEventHandlers&) { + ythrow yexception() << "EventHandlers can not be set for federated session, use FederatedEventHandlers instead"; + } + + // Each handler, if set, is wrapped up and passed down to each subsession + struct TFederatedEventHandlers { + using TSelf = TFederatedEventHandlers; + + struct TSimpleDataHandlers { + std::function<void(TReadSessionEvent::TDataReceivedEvent&)> DataHandler; + bool CommitDataAfterProcessing; + bool GracefulStopAfterCommit; + }; + + + //! Set simple handler with data processing and also + //! set other handlers with default behaviour. + //! They automatically commit data after processing + //! and confirm partition session events. + //! + //! Sets the following handlers: + //! DataReceivedHandler: sets DataReceivedHandler to handler that calls dataHandler and (if + //! commitDataAfterProcessing is set) then calls Commit(). CommitAcknowledgementHandler to handler that does + //! nothing. CreatePartitionSessionHandler to handler that confirms event. StopPartitionSessionHandler to + //! handler that confirms event. PartitionSessionStatusHandler to handler that does nothing. + //! PartitionSessionClosedHandler to handler that does nothing. + //! + //! dataHandler: handler of data event. + //! commitDataAfterProcessing: automatically commit data after calling of dataHandler. + //! gracefulReleaseAfterCommit: wait for commit acknowledgements for all inflight data before confirming + //! partition session destroy. + + TSimpleDataHandlers SimpleDataHandlers_; + + TSelf& SimpleDataHandlers(std::function<void(TReadSessionEvent::TDataReceivedEvent&)> dataHandler, + bool commitDataAfterProcessing = false, bool gracefulStopAfterCommit = true) { + SimpleDataHandlers_.DataHandler = std::move(dataHandler); + SimpleDataHandlers_.CommitDataAfterProcessing = commitDataAfterProcessing; + SimpleDataHandlers_.GracefulStopAfterCommit = gracefulStopAfterCommit; + return static_cast<TSelf&>(*this); + } + + //! Data size limit for the DataReceivedHandler handler. + //! The data size may exceed this limit. + FLUENT_SETTING_DEFAULT(size_t, MaxMessagesBytes, Max<size_t>()); + + //! Function to handle data events. + //! If this handler is set, data events will be handled by handler, + //! otherwise sent to TReadSession::GetEvent(). + //! Default value is empty function (not set). + FLUENT_SETTING(std::function<void(TReadSessionEvent::TDataReceivedEvent&)>, DataReceivedHandler); + + //! Function to handle commit ack events. + //! If this handler is set, commit ack events will be handled by handler, + //! otherwise sent to TReadSession::GetEvent(). + //! Default value is empty function (not set). + FLUENT_SETTING(std::function<void(TReadSessionEvent::TCommitOffsetAcknowledgementEvent&)>, + CommitOffsetAcknowledgementHandler); + + //! Function to handle start partition session events. + //! If this handler is set, create partition session events will be handled by handler, + //! otherwise sent to TReadSession::GetEvent(). + //! Default value is empty function (not set). + FLUENT_SETTING(std::function<void(TReadSessionEvent::TStartPartitionSessionEvent&)>, + StartPartitionSessionHandler); + + //! Function to handle stop partition session events. + //! If this handler is set, destroy partition session events will be handled by handler, + //! otherwise sent to TReadSession::GetEvent(). + //! Default value is empty function (not set). + FLUENT_SETTING(std::function<void(TReadSessionEvent::TStopPartitionSessionEvent&)>, + StopPartitionSessionHandler); + + //! Function to handle partition session status events. + //! If this handler is set, partition session status events will be handled by handler, + //! otherwise sent to TReadSession::GetEvent(). + //! Default value is empty function (not set). + FLUENT_SETTING(std::function<void(TReadSessionEvent::TPartitionSessionStatusEvent&)>, + PartitionSessionStatusHandler); + + //! Function to handle partition session closed events. + //! If this handler is set, partition session closed events will be handled by handler, + //! otherwise sent to TReadSession::GetEvent(). + //! Default value is empty function (not set). + FLUENT_SETTING(std::function<void(TReadSessionEvent::TPartitionSessionClosedEvent&)>, + PartitionSessionClosedHandler); + + //! Function to handle session closed events. + //! If this handler is set, close session events will be handled by handler + //! and then sent to TReadSession::GetEvent(). + //! Default value is empty function (not set). + FLUENT_SETTING(NTopic::TSessionClosedHandler, SessionClosedHandler); + + //! Function to handle all event types. + //! If event with current type has no handler for this type of event, + //! this handler (if specified) will be used. + //! If this handler is not specified, event can be received with TReadSession::GetEvent() method. + FLUENT_SETTING(std::function<void(TReadSessionEvent::TEvent&)>, CommonHandler); + + //! Executor for handlers. + //! If not set, default single threaded executor will be used. + //! Shared between subsessions + FLUENT_SETTING(NTopic::IExecutor::TPtr, HandlersExecutor); + }; + + //! Federated event handlers. + //! See description in TFederatedEventHandlers class. + FLUENT_SETTING(TFederatedEventHandlers, FederatedEventHandlers); + enum class EReadPolicy { READ_ALL = 0, READ_ORIGINAL, diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp index e507617ec8..7c12e6bb2b 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp @@ -10,6 +10,50 @@ namespace NYdb::NFederatedTopic { NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings& settings); +template <typename TEvent, typename TFederatedEvent> +typename std::function<void(TEvent&)> WrapFederatedHandler(std::function<void(TFederatedEvent&)> outerHandler, std::shared_ptr<TDbInfo> db) { + if (outerHandler) { + return [outerHandler, db = std::move(db)](TEvent& ev) { + auto fev = Federate(std::move(ev), db); + return outerHandler(fev); + }; + } + return {}; +} + +NTopic::TReadSessionSettings FromFederated(const TFederatedReadSessionSettings& settings, const std::shared_ptr<TDbInfo>& db) { + NTopic::TReadSessionSettings SubsessionSettings = settings; + SubsessionSettings.EventHandlers_.MaxMessagesBytes(settings.EventHandlers_.MaxMessagesBytes_); + SubsessionSettings.EventHandlers_.HandlersExecutor(settings.EventHandlers_.HandlersExecutor_); + +#define MAYBE_CONVERT_HANDLER(type, name) \ + SubsessionSettings.EventHandlers_.name( \ + WrapFederatedHandler<NTopic::type, type>(settings.FederatedEventHandlers_.name##_, db) \ + ); + + MAYBE_CONVERT_HANDLER(TReadSessionEvent::TDataReceivedEvent, DataReceivedHandler); + MAYBE_CONVERT_HANDLER(TReadSessionEvent::TCommitOffsetAcknowledgementEvent, CommitOffsetAcknowledgementHandler); + MAYBE_CONVERT_HANDLER(TReadSessionEvent::TStartPartitionSessionEvent, StartPartitionSessionHandler); + MAYBE_CONVERT_HANDLER(TReadSessionEvent::TStopPartitionSessionEvent, StopPartitionSessionHandler); + MAYBE_CONVERT_HANDLER(TReadSessionEvent::TPartitionSessionStatusEvent, PartitionSessionStatusHandler); + MAYBE_CONVERT_HANDLER(TReadSessionEvent::TPartitionSessionClosedEvent, PartitionSessionClosedHandler); + MAYBE_CONVERT_HANDLER(TReadSessionEvent::TEvent, CommonHandler); + +#undef MAYBE_CONVERT_HANDLER + + SubsessionSettings.EventHandlers_.SessionClosedHandler(settings.FederatedEventHandlers_.SessionClosedHandler_); + + if (settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler) { + SubsessionSettings.EventHandlers_.SimpleDataHandlers( + WrapFederatedHandler<NTopic::TReadSessionEvent::TDataReceivedEvent, TReadSessionEvent::TDataReceivedEvent>( + settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler, db), + settings.FederatedEventHandlers_.SimpleDataHandlers_.CommitDataAfterProcessing, + settings.FederatedEventHandlers_.SimpleDataHandlers_.GracefulStopAfterCommit); + } + + return SubsessionSettings; +} + TFederatedReadSession::TFederatedReadSession(const TFederatedReadSessionSettings& settings, std::shared_ptr<TGRpcConnectionsImpl> connections, const TFederatedTopicClientSettings& clientSetttings, @@ -42,7 +86,7 @@ void TFederatedReadSession::OpenSubSessionsImpl() { .Database(db->path()) .DiscoveryEndpoint(db->endpoint()); auto subclient = make_shared<NTopic::TTopicClient::TImpl>(Connections, settings); - auto subsession = subclient->CreateReadSession(Settings); + auto subsession = subclient->CreateReadSession(FromFederated(Settings, db)); SubSessions.emplace_back(subsession, db); } SubsessionIndex = 0; @@ -98,9 +142,8 @@ TVector<TReadSessionEvent::TEvent> TFederatedReadSession::GetEvents(bool block, with_lock(Lock) { do { auto sub = SubSessions[SubsessionIndex]; - // TODO remove copy for (auto&& ev : sub.Session->GetEvents(false, maxEventsCount, maxByteSize)) { - result.push_back(Federate(ev, sub.DbInfo)); + result.push_back(Federate(std::move(ev), sub.DbInfo)); } SubsessionIndex = (SubsessionIndex + 1) % SubSessions.size(); } diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h index 040e5117c5..a9ffe4c703 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h @@ -66,8 +66,9 @@ private: // void ScheduleDumpCountersToLog(size_t timeNumber = 0); private: + TFederatedReadSessionSettings Settings; + // For subsessions creation - const NTopic::TReadSessionSettings Settings; std::shared_ptr<TGRpcConnectionsImpl> Connections; const NTopic::TTopicClientSettings SubClientSetttings; diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp index 0b6ab25113..543bb293c3 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp @@ -1,6 +1,8 @@ #include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h> +#include <optional> + namespace NYdb::NFederatedTopic { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -15,24 +17,23 @@ std::pair<ui64, ui64> GetMessageOffsetRange(const TReadSessionEvent::TDataReceiv return {msg.GetOffset(), msg.GetOffset() + 1}; } +TReadSessionEvent::TDataReceivedEvent Federate(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db) { + return {std::move(event), std::move(db)}; +} + TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event, std::shared_ptr<TDbInfo> db) { - if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) { - return TReadSessionEvent::TDataReceivedEvent(std::move(*ev), db); - } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) { - return TReadSessionEvent::TFederated(*ev, db); - } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) { - return TReadSessionEvent::TFederated(*ev, db); - } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) { - return TReadSessionEvent::TFederated(*ev, db); - } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&event)) { - return TReadSessionEvent::TFederated(*ev, db); - } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>(&event)) { - return TReadSessionEvent::TFederated(*ev, db); - } else if (auto* ev = std::get_if<NTopic::TSessionClosedEvent>(&event)) { + return std::visit([db = std::move(db)](auto&& arg) { + using T = std::decay_t<decltype(arg)>; + std::optional<TReadSessionEvent::TEvent> ev; + if constexpr (std::is_same_v<T, NTopic::TReadSessionEvent::TDataReceivedEvent>) { + ev = TReadSessionEvent::TDataReceivedEvent(std::move(arg), std::move(db)); + } else if constexpr (std::is_same_v<T, NTopic::TSessionClosedEvent>) { + ev = std::move(arg); + } else { + ev = TReadSessionEvent::TFederated(std::move(arg), std::move(db)); + } return *ev; - } else { - Y_UNREACHABLE(); - } + }, event); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp index 88a8279311..7252f7f8ae 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp @@ -421,6 +421,103 @@ Y_UNIT_TEST_SUITE(BasicUsage) { ReadSession->Close(TDuration::MilliSeconds(10)); } + Y_UNIT_TEST(SimpleHandlers) { + auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME, false); + setup->Start(true, true); + + TFederationDiscoveryServiceMock fdsMock; + fdsMock.Port = setup->GetGrpcPort(); + + ui16 newServicePort = setup->GetPortManager()->GetPort(4285); + auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock); + + std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> ReadSession; + + // Create topic client. + NYdb::TDriverConfig cfg; + cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); + cfg.SetDatabase("/Root"); + cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + NYdb::TDriver driver(cfg); + auto clientSettings = TFederatedTopicClientSettings() + .RetryPolicy(NTopic::IRetryPolicy::GetFixedIntervalPolicy( + TDuration::Seconds(10), + TDuration::Seconds(10) + )); + NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver, clientSettings); + + ui64 count = 300u; + + TString messageBase = "message----"; + TVector<TString> sentMessages; + + for (auto i = 0u; i < count; i++) { + // sentMessages.emplace_back(messageBase * (i+1) + ToString(i)); + sentMessages.emplace_back(messageBase * (10 * i + 1)); + } + + NThreading::TPromise<void> checkedPromise = NThreading::NewPromise<void>(); + auto totalReceived = 0u; + + auto f = checkedPromise.GetFuture(); + TAtomic check = 1; + + // Create read session. + NYdb::NFederatedTopic::TFederatedReadSessionSettings readSettings; + readSettings + .ConsumerName("shared/user") + .MaxMemoryUsageBytes(1_MB) + .AppendTopics(setup->GetTestTopic()); + + readSettings.FederatedEventHandlers_.SimpleDataHandlers([&](TReadSessionEvent::TDataReceivedEvent& ev) mutable { + Cerr << ">>> event from dataHandler: " << DebugString(ev) << Endl; + Y_VERIFY_S(AtomicGet(check) != 0, "check is false"); + auto& messages = ev.GetMessages(); + for (size_t i = 0u; i < messages.size(); ++i) { + auto& message = messages[i]; + UNIT_ASSERT_VALUES_EQUAL(message.GetData(), sentMessages[totalReceived]); + totalReceived++; + } + if (totalReceived == sentMessages.size()) + checkedPromise.SetValue(); + }); + + ReadSession = topicClient.CreateFederatedReadSession(readSettings); + Cerr << ">>> Session was created" << Endl; + + Sleep(TDuration::MilliSeconds(50)); + + auto events = ReadSession->GetEvents(false); + UNIT_ASSERT(events.empty()); + + std::optional<TFederationDiscoveryServiceMock::TManualRequest> fdsRequest; + do { + fdsRequest = fdsMock.GetNextPendingRequest(); + if (!fdsRequest.has_value()) { + Sleep(TDuration::MilliSeconds(50)); + } + } while (!fdsRequest.has_value()); + + fdsRequest->Result.SetValue(fdsMock.ComposeOkResult()); + + NPersQueue::TWriteSessionSettings writeSettings; + writeSettings.Path(setup->GetTestTopic()).MessageGroupId("src_id"); + writeSettings.Codec(NPersQueue::ECodec::RAW); + NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor(); + writeSettings.CompressionExecutor(executor); + + auto& client = setup->GetPersQueueClient(); + auto session = client.CreateSimpleBlockingWriteSession(writeSettings); + + for (auto i = 0u; i < count; i++) { + auto res = session->Write(sentMessages[i]); + UNIT_ASSERT(res); + } + + f.GetValueSync(); + ReadSession->Close(); + AtomicSet(check, 0); + } } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index 0fecce9af0..f5a16b1514 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -135,7 +135,7 @@ public: i32 GetNodeId() const; i64 GetGeneration() const; -private: +private: // Node identificator. i32 NodeId_ = 1; @@ -1403,14 +1403,14 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> { //! Sets the following handlers: //! DataReceivedHandler: sets DataReceivedHandler to handler that calls dataHandler and (if //! commitDataAfterProcessing is set) then calls Commit(). CommitAcknowledgementHandler to handler that does - //! nothing. CreatePartitionSessionHandler to handler that confirms event. StopPartitionSessionHandler to + //! nothing. StartPartitionSessionHandler to handler that confirms event. StopPartitionSessionHandler to //! handler that confirms event. PartitionSessionStatusHandler to handler that does nothing. //! PartitionSessionClosedHandler to handler that does nothing. //! //! dataHandler: handler of data event. //! commitDataAfterProcessing: automatically commit data after calling of dataHandler. //! gracefulReleaseAfterCommit: wait for commit acknowledgements for all inflight data before confirming - //! partition session destroy. + //! partition session stop. TSelf& SimpleDataHandlers(std::function<void(TReadSessionEvent::TDataReceivedEvent&)> dataHandler, bool commitDataAfterProcessing = false, bool gracefulStopAfterCommit = true); @@ -1476,7 +1476,7 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> { FLUENT_SETTING(IExecutor::TPtr, HandlersExecutor); }; - + TString ConsumerName_ = ""; //! Consumer. TSelf& ConsumerName(const TString& name) { @@ -1484,7 +1484,7 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> { WithoutConsumer_ = false; return static_cast<TSelf&>(*this); } - + bool WithoutConsumer_ = false; //! Read without consumer. TSelf& WithoutConsumer() { |