summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <[email protected]>2023-06-14 16:49:06 +0300
committerkomels <[email protected]>2023-06-14 16:49:06 +0300
commit604a7cf104dc4e6198d3a3a335ba69e94bffee3a (patch)
tree9681a6a08b2f5b880ebb688ca890959579346504
parent9a1b36222ca0420e4554eb5e1b564f7d507f81c6 (diff)
Refactor mirrorer to use TopicAPI
-rw-r--r--ydb/core/persqueue/actor_persqueue_client_iface.h24
-rw-r--r--ydb/core/persqueue/codecs/pqv1.cpp4
-rw-r--r--ydb/core/persqueue/codecs/pqv1.h3
-rw-r--r--ydb/core/persqueue/mirrorer.cpp57
-rw-r--r--ydb/core/persqueue/mirrorer.h15
-rw-r--r--ydb/core/persqueue/ut/mirrorer_ut.cpp2
-rw-r--r--ydb/core/persqueue/write_meta.cpp21
-rw-r--r--ydb/core/persqueue/write_meta.h2
8 files changed, 78 insertions, 50 deletions
diff --git a/ydb/core/persqueue/actor_persqueue_client_iface.h b/ydb/core/persqueue/actor_persqueue_client_iface.h
index 3f6edbe357c..97bf686640c 100644
--- a/ydb/core/persqueue/actor_persqueue_client_iface.h
+++ b/ydb/core/persqueue/actor_persqueue_client_iface.h
@@ -3,7 +3,7 @@
#include <ydb/core/protos/pqconfig.pb.h>
#include <ydb/library/logger/actor.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
-#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/logger/log.h>
@@ -44,7 +44,7 @@ public:
}
}
- virtual std::shared_ptr<NYdb::NPersQueue::IReadSession> GetReadSession(
+ virtual std::shared_ptr<NYdb::NTopic::IReadSession> GetReadSession(
const NKikimrPQ::TMirrorPartitionConfig& config,
ui32 partition,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
@@ -85,14 +85,14 @@ protected:
}
public:
- std::shared_ptr<NYdb::NPersQueue::IReadSession> GetReadSession(
+ std::shared_ptr<NYdb::NTopic::IReadSession> GetReadSession(
const NKikimrPQ::TMirrorPartitionConfig& config,
ui32 partition,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
ui64 maxMemoryUsageBytes,
TMaybe<TLog> logger = Nothing()
) const override {
- NYdb::NPersQueue::TPersQueueClientSettings clientSettings = NYdb::NPersQueue::TPersQueueClientSettings()
+ NYdb::NTopic::TTopicClientSettings clientSettings = NYdb::NTopic::TTopicClientSettings()
.DiscoveryEndpoint(TStringBuilder() << config.GetEndpoint() << ":" << config.GetEndpointPort())
.DiscoveryMode(NYdb::EDiscoveryMode::Async)
.CredentialsProviderFactory(credentialsProviderFactory)
@@ -101,25 +101,23 @@ public:
clientSettings.Database(config.GetDatabase());
}
- NYdb::NPersQueue::TReadSessionSettings settings = NYdb::NPersQueue::TReadSessionSettings()
+ NYdb::NTopic::TReadSessionSettings settings = NYdb::NTopic::TReadSessionSettings()
.ConsumerName(config.GetConsumer())
.MaxMemoryUsageBytes(maxMemoryUsageBytes)
.Decompress(false)
- .DisableClusterDiscovery(true)
- .ReadOnlyOriginal(true)
- .RetryPolicy(NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy());
+ .RetryPolicy(NYdb::NTopic::IRetryPolicy::GetNoRetryPolicy());
if (logger) {
settings.Log(logger.GetRef());
}
if (config.HasReadFromTimestampsMs()) {
- settings.StartingMessageTimestamp(TInstant::MilliSeconds(config.GetReadFromTimestampsMs()));
+ settings.ReadFromTimestamp(TInstant::MilliSeconds(config.GetReadFromTimestampsMs()));
}
- NYdb::NPersQueue::TTopicReadSettings topicSettings(config.GetTopic());
- topicSettings.AppendPartitionGroupIds({partition + 1});
+ NYdb::NTopic::TTopicReadSettings topicSettings(config.GetTopic());
+ topicSettings.AppendPartitionIds({partition});
settings.AppendTopics(topicSettings);
- NYdb::NPersQueue::TPersQueueClient persQueueClient(*Driver, clientSettings);
- return persQueueClient.CreateReadSession(settings);
+ NYdb::NTopic::TTopicClient topicClient(*Driver, clientSettings);
+ return topicClient.CreateReadSession(settings);
}
};
diff --git a/ydb/core/persqueue/codecs/pqv1.cpp b/ydb/core/persqueue/codecs/pqv1.cpp
index 8037b0ac03e..b1e70b4bb6b 100644
--- a/ydb/core/persqueue/codecs/pqv1.cpp
+++ b/ydb/core/persqueue/codecs/pqv1.cpp
@@ -32,4 +32,8 @@ std::optional<NPersQueueCommon::ECodec> FromV1Codec(const NYdb::NPersQueue::ECod
}
}
+i32 FromTopicCodec(const NYdb::NTopic::ECodec codec) {
+ return (ui32)(codec) - 1;
+}
+
} // NKikimr::NPQ
diff --git a/ydb/core/persqueue/codecs/pqv1.h b/ydb/core/persqueue/codecs/pqv1.h
index 30024b9dc5e..29fe4e8cfda 100644
--- a/ydb/core/persqueue/codecs/pqv1.h
+++ b/ydb/core/persqueue/codecs/pqv1.h
@@ -2,10 +2,13 @@
#include <ydb/public/api/protos/draft/persqueue_common.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
namespace NKikimr::NPQ {
Ydb::PersQueue::V1::Codec ToV1Codec(const NPersQueueCommon::ECodec codec);
std::optional<NPersQueueCommon::ECodec> FromV1Codec(const NYdb::NPersQueue::ECodec codec);
+i32 FromTopicCodec(const NYdb::NTopic::ECodec codec);
+
} // NKikimr::NPQ
diff --git a/ydb/core/persqueue/mirrorer.cpp b/ydb/core/persqueue/mirrorer.cpp
index 739cfdd161a..fa4c8d34064 100644
--- a/ydb/core/persqueue/mirrorer.cpp
+++ b/ydb/core/persqueue/mirrorer.cpp
@@ -19,7 +19,7 @@ using namespace NPersQueue;
namespace NKikimr {
namespace NPQ {
-using TPersQueueReadEvent = NYdb::NPersQueue::TReadSessionEvent;
+using TPersQueueReadEvent = NYdb::NTopic::TReadSessionEvent;
constexpr NKikimrServices::TActivity::EType TMirrorer::ActorActivityType() {
return NKikimrServices::TActivity::PERSQUEUE_MIRRORER;
@@ -107,22 +107,22 @@ bool TMirrorer::AddToWriteRequest(
incorrectRequest = true;
return false;
}
- request.SetCmdWriteOffset(message.GetOffset(0));
+ request.SetCmdWriteOffset(message.GetOffset());
}
- if (request.GetCmdWriteOffset() + request.CmdWriteSize() != message.GetOffset(0)) {
+ if (request.GetCmdWriteOffset() + request.CmdWriteSize() != message.GetOffset()) {
return false;
}
auto write = request.AddCmdWrite();
write->SetData(GetSerializedData(message));
- write->SetSourceId(NSourceIdEncoding::EncodeSimple(message.GetMessageGroupId(0)));
- write->SetSeqNo(message.GetSeqNo(0));
- write->SetCreateTimeMS(message.GetCreateTime(0).MilliSeconds());
+ write->SetSourceId(NSourceIdEncoding::EncodeSimple(message.GetProducerId()));
+ write->SetSeqNo(message.GetSeqNo());
+ write->SetCreateTimeMS(message.GetCreateTime().MilliSeconds());
if (Config.GetSyncWriteTime()) {
- write->SetWriteTimeMS(message.GetWriteTime(0).MilliSeconds());
+ write->SetWriteTimeMS(message.GetWriteTime().MilliSeconds());
}
write->SetDisableDeduplication(true);
- write->SetUncompressedSize(message.GetUncompressedSize(0));
+ write->SetUncompressedSize(message.GetUncompressedSize());
return true;
}
@@ -173,7 +173,7 @@ void TMirrorer::ProcessWriteResponse(
) {
Y_VERIFY_S(response.CmdWriteResultSize() == WriteInFlight.size(), MirrorerDescription()
<< "CmdWriteResultSize=" << response.CmdWriteResultSize() << ", but expected=" << WriteInFlight.size()
- << ". First expected offset= " << (WriteInFlight.empty() ? -1 : WriteInFlight.front().GetOffset(0))
+ << ". First expected offset= " << (WriteInFlight.empty() ? -1 : WriteInFlight.front().GetOffset())
<< " response: " << response);
for (auto& result : response.GetCmdWriteResult()) {
@@ -189,10 +189,10 @@ void TMirrorer::ProcessWriteResponse(
}
auto& writtenMessageInfo = WriteInFlight.front();
if (MirrorerTimeLags) {
- TDuration lag = TInstant::MilliSeconds(result.GetWriteTimestampMS()) - writtenMessageInfo.GetWriteTime(0);
+ TDuration lag = TInstant::MilliSeconds(result.GetWriteTimestampMS()) - writtenMessageInfo.GetWriteTime();
MirrorerTimeLags->IncFor(lag.MilliSeconds(), 1);
}
- ui64 offset = writtenMessageInfo.GetOffset(0);
+ ui64 offset = writtenMessageInfo.GetOffset();
Y_VERIFY((ui64)result.GetOffset() == offset);
Y_VERIFY_S(EndOffset <= offset, MirrorerDescription()
<< "end offset more the written " << EndOffset << ">" << offset);
@@ -254,7 +254,7 @@ void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorConte
LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription()
<< "[STATE] has partition stream " << PartitionStream->GetTopicPath()
<< ":" << PartitionStream->GetPartitionId()
- << " with id " << PartitionStream->GetPartitionStreamId());
+ << " with id " << PartitionStream->GetPartitionSessionId());
} else {
LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << "[STATE] hasn't partition stream");
}
@@ -435,7 +435,7 @@ void TMirrorer::CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr&, const TActorCont
LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " creating new read session");
if (!Queue.empty()) {
- OffsetToRead = Queue.front().GetOffset(0);
+ OffsetToRead = Queue.front().GetOffset();
while (!Queue.empty()) {
ui64 dataSize = Queue.back().GetData().size();
Y_VERIFY(BytesInFlight >= dataSize);
@@ -499,15 +499,14 @@ void TMirrorer::TryUpdateWriteTimetsamp(const TActorContext &ctx) {
req->SetTopic(TopicConverter->GetClientsideName());
req->SetPartition(Partition);
req->SetCookie(UPDATE_WRITE_TIMESTAMP);
- req->MutableCmdUpdateWriteTimestamp()->SetWriteTimeMS(StreamStatus->GetWriteWatermark().MilliSeconds());
+ req->MutableCmdUpdateWriteTimestamp()->SetWriteTimeMS(StreamStatus->GetWriteTimeHighWatermark().MilliSeconds());
ctx.Send(TabletActor, request.Release());
}
}
void TMirrorer::AddMessagesToQueue(TVector<TPersQueueReadEvent::TDataReceivedEvent::TCompressedMessage>&& messages) {
for (auto& msg : messages) {
- Y_VERIFY(msg.GetBlocksCount() == 1); // TODO support several compressed messages
- ui64 offset = msg.GetOffset(0);
+ ui64 offset = msg.GetOffset();
Y_VERIFY(OffsetToRead <= offset);
ui64 messageSize = msg.GetData().size();
@@ -586,7 +585,7 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup)
if (!WaitNextReaderEventInFlight) {
return;
}
- TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = ReadSession->GetEvent(false);
+ TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent(false);
LOG_DEBUG_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " got next reader event: " << bool(event));
if (wakeup && !event) {
@@ -602,18 +601,18 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup)
if (auto* dataEvent = std::get_if<TPersQueueReadEvent::TDataReceivedEvent>(&event.GetRef())) {
AddMessagesToQueue(std::move(dataEvent->GetCompressedMessages()));
- } else if (auto* createStream = std::get_if<TPersQueueReadEvent::TCreatePartitionStreamEvent>(&event.GetRef())) {
+ } else if (auto* createStream = std::get_if<TPersQueueReadEvent::TStartPartitionSessionEvent>(&event.GetRef())) {
LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER,
MirrorerDescription() << " got create stream event for '" << createStream->DebugString()
<< " and will set offset=" << OffsetToRead);
if (PartitionStream) {
- ProcessError(ctx, TStringBuilder() << " already has stream " << PartitionStream->GetPartitionStreamId()
- << ", new stream " << createStream->GetPartitionStream()->GetPartitionStreamId());
+ ProcessError(ctx, TStringBuilder() << " already has stream " << PartitionStream->GetPartitionSessionId()
+ << ", new stream " << createStream->GetPartitionSession()->GetPartitionSessionId());
ScheduleConsumerCreation(ctx);
return;
}
- PartitionStream = createStream->GetPartitionStream();
+ PartitionStream = createStream->GetPartitionSession();
if (Partition != PartitionStream->GetPartitionId()) {
ProcessError(ctx, TStringBuilder() << " got stream for incorrect partition, stream: topic=" << PartitionStream->GetTopicPath()
<< " partition=" << PartitionStream->GetPartitionId());
@@ -628,38 +627,38 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup)
createStream->Confirm(OffsetToRead);
RequestSourcePartitionStatus();
- } else if (auto* destroyStream = std::get_if<TPersQueueReadEvent::TDestroyPartitionStreamEvent>(&event.GetRef())) {
+ } else if (auto* destroyStream = std::get_if<TPersQueueReadEvent::TStopPartitionSessionEvent>(&event.GetRef())) {
destroyStream->Confirm();
PartitionStream.Reset();
LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER,
MirrorerDescription()
<< " got destroy stream event: " << destroyStream->DebugString());
- } else if (auto* streamClosed = std::get_if<TPersQueueReadEvent::TPartitionStreamClosedEvent>(&event.GetRef())) {
+ } else if (auto* streamClosed = std::get_if<TPersQueueReadEvent::TPartitionSessionClosedEvent>(&event.GetRef())) {
PartitionStream.Reset();
LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER,
MirrorerDescription()
<< " got stream closed event for partition stream id: "
- << streamClosed->GetPartitionStream()->GetPartitionStreamId()
+ << streamClosed->GetPartitionSession()->GetPartitionSessionId()
<< " reason: " << streamClosed->GetReason());
ProcessError(ctx, TStringBuilder() << " read session stream closed event");
ScheduleConsumerCreation(ctx);
return;
- } else if (auto* streamStatus = std::get_if<TPersQueueReadEvent::TPartitionStreamStatusEvent >(&event.GetRef())) {
+ } else if (auto* streamStatus = std::get_if<TPersQueueReadEvent::TPartitionSessionStatusEvent >(&event.GetRef())) {
if (PartitionStream
- && PartitionStream->GetPartitionStreamId() == streamStatus->GetPartitionStream()->GetPartitionStreamId()
+ && PartitionStream->GetPartitionSessionId() == streamStatus->GetPartitionSession()->GetPartitionSessionId()
) {
- StreamStatus = MakeHolder<TPersQueueReadEvent::TPartitionStreamStatusEvent>(*streamStatus);
+ StreamStatus = MakeHolder<TPersQueueReadEvent::TPartitionSessionStatusEvent>(*streamStatus);
ctx.Schedule(TDuration::Seconds(1), new TEvPQ::TEvRequestPartitionStatus);
TryUpdateWriteTimetsamp(ctx);
}
- } else if (auto* commitAck = std::get_if<TPersQueueReadEvent::TCommitAcknowledgementEvent>(&event.GetRef())) {
+ } else if (auto* commitAck = std::get_if<TPersQueueReadEvent::TCommitOffsetAcknowledgementEvent>(&event.GetRef())) {
LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription()
<< " got commit responce, commited offset: " << commitAck->GetCommittedOffset());
- } else if (auto* closeSessionEvent = std::get_if<NYdb::NPersQueue::TSessionClosedEvent>(&event.GetRef())) {
+ } else if (auto* closeSessionEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&event.GetRef())) {
ProcessError(ctx, TStringBuilder() << " read session closed: " << closeSessionEvent->DebugString());
ScheduleConsumerCreation(ctx);
return;
diff --git a/ydb/core/persqueue/mirrorer.h b/ydb/core/persqueue/mirrorer.h
index 8c247147fa7..277e3f53f0b 100644
--- a/ydb/core/persqueue/mirrorer.h
+++ b/ydb/core/persqueue/mirrorer.h
@@ -11,6 +11,7 @@
#include <ydb/public/lib/base/msgbus.h>
#include <ydb/core/persqueue/events/internal.h>
#include <ydb/library/persqueue/counter_time_keeper/counter_time_keeper.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
@@ -96,7 +97,7 @@ private:
bool AddToWriteRequest(
NKikimrClient::TPersQueuePartitionRequest& request,
- NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message,
+ NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message,
bool& incorrectRequest
);
void ProcessError(const TActorContext& ctx, const TString& msg);
@@ -145,7 +146,7 @@ public:
void RequestSourcePartitionStatus();
void TryUpdateWriteTimetsamp(const TActorContext &ctx);
void AddMessagesToQueue(
- TVector<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>&& messages
+ TVector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>&& messages
);
void StartWaitNextReaderEvent(const TActorContext& ctx);
@@ -159,17 +160,17 @@ private:
ui64 OffsetToRead;
NKikimrPQ::TMirrorPartitionConfig Config;
- TDeque<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage> Queue;
- TDeque<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage> WriteInFlight;
+ TDeque<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage> Queue;
+ TDeque<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage> WriteInFlight;
ui64 BytesInFlight = 0;
std::optional<NKikimrClient::TPersQueuePartitionRequest> WriteRequestInFlight;
TDuration WriteRetryTimeout = WRITE_RETRY_TIMEOUT_START;
TInstant WriteRequestTimestamp;
NYdb::TCredentialsProviderFactoryPtr CredentialsProvider;
- std::shared_ptr<NYdb::NPersQueue::IReadSession> ReadSession;
+ std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
ui64 ReaderGeneration = 0;
- NYdb::NPersQueue::TPartitionStream::TPtr PartitionStream;
- THolder<NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamStatusEvent> StreamStatus;
+ NYdb::NTopic::TPartitionSession::TPtr PartitionStream;
+ THolder<NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent> StreamStatus;
TInstant LastInitStageTimestamp;
TDuration ConsumerInitTimeout = CONSUMER_INIT_TIMEOUT_START;
diff --git a/ydb/core/persqueue/ut/mirrorer_ut.cpp b/ydb/core/persqueue/ut/mirrorer_ut.cpp
index c802606ccfe..4da514cbe7e 100644
--- a/ydb/core/persqueue/ut/mirrorer_ut.cpp
+++ b/ydb/core/persqueue/ut/mirrorer_ut.cpp
@@ -143,7 +143,7 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {
const auto& dstMeta = dstMessages[i].GetMeta(0)->Fields;
const auto& srcMeta = srcMessages[i].GetMeta(0)->Fields;
- UNIT_ASSERT_EQUAL(dstMeta.size(), srcMeta.size());
+ UNIT_ASSERT_VALUES_EQUAL(dstMeta.size(), srcMeta.size());
for (auto& item : srcMeta) {
UNIT_ASSERT(dstMeta.count(item.first));
UNIT_ASSERT_EQUAL(dstMeta.at(item.first), item.second);
diff --git a/ydb/core/persqueue/write_meta.cpp b/ydb/core/persqueue/write_meta.cpp
index 58bed2badfa..e82eb3ab25a 100644
--- a/ydb/core/persqueue/write_meta.cpp
+++ b/ydb/core/persqueue/write_meta.cpp
@@ -41,6 +41,27 @@ TString GetSerializedData(const NYdb::NPersQueue::TReadSessionEvent::TDataReceiv
return str;
}
+TString GetSerializedData(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message) {
+ NKikimrPQClient::TDataChunk proto;
+ for (const auto& item : message.GetMeta()->Fields) {
+ if (item.first == "_ip") {
+ proto.SetIp(item.second);
+ } else {
+ SetMetaField(proto, item.first, item.second);
+ }
+ }
+ proto.SetSeqNo(message.GetSeqNo());
+ proto.SetCreateTime(message.GetCreateTime().MilliSeconds());
+ auto codec = NPQ::FromTopicCodec(message.GetCodec());
+ proto.SetCodec(codec);
+ proto.SetData(message.GetData());
+
+ TString str;
+ bool res = proto.SerializeToString(&str);
+ Y_VERIFY(res);
+ return str;
+}
+
NKikimrPQClient::TDataChunk GetDeserializedData(const TString& string) {
NKikimrPQClient::TDataChunk proto;
bool res = proto.ParseFromString(string);
diff --git a/ydb/core/persqueue/write_meta.h b/ydb/core/persqueue/write_meta.h
index 0fba0866330..7e2b4961566 100644
--- a/ydb/core/persqueue/write_meta.h
+++ b/ydb/core/persqueue/write_meta.h
@@ -2,6 +2,7 @@
#include <ydb/core/protos/grpc_pq_old.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <util/string/vector.h>
@@ -66,6 +67,7 @@ TString GetSerializedData(const NKikimrPQClient::TDataChunk& init, TArgs&...args
}
TString GetSerializedData(const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message);
+TString GetSerializedData(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message);
NKikimrPQClient::TDataChunk GetDeserializedData(const TString& string);