diff options
author | ildar-khisam <ikhis@ydb.tech> | 2023-08-01 10:30:59 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2023-08-01 10:30:59 +0300 |
commit | 33c36722183fe4f3d29d7dbd741e7f53c441ed0b (patch) | |
tree | 8c29c67778e2e9ff60db03f76bc17d5deb86a099 | |
parent | e4a0e9c847adf0267248aadfddf5f5095489fd11 (diff) | |
download | ydb-33c36722183fe4f3d29d7dbd741e7f53c441ed0b.tar.gz |
federated sdk work in progress
work in progress
5 files changed, 349 insertions, 357 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h index 2f75531305..d526e60845 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h @@ -17,13 +17,16 @@ protected: TString LocalDC = "dc1"; TTestServer Server; TLog Log = TLog("cerr"); + size_t TopicPartitionsCount = 1; public: SDKTestSetup(const TString& testCaseName, bool start = true, - const TVector<NKikimrServices::EServiceKikimr>& logServices = TTestServer::LOGGED_SERVICES, NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG) - : TestCaseName(testCaseName) + const TVector<NKikimrServices::EServiceKikimr>& logServices = TTestServer::LOGGED_SERVICES, + NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG, + size_t topicPartitionsCount = 1) + : TestCaseName(testCaseName) , Server(false, Nothing(), logServices, logPriority) - { + , TopicPartitionsCount(topicPartitionsCount) { InitOptions(); if (start) { Start(); @@ -76,7 +79,7 @@ public: Server.AnnoyingClient->CheckClustersList(Server.CleverServer->GetRuntime(), true, DataCenters); } Server.AnnoyingClient->InitSourceIds(); - CreateTopic(GetTestTopic(), GetLocalCluster()); + CreateTopic(GetTestTopic(), GetLocalCluster(), TopicPartitionsCount); if (waitInit) { Server.WaitInit(GetTestTopic()); } @@ -107,6 +110,17 @@ public: return Server.GrpcPort; } + TSimpleSharedPtr<TPortManager> GetPortManager() { + return Server.PortManager; + } + + std::unique_ptr<grpc::Server> StartGrpcService(const ui16 port, grpc::Service* service) { + grpc::ServerBuilder builder; + builder.AddListeningPort("[::]:" + ToString(port), grpc::InsecureServerCredentials()).RegisterService(service); + std::unique_ptr<grpc::Server> grpcServer(builder.BuildAndStart()); + return grpcServer; + } + NGrpc::TServerOptions& GetGrpcServerOptions() { return Server.GrpcServerOptions; } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h index 32fba77053..80732d1809 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h @@ -17,8 +17,8 @@ class TPersQueueYdbSdkTestSetup : public ::NPersQueue::SDKTestSetup { TAdaptiveLock Lock; public: TPersQueueYdbSdkTestSetup(const TString& testCaseName, bool start = true, - const TVector<NKikimrServices::EServiceKikimr>& logServices = ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG) - : SDKTestSetup(testCaseName, start, logServices, logPriority) + const TVector<NKikimrServices::EServiceKikimr>& logServices = ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG, size_t topicPartitionsCount = 1) + : SDKTestSetup(testCaseName, start, logServices, logPriority, topicPartitionsCount) { } @@ -231,7 +231,7 @@ struct TYdbPqTestRetryPolicy : IRetryPolicy { if (AtomicSwap(&OnFatalBreakDown, 0)) { return std::make_unique<TYdbPqNoRetryState>(); } - if (AtomicGet(Initialized_)) + if (AtomicGet(Initialized_)) { Cerr << "====CreateRetryState Initialized\n"; auto res = AtomicSwap(&OnBreakDown, 0); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp index 3bf8719fde..e78301e7fc 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp @@ -4,9 +4,23 @@ namespace NYdb::NTopic { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Aliases for event types + +using TDataReceivedEvent = TReadSessionEvent::TDataReceivedEvent; +using TMessageInformation = TDataReceivedEvent::TMessageInformation; +using TMessageBase = TDataReceivedEvent::TMessageBase; +using TMessage = TDataReceivedEvent::TMessage; +using TCompressedMessage = TDataReceivedEvent::TCompressedMessage; +using TCommitOffsetAcknowledgementEvent = TReadSessionEvent::TCommitOffsetAcknowledgementEvent; +using TStartPartitionSessionEvent = TReadSessionEvent::TStartPartitionSessionEvent; +using TStopPartitionSessionEvent = TReadSessionEvent::TStopPartitionSessionEvent; +using TPartitionSessionStatusEvent = TReadSessionEvent::TPartitionSessionStatusEvent; +using TPartitionSessionClosedEvent = TReadSessionEvent::TPartitionSessionClosedEvent; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Helpers -std::pair<ui64, ui64> GetMessageOffsetRange(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent, ui64 index) { +std::pair<ui64, ui64> GetMessageOffsetRange(const TDataReceivedEvent& dataReceivedEvent, ui64 index) { if (dataReceivedEvent.HasCompressedMessages()) { const auto& msg = dataReceivedEvent.GetCompressedMessages()[index]; return {msg.GetOffset(), msg.GetOffset() + 1}; @@ -18,7 +32,7 @@ std::pair<ui64, ui64> GetMessageOffsetRange(const TReadSessionEvent::TDataReceiv //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation -TReadSessionEvent::TDataReceivedEvent::TMessageInformation::TMessageInformation( +TMessageInformation::TMessageInformation( ui64 offset, TString producerId, ui64 seqNo, @@ -40,218 +54,186 @@ TReadSessionEvent::TDataReceivedEvent::TMessageInformation::TMessageInformation( , MessageGroupId(messageGroupId) {} -static void DebugStringImpl(const TReadSessionEvent::TDataReceivedEvent::TMessageInformation& info, TStringBuilder& ret) { - ret << " Information: {" - << " Offset: " << info.Offset - << " ProducerId: \"" << info.ProducerId << "\"" - << " SeqNo: " << info.SeqNo - << " CreateTime: " << info.CreateTime - << " WriteTime: " << info.WriteTime - << " UncompressedSize: " << info.UncompressedSize - << " MessageGroupId: \"" << info.MessageGroupId << "\""; - ret << " Meta: {"; - bool firstKey = true; - for (const auto& [k, v] : info.Meta->Fields) { - ret << (firstKey ? " \"" : ", \"") << k << "\": \"" << v << "\""; - firstKey = false; - } - ret << " } }"; -} - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// NTopic::TReadSessionEvent::TDataReceivedEvent::IMessage +// NTopic::TReadSessionEvent::TDataReceivedEvent::TPartitionSessionAccessor -TReadSessionEvent::TDataReceivedEvent::IMessage::IMessage(const TString& data, - TPartitionSession::TPtr partitionSession) - : Data(data) - , PartitionSession(partitionSession) +TReadSessionEvent::TPartitionSessionAccessor::TPartitionSessionAccessor(TPartitionSession::TPtr partitionSession) + : PartitionSession(std::move(partitionSession)) {} -const TString& TReadSessionEvent::TDataReceivedEvent::IMessage::GetData() const { - return Data; -} - -const TPartitionSession::TPtr& TReadSessionEvent::TDataReceivedEvent::IMessage::GetPartitionSession() const { +const TPartitionSession::TPtr& TReadSessionEvent::TPartitionSessionAccessor::GetPartitionSession() const { return PartitionSession; } -TString TReadSessionEvent::TDataReceivedEvent::IMessage::DebugString(bool printData) const { - TStringBuilder ret; - DebugString(ret, printData); - return std::move(ret); -} - -template <class TSerializeInformationFunc> -static void DebugStringImpl(TStringBuilder& ret, - const TString& name, - const TReadSessionEvent::TDataReceivedEvent::IMessage& msg, - bool printData, - TSerializeInformationFunc serializeInformationFunc, - std::optional<ECodec> codec = std::nullopt) -{ - ret << name << " {"; - try { - const TString& data = msg.GetData(); - if (printData) { - ret << " Data: \"" << data << "\""; - } else { - ret << " Data: .." << data.size() << " bytes.."; - } - } catch (...) { - ret << " DataDecompressionError: \"" << CurrentExceptionMessage() << "\""; - } - auto partitionSession = msg.GetPartitionSession(); - ret << " Partition session id: " << partitionSession->GetPartitionSessionId() - << " Topic: \"" << partitionSession->GetTopicPath() << "\"" - << " Partition: " << partitionSession->GetPartitionId(); - if (codec) { - ret << " Codec: " << codec.value(); - } - serializeInformationFunc(ret); - ret << " }"; +template<> +void TPrintable<TPartitionSession>::DebugString(TStringBuilder& res, bool) const { + const auto* self = static_cast<const TPartitionSession*>(this); + res << " Partition session id: " << self->GetPartitionSessionId() + << " Topic: \"" << self->GetTopicPath() << "\"" + << " Partition: " << self->GetPartitionId(); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage +// NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageBase -TReadSessionEvent::TDataReceivedEvent::TMessage::TMessage(const TString& data, - std::exception_ptr decompressionException, - const TMessageInformation& information, - TPartitionSession::TPtr partitionSession) - : IMessage(data, partitionSession) - , DecompressionException(std::move(decompressionException)) - , Information(information) -{ -} - -const TString& TReadSessionEvent::TDataReceivedEvent::TMessage::GetData() const { - if (DecompressionException) { - std::rethrow_exception(DecompressionException); - } - return IMessage::GetData(); -} +TMessageBase::TMessageBase(const TString& data, TMessageInformation info) + : Data(data) + , Information(std::move(info)) +{} -bool TReadSessionEvent::TDataReceivedEvent::TMessage::HasException() const { - return DecompressionException != nullptr; +const TString& TMessageBase::GetData() const { + return Data; } -ui64 TReadSessionEvent::TDataReceivedEvent::TMessage::GetOffset() const { +ui64 TMessageBase::GetOffset() const { return Information.Offset; } -const TString& TReadSessionEvent::TDataReceivedEvent::TMessage::GetProducerId() const { +const TString& TMessageBase::GetProducerId() const { return Information.ProducerId; } -const TString& TReadSessionEvent::TDataReceivedEvent::TMessage::GetMessageGroupId() const { +const TString& TMessageBase::GetMessageGroupId() const { return Information.MessageGroupId; } -ui64 TReadSessionEvent::TDataReceivedEvent::TMessage::GetSeqNo() const { +ui64 TMessageBase::GetSeqNo() const { return Information.SeqNo; } -TInstant TReadSessionEvent::TDataReceivedEvent::TMessage::GetCreateTime() const { +TInstant TMessageBase::GetCreateTime() const { return Information.CreateTime; } -TInstant TReadSessionEvent::TDataReceivedEvent::TMessage::GetWriteTime() const { +TInstant TMessageBase::GetWriteTime() const { return Information.WriteTime; } -const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TMessage::GetMeta() const { +const TWriteSessionMeta::TPtr& TMessageBase::GetMeta() const { return Information.Meta; } -const TMessageMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TMessage::GetMessageMeta() const { +const TMessageMeta::TPtr& TMessageBase::GetMessageMeta() const { return Information.MessageMeta; } -void TReadSessionEvent::TDataReceivedEvent::TMessage::Commit() { - static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get()) - ->Commit(Information.Offset, Information.Offset + 1); -} - -void TReadSessionEvent::TDataReceivedEvent::TMessage::DebugString(TStringBuilder& ret, bool printData) const { - DebugStringImpl(ret, "Message", *this, printData, [this](TStringBuilder& ret) { - DebugStringImpl(this->Information, ret); - }); +template<> +void TPrintable<TMessageBase>::DebugString(TStringBuilder& ret, bool printData) const { + const auto* self = static_cast<const TMessageBase*>(this); + try { + const TString& data = self->GetData(); + if (printData) { + ret << " Data: \"" << data << "\""; + } else { + ret << " Data: .." << data.size() << " bytes.."; + } + } catch (...) { + ret << " DataDecompressionError: \"" << CurrentExceptionMessage() << "\""; + } + ret << " Information: {" + << " Offset: " << self->GetOffset() + << " ProducerId: \"" << self->GetProducerId() << "\"" + << " SeqNo: " << self->GetSeqNo() + << " CreateTime: " << self->GetCreateTime() + << " WriteTime: " << self->GetWriteTime() + << " MessageGroupId: \"" << self->GetMessageGroupId() << "\""; + ret << " Meta: {"; + bool firstKey = true; + for (const auto& [k, v] : self->GetMeta()->Fields) { + ret << (firstKey ? " \"" : ", \"") << k << "\": \"" << v << "\""; + firstKey = false; + } + ret << " }"; + ret << " MessageMeta: {"; + firstKey = true; + for (const auto& [k, v] : self->GetMessageMeta()->Fields) { + ret << (firstKey ? " \"" : ", \"") << k << "\": \"" << v << "\""; + firstKey = false; + } + ret << " } }"; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage - -TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::TCompressedMessage(ECodec codec, - const TString& data, - const TMessageInformation& information, - TPartitionSession::TPtr partitionSession) - : IMessage(data, partitionSession) - , Codec(codec) - , Information(information) -{} - - -ECodec TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetCodec() const { - return Codec; -} +// NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage -ui64 TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetOffset() const { - return Information.Offset; +TMessage::TMessage(const TString& data, + std::exception_ptr decompressionException, + TMessageInformation information, + TPartitionSession::TPtr partitionSession) + : TMessageBase(data, std::move(information)) + , TPartitionSessionAccessor(std::move(partitionSession)) + , DecompressionException(std::move(decompressionException)) { } -const TString& TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetProducerId() const { - return Information.ProducerId; +const TString& TMessage::GetData() const { + if (DecompressionException) { + std::rethrow_exception(DecompressionException); + } + return TMessageBase::GetData(); } -const TString& TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetMessageGroupId() const { - return Information.MessageGroupId; +bool TMessage::HasException() const { + return DecompressionException != nullptr; } -ui64 TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetSeqNo() const { - return Information.SeqNo; +void TMessage::Commit() { + static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get()) + ->Commit(Information.Offset, Information.Offset + 1); } -TInstant TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetCreateTime() const { - return Information.CreateTime; +template<> +void TPrintable<TMessage>::DebugString(TStringBuilder& ret, bool printData) const { + const auto* self = static_cast<const TMessage*>(this); + ret << "Message {"; + static_cast<const TMessageBase*>(self)->DebugString(ret, printData); + self->GetPartitionSession()->DebugString(ret); + ret << " }"; } -TInstant TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetWriteTime() const { - return Information.WriteTime; -} +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage -const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetMeta() const { - return Information.Meta; +TCompressedMessage::TCompressedMessage(ECodec codec, + const TString& data, + TMessageInformation information, + TPartitionSession::TPtr partitionSession) + : TMessageBase(data, std::move(information)) + , TPartitionSessionAccessor(std::move(partitionSession)) + , Codec(codec) { } -const TMessageMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetMessageMeta() const { - return Information.MessageMeta; +ECodec TCompressedMessage::GetCodec() const { + return Codec; } -ui64 TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetUncompressedSize() const { +ui64 TCompressedMessage::GetUncompressedSize() const { return Information.UncompressedSize; } -void TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::Commit() { +void TCompressedMessage::Commit() { static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get()) ->Commit(Information.Offset, Information.Offset + 1); } -void TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::DebugString(TStringBuilder& ret, bool printData) const { - DebugStringImpl( - ret, "CompressedMessage", *this, printData, - [this](TStringBuilder& ret) { DebugStringImpl(this->Information, ret); }, Codec); +template<> +void TPrintable<TCompressedMessage>::DebugString(TStringBuilder& ret, bool printData) const { + const auto* self = static_cast<const TCompressedMessage*>(this); + ret << "CompressedMessage {"; + static_cast<const TMessageBase*>(self)->DebugString(ret, printData); + self->GetPartitionSession()->DebugString(ret); + ret << " Codec: " << self->GetCodec() + << " Uncompressed size: " << self->GetUncompressedSize() + << " }"; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // NTopic::TReadSessionEvent::TDataReceivedEvent -TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(TVector<TMessage> messages, - TVector<TCompressedMessage> compressedMessages, - TPartitionSession::TPtr partitionSession) - : Messages(std::move(messages)) - , CompressedMessages(std::move(compressedMessages)) - , PartitionSession(std::move(partitionSession)) -{ +TDataReceivedEvent::TDataReceivedEvent(TVector<TMessage> messages, TVector<TCompressedMessage> compressedMessages, + TPartitionSession::TPtr partitionSession) + : TPartitionSessionAccessor(std::move(partitionSession)) + , Messages(std::move(messages)) + , CompressedMessages(std::move(compressedMessages)) { for (size_t i = 0; i < GetMessagesCount(); ++i) { auto [from, to] = GetMessageOffsetRange(*this, i); if (OffsetRanges.empty() || OffsetRanges.back().second != from) { @@ -262,139 +244,156 @@ TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(TVector<TMessage> mess } } -void TReadSessionEvent::TDataReceivedEvent::Commit() { +void TDataReceivedEvent::Commit() { for (auto [from, to] : OffsetRanges) { static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get())->Commit(from, to); } } -TString TReadSessionEvent::TDataReceivedEvent::DebugString(bool printData) const { - TStringBuilder ret; - ret << "DataReceived { PartitionSessionId: " << GetPartitionSession()->GetPartitionSessionId() - << " PartitionId: " << GetPartitionSession()->GetPartitionId(); - for (const auto& message : Messages) { - ret << " "; - message.DebugString(ret, printData); - } - for (const auto& message : CompressedMessages) { - ret << " "; - message.DebugString(ret, printData); +template<> +void TPrintable<TDataReceivedEvent>::DebugString(TStringBuilder& ret, bool printData) const { + const auto* self = static_cast<const TDataReceivedEvent*>(this); + ret << "DataReceived {"; + self->GetPartitionSession()->DebugString(ret); + if (self->HasCompressedMessages()) { + for (const auto& message : self->GetCompressedMessages()) { + ret << " "; + message.DebugString(ret, printData); + } + } else { + for (const auto& message : self->GetMessages()) { + ret << " "; + message.DebugString(ret, printData); + } } ret << " }"; - return std::move(ret); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent -TReadSessionEvent::TCommitOffsetAcknowledgementEvent::TCommitOffsetAcknowledgementEvent(TPartitionSession::TPtr partitionSession, ui64 committedOffset) - : PartitionSession(std::move(partitionSession)) - , CommittedOffset(committedOffset) -{ +TCommitOffsetAcknowledgementEvent::TCommitOffsetAcknowledgementEvent(TPartitionSession::TPtr partitionSession, + ui64 committedOffset) + : TPartitionSessionAccessor(std::move(partitionSession)) + , CommittedOffset(committedOffset) { } - -TString TReadSessionEvent::TCommitOffsetAcknowledgementEvent::DebugString() const { - return TStringBuilder() << "CommitAcknowledgement { PartitionSessionId: " << GetPartitionSession()->GetPartitionSessionId() - << " PartitionId: " << GetPartitionSession()->GetPartitionId() - << " CommittedOffset: " << GetCommittedOffset() - << " }"; +template<> +void TPrintable<TCommitOffsetAcknowledgementEvent>::DebugString(TStringBuilder& ret, bool) const { + const auto* self = static_cast<const TCommitOffsetAcknowledgementEvent*>(this); + ret << "CommitAcknowledgement {"; + self->GetPartitionSession()->DebugString(ret); + ret << " CommittedOffset: " << self->GetCommittedOffset() + << " }"; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // NTopic::TReadSessionEvent::TStartPartitionSessionEvent -TReadSessionEvent::TStartPartitionSessionEvent::TStartPartitionSessionEvent(TPartitionSession::TPtr partitionSession, - ui64 committedOffset, ui64 endOffset) - : PartitionSession(std::move(partitionSession)) +TStartPartitionSessionEvent::TStartPartitionSessionEvent(TPartitionSession::TPtr partitionSession, ui64 committedOffset, + ui64 endOffset) + : TPartitionSessionAccessor(std::move(partitionSession)) , CommittedOffset(committedOffset) , EndOffset(endOffset) { } -void TReadSessionEvent::TStartPartitionSessionEvent::Confirm(TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset) { +void TStartPartitionSessionEvent::Confirm(TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset) { if (PartitionSession) { static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get()) ->ConfirmCreate(readOffset, commitOffset); } } -TString TReadSessionEvent::TStartPartitionSessionEvent::DebugString() const { - return TStringBuilder() << "CreatePartitionSession { PartitionSessionId: " - << GetPartitionSession()->GetPartitionSessionId() - << " TopicPath: " << GetPartitionSession()->GetTopicPath() - << " PartitionId: " << GetPartitionSession()->GetPartitionId() - << " CommittedOffset: " << GetCommittedOffset() - << " EndOffset: " << GetEndOffset() << " }"; +template<> +void TPrintable<TStartPartitionSessionEvent>::DebugString(TStringBuilder& ret, bool) const { + const auto* self = static_cast<const TStartPartitionSessionEvent*>(this); + ret << "StartPartitionSession {"; + self->GetPartitionSession()->DebugString(ret); + ret << " CommittedOffset: " << self->GetCommittedOffset() + << " EndOffset: " << self->GetEndOffset() + << " }"; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // NTopic::TReadSessionEvent::TStopPartitionSessionEvent -TReadSessionEvent::TStopPartitionSessionEvent::TStopPartitionSessionEvent(TPartitionSession::TPtr partitionSession, - bool committedOffset) - : PartitionSession(std::move(partitionSession)) +TStopPartitionSessionEvent::TStopPartitionSessionEvent(TPartitionSession::TPtr partitionSession, bool committedOffset) + : TPartitionSessionAccessor(std::move(partitionSession)) , CommittedOffset(committedOffset) { } -void TReadSessionEvent::TStopPartitionSessionEvent::Confirm() { +void TStopPartitionSessionEvent::Confirm() { if (PartitionSession) { static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get())->ConfirmDestroy(); } } -TString TReadSessionEvent::TStopPartitionSessionEvent::DebugString() const { - return TStringBuilder() << "DestroyPartitionSession { PartitionSessionId: " - << GetPartitionSession()->GetPartitionSessionId() - << " PartitionId: " << GetPartitionSession()->GetPartitionId() - << " CommittedOffset: " << GetCommittedOffset() << " }"; +template<> +void TPrintable<TStopPartitionSessionEvent>::DebugString(TStringBuilder& ret, bool) const { + const auto* self = static_cast<const TStopPartitionSessionEvent*>(this); + ret << "StopPartitionSession {"; + self->GetPartitionSession()->DebugString(ret); + ret << " CommittedOffset: " << self->GetCommittedOffset() + << " }"; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // NTopic::TReadSessionEvent::TPartitionSessionStatusEvent -TReadSessionEvent::TPartitionSessionStatusEvent::TPartitionSessionStatusEvent(TPartitionSession::TPtr partitionSession, - ui64 committedOffset, ui64 readOffset, - ui64 endOffset, - TInstant writeTimeHighWatermark) - : PartitionSession(std::move(partitionSession)) +TPartitionSessionStatusEvent::TPartitionSessionStatusEvent(TPartitionSession::TPtr partitionSession, + ui64 committedOffset, ui64 readOffset, ui64 endOffset, + TInstant writeTimeHighWatermark) + : TPartitionSessionAccessor(std::move(partitionSession)) , CommittedOffset(committedOffset) , ReadOffset(readOffset) , EndOffset(endOffset) , WriteTimeHighWatermark(writeTimeHighWatermark) { } -TString TReadSessionEvent::TPartitionSessionStatusEvent::DebugString() const { - return TStringBuilder() << "PartitionSessionStatus { PartitionSessionId: " - << GetPartitionSession()->GetPartitionSessionId() - << " PartitionId: " << GetPartitionSession()->GetPartitionId() - << " CommittedOffset: " << GetCommittedOffset() << " ReadOffset: " << GetReadOffset() - << " EndOffset: " << GetEndOffset() - << " WriteWatermark: " << GetWriteTimeHighWatermark() << " }"; +template<> +void TPrintable<TPartitionSessionStatusEvent>::DebugString(TStringBuilder& ret, bool) const { + const auto* self = static_cast<const TPartitionSessionStatusEvent*>(this); + ret << "PartitionSessionStatus {"; + self->GetPartitionSession()->DebugString(ret); + ret << " CommittedOffset: " << self->GetCommittedOffset() + << " ReadOffset: " << self->GetReadOffset() + << " EndOffset: " << self->GetEndOffset() + << " WriteWatermark: " << self->GetWriteTimeHighWatermark() + << " }"; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // NTopic::TReadSessionEvent::TPartitionSessionClosedEvent -TReadSessionEvent::TPartitionSessionClosedEvent::TPartitionSessionClosedEvent(TPartitionSession::TPtr partitionSession, EReason reason) - : PartitionSession(std::move(partitionSession)) +TPartitionSessionClosedEvent::TPartitionSessionClosedEvent(TPartitionSession::TPtr partitionSession, EReason reason) + : TPartitionSessionAccessor(std::move(partitionSession)) , Reason(reason) { } -TString TReadSessionEvent::TPartitionSessionClosedEvent::DebugString() const { - return TStringBuilder() << "PartitionSessionClosed { PartitionSessionId: " - << GetPartitionSession()->GetPartitionSessionId() - << " PartitionId: " << GetPartitionSession()->GetPartitionId() - << " Reason: " << GetReason() << " }"; +template<> +void TPrintable<TPartitionSessionClosedEvent>::DebugString(TStringBuilder& ret, bool) const { + const auto* self = static_cast<const TPartitionSessionClosedEvent*>(this); + ret << "PartitionSessionClosed {"; + self->GetPartitionSession()->DebugString(ret); + ret << " Reason: " << self->GetReason() + << " }"; } -TString TSessionClosedEvent::DebugString() const { - return - TStringBuilder() << "SessionClosed { Status: " << GetStatus() - << " Issues: \"" << NPersQueue::IssuesSingleLineString(GetIssues()) - << "\" }"; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NTopic::TSessionClosedEvent + +template<> +void TPrintable<TSessionClosedEvent>::DebugString(TStringBuilder& ret, bool) const { + const auto* self = static_cast<const TSessionClosedEvent*>(this); + ret << "SessionClosed { Status: " << self->GetStatus() + << " Issues: \"" << NPersQueue::IssuesSingleLineString(self->GetIssues()) + << "\" }"; } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NTopic::TReadSessionEvent + TString DebugString(const TReadSessionEvent::TEvent& event) { return std::visit([](const auto& ev) { return ev.DebugString(); }, event); } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp index 3fb36f63da..0cf98e4212 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -258,7 +258,7 @@ void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& mess readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk; } if (readyToAccept) { - EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}}); } } @@ -440,7 +440,7 @@ void TWriteSessionImpl::InitImpl() { auto* init = req.mutable_init_request(); init->set_path(Settings.Path_); init->set_producer_id(Settings.ProducerId_); - + if (Settings.PartitionId_.Defined()) init->set_partition_id(*Settings.PartitionId_); else @@ -561,32 +561,32 @@ TStringBuilder TWriteSessionImpl::LogPrefix() const { return TStringBuilder() << "ProducerId [" << Settings.ProducerId_ << "] MessageGroupId [" << Settings.MessageGroupId_ << "] SessionId [" << SessionId << "] "; } -TString TWriteSessionEvent::TAcksEvent::DebugString() const { - TStringBuilder res; +template<> +void TPrintable<TWriteSessionEvent::TAcksEvent>::DebugString(TStringBuilder& res, bool) const { + const auto* self = static_cast<const TWriteSessionEvent::TAcksEvent*>(this); res << "AcksEvent:"; - for (auto& ack : Acks) { + for (auto& ack : self->Acks) { res << " { seqNo : " << ack.SeqNo << ", State : " << ack.State; if (ack.Details) { res << ", offset : " << ack.Details->Offset << ", partitionId : " << ack.Details->PartitionId; } res << " }"; } - if (!Acks.empty() && Acks.back().Stat) { - auto& stat = Acks.back().Stat; + if (!self->Acks.empty() && self->Acks.back().Stat) { + auto& stat = self->Acks.back().Stat; res << " write stat: Write time " << stat->WriteTime << " minimal time in partition queue " << stat->MinTimeInPartitionQueue << " maximal time in partition queue " << stat->MaxTimeInPartitionQueue << " partition quoted time " << stat->PartitionQuotedTime << " topic quoted time " << stat->TopicQuotedTime; } - return res; } -TString TWriteSessionEvent::TReadyToAcceptEvent::DebugString() const { - return "ReadyToAcceptEvent"; +template<> +void TPrintable<TWriteSessionEvent::TReadyToAcceptEvent>::DebugString(TStringBuilder& res, bool) const { + res << "ReadyToAcceptEvent"; } - TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMessageImpl() { Y_VERIFY(Lock.IsLocked()); @@ -625,7 +625,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess OnErrorResolved(); if (!FirstTokenSent) { - result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}}); FirstTokenSent = true; } // Kickstart send after session reestablishment @@ -677,7 +677,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess }); if (CleanupOnAcknowledged(sequenceNumber - SeqNoShift)) { - result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}}); } } //EventsQueue->PushEvent(std::move(acksEvent)); @@ -816,7 +816,7 @@ void TWriteSessionImpl::OnCompressed(TBlock&& block, bool isSyncCompression) { memoryUsage = OnCompressedImpl(std::move(block)); } if (memoryUsage.NowOk && !memoryUsage.WasOk) { - EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}}); } } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index 962bebac17..a0cc5e2088 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -171,7 +171,10 @@ class TPartitioningSettings { public: TPartitioningSettings() : MinActivePartitions_(0), PartitionCountLimit_(0){} TPartitioningSettings(const Ydb::Topic::PartitioningSettings& settings); - TPartitioningSettings(ui64 minActivePartitions, ui64 partitionCountLimit) : MinActivePartitions_(minActivePartitions), PartitionCountLimit_(partitionCountLimit) {} + TPartitioningSettings(ui64 minActivePartitions, ui64 partitionCountLimit) + : MinActivePartitions_(minActivePartitions) + , PartitionCountLimit_(partitionCountLimit) { + } ui64 GetMinActivePartitions() const; ui64 GetPartitionCountLimit() const; @@ -602,6 +605,19 @@ struct TCommitOffsetSettings : public TOperationRequestSettings<TCommitOffsetSet //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +template <typename TEvent> +class TPrintable { +public: + TString DebugString(bool printData = false) const { + TStringBuilder b; + static_cast<const TEvent*>(this)->DebugString(b, printData); + return b; + } + + // implemented in template specializations + void DebugString(TStringBuilder& ret, bool printData = false) const = delete; +}; + //! Session metainformation. struct TWriteSessionMeta: public TThrRefBase { using TPtr = TIntrusivePtr<TWriteSessionMeta>; @@ -618,12 +634,13 @@ struct TMessageMeta: public TThrRefBase { }; //! Event that is sent to client during session destruction. -struct TSessionClosedEvent: public TStatus { +struct TSessionClosedEvent: public TStatus, public TPrintable<TSessionClosedEvent> { using TStatus::TStatus; - - TString DebugString() const; }; +template<> +void TPrintable<TSessionClosedEvent>::DebugString(TStringBuilder& res, bool) const; + struct TWriteStat : public TThrRefBase { TDuration WriteTime; TDuration MinTimeInPartitionQueue; @@ -705,7 +722,7 @@ struct TReaderCounters: public TThrRefBase { }; //! Partition session. -struct TPartitionSession: public TThrRefBase { +struct TPartitionSession: public TThrRefBase, public TPrintable<TPartitionSession> { using TPtr = TIntrusivePtr<TPartitionSession>; public: @@ -739,11 +756,26 @@ protected: ui64 PartitionId; }; +template<> +void TPrintable<TPartitionSession>::DebugString(TStringBuilder& res, bool) const; + //! Events for read session. struct TReadSessionEvent { + class TPartitionSessionAccessor { + public: + TPartitionSessionAccessor(TPartitionSession::TPtr partitionSession); + + virtual ~TPartitionSessionAccessor() = default; + + virtual const TPartitionSession::TPtr& GetPartitionSession() const; + + protected: + TPartitionSession::TPtr PartitionSession; + }; + //! Event with new data. //! Contains batch of messages from single partition session. - struct TDataReceivedEvent { + struct TDataReceivedEvent : public TPartitionSessionAccessor, public TPrintable<TDataReceivedEvent> { struct TMessageInformation { TMessageInformation(ui64 offset, TString producerId, @@ -765,39 +797,16 @@ struct TReadSessionEvent { TString MessageGroupId; }; - class IMessage { + class TMessageBase : public TPrintable<TMessageBase> { public: - IMessage(const TString& data, TPartitionSession::TPtr partitionSession); + TMessageBase(const TString& data, TMessageInformation info); - virtual ~IMessage() = default; + virtual ~TMessageBase() = default; virtual const TString& GetData() const; - //! Partition session. Same as in batch. - const TPartitionSession::TPtr& GetPartitionSession() const; - virtual void Commit() = 0; - TString DebugString(bool printData = false) const; - virtual void DebugString(TStringBuilder& ret, bool printData = false) const = 0; - - protected: - TString Data; - - TPartitionSession::TPtr PartitionSession; - }; - - //! Single message. - struct TMessage: public IMessage { - TMessage(const TString& data, std::exception_ptr decompressionException, - const TMessageInformation& information, TPartitionSession::TPtr partitionSession); - - //! User data. - //! Throws decompressor exception if decompression failed. - const TString& GetData() const override; - - bool HasException() const; - //! Message offset. ui64 GetOffset() const; @@ -822,19 +831,37 @@ struct TReadSessionEvent { //! Message level meta info. const TMessageMeta::TPtr& GetMessageMeta() const; + protected: + TString Data; + TMessageInformation Information; + }; + + //! Single message. + struct TMessage: public TMessageBase, public TPartitionSessionAccessor, public TPrintable<TMessage> { + using TPrintable<TMessage>::DebugString; + + TMessage(const TString& data, std::exception_ptr decompressionException, TMessageInformation information, + TPartitionSession::TPtr partitionSession); + + //! User data. + //! Throws decompressor exception if decompression failed. + const TString& GetData() const override; + //! Commits single message. void Commit() override; - using IMessage::DebugString; - void DebugString(TStringBuilder& ret, bool printData = false) const override; + bool HasException() const; private: std::exception_ptr DecompressionException; - TMessageInformation Information; }; - struct TCompressedMessage: public IMessage { - TCompressedMessage(ECodec codec, const TString& data, const TMessageInformation& information, + struct TCompressedMessage: public TMessageBase, + public TPartitionSessionAccessor, + public TPrintable<TCompressedMessage> { + using TPrintable<TCompressedMessage>::DebugString; + + TCompressedMessage(ECodec codec, const TString& data, TMessageInformation information, TPartitionSession::TPtr partitionSession); virtual ~TCompressedMessage() { @@ -843,53 +870,20 @@ struct TReadSessionEvent { //! Message codec ECodec GetCodec() const; - //! Message offset. - ui64 GetOffset() const; - - //! Producer id - const TString& GetProducerId() const; - - //! Message group id. - const TString& GetMessageGroupId() const; - - //! Sequence number. - ui64 GetSeqNo() const; - - //! Message creation timestamp. - TInstant GetCreateTime() const; - - //! Message write timestamp. - TInstant GetWriteTime() const; - - //! Metainfo. - const TWriteSessionMeta::TPtr& GetMeta() const; - - //! Message level meta info. - const TMessageMeta::TPtr& GetMessageMeta() const; - //! Uncompressed size. ui64 GetUncompressedSize() const; //! Commits all offsets in compressed message. void Commit() override; - using IMessage::DebugString; - void DebugString(TStringBuilder& ret, bool printData = false) const override; - private: ECodec Codec; - TMessageInformation Information; }; public: TDataReceivedEvent(TVector<TMessage> messages, TVector<TCompressedMessage> compressedMessages, TPartitionSession::TPtr partitionSession); - //! Partition session. - const TPartitionSession::TPtr& GetPartitionSession() const { - return PartitionSession; - } - bool HasCompressedMessages() const { return !CompressedMessages.empty(); } @@ -923,8 +917,6 @@ struct TReadSessionEvent { //! Commits all messages in batch. void Commit(); - TString DebugString(bool printData = false) const; - private: void CheckMessagesFilled(bool compressed) const { Y_VERIFY(!Messages.empty() || !CompressedMessages.empty()); @@ -939,19 +931,14 @@ struct TReadSessionEvent { private: TVector<TMessage> Messages; TVector<TCompressedMessage> CompressedMessages; - TPartitionSession::TPtr PartitionSession; std::vector<std::pair<ui64, ui64>> OffsetRanges; }; //! Acknowledgement for commit request. - struct TCommitOffsetAcknowledgementEvent { + struct TCommitOffsetAcknowledgementEvent: public TPartitionSessionAccessor, + public TPrintable<TCommitOffsetAcknowledgementEvent> { TCommitOffsetAcknowledgementEvent(TPartitionSession::TPtr partitionSession, ui64 committedOffset); - //! Partition session. - const TPartitionSession::TPtr& GetPartitionSession() const { - return PartitionSession; - } - //! Committed offset. //! This means that from now the first available //! message offset in current partition @@ -961,21 +948,15 @@ struct TReadSessionEvent { return CommittedOffset; } - TString DebugString() const; - private: - TPartitionSession::TPtr PartitionSession; ui64 CommittedOffset; }; //! Server command for creating and starting partition session. - struct TStartPartitionSessionEvent { + struct TStartPartitionSessionEvent: public TPartitionSessionAccessor, + public TPrintable<TStartPartitionSessionEvent> { explicit TStartPartitionSessionEvent(TPartitionSession::TPtr, ui64 committedOffset, ui64 endOffset); - const TPartitionSession::TPtr& GetPartitionSession() const { - return PartitionSession; - } - //! Current committed offset in partition session. ui64 GetCommittedOffset() const { return CommittedOffset; @@ -991,10 +972,7 @@ struct TReadSessionEvent { //! If maybe is empty then no rewinding void Confirm(TMaybe<ui64> readOffset = Nothing(), TMaybe<ui64> commitOffset = Nothing()); - TString DebugString() const; - private: - TPartitionSession::TPtr PartitionSession; ui64 CommittedOffset; ui64 EndOffset; }; @@ -1002,13 +980,9 @@ struct TReadSessionEvent { //! Server command for stopping and destroying partition session. //! Server can destroy partition session gracefully //! for rebalancing among all topic clients. - struct TStopPartitionSessionEvent { + struct TStopPartitionSessionEvent: public TPartitionSessionAccessor, public TPrintable<TStopPartitionSessionEvent> { TStopPartitionSessionEvent(TPartitionSession::TPtr partitionSession, bool committedOffset); - const TPartitionSession::TPtr& GetPartitionSession() const { - return PartitionSession; - } - //! Last offset of the partition session that was committed. ui64 GetCommittedOffset() const { return CommittedOffset; @@ -1018,22 +992,16 @@ struct TReadSessionEvent { //! Confirm has no effect if TPartitionSessionClosedEvent for same partition session with is received. void Confirm(); - TString DebugString() const; - private: - TPartitionSession::TPtr PartitionSession; ui64 CommittedOffset; }; //! Status for partition session requested via TPartitionSession::RequestStatus() - struct TPartitionSessionStatusEvent { + struct TPartitionSessionStatusEvent: public TPartitionSessionAccessor, + public TPrintable<TPartitionSessionStatusEvent> { TPartitionSessionStatusEvent(TPartitionSession::TPtr partitionSession, ui64 committedOffset, ui64 readOffset, ui64 endOffset, TInstant writeTimeHighWatermark); - const TPartitionSession::TPtr& GetPartitionSession() const { - return PartitionSession; - } - //! Committed offset. ui64 GetCommittedOffset() const { return CommittedOffset; @@ -1055,10 +1023,7 @@ struct TReadSessionEvent { return WriteTimeHighWatermark; } - TString DebugString() const; - private: - TPartitionSession::TPtr PartitionSession; ui64 CommittedOffset = 0; ui64 ReadOffset = 0; ui64 EndOffset = 0; @@ -1069,7 +1034,8 @@ struct TReadSessionEvent { //! partition session death. //! This could be after graceful stop of partition session //! or when connection with partition was lost. - struct TPartitionSessionClosedEvent { + struct TPartitionSessionClosedEvent: public TPartitionSessionAccessor, + public TPrintable<TPartitionSessionClosedEvent> { enum class EReason { StopConfirmedByUser, Lost, @@ -1079,18 +1045,11 @@ struct TReadSessionEvent { public: TPartitionSessionClosedEvent(TPartitionSession::TPtr partitionSession, EReason reason); - const TPartitionSession::TPtr& GetPartitionSession() const { - return PartitionSession; - } - EReason GetReason() const { return Reason; } - TString DebugString() const; - private: - TPartitionSession::TPtr PartitionSession; EReason Reason; }; @@ -1136,7 +1095,28 @@ private: THolder<TImpl> Impl; }; -//! Event debug string. +//! Events debug strings. +template<> +void TPrintable<TReadSessionEvent::TDataReceivedEvent::TMessageBase>::DebugString(TStringBuilder& ret, bool printData) const; +template<> +void TPrintable<TReadSessionEvent::TDataReceivedEvent::TMessage>::DebugString(TStringBuilder& ret, bool printData) const; +template<> +void TPrintable<TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>::DebugString(TStringBuilder& ret, bool printData) const; +template<> +void TPrintable<TReadSessionEvent::TDataReceivedEvent>::DebugString(TStringBuilder& ret, bool printData) const; +template<> +void TPrintable<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>::DebugString(TStringBuilder& ret, bool printData) const; +template<> +void TPrintable<TReadSessionEvent::TStartPartitionSessionEvent>::DebugString(TStringBuilder& ret, bool printData) const; +template<> +void TPrintable<TReadSessionEvent::TStopPartitionSessionEvent>::DebugString(TStringBuilder& ret, bool printData) const; +template<> +void TPrintable<TReadSessionEvent::TPartitionSessionStatusEvent>::DebugString(TStringBuilder& ret, bool printData) const; +template<> +void TPrintable<TReadSessionEvent::TPartitionSessionClosedEvent>::DebugString(TStringBuilder& ret, bool printData) const; +template<> +void TPrintable<TSessionClosedEvent>::DebugString(TStringBuilder& ret, bool printData) const; + TString DebugString(const TReadSessionEvent::TEvent& event); //! Retry policy. @@ -1233,28 +1213,27 @@ struct TWriteSessionEvent { }; - struct TAcksEvent { + struct TAcksEvent : public TPrintable<TAcksEvent> { //! Acks could be batched from several Write requests. //! They are provided to client as soon as possible. TVector<TWriteAck> Acks; - - TString DebugString() const; - }; //! Indicates that a writer is ready to accept new message(s). //! Continuation token should be kept and then used in write methods. - struct TReadyToAcceptEvent { + struct TReadyToAcceptEvent : public TPrintable<TReadyToAcceptEvent> { TContinuationToken ContinuationToken; - - TString DebugString() const; - }; using TEvent = std::variant<TAcksEvent, TReadyToAcceptEvent, TSessionClosedEvent>; }; -//! Event debug string. +//! Events debug strings. +template<> +void TPrintable<TWriteSessionEvent::TAcksEvent>::DebugString(TStringBuilder& ret, bool printData) const; +template<> +void TPrintable<TWriteSessionEvent::TReadyToAcceptEvent>::DebugString(TStringBuilder& ret, bool printData) const; + TString DebugString(const TWriteSessionEvent::TEvent& event); using TSessionClosedHandler = std::function<void(const TSessionClosedEvent&)>; |