aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2023-08-01 10:30:59 +0300
committerildar-khisam <ikhis@ydb.tech>2023-08-01 10:30:59 +0300
commit33c36722183fe4f3d29d7dbd741e7f53c441ed0b (patch)
tree8c29c67778e2e9ff60db03f76bc17d5deb86a099
parente4a0e9c847adf0267248aadfddf5f5095489fd11 (diff)
downloadydb-33c36722183fe4f3d29d7dbd741e7f53c441ed0b.tar.gz
federated sdk work in progress
work in progress
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h22
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp423
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp28
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/topic.h227
5 files changed, 349 insertions, 357 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
index 2f75531305..d526e60845 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
@@ -17,13 +17,16 @@ protected:
TString LocalDC = "dc1";
TTestServer Server;
TLog Log = TLog("cerr");
+ size_t TopicPartitionsCount = 1;
public:
SDKTestSetup(const TString& testCaseName, bool start = true,
- const TVector<NKikimrServices::EServiceKikimr>& logServices = TTestServer::LOGGED_SERVICES, NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG)
- : TestCaseName(testCaseName)
+ const TVector<NKikimrServices::EServiceKikimr>& logServices = TTestServer::LOGGED_SERVICES,
+ NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG,
+ size_t topicPartitionsCount = 1)
+ : TestCaseName(testCaseName)
, Server(false, Nothing(), logServices, logPriority)
- {
+ , TopicPartitionsCount(topicPartitionsCount) {
InitOptions();
if (start) {
Start();
@@ -76,7 +79,7 @@ public:
Server.AnnoyingClient->CheckClustersList(Server.CleverServer->GetRuntime(), true, DataCenters);
}
Server.AnnoyingClient->InitSourceIds();
- CreateTopic(GetTestTopic(), GetLocalCluster());
+ CreateTopic(GetTestTopic(), GetLocalCluster(), TopicPartitionsCount);
if (waitInit) {
Server.WaitInit(GetTestTopic());
}
@@ -107,6 +110,17 @@ public:
return Server.GrpcPort;
}
+ TSimpleSharedPtr<TPortManager> GetPortManager() {
+ return Server.PortManager;
+ }
+
+ std::unique_ptr<grpc::Server> StartGrpcService(const ui16 port, grpc::Service* service) {
+ grpc::ServerBuilder builder;
+ builder.AddListeningPort("[::]:" + ToString(port), grpc::InsecureServerCredentials()).RegisterService(service);
+ std::unique_ptr<grpc::Server> grpcServer(builder.BuildAndStart());
+ return grpcServer;
+ }
+
NGrpc::TServerOptions& GetGrpcServerOptions() {
return Server.GrpcServerOptions;
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h
index 32fba77053..80732d1809 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h
@@ -17,8 +17,8 @@ class TPersQueueYdbSdkTestSetup : public ::NPersQueue::SDKTestSetup {
TAdaptiveLock Lock;
public:
TPersQueueYdbSdkTestSetup(const TString& testCaseName, bool start = true,
- const TVector<NKikimrServices::EServiceKikimr>& logServices = ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG)
- : SDKTestSetup(testCaseName, start, logServices, logPriority)
+ const TVector<NKikimrServices::EServiceKikimr>& logServices = ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG, size_t topicPartitionsCount = 1)
+ : SDKTestSetup(testCaseName, start, logServices, logPriority, topicPartitionsCount)
{
}
@@ -231,7 +231,7 @@ struct TYdbPqTestRetryPolicy : IRetryPolicy {
if (AtomicSwap(&OnFatalBreakDown, 0)) {
return std::make_unique<TYdbPqNoRetryState>();
}
- if (AtomicGet(Initialized_))
+ if (AtomicGet(Initialized_))
{
Cerr << "====CreateRetryState Initialized\n";
auto res = AtomicSwap(&OnBreakDown, 0);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp
index 3bf8719fde..e78301e7fc 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp
@@ -4,9 +4,23 @@
namespace NYdb::NTopic {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// Aliases for event types
+
+using TDataReceivedEvent = TReadSessionEvent::TDataReceivedEvent;
+using TMessageInformation = TDataReceivedEvent::TMessageInformation;
+using TMessageBase = TDataReceivedEvent::TMessageBase;
+using TMessage = TDataReceivedEvent::TMessage;
+using TCompressedMessage = TDataReceivedEvent::TCompressedMessage;
+using TCommitOffsetAcknowledgementEvent = TReadSessionEvent::TCommitOffsetAcknowledgementEvent;
+using TStartPartitionSessionEvent = TReadSessionEvent::TStartPartitionSessionEvent;
+using TStopPartitionSessionEvent = TReadSessionEvent::TStopPartitionSessionEvent;
+using TPartitionSessionStatusEvent = TReadSessionEvent::TPartitionSessionStatusEvent;
+using TPartitionSessionClosedEvent = TReadSessionEvent::TPartitionSessionClosedEvent;
+
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Helpers
-std::pair<ui64, ui64> GetMessageOffsetRange(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent, ui64 index) {
+std::pair<ui64, ui64> GetMessageOffsetRange(const TDataReceivedEvent& dataReceivedEvent, ui64 index) {
if (dataReceivedEvent.HasCompressedMessages()) {
const auto& msg = dataReceivedEvent.GetCompressedMessages()[index];
return {msg.GetOffset(), msg.GetOffset() + 1};
@@ -18,7 +32,7 @@ std::pair<ui64, ui64> GetMessageOffsetRange(const TReadSessionEvent::TDataReceiv
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation
-TReadSessionEvent::TDataReceivedEvent::TMessageInformation::TMessageInformation(
+TMessageInformation::TMessageInformation(
ui64 offset,
TString producerId,
ui64 seqNo,
@@ -40,218 +54,186 @@ TReadSessionEvent::TDataReceivedEvent::TMessageInformation::TMessageInformation(
, MessageGroupId(messageGroupId)
{}
-static void DebugStringImpl(const TReadSessionEvent::TDataReceivedEvent::TMessageInformation& info, TStringBuilder& ret) {
- ret << " Information: {"
- << " Offset: " << info.Offset
- << " ProducerId: \"" << info.ProducerId << "\""
- << " SeqNo: " << info.SeqNo
- << " CreateTime: " << info.CreateTime
- << " WriteTime: " << info.WriteTime
- << " UncompressedSize: " << info.UncompressedSize
- << " MessageGroupId: \"" << info.MessageGroupId << "\"";
- ret << " Meta: {";
- bool firstKey = true;
- for (const auto& [k, v] : info.Meta->Fields) {
- ret << (firstKey ? " \"" : ", \"") << k << "\": \"" << v << "\"";
- firstKey = false;
- }
- ret << " } }";
-}
-
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-// NTopic::TReadSessionEvent::TDataReceivedEvent::IMessage
+// NTopic::TReadSessionEvent::TDataReceivedEvent::TPartitionSessionAccessor
-TReadSessionEvent::TDataReceivedEvent::IMessage::IMessage(const TString& data,
- TPartitionSession::TPtr partitionSession)
- : Data(data)
- , PartitionSession(partitionSession)
+TReadSessionEvent::TPartitionSessionAccessor::TPartitionSessionAccessor(TPartitionSession::TPtr partitionSession)
+ : PartitionSession(std::move(partitionSession))
{}
-const TString& TReadSessionEvent::TDataReceivedEvent::IMessage::GetData() const {
- return Data;
-}
-
-const TPartitionSession::TPtr& TReadSessionEvent::TDataReceivedEvent::IMessage::GetPartitionSession() const {
+const TPartitionSession::TPtr& TReadSessionEvent::TPartitionSessionAccessor::GetPartitionSession() const {
return PartitionSession;
}
-TString TReadSessionEvent::TDataReceivedEvent::IMessage::DebugString(bool printData) const {
- TStringBuilder ret;
- DebugString(ret, printData);
- return std::move(ret);
-}
-
-template <class TSerializeInformationFunc>
-static void DebugStringImpl(TStringBuilder& ret,
- const TString& name,
- const TReadSessionEvent::TDataReceivedEvent::IMessage& msg,
- bool printData,
- TSerializeInformationFunc serializeInformationFunc,
- std::optional<ECodec> codec = std::nullopt)
-{
- ret << name << " {";
- try {
- const TString& data = msg.GetData();
- if (printData) {
- ret << " Data: \"" << data << "\"";
- } else {
- ret << " Data: .." << data.size() << " bytes..";
- }
- } catch (...) {
- ret << " DataDecompressionError: \"" << CurrentExceptionMessage() << "\"";
- }
- auto partitionSession = msg.GetPartitionSession();
- ret << " Partition session id: " << partitionSession->GetPartitionSessionId()
- << " Topic: \"" << partitionSession->GetTopicPath() << "\""
- << " Partition: " << partitionSession->GetPartitionId();
- if (codec) {
- ret << " Codec: " << codec.value();
- }
- serializeInformationFunc(ret);
- ret << " }";
+template<>
+void TPrintable<TPartitionSession>::DebugString(TStringBuilder& res, bool) const {
+ const auto* self = static_cast<const TPartitionSession*>(this);
+ res << " Partition session id: " << self->GetPartitionSessionId()
+ << " Topic: \"" << self->GetTopicPath() << "\""
+ << " Partition: " << self->GetPartitionId();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-// NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage
+// NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageBase
-TReadSessionEvent::TDataReceivedEvent::TMessage::TMessage(const TString& data,
- std::exception_ptr decompressionException,
- const TMessageInformation& information,
- TPartitionSession::TPtr partitionSession)
- : IMessage(data, partitionSession)
- , DecompressionException(std::move(decompressionException))
- , Information(information)
-{
-}
-
-const TString& TReadSessionEvent::TDataReceivedEvent::TMessage::GetData() const {
- if (DecompressionException) {
- std::rethrow_exception(DecompressionException);
- }
- return IMessage::GetData();
-}
+TMessageBase::TMessageBase(const TString& data, TMessageInformation info)
+ : Data(data)
+ , Information(std::move(info))
+{}
-bool TReadSessionEvent::TDataReceivedEvent::TMessage::HasException() const {
- return DecompressionException != nullptr;
+const TString& TMessageBase::GetData() const {
+ return Data;
}
-ui64 TReadSessionEvent::TDataReceivedEvent::TMessage::GetOffset() const {
+ui64 TMessageBase::GetOffset() const {
return Information.Offset;
}
-const TString& TReadSessionEvent::TDataReceivedEvent::TMessage::GetProducerId() const {
+const TString& TMessageBase::GetProducerId() const {
return Information.ProducerId;
}
-const TString& TReadSessionEvent::TDataReceivedEvent::TMessage::GetMessageGroupId() const {
+const TString& TMessageBase::GetMessageGroupId() const {
return Information.MessageGroupId;
}
-ui64 TReadSessionEvent::TDataReceivedEvent::TMessage::GetSeqNo() const {
+ui64 TMessageBase::GetSeqNo() const {
return Information.SeqNo;
}
-TInstant TReadSessionEvent::TDataReceivedEvent::TMessage::GetCreateTime() const {
+TInstant TMessageBase::GetCreateTime() const {
return Information.CreateTime;
}
-TInstant TReadSessionEvent::TDataReceivedEvent::TMessage::GetWriteTime() const {
+TInstant TMessageBase::GetWriteTime() const {
return Information.WriteTime;
}
-const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TMessage::GetMeta() const {
+const TWriteSessionMeta::TPtr& TMessageBase::GetMeta() const {
return Information.Meta;
}
-const TMessageMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TMessage::GetMessageMeta() const {
+const TMessageMeta::TPtr& TMessageBase::GetMessageMeta() const {
return Information.MessageMeta;
}
-void TReadSessionEvent::TDataReceivedEvent::TMessage::Commit() {
- static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get())
- ->Commit(Information.Offset, Information.Offset + 1);
-}
-
-void TReadSessionEvent::TDataReceivedEvent::TMessage::DebugString(TStringBuilder& ret, bool printData) const {
- DebugStringImpl(ret, "Message", *this, printData, [this](TStringBuilder& ret) {
- DebugStringImpl(this->Information, ret);
- });
+template<>
+void TPrintable<TMessageBase>::DebugString(TStringBuilder& ret, bool printData) const {
+ const auto* self = static_cast<const TMessageBase*>(this);
+ try {
+ const TString& data = self->GetData();
+ if (printData) {
+ ret << " Data: \"" << data << "\"";
+ } else {
+ ret << " Data: .." << data.size() << " bytes..";
+ }
+ } catch (...) {
+ ret << " DataDecompressionError: \"" << CurrentExceptionMessage() << "\"";
+ }
+ ret << " Information: {"
+ << " Offset: " << self->GetOffset()
+ << " ProducerId: \"" << self->GetProducerId() << "\""
+ << " SeqNo: " << self->GetSeqNo()
+ << " CreateTime: " << self->GetCreateTime()
+ << " WriteTime: " << self->GetWriteTime()
+ << " MessageGroupId: \"" << self->GetMessageGroupId() << "\"";
+ ret << " Meta: {";
+ bool firstKey = true;
+ for (const auto& [k, v] : self->GetMeta()->Fields) {
+ ret << (firstKey ? " \"" : ", \"") << k << "\": \"" << v << "\"";
+ firstKey = false;
+ }
+ ret << " }";
+ ret << " MessageMeta: {";
+ firstKey = true;
+ for (const auto& [k, v] : self->GetMessageMeta()->Fields) {
+ ret << (firstKey ? " \"" : ", \"") << k << "\": \"" << v << "\"";
+ firstKey = false;
+ }
+ ret << " } }";
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-// NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage
-
-TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::TCompressedMessage(ECodec codec,
- const TString& data,
- const TMessageInformation& information,
- TPartitionSession::TPtr partitionSession)
- : IMessage(data, partitionSession)
- , Codec(codec)
- , Information(information)
-{}
-
-
-ECodec TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetCodec() const {
- return Codec;
-}
+// NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage
-ui64 TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetOffset() const {
- return Information.Offset;
+TMessage::TMessage(const TString& data,
+ std::exception_ptr decompressionException,
+ TMessageInformation information,
+ TPartitionSession::TPtr partitionSession)
+ : TMessageBase(data, std::move(information))
+ , TPartitionSessionAccessor(std::move(partitionSession))
+ , DecompressionException(std::move(decompressionException)) {
}
-const TString& TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetProducerId() const {
- return Information.ProducerId;
+const TString& TMessage::GetData() const {
+ if (DecompressionException) {
+ std::rethrow_exception(DecompressionException);
+ }
+ return TMessageBase::GetData();
}
-const TString& TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetMessageGroupId() const {
- return Information.MessageGroupId;
+bool TMessage::HasException() const {
+ return DecompressionException != nullptr;
}
-ui64 TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetSeqNo() const {
- return Information.SeqNo;
+void TMessage::Commit() {
+ static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get())
+ ->Commit(Information.Offset, Information.Offset + 1);
}
-TInstant TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetCreateTime() const {
- return Information.CreateTime;
+template<>
+void TPrintable<TMessage>::DebugString(TStringBuilder& ret, bool printData) const {
+ const auto* self = static_cast<const TMessage*>(this);
+ ret << "Message {";
+ static_cast<const TMessageBase*>(self)->DebugString(ret, printData);
+ self->GetPartitionSession()->DebugString(ret);
+ ret << " }";
}
-TInstant TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetWriteTime() const {
- return Information.WriteTime;
-}
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage
-const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetMeta() const {
- return Information.Meta;
+TCompressedMessage::TCompressedMessage(ECodec codec,
+ const TString& data,
+ TMessageInformation information,
+ TPartitionSession::TPtr partitionSession)
+ : TMessageBase(data, std::move(information))
+ , TPartitionSessionAccessor(std::move(partitionSession))
+ , Codec(codec) {
}
-const TMessageMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetMessageMeta() const {
- return Information.MessageMeta;
+ECodec TCompressedMessage::GetCodec() const {
+ return Codec;
}
-ui64 TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetUncompressedSize() const {
+ui64 TCompressedMessage::GetUncompressedSize() const {
return Information.UncompressedSize;
}
-void TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::Commit() {
+void TCompressedMessage::Commit() {
static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get())
->Commit(Information.Offset, Information.Offset + 1);
}
-void TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::DebugString(TStringBuilder& ret, bool printData) const {
- DebugStringImpl(
- ret, "CompressedMessage", *this, printData,
- [this](TStringBuilder& ret) { DebugStringImpl(this->Information, ret); }, Codec);
+template<>
+void TPrintable<TCompressedMessage>::DebugString(TStringBuilder& ret, bool printData) const {
+ const auto* self = static_cast<const TCompressedMessage*>(this);
+ ret << "CompressedMessage {";
+ static_cast<const TMessageBase*>(self)->DebugString(ret, printData);
+ self->GetPartitionSession()->DebugString(ret);
+ ret << " Codec: " << self->GetCodec()
+ << " Uncompressed size: " << self->GetUncompressedSize()
+ << " }";
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// NTopic::TReadSessionEvent::TDataReceivedEvent
-TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(TVector<TMessage> messages,
- TVector<TCompressedMessage> compressedMessages,
- TPartitionSession::TPtr partitionSession)
- : Messages(std::move(messages))
- , CompressedMessages(std::move(compressedMessages))
- , PartitionSession(std::move(partitionSession))
-{
+TDataReceivedEvent::TDataReceivedEvent(TVector<TMessage> messages, TVector<TCompressedMessage> compressedMessages,
+ TPartitionSession::TPtr partitionSession)
+ : TPartitionSessionAccessor(std::move(partitionSession))
+ , Messages(std::move(messages))
+ , CompressedMessages(std::move(compressedMessages)) {
for (size_t i = 0; i < GetMessagesCount(); ++i) {
auto [from, to] = GetMessageOffsetRange(*this, i);
if (OffsetRanges.empty() || OffsetRanges.back().second != from) {
@@ -262,139 +244,156 @@ TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(TVector<TMessage> mess
}
}
-void TReadSessionEvent::TDataReceivedEvent::Commit() {
+void TDataReceivedEvent::Commit() {
for (auto [from, to] : OffsetRanges) {
static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get())->Commit(from, to);
}
}
-TString TReadSessionEvent::TDataReceivedEvent::DebugString(bool printData) const {
- TStringBuilder ret;
- ret << "DataReceived { PartitionSessionId: " << GetPartitionSession()->GetPartitionSessionId()
- << " PartitionId: " << GetPartitionSession()->GetPartitionId();
- for (const auto& message : Messages) {
- ret << " ";
- message.DebugString(ret, printData);
- }
- for (const auto& message : CompressedMessages) {
- ret << " ";
- message.DebugString(ret, printData);
+template<>
+void TPrintable<TDataReceivedEvent>::DebugString(TStringBuilder& ret, bool printData) const {
+ const auto* self = static_cast<const TDataReceivedEvent*>(this);
+ ret << "DataReceived {";
+ self->GetPartitionSession()->DebugString(ret);
+ if (self->HasCompressedMessages()) {
+ for (const auto& message : self->GetCompressedMessages()) {
+ ret << " ";
+ message.DebugString(ret, printData);
+ }
+ } else {
+ for (const auto& message : self->GetMessages()) {
+ ret << " ";
+ message.DebugString(ret, printData);
+ }
}
ret << " }";
- return std::move(ret);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent
-TReadSessionEvent::TCommitOffsetAcknowledgementEvent::TCommitOffsetAcknowledgementEvent(TPartitionSession::TPtr partitionSession, ui64 committedOffset)
- : PartitionSession(std::move(partitionSession))
- , CommittedOffset(committedOffset)
-{
+TCommitOffsetAcknowledgementEvent::TCommitOffsetAcknowledgementEvent(TPartitionSession::TPtr partitionSession,
+ ui64 committedOffset)
+ : TPartitionSessionAccessor(std::move(partitionSession))
+ , CommittedOffset(committedOffset) {
}
-
-TString TReadSessionEvent::TCommitOffsetAcknowledgementEvent::DebugString() const {
- return TStringBuilder() << "CommitAcknowledgement { PartitionSessionId: " << GetPartitionSession()->GetPartitionSessionId()
- << " PartitionId: " << GetPartitionSession()->GetPartitionId()
- << " CommittedOffset: " << GetCommittedOffset()
- << " }";
+template<>
+void TPrintable<TCommitOffsetAcknowledgementEvent>::DebugString(TStringBuilder& ret, bool) const {
+ const auto* self = static_cast<const TCommitOffsetAcknowledgementEvent*>(this);
+ ret << "CommitAcknowledgement {";
+ self->GetPartitionSession()->DebugString(ret);
+ ret << " CommittedOffset: " << self->GetCommittedOffset()
+ << " }";
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// NTopic::TReadSessionEvent::TStartPartitionSessionEvent
-TReadSessionEvent::TStartPartitionSessionEvent::TStartPartitionSessionEvent(TPartitionSession::TPtr partitionSession,
- ui64 committedOffset, ui64 endOffset)
- : PartitionSession(std::move(partitionSession))
+TStartPartitionSessionEvent::TStartPartitionSessionEvent(TPartitionSession::TPtr partitionSession, ui64 committedOffset,
+ ui64 endOffset)
+ : TPartitionSessionAccessor(std::move(partitionSession))
, CommittedOffset(committedOffset)
, EndOffset(endOffset) {
}
-void TReadSessionEvent::TStartPartitionSessionEvent::Confirm(TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset) {
+void TStartPartitionSessionEvent::Confirm(TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset) {
if (PartitionSession) {
static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get())
->ConfirmCreate(readOffset, commitOffset);
}
}
-TString TReadSessionEvent::TStartPartitionSessionEvent::DebugString() const {
- return TStringBuilder() << "CreatePartitionSession { PartitionSessionId: "
- << GetPartitionSession()->GetPartitionSessionId()
- << " TopicPath: " << GetPartitionSession()->GetTopicPath()
- << " PartitionId: " << GetPartitionSession()->GetPartitionId()
- << " CommittedOffset: " << GetCommittedOffset()
- << " EndOffset: " << GetEndOffset() << " }";
+template<>
+void TPrintable<TStartPartitionSessionEvent>::DebugString(TStringBuilder& ret, bool) const {
+ const auto* self = static_cast<const TStartPartitionSessionEvent*>(this);
+ ret << "StartPartitionSession {";
+ self->GetPartitionSession()->DebugString(ret);
+ ret << " CommittedOffset: " << self->GetCommittedOffset()
+ << " EndOffset: " << self->GetEndOffset()
+ << " }";
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// NTopic::TReadSessionEvent::TStopPartitionSessionEvent
-TReadSessionEvent::TStopPartitionSessionEvent::TStopPartitionSessionEvent(TPartitionSession::TPtr partitionSession,
- bool committedOffset)
- : PartitionSession(std::move(partitionSession))
+TStopPartitionSessionEvent::TStopPartitionSessionEvent(TPartitionSession::TPtr partitionSession, bool committedOffset)
+ : TPartitionSessionAccessor(std::move(partitionSession))
, CommittedOffset(committedOffset) {
}
-void TReadSessionEvent::TStopPartitionSessionEvent::Confirm() {
+void TStopPartitionSessionEvent::Confirm() {
if (PartitionSession) {
static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get())->ConfirmDestroy();
}
}
-TString TReadSessionEvent::TStopPartitionSessionEvent::DebugString() const {
- return TStringBuilder() << "DestroyPartitionSession { PartitionSessionId: "
- << GetPartitionSession()->GetPartitionSessionId()
- << " PartitionId: " << GetPartitionSession()->GetPartitionId()
- << " CommittedOffset: " << GetCommittedOffset() << " }";
+template<>
+void TPrintable<TStopPartitionSessionEvent>::DebugString(TStringBuilder& ret, bool) const {
+ const auto* self = static_cast<const TStopPartitionSessionEvent*>(this);
+ ret << "StopPartitionSession {";
+ self->GetPartitionSession()->DebugString(ret);
+ ret << " CommittedOffset: " << self->GetCommittedOffset()
+ << " }";
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// NTopic::TReadSessionEvent::TPartitionSessionStatusEvent
-TReadSessionEvent::TPartitionSessionStatusEvent::TPartitionSessionStatusEvent(TPartitionSession::TPtr partitionSession,
- ui64 committedOffset, ui64 readOffset,
- ui64 endOffset,
- TInstant writeTimeHighWatermark)
- : PartitionSession(std::move(partitionSession))
+TPartitionSessionStatusEvent::TPartitionSessionStatusEvent(TPartitionSession::TPtr partitionSession,
+ ui64 committedOffset, ui64 readOffset, ui64 endOffset,
+ TInstant writeTimeHighWatermark)
+ : TPartitionSessionAccessor(std::move(partitionSession))
, CommittedOffset(committedOffset)
, ReadOffset(readOffset)
, EndOffset(endOffset)
, WriteTimeHighWatermark(writeTimeHighWatermark) {
}
-TString TReadSessionEvent::TPartitionSessionStatusEvent::DebugString() const {
- return TStringBuilder() << "PartitionSessionStatus { PartitionSessionId: "
- << GetPartitionSession()->GetPartitionSessionId()
- << " PartitionId: " << GetPartitionSession()->GetPartitionId()
- << " CommittedOffset: " << GetCommittedOffset() << " ReadOffset: " << GetReadOffset()
- << " EndOffset: " << GetEndOffset()
- << " WriteWatermark: " << GetWriteTimeHighWatermark() << " }";
+template<>
+void TPrintable<TPartitionSessionStatusEvent>::DebugString(TStringBuilder& ret, bool) const {
+ const auto* self = static_cast<const TPartitionSessionStatusEvent*>(this);
+ ret << "PartitionSessionStatus {";
+ self->GetPartitionSession()->DebugString(ret);
+ ret << " CommittedOffset: " << self->GetCommittedOffset()
+ << " ReadOffset: " << self->GetReadOffset()
+ << " EndOffset: " << self->GetEndOffset()
+ << " WriteWatermark: " << self->GetWriteTimeHighWatermark()
+ << " }";
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// NTopic::TReadSessionEvent::TPartitionSessionClosedEvent
-TReadSessionEvent::TPartitionSessionClosedEvent::TPartitionSessionClosedEvent(TPartitionSession::TPtr partitionSession, EReason reason)
- : PartitionSession(std::move(partitionSession))
+TPartitionSessionClosedEvent::TPartitionSessionClosedEvent(TPartitionSession::TPtr partitionSession, EReason reason)
+ : TPartitionSessionAccessor(std::move(partitionSession))
, Reason(reason)
{
}
-TString TReadSessionEvent::TPartitionSessionClosedEvent::DebugString() const {
- return TStringBuilder() << "PartitionSessionClosed { PartitionSessionId: "
- << GetPartitionSession()->GetPartitionSessionId()
- << " PartitionId: " << GetPartitionSession()->GetPartitionId()
- << " Reason: " << GetReason() << " }";
+template<>
+void TPrintable<TPartitionSessionClosedEvent>::DebugString(TStringBuilder& ret, bool) const {
+ const auto* self = static_cast<const TPartitionSessionClosedEvent*>(this);
+ ret << "PartitionSessionClosed {";
+ self->GetPartitionSession()->DebugString(ret);
+ ret << " Reason: " << self->GetReason()
+ << " }";
}
-TString TSessionClosedEvent::DebugString() const {
- return
- TStringBuilder() << "SessionClosed { Status: " << GetStatus()
- << " Issues: \"" << NPersQueue::IssuesSingleLineString(GetIssues())
- << "\" }";
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// NTopic::TSessionClosedEvent
+
+template<>
+void TPrintable<TSessionClosedEvent>::DebugString(TStringBuilder& ret, bool) const {
+ const auto* self = static_cast<const TSessionClosedEvent*>(this);
+ ret << "SessionClosed { Status: " << self->GetStatus()
+ << " Issues: \"" << NPersQueue::IssuesSingleLineString(self->GetIssues())
+ << "\" }";
}
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// NTopic::TReadSessionEvent
+
TString DebugString(const TReadSessionEvent::TEvent& event) {
return std::visit([](const auto& ev) { return ev.DebugString(); }, event);
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
index 3fb36f63da..0cf98e4212 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
@@ -258,7 +258,7 @@ void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& mess
readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk;
}
if (readyToAccept) {
- EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}});
}
}
@@ -440,7 +440,7 @@ void TWriteSessionImpl::InitImpl() {
auto* init = req.mutable_init_request();
init->set_path(Settings.Path_);
init->set_producer_id(Settings.ProducerId_);
-
+
if (Settings.PartitionId_.Defined())
init->set_partition_id(*Settings.PartitionId_);
else
@@ -561,32 +561,32 @@ TStringBuilder TWriteSessionImpl::LogPrefix() const {
return TStringBuilder() << "ProducerId [" << Settings.ProducerId_ << "] MessageGroupId [" << Settings.MessageGroupId_ << "] SessionId [" << SessionId << "] ";
}
-TString TWriteSessionEvent::TAcksEvent::DebugString() const {
- TStringBuilder res;
+template<>
+void TPrintable<TWriteSessionEvent::TAcksEvent>::DebugString(TStringBuilder& res, bool) const {
+ const auto* self = static_cast<const TWriteSessionEvent::TAcksEvent*>(this);
res << "AcksEvent:";
- for (auto& ack : Acks) {
+ for (auto& ack : self->Acks) {
res << " { seqNo : " << ack.SeqNo << ", State : " << ack.State;
if (ack.Details) {
res << ", offset : " << ack.Details->Offset << ", partitionId : " << ack.Details->PartitionId;
}
res << " }";
}
- if (!Acks.empty() && Acks.back().Stat) {
- auto& stat = Acks.back().Stat;
+ if (!self->Acks.empty() && self->Acks.back().Stat) {
+ auto& stat = self->Acks.back().Stat;
res << " write stat: Write time " << stat->WriteTime
<< " minimal time in partition queue " << stat->MinTimeInPartitionQueue
<< " maximal time in partition queue " << stat->MaxTimeInPartitionQueue
<< " partition quoted time " << stat->PartitionQuotedTime
<< " topic quoted time " << stat->TopicQuotedTime;
}
- return res;
}
-TString TWriteSessionEvent::TReadyToAcceptEvent::DebugString() const {
- return "ReadyToAcceptEvent";
+template<>
+void TPrintable<TWriteSessionEvent::TReadyToAcceptEvent>::DebugString(TStringBuilder& res, bool) const {
+ res << "ReadyToAcceptEvent";
}
-
TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMessageImpl() {
Y_VERIFY(Lock.IsLocked());
@@ -625,7 +625,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
OnErrorResolved();
if (!FirstTokenSent) {
- result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}});
FirstTokenSent = true;
}
// Kickstart send after session reestablishment
@@ -677,7 +677,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
});
if (CleanupOnAcknowledged(sequenceNumber - SeqNoShift)) {
- result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}});
}
}
//EventsQueue->PushEvent(std::move(acksEvent));
@@ -816,7 +816,7 @@ void TWriteSessionImpl::OnCompressed(TBlock&& block, bool isSyncCompression) {
memoryUsage = OnCompressedImpl(std::move(block));
}
if (memoryUsage.NowOk && !memoryUsage.WasOk) {
- EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}});
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
index 962bebac17..a0cc5e2088 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
@@ -171,7 +171,10 @@ class TPartitioningSettings {
public:
TPartitioningSettings() : MinActivePartitions_(0), PartitionCountLimit_(0){}
TPartitioningSettings(const Ydb::Topic::PartitioningSettings& settings);
- TPartitioningSettings(ui64 minActivePartitions, ui64 partitionCountLimit) : MinActivePartitions_(minActivePartitions), PartitionCountLimit_(partitionCountLimit) {}
+ TPartitioningSettings(ui64 minActivePartitions, ui64 partitionCountLimit)
+ : MinActivePartitions_(minActivePartitions)
+ , PartitionCountLimit_(partitionCountLimit) {
+ }
ui64 GetMinActivePartitions() const;
ui64 GetPartitionCountLimit() const;
@@ -602,6 +605,19 @@ struct TCommitOffsetSettings : public TOperationRequestSettings<TCommitOffsetSet
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+template <typename TEvent>
+class TPrintable {
+public:
+ TString DebugString(bool printData = false) const {
+ TStringBuilder b;
+ static_cast<const TEvent*>(this)->DebugString(b, printData);
+ return b;
+ }
+
+ // implemented in template specializations
+ void DebugString(TStringBuilder& ret, bool printData = false) const = delete;
+};
+
//! Session metainformation.
struct TWriteSessionMeta: public TThrRefBase {
using TPtr = TIntrusivePtr<TWriteSessionMeta>;
@@ -618,12 +634,13 @@ struct TMessageMeta: public TThrRefBase {
};
//! Event that is sent to client during session destruction.
-struct TSessionClosedEvent: public TStatus {
+struct TSessionClosedEvent: public TStatus, public TPrintable<TSessionClosedEvent> {
using TStatus::TStatus;
-
- TString DebugString() const;
};
+template<>
+void TPrintable<TSessionClosedEvent>::DebugString(TStringBuilder& res, bool) const;
+
struct TWriteStat : public TThrRefBase {
TDuration WriteTime;
TDuration MinTimeInPartitionQueue;
@@ -705,7 +722,7 @@ struct TReaderCounters: public TThrRefBase {
};
//! Partition session.
-struct TPartitionSession: public TThrRefBase {
+struct TPartitionSession: public TThrRefBase, public TPrintable<TPartitionSession> {
using TPtr = TIntrusivePtr<TPartitionSession>;
public:
@@ -739,11 +756,26 @@ protected:
ui64 PartitionId;
};
+template<>
+void TPrintable<TPartitionSession>::DebugString(TStringBuilder& res, bool) const;
+
//! Events for read session.
struct TReadSessionEvent {
+ class TPartitionSessionAccessor {
+ public:
+ TPartitionSessionAccessor(TPartitionSession::TPtr partitionSession);
+
+ virtual ~TPartitionSessionAccessor() = default;
+
+ virtual const TPartitionSession::TPtr& GetPartitionSession() const;
+
+ protected:
+ TPartitionSession::TPtr PartitionSession;
+ };
+
//! Event with new data.
//! Contains batch of messages from single partition session.
- struct TDataReceivedEvent {
+ struct TDataReceivedEvent : public TPartitionSessionAccessor, public TPrintable<TDataReceivedEvent> {
struct TMessageInformation {
TMessageInformation(ui64 offset,
TString producerId,
@@ -765,39 +797,16 @@ struct TReadSessionEvent {
TString MessageGroupId;
};
- class IMessage {
+ class TMessageBase : public TPrintable<TMessageBase> {
public:
- IMessage(const TString& data, TPartitionSession::TPtr partitionSession);
+ TMessageBase(const TString& data, TMessageInformation info);
- virtual ~IMessage() = default;
+ virtual ~TMessageBase() = default;
virtual const TString& GetData() const;
- //! Partition session. Same as in batch.
- const TPartitionSession::TPtr& GetPartitionSession() const;
-
virtual void Commit() = 0;
- TString DebugString(bool printData = false) const;
- virtual void DebugString(TStringBuilder& ret, bool printData = false) const = 0;
-
- protected:
- TString Data;
-
- TPartitionSession::TPtr PartitionSession;
- };
-
- //! Single message.
- struct TMessage: public IMessage {
- TMessage(const TString& data, std::exception_ptr decompressionException,
- const TMessageInformation& information, TPartitionSession::TPtr partitionSession);
-
- //! User data.
- //! Throws decompressor exception if decompression failed.
- const TString& GetData() const override;
-
- bool HasException() const;
-
//! Message offset.
ui64 GetOffset() const;
@@ -822,19 +831,37 @@ struct TReadSessionEvent {
//! Message level meta info.
const TMessageMeta::TPtr& GetMessageMeta() const;
+ protected:
+ TString Data;
+ TMessageInformation Information;
+ };
+
+ //! Single message.
+ struct TMessage: public TMessageBase, public TPartitionSessionAccessor, public TPrintable<TMessage> {
+ using TPrintable<TMessage>::DebugString;
+
+ TMessage(const TString& data, std::exception_ptr decompressionException, TMessageInformation information,
+ TPartitionSession::TPtr partitionSession);
+
+ //! User data.
+ //! Throws decompressor exception if decompression failed.
+ const TString& GetData() const override;
+
//! Commits single message.
void Commit() override;
- using IMessage::DebugString;
- void DebugString(TStringBuilder& ret, bool printData = false) const override;
+ bool HasException() const;
private:
std::exception_ptr DecompressionException;
- TMessageInformation Information;
};
- struct TCompressedMessage: public IMessage {
- TCompressedMessage(ECodec codec, const TString& data, const TMessageInformation& information,
+ struct TCompressedMessage: public TMessageBase,
+ public TPartitionSessionAccessor,
+ public TPrintable<TCompressedMessage> {
+ using TPrintable<TCompressedMessage>::DebugString;
+
+ TCompressedMessage(ECodec codec, const TString& data, TMessageInformation information,
TPartitionSession::TPtr partitionSession);
virtual ~TCompressedMessage() {
@@ -843,53 +870,20 @@ struct TReadSessionEvent {
//! Message codec
ECodec GetCodec() const;
- //! Message offset.
- ui64 GetOffset() const;
-
- //! Producer id
- const TString& GetProducerId() const;
-
- //! Message group id.
- const TString& GetMessageGroupId() const;
-
- //! Sequence number.
- ui64 GetSeqNo() const;
-
- //! Message creation timestamp.
- TInstant GetCreateTime() const;
-
- //! Message write timestamp.
- TInstant GetWriteTime() const;
-
- //! Metainfo.
- const TWriteSessionMeta::TPtr& GetMeta() const;
-
- //! Message level meta info.
- const TMessageMeta::TPtr& GetMessageMeta() const;
-
//! Uncompressed size.
ui64 GetUncompressedSize() const;
//! Commits all offsets in compressed message.
void Commit() override;
- using IMessage::DebugString;
- void DebugString(TStringBuilder& ret, bool printData = false) const override;
-
private:
ECodec Codec;
- TMessageInformation Information;
};
public:
TDataReceivedEvent(TVector<TMessage> messages, TVector<TCompressedMessage> compressedMessages,
TPartitionSession::TPtr partitionSession);
- //! Partition session.
- const TPartitionSession::TPtr& GetPartitionSession() const {
- return PartitionSession;
- }
-
bool HasCompressedMessages() const {
return !CompressedMessages.empty();
}
@@ -923,8 +917,6 @@ struct TReadSessionEvent {
//! Commits all messages in batch.
void Commit();
- TString DebugString(bool printData = false) const;
-
private:
void CheckMessagesFilled(bool compressed) const {
Y_VERIFY(!Messages.empty() || !CompressedMessages.empty());
@@ -939,19 +931,14 @@ struct TReadSessionEvent {
private:
TVector<TMessage> Messages;
TVector<TCompressedMessage> CompressedMessages;
- TPartitionSession::TPtr PartitionSession;
std::vector<std::pair<ui64, ui64>> OffsetRanges;
};
//! Acknowledgement for commit request.
- struct TCommitOffsetAcknowledgementEvent {
+ struct TCommitOffsetAcknowledgementEvent: public TPartitionSessionAccessor,
+ public TPrintable<TCommitOffsetAcknowledgementEvent> {
TCommitOffsetAcknowledgementEvent(TPartitionSession::TPtr partitionSession, ui64 committedOffset);
- //! Partition session.
- const TPartitionSession::TPtr& GetPartitionSession() const {
- return PartitionSession;
- }
-
//! Committed offset.
//! This means that from now the first available
//! message offset in current partition
@@ -961,21 +948,15 @@ struct TReadSessionEvent {
return CommittedOffset;
}
- TString DebugString() const;
-
private:
- TPartitionSession::TPtr PartitionSession;
ui64 CommittedOffset;
};
//! Server command for creating and starting partition session.
- struct TStartPartitionSessionEvent {
+ struct TStartPartitionSessionEvent: public TPartitionSessionAccessor,
+ public TPrintable<TStartPartitionSessionEvent> {
explicit TStartPartitionSessionEvent(TPartitionSession::TPtr, ui64 committedOffset, ui64 endOffset);
- const TPartitionSession::TPtr& GetPartitionSession() const {
- return PartitionSession;
- }
-
//! Current committed offset in partition session.
ui64 GetCommittedOffset() const {
return CommittedOffset;
@@ -991,10 +972,7 @@ struct TReadSessionEvent {
//! If maybe is empty then no rewinding
void Confirm(TMaybe<ui64> readOffset = Nothing(), TMaybe<ui64> commitOffset = Nothing());
- TString DebugString() const;
-
private:
- TPartitionSession::TPtr PartitionSession;
ui64 CommittedOffset;
ui64 EndOffset;
};
@@ -1002,13 +980,9 @@ struct TReadSessionEvent {
//! Server command for stopping and destroying partition session.
//! Server can destroy partition session gracefully
//! for rebalancing among all topic clients.
- struct TStopPartitionSessionEvent {
+ struct TStopPartitionSessionEvent: public TPartitionSessionAccessor, public TPrintable<TStopPartitionSessionEvent> {
TStopPartitionSessionEvent(TPartitionSession::TPtr partitionSession, bool committedOffset);
- const TPartitionSession::TPtr& GetPartitionSession() const {
- return PartitionSession;
- }
-
//! Last offset of the partition session that was committed.
ui64 GetCommittedOffset() const {
return CommittedOffset;
@@ -1018,22 +992,16 @@ struct TReadSessionEvent {
//! Confirm has no effect if TPartitionSessionClosedEvent for same partition session with is received.
void Confirm();
- TString DebugString() const;
-
private:
- TPartitionSession::TPtr PartitionSession;
ui64 CommittedOffset;
};
//! Status for partition session requested via TPartitionSession::RequestStatus()
- struct TPartitionSessionStatusEvent {
+ struct TPartitionSessionStatusEvent: public TPartitionSessionAccessor,
+ public TPrintable<TPartitionSessionStatusEvent> {
TPartitionSessionStatusEvent(TPartitionSession::TPtr partitionSession, ui64 committedOffset, ui64 readOffset,
ui64 endOffset, TInstant writeTimeHighWatermark);
- const TPartitionSession::TPtr& GetPartitionSession() const {
- return PartitionSession;
- }
-
//! Committed offset.
ui64 GetCommittedOffset() const {
return CommittedOffset;
@@ -1055,10 +1023,7 @@ struct TReadSessionEvent {
return WriteTimeHighWatermark;
}
- TString DebugString() const;
-
private:
- TPartitionSession::TPtr PartitionSession;
ui64 CommittedOffset = 0;
ui64 ReadOffset = 0;
ui64 EndOffset = 0;
@@ -1069,7 +1034,8 @@ struct TReadSessionEvent {
//! partition session death.
//! This could be after graceful stop of partition session
//! or when connection with partition was lost.
- struct TPartitionSessionClosedEvent {
+ struct TPartitionSessionClosedEvent: public TPartitionSessionAccessor,
+ public TPrintable<TPartitionSessionClosedEvent> {
enum class EReason {
StopConfirmedByUser,
Lost,
@@ -1079,18 +1045,11 @@ struct TReadSessionEvent {
public:
TPartitionSessionClosedEvent(TPartitionSession::TPtr partitionSession, EReason reason);
- const TPartitionSession::TPtr& GetPartitionSession() const {
- return PartitionSession;
- }
-
EReason GetReason() const {
return Reason;
}
- TString DebugString() const;
-
private:
- TPartitionSession::TPtr PartitionSession;
EReason Reason;
};
@@ -1136,7 +1095,28 @@ private:
THolder<TImpl> Impl;
};
-//! Event debug string.
+//! Events debug strings.
+template<>
+void TPrintable<TReadSessionEvent::TDataReceivedEvent::TMessageBase>::DebugString(TStringBuilder& ret, bool printData) const;
+template<>
+void TPrintable<TReadSessionEvent::TDataReceivedEvent::TMessage>::DebugString(TStringBuilder& ret, bool printData) const;
+template<>
+void TPrintable<TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>::DebugString(TStringBuilder& ret, bool printData) const;
+template<>
+void TPrintable<TReadSessionEvent::TDataReceivedEvent>::DebugString(TStringBuilder& ret, bool printData) const;
+template<>
+void TPrintable<TReadSessionEvent::TCommitOffsetAcknowledgementEvent>::DebugString(TStringBuilder& ret, bool printData) const;
+template<>
+void TPrintable<TReadSessionEvent::TStartPartitionSessionEvent>::DebugString(TStringBuilder& ret, bool printData) const;
+template<>
+void TPrintable<TReadSessionEvent::TStopPartitionSessionEvent>::DebugString(TStringBuilder& ret, bool printData) const;
+template<>
+void TPrintable<TReadSessionEvent::TPartitionSessionStatusEvent>::DebugString(TStringBuilder& ret, bool printData) const;
+template<>
+void TPrintable<TReadSessionEvent::TPartitionSessionClosedEvent>::DebugString(TStringBuilder& ret, bool printData) const;
+template<>
+void TPrintable<TSessionClosedEvent>::DebugString(TStringBuilder& ret, bool printData) const;
+
TString DebugString(const TReadSessionEvent::TEvent& event);
//! Retry policy.
@@ -1233,28 +1213,27 @@ struct TWriteSessionEvent {
};
- struct TAcksEvent {
+ struct TAcksEvent : public TPrintable<TAcksEvent> {
//! Acks could be batched from several Write requests.
//! They are provided to client as soon as possible.
TVector<TWriteAck> Acks;
-
- TString DebugString() const;
-
};
//! Indicates that a writer is ready to accept new message(s).
//! Continuation token should be kept and then used in write methods.
- struct TReadyToAcceptEvent {
+ struct TReadyToAcceptEvent : public TPrintable<TReadyToAcceptEvent> {
TContinuationToken ContinuationToken;
-
- TString DebugString() const;
-
};
using TEvent = std::variant<TAcksEvent, TReadyToAcceptEvent, TSessionClosedEvent>;
};
-//! Event debug string.
+//! Events debug strings.
+template<>
+void TPrintable<TWriteSessionEvent::TAcksEvent>::DebugString(TStringBuilder& ret, bool printData) const;
+template<>
+void TPrintable<TWriteSessionEvent::TReadyToAcceptEvent>::DebugString(TStringBuilder& ret, bool printData) const;
+
TString DebugString(const TWriteSessionEvent::TEvent& event);
using TSessionClosedHandler = std::function<void(const TSessionClosedEvent&)>;