diff options
| author | komels <[email protected]> | 2023-06-14 16:49:06 +0300 |
|---|---|---|
| committer | komels <[email protected]> | 2023-06-14 16:49:06 +0300 |
| commit | 604a7cf104dc4e6198d3a3a335ba69e94bffee3a (patch) | |
| tree | 9681a6a08b2f5b880ebb688ca890959579346504 | |
| parent | 9a1b36222ca0420e4554eb5e1b564f7d507f81c6 (diff) | |
Refactor mirrorer to use TopicAPI
| -rw-r--r-- | ydb/core/persqueue/actor_persqueue_client_iface.h | 24 | ||||
| -rw-r--r-- | ydb/core/persqueue/codecs/pqv1.cpp | 4 | ||||
| -rw-r--r-- | ydb/core/persqueue/codecs/pqv1.h | 3 | ||||
| -rw-r--r-- | ydb/core/persqueue/mirrorer.cpp | 57 | ||||
| -rw-r--r-- | ydb/core/persqueue/mirrorer.h | 15 | ||||
| -rw-r--r-- | ydb/core/persqueue/ut/mirrorer_ut.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/persqueue/write_meta.cpp | 21 | ||||
| -rw-r--r-- | ydb/core/persqueue/write_meta.h | 2 |
8 files changed, 78 insertions, 50 deletions
diff --git a/ydb/core/persqueue/actor_persqueue_client_iface.h b/ydb/core/persqueue/actor_persqueue_client_iface.h index 3f6edbe357c..97bf686640c 100644 --- a/ydb/core/persqueue/actor_persqueue_client_iface.h +++ b/ydb/core/persqueue/actor_persqueue_client_iface.h @@ -3,7 +3,7 @@ #include <ydb/core/protos/pqconfig.pb.h> #include <ydb/library/logger/actor.h> #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> -#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <library/cpp/actors/core/actor.h> #include <library/cpp/logger/log.h> @@ -44,7 +44,7 @@ public: } } - virtual std::shared_ptr<NYdb::NPersQueue::IReadSession> GetReadSession( + virtual std::shared_ptr<NYdb::NTopic::IReadSession> GetReadSession( const NKikimrPQ::TMirrorPartitionConfig& config, ui32 partition, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory, @@ -85,14 +85,14 @@ protected: } public: - std::shared_ptr<NYdb::NPersQueue::IReadSession> GetReadSession( + std::shared_ptr<NYdb::NTopic::IReadSession> GetReadSession( const NKikimrPQ::TMirrorPartitionConfig& config, ui32 partition, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory, ui64 maxMemoryUsageBytes, TMaybe<TLog> logger = Nothing() ) const override { - NYdb::NPersQueue::TPersQueueClientSettings clientSettings = NYdb::NPersQueue::TPersQueueClientSettings() + NYdb::NTopic::TTopicClientSettings clientSettings = NYdb::NTopic::TTopicClientSettings() .DiscoveryEndpoint(TStringBuilder() << config.GetEndpoint() << ":" << config.GetEndpointPort()) .DiscoveryMode(NYdb::EDiscoveryMode::Async) .CredentialsProviderFactory(credentialsProviderFactory) @@ -101,25 +101,23 @@ public: clientSettings.Database(config.GetDatabase()); } - NYdb::NPersQueue::TReadSessionSettings settings = NYdb::NPersQueue::TReadSessionSettings() + NYdb::NTopic::TReadSessionSettings settings = NYdb::NTopic::TReadSessionSettings() .ConsumerName(config.GetConsumer()) .MaxMemoryUsageBytes(maxMemoryUsageBytes) .Decompress(false) - .DisableClusterDiscovery(true) - .ReadOnlyOriginal(true) - .RetryPolicy(NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy()); + .RetryPolicy(NYdb::NTopic::IRetryPolicy::GetNoRetryPolicy()); if (logger) { settings.Log(logger.GetRef()); } if (config.HasReadFromTimestampsMs()) { - settings.StartingMessageTimestamp(TInstant::MilliSeconds(config.GetReadFromTimestampsMs())); + settings.ReadFromTimestamp(TInstant::MilliSeconds(config.GetReadFromTimestampsMs())); } - NYdb::NPersQueue::TTopicReadSettings topicSettings(config.GetTopic()); - topicSettings.AppendPartitionGroupIds({partition + 1}); + NYdb::NTopic::TTopicReadSettings topicSettings(config.GetTopic()); + topicSettings.AppendPartitionIds({partition}); settings.AppendTopics(topicSettings); - NYdb::NPersQueue::TPersQueueClient persQueueClient(*Driver, clientSettings); - return persQueueClient.CreateReadSession(settings); + NYdb::NTopic::TTopicClient topicClient(*Driver, clientSettings); + return topicClient.CreateReadSession(settings); } }; diff --git a/ydb/core/persqueue/codecs/pqv1.cpp b/ydb/core/persqueue/codecs/pqv1.cpp index 8037b0ac03e..b1e70b4bb6b 100644 --- a/ydb/core/persqueue/codecs/pqv1.cpp +++ b/ydb/core/persqueue/codecs/pqv1.cpp @@ -32,4 +32,8 @@ std::optional<NPersQueueCommon::ECodec> FromV1Codec(const NYdb::NPersQueue::ECod } } +i32 FromTopicCodec(const NYdb::NTopic::ECodec codec) { + return (ui32)(codec) - 1; +} + } // NKikimr::NPQ diff --git a/ydb/core/persqueue/codecs/pqv1.h b/ydb/core/persqueue/codecs/pqv1.h index 30024b9dc5e..29fe4e8cfda 100644 --- a/ydb/core/persqueue/codecs/pqv1.h +++ b/ydb/core/persqueue/codecs/pqv1.h @@ -2,10 +2,13 @@ #include <ydb/public/api/protos/draft/persqueue_common.pb.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> namespace NKikimr::NPQ { Ydb::PersQueue::V1::Codec ToV1Codec(const NPersQueueCommon::ECodec codec); std::optional<NPersQueueCommon::ECodec> FromV1Codec(const NYdb::NPersQueue::ECodec codec); +i32 FromTopicCodec(const NYdb::NTopic::ECodec codec); + } // NKikimr::NPQ diff --git a/ydb/core/persqueue/mirrorer.cpp b/ydb/core/persqueue/mirrorer.cpp index 739cfdd161a..fa4c8d34064 100644 --- a/ydb/core/persqueue/mirrorer.cpp +++ b/ydb/core/persqueue/mirrorer.cpp @@ -19,7 +19,7 @@ using namespace NPersQueue; namespace NKikimr { namespace NPQ { -using TPersQueueReadEvent = NYdb::NPersQueue::TReadSessionEvent; +using TPersQueueReadEvent = NYdb::NTopic::TReadSessionEvent; constexpr NKikimrServices::TActivity::EType TMirrorer::ActorActivityType() { return NKikimrServices::TActivity::PERSQUEUE_MIRRORER; @@ -107,22 +107,22 @@ bool TMirrorer::AddToWriteRequest( incorrectRequest = true; return false; } - request.SetCmdWriteOffset(message.GetOffset(0)); + request.SetCmdWriteOffset(message.GetOffset()); } - if (request.GetCmdWriteOffset() + request.CmdWriteSize() != message.GetOffset(0)) { + if (request.GetCmdWriteOffset() + request.CmdWriteSize() != message.GetOffset()) { return false; } auto write = request.AddCmdWrite(); write->SetData(GetSerializedData(message)); - write->SetSourceId(NSourceIdEncoding::EncodeSimple(message.GetMessageGroupId(0))); - write->SetSeqNo(message.GetSeqNo(0)); - write->SetCreateTimeMS(message.GetCreateTime(0).MilliSeconds()); + write->SetSourceId(NSourceIdEncoding::EncodeSimple(message.GetProducerId())); + write->SetSeqNo(message.GetSeqNo()); + write->SetCreateTimeMS(message.GetCreateTime().MilliSeconds()); if (Config.GetSyncWriteTime()) { - write->SetWriteTimeMS(message.GetWriteTime(0).MilliSeconds()); + write->SetWriteTimeMS(message.GetWriteTime().MilliSeconds()); } write->SetDisableDeduplication(true); - write->SetUncompressedSize(message.GetUncompressedSize(0)); + write->SetUncompressedSize(message.GetUncompressedSize()); return true; } @@ -173,7 +173,7 @@ void TMirrorer::ProcessWriteResponse( ) { Y_VERIFY_S(response.CmdWriteResultSize() == WriteInFlight.size(), MirrorerDescription() << "CmdWriteResultSize=" << response.CmdWriteResultSize() << ", but expected=" << WriteInFlight.size() - << ". First expected offset= " << (WriteInFlight.empty() ? -1 : WriteInFlight.front().GetOffset(0)) + << ". First expected offset= " << (WriteInFlight.empty() ? -1 : WriteInFlight.front().GetOffset()) << " response: " << response); for (auto& result : response.GetCmdWriteResult()) { @@ -189,10 +189,10 @@ void TMirrorer::ProcessWriteResponse( } auto& writtenMessageInfo = WriteInFlight.front(); if (MirrorerTimeLags) { - TDuration lag = TInstant::MilliSeconds(result.GetWriteTimestampMS()) - writtenMessageInfo.GetWriteTime(0); + TDuration lag = TInstant::MilliSeconds(result.GetWriteTimestampMS()) - writtenMessageInfo.GetWriteTime(); MirrorerTimeLags->IncFor(lag.MilliSeconds(), 1); } - ui64 offset = writtenMessageInfo.GetOffset(0); + ui64 offset = writtenMessageInfo.GetOffset(); Y_VERIFY((ui64)result.GetOffset() == offset); Y_VERIFY_S(EndOffset <= offset, MirrorerDescription() << "end offset more the written " << EndOffset << ">" << offset); @@ -254,7 +254,7 @@ void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorConte LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << "[STATE] has partition stream " << PartitionStream->GetTopicPath() << ":" << PartitionStream->GetPartitionId() - << " with id " << PartitionStream->GetPartitionStreamId()); + << " with id " << PartitionStream->GetPartitionSessionId()); } else { LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << "[STATE] hasn't partition stream"); } @@ -435,7 +435,7 @@ void TMirrorer::CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr&, const TActorCont LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " creating new read session"); if (!Queue.empty()) { - OffsetToRead = Queue.front().GetOffset(0); + OffsetToRead = Queue.front().GetOffset(); while (!Queue.empty()) { ui64 dataSize = Queue.back().GetData().size(); Y_VERIFY(BytesInFlight >= dataSize); @@ -499,15 +499,14 @@ void TMirrorer::TryUpdateWriteTimetsamp(const TActorContext &ctx) { req->SetTopic(TopicConverter->GetClientsideName()); req->SetPartition(Partition); req->SetCookie(UPDATE_WRITE_TIMESTAMP); - req->MutableCmdUpdateWriteTimestamp()->SetWriteTimeMS(StreamStatus->GetWriteWatermark().MilliSeconds()); + req->MutableCmdUpdateWriteTimestamp()->SetWriteTimeMS(StreamStatus->GetWriteTimeHighWatermark().MilliSeconds()); ctx.Send(TabletActor, request.Release()); } } void TMirrorer::AddMessagesToQueue(TVector<TPersQueueReadEvent::TDataReceivedEvent::TCompressedMessage>&& messages) { for (auto& msg : messages) { - Y_VERIFY(msg.GetBlocksCount() == 1); // TODO support several compressed messages - ui64 offset = msg.GetOffset(0); + ui64 offset = msg.GetOffset(); Y_VERIFY(OffsetToRead <= offset); ui64 messageSize = msg.GetData().size(); @@ -586,7 +585,7 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup) if (!WaitNextReaderEventInFlight) { return; } - TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = ReadSession->GetEvent(false); + TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent(false); LOG_DEBUG_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " got next reader event: " << bool(event)); if (wakeup && !event) { @@ -602,18 +601,18 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup) if (auto* dataEvent = std::get_if<TPersQueueReadEvent::TDataReceivedEvent>(&event.GetRef())) { AddMessagesToQueue(std::move(dataEvent->GetCompressedMessages())); - } else if (auto* createStream = std::get_if<TPersQueueReadEvent::TCreatePartitionStreamEvent>(&event.GetRef())) { + } else if (auto* createStream = std::get_if<TPersQueueReadEvent::TStartPartitionSessionEvent>(&event.GetRef())) { LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " got create stream event for '" << createStream->DebugString() << " and will set offset=" << OffsetToRead); if (PartitionStream) { - ProcessError(ctx, TStringBuilder() << " already has stream " << PartitionStream->GetPartitionStreamId() - << ", new stream " << createStream->GetPartitionStream()->GetPartitionStreamId()); + ProcessError(ctx, TStringBuilder() << " already has stream " << PartitionStream->GetPartitionSessionId() + << ", new stream " << createStream->GetPartitionSession()->GetPartitionSessionId()); ScheduleConsumerCreation(ctx); return; } - PartitionStream = createStream->GetPartitionStream(); + PartitionStream = createStream->GetPartitionSession(); if (Partition != PartitionStream->GetPartitionId()) { ProcessError(ctx, TStringBuilder() << " got stream for incorrect partition, stream: topic=" << PartitionStream->GetTopicPath() << " partition=" << PartitionStream->GetPartitionId()); @@ -628,38 +627,38 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup) createStream->Confirm(OffsetToRead); RequestSourcePartitionStatus(); - } else if (auto* destroyStream = std::get_if<TPersQueueReadEvent::TDestroyPartitionStreamEvent>(&event.GetRef())) { + } else if (auto* destroyStream = std::get_if<TPersQueueReadEvent::TStopPartitionSessionEvent>(&event.GetRef())) { destroyStream->Confirm(); PartitionStream.Reset(); LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " got destroy stream event: " << destroyStream->DebugString()); - } else if (auto* streamClosed = std::get_if<TPersQueueReadEvent::TPartitionStreamClosedEvent>(&event.GetRef())) { + } else if (auto* streamClosed = std::get_if<TPersQueueReadEvent::TPartitionSessionClosedEvent>(&event.GetRef())) { PartitionStream.Reset(); LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " got stream closed event for partition stream id: " - << streamClosed->GetPartitionStream()->GetPartitionStreamId() + << streamClosed->GetPartitionSession()->GetPartitionSessionId() << " reason: " << streamClosed->GetReason()); ProcessError(ctx, TStringBuilder() << " read session stream closed event"); ScheduleConsumerCreation(ctx); return; - } else if (auto* streamStatus = std::get_if<TPersQueueReadEvent::TPartitionStreamStatusEvent >(&event.GetRef())) { + } else if (auto* streamStatus = std::get_if<TPersQueueReadEvent::TPartitionSessionStatusEvent >(&event.GetRef())) { if (PartitionStream - && PartitionStream->GetPartitionStreamId() == streamStatus->GetPartitionStream()->GetPartitionStreamId() + && PartitionStream->GetPartitionSessionId() == streamStatus->GetPartitionSession()->GetPartitionSessionId() ) { - StreamStatus = MakeHolder<TPersQueueReadEvent::TPartitionStreamStatusEvent>(*streamStatus); + StreamStatus = MakeHolder<TPersQueueReadEvent::TPartitionSessionStatusEvent>(*streamStatus); ctx.Schedule(TDuration::Seconds(1), new TEvPQ::TEvRequestPartitionStatus); TryUpdateWriteTimetsamp(ctx); } - } else if (auto* commitAck = std::get_if<TPersQueueReadEvent::TCommitAcknowledgementEvent>(&event.GetRef())) { + } else if (auto* commitAck = std::get_if<TPersQueueReadEvent::TCommitOffsetAcknowledgementEvent>(&event.GetRef())) { LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " got commit responce, commited offset: " << commitAck->GetCommittedOffset()); - } else if (auto* closeSessionEvent = std::get_if<NYdb::NPersQueue::TSessionClosedEvent>(&event.GetRef())) { + } else if (auto* closeSessionEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&event.GetRef())) { ProcessError(ctx, TStringBuilder() << " read session closed: " << closeSessionEvent->DebugString()); ScheduleConsumerCreation(ctx); return; diff --git a/ydb/core/persqueue/mirrorer.h b/ydb/core/persqueue/mirrorer.h index 8c247147fa7..277e3f53f0b 100644 --- a/ydb/core/persqueue/mirrorer.h +++ b/ydb/core/persqueue/mirrorer.h @@ -11,6 +11,7 @@ #include <ydb/public/lib/base/msgbus.h> #include <ydb/core/persqueue/events/internal.h> #include <ydb/library/persqueue/counter_time_keeper/counter_time_keeper.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> @@ -96,7 +97,7 @@ private: bool AddToWriteRequest( NKikimrClient::TPersQueuePartitionRequest& request, - NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message, + NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message, bool& incorrectRequest ); void ProcessError(const TActorContext& ctx, const TString& msg); @@ -145,7 +146,7 @@ public: void RequestSourcePartitionStatus(); void TryUpdateWriteTimetsamp(const TActorContext &ctx); void AddMessagesToQueue( - TVector<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>&& messages + TVector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>&& messages ); void StartWaitNextReaderEvent(const TActorContext& ctx); @@ -159,17 +160,17 @@ private: ui64 OffsetToRead; NKikimrPQ::TMirrorPartitionConfig Config; - TDeque<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage> Queue; - TDeque<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage> WriteInFlight; + TDeque<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage> Queue; + TDeque<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage> WriteInFlight; ui64 BytesInFlight = 0; std::optional<NKikimrClient::TPersQueuePartitionRequest> WriteRequestInFlight; TDuration WriteRetryTimeout = WRITE_RETRY_TIMEOUT_START; TInstant WriteRequestTimestamp; NYdb::TCredentialsProviderFactoryPtr CredentialsProvider; - std::shared_ptr<NYdb::NPersQueue::IReadSession> ReadSession; + std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession; ui64 ReaderGeneration = 0; - NYdb::NPersQueue::TPartitionStream::TPtr PartitionStream; - THolder<NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamStatusEvent> StreamStatus; + NYdb::NTopic::TPartitionSession::TPtr PartitionStream; + THolder<NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent> StreamStatus; TInstant LastInitStageTimestamp; TDuration ConsumerInitTimeout = CONSUMER_INIT_TIMEOUT_START; diff --git a/ydb/core/persqueue/ut/mirrorer_ut.cpp b/ydb/core/persqueue/ut/mirrorer_ut.cpp index c802606ccfe..4da514cbe7e 100644 --- a/ydb/core/persqueue/ut/mirrorer_ut.cpp +++ b/ydb/core/persqueue/ut/mirrorer_ut.cpp @@ -143,7 +143,7 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) { const auto& dstMeta = dstMessages[i].GetMeta(0)->Fields; const auto& srcMeta = srcMessages[i].GetMeta(0)->Fields; - UNIT_ASSERT_EQUAL(dstMeta.size(), srcMeta.size()); + UNIT_ASSERT_VALUES_EQUAL(dstMeta.size(), srcMeta.size()); for (auto& item : srcMeta) { UNIT_ASSERT(dstMeta.count(item.first)); UNIT_ASSERT_EQUAL(dstMeta.at(item.first), item.second); diff --git a/ydb/core/persqueue/write_meta.cpp b/ydb/core/persqueue/write_meta.cpp index 58bed2badfa..e82eb3ab25a 100644 --- a/ydb/core/persqueue/write_meta.cpp +++ b/ydb/core/persqueue/write_meta.cpp @@ -41,6 +41,27 @@ TString GetSerializedData(const NYdb::NPersQueue::TReadSessionEvent::TDataReceiv return str; } +TString GetSerializedData(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message) { + NKikimrPQClient::TDataChunk proto; + for (const auto& item : message.GetMeta()->Fields) { + if (item.first == "_ip") { + proto.SetIp(item.second); + } else { + SetMetaField(proto, item.first, item.second); + } + } + proto.SetSeqNo(message.GetSeqNo()); + proto.SetCreateTime(message.GetCreateTime().MilliSeconds()); + auto codec = NPQ::FromTopicCodec(message.GetCodec()); + proto.SetCodec(codec); + proto.SetData(message.GetData()); + + TString str; + bool res = proto.SerializeToString(&str); + Y_VERIFY(res); + return str; +} + NKikimrPQClient::TDataChunk GetDeserializedData(const TString& string) { NKikimrPQClient::TDataChunk proto; bool res = proto.ParseFromString(string); diff --git a/ydb/core/persqueue/write_meta.h b/ydb/core/persqueue/write_meta.h index 0fba0866330..7e2b4961566 100644 --- a/ydb/core/persqueue/write_meta.h +++ b/ydb/core/persqueue/write_meta.h @@ -2,6 +2,7 @@ #include <ydb/core/protos/grpc_pq_old.pb.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <util/string/vector.h> @@ -66,6 +67,7 @@ TString GetSerializedData(const NKikimrPQClient::TDataChunk& init, TArgs&...args } TString GetSerializedData(const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message); +TString GetSerializedData(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message); NKikimrPQClient::TDataChunk GetDeserializedData(const TString& string); |
