aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2023-09-29 12:41:03 +0300
committerildar-khisam <ikhis@ydb.tech>2023-09-29 13:27:05 +0300
commit2e71acf92429671421fed0652526110f2dec0629 (patch)
tree0d4823ee6e0469aef562a374ab64bdacc3d173a6
parentd1b6c34393585eae85b13c71f4ca6125ad908ddd (diff)
downloadydb-2e71acf92429671421fed0652526110f2dec0629.tar.gz
fed sdk add handlers
work in progress
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h122
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp49
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp33
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp97
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/topic.h10
6 files changed, 287 insertions, 27 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
index 299210ea6c..d810eed628 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
@@ -9,6 +9,8 @@ namespace NYdb::NFederatedTopic {
using NTopic::TPrintable;
using TDbInfo = Ydb::FederationDiscovery::DatabaseInfo;
+using TSessionClosedEvent = NTopic::TSessionClosedEvent;
+
//! Federated partition session.
struct TFederatedPartitionSession : public TThrRefBase, public TPrintable<TFederatedPartitionSession> {
using TPtr = TIntrusivePtr<TFederatedPartitionSession>;
@@ -113,7 +115,7 @@ struct TReadSessionEvent {
NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr<TDbInfo> db);
const NTopic::TPartitionSession::TPtr& GetPartitionSession() const override {
- ythrow yexception() << "GetPartitionSession() method unavailable for federated objects, use GetFederatedPartitionSession() instead";
+ ythrow yexception() << "GetPartitionSession method unavailable for federated objects, use GetFederatedPartitionSession instead";
}
bool HasCompressedMessages() const {
@@ -172,9 +174,16 @@ struct TReadSessionEvent {
TStopPartitionSessionEvent,
TPartitionSessionStatusEvent,
TPartitionSessionClosedEvent,
- NTopic::TSessionClosedEvent>;
+ TSessionClosedEvent>;
};
+template <typename TEvent>
+TReadSessionEvent::TFederated<TEvent> Federate(TEvent event, std::shared_ptr<TDbInfo> db) {
+ return {std::move(event), std::move(db)};
+}
+
+TReadSessionEvent::TDataReceivedEvent Federate(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db);
+
TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event, std::shared_ptr<TDbInfo> db);
//! Set of offsets to commit.
@@ -241,6 +250,115 @@ struct TFederatedWriteSessionSettings : public NTopic::TWriteSessionSettings {
struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings {
using TSelf = TFederatedReadSessionSettings;
+ NTopic::TReadSessionSettings& EventHandlers(const TEventHandlers&) {
+ ythrow yexception() << "EventHandlers can not be set for federated session, use FederatedEventHandlers instead";
+ }
+
+ // Each handler, if set, is wrapped up and passed down to each subsession
+ struct TFederatedEventHandlers {
+ using TSelf = TFederatedEventHandlers;
+
+ struct TSimpleDataHandlers {
+ std::function<void(TReadSessionEvent::TDataReceivedEvent&)> DataHandler;
+ bool CommitDataAfterProcessing;
+ bool GracefulStopAfterCommit;
+ };
+
+
+ //! Set simple handler with data processing and also
+ //! set other handlers with default behaviour.
+ //! They automatically commit data after processing
+ //! and confirm partition session events.
+ //!
+ //! Sets the following handlers:
+ //! DataReceivedHandler: sets DataReceivedHandler to handler that calls dataHandler and (if
+ //! commitDataAfterProcessing is set) then calls Commit(). CommitAcknowledgementHandler to handler that does
+ //! nothing. CreatePartitionSessionHandler to handler that confirms event. StopPartitionSessionHandler to
+ //! handler that confirms event. PartitionSessionStatusHandler to handler that does nothing.
+ //! PartitionSessionClosedHandler to handler that does nothing.
+ //!
+ //! dataHandler: handler of data event.
+ //! commitDataAfterProcessing: automatically commit data after calling of dataHandler.
+ //! gracefulReleaseAfterCommit: wait for commit acknowledgements for all inflight data before confirming
+ //! partition session destroy.
+
+ TSimpleDataHandlers SimpleDataHandlers_;
+
+ TSelf& SimpleDataHandlers(std::function<void(TReadSessionEvent::TDataReceivedEvent&)> dataHandler,
+ bool commitDataAfterProcessing = false, bool gracefulStopAfterCommit = true) {
+ SimpleDataHandlers_.DataHandler = std::move(dataHandler);
+ SimpleDataHandlers_.CommitDataAfterProcessing = commitDataAfterProcessing;
+ SimpleDataHandlers_.GracefulStopAfterCommit = gracefulStopAfterCommit;
+ return static_cast<TSelf&>(*this);
+ }
+
+ //! Data size limit for the DataReceivedHandler handler.
+ //! The data size may exceed this limit.
+ FLUENT_SETTING_DEFAULT(size_t, MaxMessagesBytes, Max<size_t>());
+
+ //! Function to handle data events.
+ //! If this handler is set, data events will be handled by handler,
+ //! otherwise sent to TReadSession::GetEvent().
+ //! Default value is empty function (not set).
+ FLUENT_SETTING(std::function<void(TReadSessionEvent::TDataReceivedEvent&)>, DataReceivedHandler);
+
+ //! Function to handle commit ack events.
+ //! If this handler is set, commit ack events will be handled by handler,
+ //! otherwise sent to TReadSession::GetEvent().
+ //! Default value is empty function (not set).
+ FLUENT_SETTING(std::function<void(TReadSessionEvent::TCommitOffsetAcknowledgementEvent&)>,
+ CommitOffsetAcknowledgementHandler);
+
+ //! Function to handle start partition session events.
+ //! If this handler is set, create partition session events will be handled by handler,
+ //! otherwise sent to TReadSession::GetEvent().
+ //! Default value is empty function (not set).
+ FLUENT_SETTING(std::function<void(TReadSessionEvent::TStartPartitionSessionEvent&)>,
+ StartPartitionSessionHandler);
+
+ //! Function to handle stop partition session events.
+ //! If this handler is set, destroy partition session events will be handled by handler,
+ //! otherwise sent to TReadSession::GetEvent().
+ //! Default value is empty function (not set).
+ FLUENT_SETTING(std::function<void(TReadSessionEvent::TStopPartitionSessionEvent&)>,
+ StopPartitionSessionHandler);
+
+ //! Function to handle partition session status events.
+ //! If this handler is set, partition session status events will be handled by handler,
+ //! otherwise sent to TReadSession::GetEvent().
+ //! Default value is empty function (not set).
+ FLUENT_SETTING(std::function<void(TReadSessionEvent::TPartitionSessionStatusEvent&)>,
+ PartitionSessionStatusHandler);
+
+ //! Function to handle partition session closed events.
+ //! If this handler is set, partition session closed events will be handled by handler,
+ //! otherwise sent to TReadSession::GetEvent().
+ //! Default value is empty function (not set).
+ FLUENT_SETTING(std::function<void(TReadSessionEvent::TPartitionSessionClosedEvent&)>,
+ PartitionSessionClosedHandler);
+
+ //! Function to handle session closed events.
+ //! If this handler is set, close session events will be handled by handler
+ //! and then sent to TReadSession::GetEvent().
+ //! Default value is empty function (not set).
+ FLUENT_SETTING(NTopic::TSessionClosedHandler, SessionClosedHandler);
+
+ //! Function to handle all event types.
+ //! If event with current type has no handler for this type of event,
+ //! this handler (if specified) will be used.
+ //! If this handler is not specified, event can be received with TReadSession::GetEvent() method.
+ FLUENT_SETTING(std::function<void(TReadSessionEvent::TEvent&)>, CommonHandler);
+
+ //! Executor for handlers.
+ //! If not set, default single threaded executor will be used.
+ //! Shared between subsessions
+ FLUENT_SETTING(NTopic::IExecutor::TPtr, HandlersExecutor);
+ };
+
+ //! Federated event handlers.
+ //! See description in TFederatedEventHandlers class.
+ FLUENT_SETTING(TFederatedEventHandlers, FederatedEventHandlers);
+
enum class EReadPolicy {
READ_ALL = 0,
READ_ORIGINAL,
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
index e507617ec8..7c12e6bb2b 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
@@ -10,6 +10,50 @@ namespace NYdb::NFederatedTopic {
NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings& settings);
+template <typename TEvent, typename TFederatedEvent>
+typename std::function<void(TEvent&)> WrapFederatedHandler(std::function<void(TFederatedEvent&)> outerHandler, std::shared_ptr<TDbInfo> db) {
+ if (outerHandler) {
+ return [outerHandler, db = std::move(db)](TEvent& ev) {
+ auto fev = Federate(std::move(ev), db);
+ return outerHandler(fev);
+ };
+ }
+ return {};
+}
+
+NTopic::TReadSessionSettings FromFederated(const TFederatedReadSessionSettings& settings, const std::shared_ptr<TDbInfo>& db) {
+ NTopic::TReadSessionSettings SubsessionSettings = settings;
+ SubsessionSettings.EventHandlers_.MaxMessagesBytes(settings.EventHandlers_.MaxMessagesBytes_);
+ SubsessionSettings.EventHandlers_.HandlersExecutor(settings.EventHandlers_.HandlersExecutor_);
+
+#define MAYBE_CONVERT_HANDLER(type, name) \
+ SubsessionSettings.EventHandlers_.name( \
+ WrapFederatedHandler<NTopic::type, type>(settings.FederatedEventHandlers_.name##_, db) \
+ );
+
+ MAYBE_CONVERT_HANDLER(TReadSessionEvent::TDataReceivedEvent, DataReceivedHandler);
+ MAYBE_CONVERT_HANDLER(TReadSessionEvent::TCommitOffsetAcknowledgementEvent, CommitOffsetAcknowledgementHandler);
+ MAYBE_CONVERT_HANDLER(TReadSessionEvent::TStartPartitionSessionEvent, StartPartitionSessionHandler);
+ MAYBE_CONVERT_HANDLER(TReadSessionEvent::TStopPartitionSessionEvent, StopPartitionSessionHandler);
+ MAYBE_CONVERT_HANDLER(TReadSessionEvent::TPartitionSessionStatusEvent, PartitionSessionStatusHandler);
+ MAYBE_CONVERT_HANDLER(TReadSessionEvent::TPartitionSessionClosedEvent, PartitionSessionClosedHandler);
+ MAYBE_CONVERT_HANDLER(TReadSessionEvent::TEvent, CommonHandler);
+
+#undef MAYBE_CONVERT_HANDLER
+
+ SubsessionSettings.EventHandlers_.SessionClosedHandler(settings.FederatedEventHandlers_.SessionClosedHandler_);
+
+ if (settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler) {
+ SubsessionSettings.EventHandlers_.SimpleDataHandlers(
+ WrapFederatedHandler<NTopic::TReadSessionEvent::TDataReceivedEvent, TReadSessionEvent::TDataReceivedEvent>(
+ settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler, db),
+ settings.FederatedEventHandlers_.SimpleDataHandlers_.CommitDataAfterProcessing,
+ settings.FederatedEventHandlers_.SimpleDataHandlers_.GracefulStopAfterCommit);
+ }
+
+ return SubsessionSettings;
+}
+
TFederatedReadSession::TFederatedReadSession(const TFederatedReadSessionSettings& settings,
std::shared_ptr<TGRpcConnectionsImpl> connections,
const TFederatedTopicClientSettings& clientSetttings,
@@ -42,7 +86,7 @@ void TFederatedReadSession::OpenSubSessionsImpl() {
.Database(db->path())
.DiscoveryEndpoint(db->endpoint());
auto subclient = make_shared<NTopic::TTopicClient::TImpl>(Connections, settings);
- auto subsession = subclient->CreateReadSession(Settings);
+ auto subsession = subclient->CreateReadSession(FromFederated(Settings, db));
SubSessions.emplace_back(subsession, db);
}
SubsessionIndex = 0;
@@ -98,9 +142,8 @@ TVector<TReadSessionEvent::TEvent> TFederatedReadSession::GetEvents(bool block,
with_lock(Lock) {
do {
auto sub = SubSessions[SubsessionIndex];
- // TODO remove copy
for (auto&& ev : sub.Session->GetEvents(false, maxEventsCount, maxByteSize)) {
- result.push_back(Federate(ev, sub.DbInfo));
+ result.push_back(Federate(std::move(ev), sub.DbInfo));
}
SubsessionIndex = (SubsessionIndex + 1) % SubSessions.size();
}
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h
index 040e5117c5..a9ffe4c703 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h
@@ -66,8 +66,9 @@ private:
// void ScheduleDumpCountersToLog(size_t timeNumber = 0);
private:
+ TFederatedReadSessionSettings Settings;
+
// For subsessions creation
- const NTopic::TReadSessionSettings Settings;
std::shared_ptr<TGRpcConnectionsImpl> Connections;
const NTopic::TTopicClientSettings SubClientSetttings;
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp
index 0b6ab25113..543bb293c3 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp
@@ -1,6 +1,8 @@
#include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h>
+#include <optional>
+
namespace NYdb::NFederatedTopic {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -15,24 +17,23 @@ std::pair<ui64, ui64> GetMessageOffsetRange(const TReadSessionEvent::TDataReceiv
return {msg.GetOffset(), msg.GetOffset() + 1};
}
+TReadSessionEvent::TDataReceivedEvent Federate(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db) {
+ return {std::move(event), std::move(db)};
+}
+
TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event, std::shared_ptr<TDbInfo> db) {
- if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) {
- return TReadSessionEvent::TDataReceivedEvent(std::move(*ev), db);
- } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) {
- return TReadSessionEvent::TFederated(*ev, db);
- } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
- return TReadSessionEvent::TFederated(*ev, db);
- } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) {
- return TReadSessionEvent::TFederated(*ev, db);
- } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&event)) {
- return TReadSessionEvent::TFederated(*ev, db);
- } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>(&event)) {
- return TReadSessionEvent::TFederated(*ev, db);
- } else if (auto* ev = std::get_if<NTopic::TSessionClosedEvent>(&event)) {
+ return std::visit([db = std::move(db)](auto&& arg) {
+ using T = std::decay_t<decltype(arg)>;
+ std::optional<TReadSessionEvent::TEvent> ev;
+ if constexpr (std::is_same_v<T, NTopic::TReadSessionEvent::TDataReceivedEvent>) {
+ ev = TReadSessionEvent::TDataReceivedEvent(std::move(arg), std::move(db));
+ } else if constexpr (std::is_same_v<T, NTopic::TSessionClosedEvent>) {
+ ev = std::move(arg);
+ } else {
+ ev = TReadSessionEvent::TFederated(std::move(arg), std::move(db));
+ }
return *ev;
- } else {
- Y_UNREACHABLE();
- }
+ }, event);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
index 88a8279311..7252f7f8ae 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
@@ -421,6 +421,103 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
ReadSession->Close(TDuration::MilliSeconds(10));
}
+ Y_UNIT_TEST(SimpleHandlers) {
+ auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME, false);
+ setup->Start(true, true);
+
+ TFederationDiscoveryServiceMock fdsMock;
+ fdsMock.Port = setup->GetGrpcPort();
+
+ ui16 newServicePort = setup->GetPortManager()->GetPort(4285);
+ auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock);
+
+ std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> ReadSession;
+
+ // Create topic client.
+ NYdb::TDriverConfig cfg;
+ cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort);
+ cfg.SetDatabase("/Root");
+ cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG));
+ NYdb::TDriver driver(cfg);
+ auto clientSettings = TFederatedTopicClientSettings()
+ .RetryPolicy(NTopic::IRetryPolicy::GetFixedIntervalPolicy(
+ TDuration::Seconds(10),
+ TDuration::Seconds(10)
+ ));
+ NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver, clientSettings);
+
+ ui64 count = 300u;
+
+ TString messageBase = "message----";
+ TVector<TString> sentMessages;
+
+ for (auto i = 0u; i < count; i++) {
+ // sentMessages.emplace_back(messageBase * (i+1) + ToString(i));
+ sentMessages.emplace_back(messageBase * (10 * i + 1));
+ }
+
+ NThreading::TPromise<void> checkedPromise = NThreading::NewPromise<void>();
+ auto totalReceived = 0u;
+
+ auto f = checkedPromise.GetFuture();
+ TAtomic check = 1;
+
+ // Create read session.
+ NYdb::NFederatedTopic::TFederatedReadSessionSettings readSettings;
+ readSettings
+ .ConsumerName("shared/user")
+ .MaxMemoryUsageBytes(1_MB)
+ .AppendTopics(setup->GetTestTopic());
+
+ readSettings.FederatedEventHandlers_.SimpleDataHandlers([&](TReadSessionEvent::TDataReceivedEvent& ev) mutable {
+ Cerr << ">>> event from dataHandler: " << DebugString(ev) << Endl;
+ Y_VERIFY_S(AtomicGet(check) != 0, "check is false");
+ auto& messages = ev.GetMessages();
+ for (size_t i = 0u; i < messages.size(); ++i) {
+ auto& message = messages[i];
+ UNIT_ASSERT_VALUES_EQUAL(message.GetData(), sentMessages[totalReceived]);
+ totalReceived++;
+ }
+ if (totalReceived == sentMessages.size())
+ checkedPromise.SetValue();
+ });
+
+ ReadSession = topicClient.CreateFederatedReadSession(readSettings);
+ Cerr << ">>> Session was created" << Endl;
+
+ Sleep(TDuration::MilliSeconds(50));
+
+ auto events = ReadSession->GetEvents(false);
+ UNIT_ASSERT(events.empty());
+
+ std::optional<TFederationDiscoveryServiceMock::TManualRequest> fdsRequest;
+ do {
+ fdsRequest = fdsMock.GetNextPendingRequest();
+ if (!fdsRequest.has_value()) {
+ Sleep(TDuration::MilliSeconds(50));
+ }
+ } while (!fdsRequest.has_value());
+
+ fdsRequest->Result.SetValue(fdsMock.ComposeOkResult());
+
+ NPersQueue::TWriteSessionSettings writeSettings;
+ writeSettings.Path(setup->GetTestTopic()).MessageGroupId("src_id");
+ writeSettings.Codec(NPersQueue::ECodec::RAW);
+ NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor();
+ writeSettings.CompressionExecutor(executor);
+
+ auto& client = setup->GetPersQueueClient();
+ auto session = client.CreateSimpleBlockingWriteSession(writeSettings);
+
+ for (auto i = 0u; i < count; i++) {
+ auto res = session->Write(sentMessages[i]);
+ UNIT_ASSERT(res);
+ }
+
+ f.GetValueSync();
+ ReadSession->Close();
+ AtomicSet(check, 0);
+ }
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
index 0fecce9af0..f5a16b1514 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
@@ -135,7 +135,7 @@ public:
i32 GetNodeId() const;
i64 GetGeneration() const;
-private:
+private:
// Node identificator.
i32 NodeId_ = 1;
@@ -1403,14 +1403,14 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> {
//! Sets the following handlers:
//! DataReceivedHandler: sets DataReceivedHandler to handler that calls dataHandler and (if
//! commitDataAfterProcessing is set) then calls Commit(). CommitAcknowledgementHandler to handler that does
- //! nothing. CreatePartitionSessionHandler to handler that confirms event. StopPartitionSessionHandler to
+ //! nothing. StartPartitionSessionHandler to handler that confirms event. StopPartitionSessionHandler to
//! handler that confirms event. PartitionSessionStatusHandler to handler that does nothing.
//! PartitionSessionClosedHandler to handler that does nothing.
//!
//! dataHandler: handler of data event.
//! commitDataAfterProcessing: automatically commit data after calling of dataHandler.
//! gracefulReleaseAfterCommit: wait for commit acknowledgements for all inflight data before confirming
- //! partition session destroy.
+ //! partition session stop.
TSelf& SimpleDataHandlers(std::function<void(TReadSessionEvent::TDataReceivedEvent&)> dataHandler,
bool commitDataAfterProcessing = false, bool gracefulStopAfterCommit = true);
@@ -1476,7 +1476,7 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> {
FLUENT_SETTING(IExecutor::TPtr, HandlersExecutor);
};
-
+
TString ConsumerName_ = "";
//! Consumer.
TSelf& ConsumerName(const TString& name) {
@@ -1484,7 +1484,7 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> {
WithoutConsumer_ = false;
return static_cast<TSelf&>(*this);
}
-
+
bool WithoutConsumer_ = false;
//! Read without consumer.
TSelf& WithoutConsumer() {