diff options
author | komels <[email protected]> | 2023-06-14 11:00:52 +0300 |
---|---|---|
committer | komels <[email protected]> | 2023-06-14 11:00:52 +0300 |
commit | e4aa2c56ed1958de4ed580a1b9b772559cde1ef9 (patch) | |
tree | ffa34c0c1099bbea8f06c09427ce7134da96c555 | |
parent | f6e723596f2e176356bbd53c0a87ee9168e14f22 (diff) |
No dedup mode in YDB Topic & message level metadata(server-side)
A verison of PR 3886257 without changes in SDK WriteSession api.
18 files changed, 488 insertions, 126 deletions
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 2c847ecdcbb..9d73e0ba85c 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -572,7 +572,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c } for (const auto& msg: ev->Get()->Msgs) { //this is checked in pq_impl when forming EvWrite request - Y_VERIFY(!msg.SourceId.empty() || ev->Get()->IsDirectWrite); + Y_VERIFY(!msg.SourceId.empty() || ev->Get()->IsDirectWrite || msg.DisableDeduplication); Y_VERIFY(!msg.Data.empty()); if (msg.SeqNo > (ui64)Max<i64>()) { diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 47ec65fe768..a556d295e87 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1754,7 +1754,7 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p errorStr = "no SeqNo"; } else if (!cmd.HasData() || cmd.GetData().empty()){ errorStr = "empty Data"; - } else if ((!cmd.HasSourceId() || cmd.GetSourceId().empty()) && !req.GetIsDirectWrite()) { + } else if ((!cmd.HasSourceId() || cmd.GetSourceId().empty()) && !req.GetIsDirectWrite() && !cmd.GetDisableDeduplication()) { errorStr = "empty SourceId"; } else if (cmd.GetPartitionKey().size() > 256) { errorStr = "too long partition key"; diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index b0298fc4c38..df0f0925ee2 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -10,6 +10,7 @@ #include <ydb/public/lib/base/msgbus_status.h> #include <util/generic/deque.h> +#include <util/generic/guid.h> #include <util/generic/map.h> #include <util/string/builder.h> @@ -179,7 +180,11 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { auto ev = MakeRequest(PartitionId, PipeClient); auto& cmd = *ev->Record.MutablePartitionRequest()->MutableCmdGetOwnership(); - cmd.SetOwner(SourceId); + if (Opts.UseDeduplication) { + cmd.SetOwner(SourceId); + } else { + cmd.SetOwner(CreateGuidAsString()); + } cmd.SetForce(true); NTabletPipe::SendData(SelfId(), PipeClient, ev.Release()); @@ -382,6 +387,9 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { auto& request = *ev->Record.MutablePartitionRequest(); request.SetMessageNo(MessageNo++); + if (!Opts.UseDeduplication) { + request.SetPartition(PartitionId); + } NTabletPipe::SendData(SelfId(), PipeClient, ev.Release()); PendingWrite.emplace_back(cookie); @@ -479,7 +487,6 @@ public: }; PipeClient = RegisterWithSameMailbox(NTabletPipe::CreateClient(SelfId(), TabletId, config)); - GetOwnership(); } diff --git a/ydb/core/persqueue/writer/writer.h b/ydb/core/persqueue/writer/writer.h index 950758f3022..919cd3f22f3 100644 --- a/ydb/core/persqueue/writer/writer.h +++ b/ydb/core/persqueue/writer/writer.h @@ -113,9 +113,11 @@ struct TEvPartitionWriter { struct TPartitionWriterOpts { bool CheckState = false; bool AutoRegister = false; + bool UseDeduplication = true; TPartitionWriterOpts& WithCheckState(bool value) { CheckState = value; return *this; } TPartitionWriterOpts& WithAutoRegister(bool value) { AutoRegister = value; return *this; } + TPartitionWriterOpts& WithDeduplication(bool value) { UseDeduplication = value; return *this; } }; IActor* CreatePartitionWriter(const TActorId& client, ui64 tabletId, ui32 partitionId, const TString& sourceId, diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index e9c4a5efa36..fcd260195f0 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -811,6 +811,7 @@ message TFeatureFlags { optional bool ForceColumnTablesCompositeMarks = 97 [default = false]; optional bool EnableSubscriptionsInDiscovery = 98 [default = false]; optional bool EnableGetNodeLabels = 99 [default = false]; + optional bool EnableTopicMessageMeta = 100 [default = false]; } message THttpProxyConfig { diff --git a/ydb/core/protos/grpc_pq_old.proto b/ydb/core/protos/grpc_pq_old.proto index ed273e14041..3f2d32bba75 100755 --- a/ydb/core/protos/grpc_pq_old.proto +++ b/ydb/core/protos/grpc_pq_old.proto @@ -37,4 +37,6 @@ message TDataChunk { optional TMapType ExtraFields = 126; optional bytes Data = 127; // ~ 64K + + map<string, string> MessageMeta = 128; } diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index 3ebab230c35..0cea5ae5277 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -151,6 +151,8 @@ message StreamWriteMessage { // Explicit partition id to write to. int64 partition_id = 6; } + // Message metadata. Overall size is limited to 4096 symbols (all keys and values combined). + map<string, string> message_meta = 7; } } @@ -358,6 +360,8 @@ message StreamReadMessage { // Filled if message_group_id was set on message write. string message_group_id = 7; + map<string, string> message_meta = 8; + } // Representation of sequence of client messages from one write session. diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index c4acb6fcab6..8ca0811ff89 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -265,6 +265,13 @@ public: return BatchesMeta[batchIndex]; } + template <bool V = UseMigrationProtocol, class = std::enable_if_t<!V>> + typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr GetMessageMeta(size_t batchIndex, size_t messageIndex) const { + Y_ASSERT(batchIndex < MessagesMeta.size()); + Y_ASSERT(messageIndex < MessagesMeta[batchIndex].size()); + return MessagesMeta[batchIndex][messageIndex]; + } + bool HasMoreData() const { return CurrentReadingMessage.first < static_cast<size_t>(GetServerMessage().batches_size()); } @@ -322,7 +329,9 @@ private: private: TPartitionData<UseMigrationProtocol> ServerMessage; - std::vector<typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr> BatchesMeta; + using TMetadataPtrVector = std::vector<typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr>; + TMetadataPtrVector BatchesMeta; + std::vector<TMetadataPtrVector> MessagesMeta; std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session; bool DoDecompress; i64 ServerBytesSize = 0; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index 53c0561f60c..3848d313b5d 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -2192,6 +2192,9 @@ TDataDecompressionInfo<UseMigrationProtocol>::~TDataDecompressionInfo() template<bool UseMigrationProtocol> void TDataDecompressionInfo<UseMigrationProtocol>::BuildBatchesMeta() { BatchesMeta.reserve(ServerMessage.batches_size()); + if constexpr (!UseMigrationProtocol) { + MessagesMeta.reserve(ServerMessage.batches_size()); + } for (const auto& batch : ServerMessage.batches()) { // Extra fields. typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr meta = MakeIntrusive<TAWriteSessionMeta<UseMigrationProtocol>>(); @@ -2206,6 +2209,16 @@ void TDataDecompressionInfo<UseMigrationProtocol>::BuildBatchesMeta() { for (const auto& [key, value] : batch.write_session_meta()) { meta->Fields.emplace(key, value); } + MessagesMeta.emplace_back(TMetadataPtrVector{}); + auto &currBatchMessagesMeta = MessagesMeta.back(); + for (const auto &messageData: batch.message_data()) { + typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr msgMeta = MakeIntrusive<TAWriteSessionMeta<UseMigrationProtocol>>(); + msgMeta->Fields.reserve(messageData.message_meta_size()); + for (const auto &[key, value]: messageData.message_meta()) { + msgMeta->Fields.emplace(key, value); + } + currBatchMessagesMeta.emplace_back(std::move(msgMeta)); + } } BatchesMeta.emplace_back(std::move(meta)); @@ -2336,7 +2349,6 @@ void TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPart i64 maxOffset = 0; auto& batch = *msg.mutable_batches(Batch); const auto& meta = Parent->GetBatchMeta(Batch); - const TInstant batchWriteTimestamp = [&batch](){ if constexpr (UseMigrationProtocol) { return TInstant::MilliSeconds(batch.write_timestamp_ms()); @@ -2375,14 +2387,18 @@ void TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPart messageData.explicit_hash()); } } else { - NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo(messageData.offset(), - batch.producer_id(), - messageData.seq_no(), - TInstant::MilliSeconds(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(messageData.created_at())), - batchWriteTimestamp, - meta, - messageData.uncompressed_size(), - messageData.message_group_id()); + const auto &messageMeta = Parent->GetMessageMeta(Batch, Message); + NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo( + messageData.offset(), + batch.producer_id(), + messageData.seq_no(), + TInstant::MilliSeconds(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(messageData.created_at())), + batchWriteTimestamp, + meta, + messageMeta, + messageData.uncompressed_size(), + messageData.message_group_id() + ); if (Parent->GetDoDecompress()) { messages.emplace_back(messageData.data(), diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp index adb43a2c0d8..fbe8e1a75a6 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp @@ -25,6 +25,7 @@ TReadSessionEvent::TDataReceivedEvent::TMessageInformation::TMessageInformation( TInstant createTime, TInstant writeTime, TWriteSessionMeta::TPtr meta, + TWriteSessionMeta::TPtr messageMeta, ui64 uncompressedSize, TString messageGroupId ) @@ -34,6 +35,7 @@ TReadSessionEvent::TDataReceivedEvent::TMessageInformation::TMessageInformation( , CreateTime(createTime) , WriteTime(writeTime) , Meta(meta) + , MessageMeta(messageMeta) , UncompressedSize(uncompressedSize) , MessageGroupId(messageGroupId) {} @@ -161,6 +163,10 @@ const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TMessage:: return Information.Meta; } +const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TMessage::GetMessageMeta() const { + return Information.MessageMeta; +} + void TReadSessionEvent::TDataReceivedEvent::TMessage::Commit() { static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get()) ->Commit(Information.Offset, Information.Offset + 1); @@ -217,6 +223,10 @@ const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TCompresse return Information.Meta; } +const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetMessageMeta() const { + return Information.MessageMeta; +} + ui64 TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetUncompressedSize() const { return Information.UncompressedSize; } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp index 8b6e898c36b..58393858cf3 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -10,6 +10,7 @@ #include <util/generic/store_policy.h> #include <util/generic/utility.h> #include <util/stream/buffer.h> +#include <util/generic/guid.h> namespace NYdb::NTopic { @@ -125,8 +126,24 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat } return result; } +TString GenerateProducerId() { + return CreateGuidAsString(); +} void TWriteSessionImpl::InitWriter() { // No Lock, very initial start - no race yet as well. + if (!Settings.DeduplicationEnabled_.Defined()) { + Settings.DeduplicationEnabled_ = !(Settings.ProducerId_.empty()); + } + else if (Settings.DeduplicationEnabled_.GetRef()) { + if (Settings.ProducerId_.empty()) { + Settings.ProducerId(GenerateProducerId()); + } + } else { + if (!Settings.ProducerId_.empty()) { + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "ProducerId is not empty when deduplication is switched off"); + ThrowFatalError("Cannot disable deduplication when non-empty ProducerId is provided"); + } + } CompressionExecutor = Settings.CompressionExecutor_; IExecutor::TPtr executor; executor = CreateSyncExecutor(); @@ -139,6 +156,10 @@ void TWriteSessionImpl::InitWriter() { // No Lock, very initial start - no race } // Client method NThreading::TFuture<ui64> TWriteSessionImpl::GetInitSeqNo() { + if (!Settings.DeduplicationEnabled_.GetOrElse(true)) { + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "GetInitSeqNo called with deduplication disabled"); + ThrowFatalError("Cannot call GetInitSeqNo when deduplication is disabled"); + } if (Settings.ValidateSeqNo_) { if (AutoSeqNoMode.Defined() && *AutoSeqNoMode) { LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode"); @@ -177,6 +198,10 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) { } } if (seqNo.Defined()) { + if (!Settings.DeduplicationEnabled_.GetOrElse(true)) { + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "SeqNo is provided on write when deduplication is disabled"); + ThrowFatalError("Cannot provide SeqNo on Write() when deduplication is disabled"); + } if (*AutoSeqNoMode) { LOG_LAZY(DbDriverState->Log, TLOG_ERR, @@ -226,7 +251,8 @@ void TWriteSessionImpl::WriteInternal( bool readyToAccept = false; size_t bufferSize = data.size(); with_lock(Lock) { - CurrentBatch.Add(GetNextSeqNoImpl(seqNo), createdAtValue, data, codec, originalSize); + //ToDo[message-meta] - Pass message meta here. + CurrentBatch.Add(GetNextSeqNoImpl(seqNo), createdAtValue, data, codec, originalSize, {}); FlushWriteIfRequiredImpl(); readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk; @@ -591,9 +617,13 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess ui64 newLastSeqNo = initResponse.last_seq_no(); // SeqNo increased, so there's a risk of loss, apply SeqNo shift. // MinUnsentSeqNo must be > 0 if anything was ever sent yet - if (MinUnsentSeqNo && OnSeqNoShift && newLastSeqNo > MinUnsentSeqNo) { - SeqNoShift = newLastSeqNo - MinUnsentSeqNo; - } + if (Settings.DeduplicationEnabled_.GetOrElse(true)) { + if(MinUnsentSeqNo && OnSeqNoShift && newLastSeqNo > MinUnsentSeqNo) { + SeqNoShift = newLastSeqNo - MinUnsentSeqNo; + } + } else { + newLastSeqNo = 1; + } result.InitSeqNo = newLastSeqNo; LastSeqNo = newLastSeqNo; @@ -878,29 +908,35 @@ size_t TWriteSessionImpl::WriteBatchImpl() { for (size_t i = 0; i != CurrentBatch.Messages.size();) { TBlock block{}; for (; block.OriginalSize < MaxBlockSize && i != CurrentBatch.Messages.size(); ++i) { - auto sequenceNumber = CurrentBatch.Messages[i].SeqNo; - auto createTs = CurrentBatch.Messages[i].CreatedAt; + auto& currMessage = CurrentBatch.Messages[i]; + auto sequenceNumber = currMessage.SeqNo; + auto createTs = currMessage.CreatedAt; if (!block.MessageCount) { block.Offset = sequenceNumber; } block.MessageCount += 1; - const auto& datum = CurrentBatch.Messages[i].DataRef; + const auto& datum = currMessage.DataRef; block.OriginalSize += datum.size(); block.OriginalMemoryUsage = CurrentBatch.Data.size(); block.OriginalDataRefs.emplace_back(datum); if (CurrentBatch.Messages[i].Codec.Defined()) { Y_VERIFY(CurrentBatch.Messages.size() == 1); - block.CodecID = static_cast<ui32>(*CurrentBatch.Messages[i].Codec); - block.OriginalSize = CurrentBatch.Messages[i].OriginalSize; + block.CodecID = static_cast<ui32>(*currMessage.Codec); + block.OriginalSize = currMessage.OriginalSize; block.Compressed = false; } size += datum.size(); UpdateTimedCountersImpl(); (*Counters->BytesInflightUncompressed) += datum.size(); (*Counters->MessagesInflight)++; - OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size()); + if (!currMessage.MessageMeta.empty()) { + OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size(), + std::move(currMessage.MessageMeta)); + } else { + OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size()); + } } block.Data = std::move(CurrentBatch.Data); if (skipCompression) { @@ -984,6 +1020,12 @@ void TWriteSessionImpl::SendImpl() { msgData->set_seq_no(message.SeqNo + SeqNoShift); *msgData->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(message.CreatedAt.MilliSeconds()); + if (!message.MessageMeta.empty()) { + auto* meta = msgData->mutable_message_meta(); + for (auto& [k, v] : message.MessageMeta) { + (*meta)[k] = v; + } + } SentOriginalMessages.emplace(std::move(message)); OriginalMessagesToSend.pop(); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h index 9399fb22135..2f739d4064d 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h @@ -173,12 +173,15 @@ private: TStringBuf DataRef; TMaybe<ECodec> Codec; ui32 OriginalSize; // only for coded messages - TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {}, ui32 originalSize = 0) + std::unordered_map<TString, TString> MessageMeta; + TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {}, + ui32 originalSize = 0, const std::unordered_map<TString, TString>& messageMeta = {}) : SeqNo(seqNo) , CreatedAt(createdAt) , DataRef(data) , Codec(codec) , OriginalSize(originalSize) + , MessageMeta(messageMeta) {} }; @@ -189,11 +192,12 @@ private: TInstant StartedAt = TInstant::Zero(); bool Acquired = false; bool FlushRequested = false; - void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize) { + void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, + const std::unordered_map<TString, TString>& messageMeta) { if (StartedAt == TInstant::Zero()) StartedAt = TInstant::Now(); CurrentSize += codec ? originalSize : data.size(); - Messages.emplace_back(seqNo, createdAt, data, codec, originalSize); + Messages.emplace_back(seqNo, createdAt, data, codec, originalSize, messageMeta); Acquired = false; } @@ -262,10 +266,18 @@ private: ui64 SeqNo; TInstant CreatedAt; size_t Size; + std::unordered_map<TString, TString> MessageMeta; TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size) - : SeqNo(sequenceNumber) - , CreatedAt(createdAt) - , Size(size) + : SeqNo(sequenceNumber) + , CreatedAt(createdAt) + , Size(size) + {} + TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size, + std::unordered_map<TString, TString>&& messageMeta) + : SeqNo(sequenceNumber) + , CreatedAt(createdAt) + , Size(size) + , MessageMeta(std::move(messageMeta)) {} }; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index 40ba536e48b..751ca9d7cff 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -686,6 +686,7 @@ struct TReadSessionEvent { TInstant createTime, TInstant writeTime, TWriteSessionMeta::TPtr meta, + TWriteSessionMeta::TPtr messageMeta, ui64 uncompressedSize, TString messageGroupId); ui64 Offset; @@ -694,6 +695,7 @@ struct TReadSessionEvent { TInstant CreateTime; TInstant WriteTime; TWriteSessionMeta::TPtr Meta; + TWriteSessionMeta::TPtr MessageMeta; ui64 UncompressedSize; TString MessageGroupId; }; @@ -752,6 +754,9 @@ struct TReadSessionEvent { //! Metainfo. const TWriteSessionMeta::TPtr& GetMeta() const; + //! Message level meta info. + const TWriteSessionMeta::TPtr& GetMessageMeta() const; + //! Commits single message. void Commit() override; @@ -794,6 +799,9 @@ struct TReadSessionEvent { //! Metainfo. const TWriteSessionMeta::TPtr& GetMeta() const; + //! Message level meta info. + const TWriteSessionMeta::TPtr& GetMessageMeta() const; + //! Uncompressed size. ui64 GetUncompressedSize() const; @@ -1212,6 +1220,11 @@ struct TWriteSessionSettings : public TRequestSettings<TWriteSessionSettings> { //! MessageGroupId to use. FLUENT_SETTING(TString, MessageGroupId); + //! Explicitly enables or disables deduplication for this write session. + //! If ProducerId option is defined deduplication will always be enabled. + //! If ProducerId option is empty, but deduplication is enable, a random ProducerId is generated. + FLUENT_SETTING_OPTIONAL(bool, DeduplicationEnabled); + //! Write to an exact partition. Generally server assigns partition automatically by message_group_id. //! Using this option is not recommended unless you know for sure why you need it. FLUENT_SETTING_OPTIONAL(ui32, PartitionId); diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp index 997ce3b4023..94a9b1ba3b4 100644 --- a/ydb/services/persqueue_v1/actors/partition_actor.cpp +++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp @@ -387,7 +387,6 @@ bool FillBatchedData( SetBatchExtraField(currentBatch, "logtype", header.GetLogType()); } } - if (proto.HasExtraFields()) { const auto& map = proto.GetExtraFields(); for (const auto& kv : map.GetItems()) { @@ -424,6 +423,8 @@ bool FillBatchedData( ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(r.GetCreateTimestampMS()); message->set_message_group_id(GetBatchSourceId(currentBatch)); + auto* msgMeta = message->mutable_message_meta(); + *msgMeta = (proto.GetMessageMeta()); } hasData = true; } diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h index c52a40da557..ba87c01c89c 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.h +++ b/ydb/services/persqueue_v1/actors/write_session_actor.h @@ -116,6 +116,7 @@ private: HFunc(NPQ::TEvPartitionWriter::TEvInitResult, Handle); HFunc(NPQ::TEvPartitionWriter::TEvWriteAccepted, Handle); HFunc(NPQ::TEvPartitionWriter::TEvWriteResponse, Handle); + HFunc(NPQ::TEvPartitionWriter::TEvDisconnected, Handle); HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); HFunc(TEvTabletPipe::TEvClientConnected, Handle); @@ -164,8 +165,10 @@ private: void StartSession(const NActors::TActorContext& ctx); void SendSelectPartitionRequest(const TString& topic, const NActors::TActorContext& ctx); + void UpdateOrProceedPartition(const NActors::TActorContext& ctx); void UpdatePartition(const NActors::TActorContext& ctx); void RequestNextPartition(const NActors::TActorContext& ctx); + void GetOrProcessPartition(const NActors::TActorContext& ctx); void ProceedPartition(const ui32 partition, const NActors::TActorContext& ctx); @@ -175,7 +178,10 @@ private: //void InitCheckACL(const TActorContext& ctx); void Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev, const TActorContext& ctx); + void MakeAndSentInitResponse(const TMaybe<ui64>& maxSeqNo, const TActorContext& ctx); + void Handle(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev, const TActorContext& ctx); + void ProcessWriteResponse(const NKikimrClient::TPersQueuePartitionResponse& response, const TActorContext& ctx); void Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev, const TActorContext& ctx); void Handle(NPQ::TEvPartitionWriter::TEvDisconnected::TPtr& ev, const TActorContext& ctx); void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx); @@ -225,6 +231,7 @@ private: bool PartitionFound = false; // 'SourceId' is called 'MessageGroupId' since gRPC data plane API v1 TString SourceId; // TODO: Replace with 'MessageGroupId' everywhere + bool UseDeduplication = true; NPQ::NSourceIdEncoding::TEncodedSourceId EncodedSourceId; TString OwnerCookie; @@ -281,7 +288,9 @@ private: TInstant LogSessionDeadline; ui64 BalancerTabletId; + ui64 PartitionTabletId; TActorId PipeToBalancer; + TActorId PipeToPartition; // PQ tablet configuration that we get at the time of session initialization NKikimrPQ::TPQTabletConfig InitialPQTabletConfig; @@ -310,6 +319,8 @@ private: TInitRequest InitRequest; NPQ::ESourceIdTableGeneration SrcIdTableGeneration; + TDeque<ui64> SeqNoInflight; + }; } diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index 72f1f67c7d7..3054d7cb3b2 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -36,6 +36,8 @@ using namespace NPQ; template <bool UseMigrationProtocol> using ECodec = std::conditional_t<UseMigrationProtocol, Ydb::PersQueue::V1::Codec, i32>; +static constexpr ui64 MAX_METADATA_SIZE_PER_MESSAGE = 4096; + template <bool UseMigrationProtocol> ECodec<UseMigrationProtocol> CodecByName(const TString& codec) { THashMap<TString, ECodec<UseMigrationProtocol>> codecsByName; @@ -147,6 +149,8 @@ inline void FillChunkDataFromReq( proto.SetCodec(writeRequest.codec() - 1); } proto.SetData(msg.data()); + auto* msgMeta = proto.MutableMessageMeta(); + *msgMeta = msg.message_meta(); } namespace NGRpcProxy { @@ -410,9 +414,11 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt // For now exactly two scenarios supported: // 1. Non-empty producer_id == message_group_id // 2. Non-empty producer_id && non-empty valid partition_id (explicit partitioning) + // 3. Empty producer id (no deduplication). bool isScenarioSupported = (!InitRequest.producer_id().empty() && InitRequest.has_message_group_id() && InitRequest.message_group_id() == InitRequest.producer_id()) - || (!InitRequest.producer_id().empty() && InitRequest.has_partition_id()); + || (!InitRequest.producer_id().empty() && InitRequest.has_partition_id() + || InitRequest.producer_id().empty()); if (!isScenarioSupported) { CloseSession("unsupported producer_id / message_group_id / partition_id settings in init request", @@ -436,11 +442,17 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt if constexpr (UseMigrationProtocol) { return InitRequest.message_group_id(); } else { + if (InitRequest.producer_id().empty()) { + UseDeduplication = false; + } return InitRequest.has_message_group_id() ? InitRequest.message_group_id() : InitRequest.producer_id(); } }(); LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << " " << InitRequest << " from " << PeerName); + if (!UseDeduplication) { + LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << ". Disable deduplication for empty producer id"); + } //TODO: get user agent from headers UserAgent = "pqv1 server"; LogSession(ctx); @@ -473,11 +485,18 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::InitAfterDiscovery(const TActorContext& ctx) { - try { - EncodedSourceId = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcIdHash(), SourceId, SrcIdTableGeneration); - } catch (yexception& e) { - CloseSession(TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(), PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + if (!SourceId.empty()) { + try { + EncodedSourceId = NSourceIdEncoding::EncodeSrcId( + FullConverter->GetTopicForSrcIdHash(), SourceId, SrcIdTableGeneration + ); + } catch (yexception &e) { + CloseSession(TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(), + PersQueue::ErrorCode::BAD_REQUEST, ctx); + return; + } + } else { + Y_VERIFY(!UseDeduplication); } InitMeta = GetInitialDataChunk(InitRequest, FullConverter->GetClientsideName(), PeerName); // ToDo[migration] - check? @@ -645,6 +664,9 @@ ui32 TWriteSessionActor<UseMigrationProtocol>::CalculateFirstClassPartition(cons template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::DiscoverPartition(const NActors::TActorContext& ctx) { const auto &pqConfig = AppData(ctx)->PQConfig; + if (!UseDeduplication) { + return GetOrProcessPartition(ctx); + } if (pqConfig.GetTopicsAreFirstClassCitizen()) { if (pqConfig.GetUseSrcIdMetaMappingInFirstClass()) { return SendCreateManagerRequest(ctx); @@ -724,6 +746,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvCreateSes template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::SendSelectPartitionRequest(const TString& topic, const NActors::TActorContext& ctx) { + Y_VERIFY(UseDeduplication); //read from DS auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); @@ -749,8 +772,18 @@ void TWriteSessionActor<UseMigrationProtocol>::SendSelectPartitionRequest(const } template<bool UseMigrationProtocol> +void TWriteSessionActor<UseMigrationProtocol>::UpdateOrProceedPartition(const TActorContext& ctx) { + if (UseDeduplication) { + UpdatePartition(ctx); + } else { + ProceedPartition(Partition, ctx); + } +} + +template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::UpdatePartition(const TActorContext& ctx) { Y_VERIFY(State == ES_WAIT_TABLE_REQUEST_1 || State == ES_WAIT_NEXT_PARTITION); + Y_VERIFY(UseDeduplication); SendUpdateSrcIdsRequests(ctx); State = ES_WAIT_TABLE_REQUEST_2; } @@ -779,13 +812,41 @@ template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const TActorContext& ctx) { Y_VERIFY(State == ES_WAIT_NEXT_PARTITION); Partition = ev->Get()->Record.GetPartitionId(); - UpdatePartition(ctx); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie + << ". Got next partition from server: " << Partition); + UpdateOrProceedPartition(ctx); +} + +template<bool UseMigrationProtocol> +void TWriteSessionActor<UseMigrationProtocol>::GetOrProcessPartition(const TActorContext& ctx) { + if (!UseDeduplication) { + Y_VERIFY(!PartitionFound); + } + if (!PartitionFound) { + const auto& pqConfig = AppData(ctx)->PQConfig; + auto partition = GetPartitionFromConfigOptions( + PreferedPartition, EncodedSourceId, PartitionToTablet.size(), + pqConfig.GetTopicsAreFirstClassCitizen(), + pqConfig.GetRoundRobinPartitionMapping() || SourceId.empty() + //Empty SrcId = No-dedup mode and no msg-group id provided, so always use RoundRobin; + ); + if (partition.Defined()) { + PartitionFound = true; + Partition = *partition; + } + } + if (PartitionFound) { + UpdateOrProceedPartition(ctx); + } else { + State = ES_WAIT_TABLE_REQUEST_1; + RequestNextPartition(ctx); + } + } template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const TActorContext &ctx) { auto& record = ev->Get()->Record.GetRef(); - const auto& pqConfig = AppData(ctx)->PQConfig; if (record.GetYdbStatus() == Ydb::StatusIds::ABORTED) { LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId " << SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " discover partition race, retrying"); @@ -844,20 +905,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvQueryResp << SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " partition " << Partition << " partitions " << PartitionToTablet.size() << "(" << EncodedSourceId.Hash % PartitionToTablet.size() << ") create " << SourceIdCreateTime << " result " << t); - if (!PartitionFound) { - auto partition = GetPartitionFromConfigOptions(PreferedPartition, EncodedSourceId, PartitionToTablet.size(), - pqConfig.GetTopicsAreFirstClassCitizen(), - pqConfig.GetRoundRobinPartitionMapping()); - if (partition.Defined()) { - PartitionFound = true; - Partition = *partition; - } - } - if (PartitionFound) { - UpdatePartition(ctx); - } else { - RequestNextPartition(ctx); - } + GetOrProcessPartition(ctx); return; } else if (State == EState::ES_WAIT_TABLE_REQUEST_2) { @@ -939,9 +987,11 @@ void TWriteSessionActor<UseMigrationProtocol>::ProceedPartition(const ui32 parti Partition = partition; auto it = PartitionToTablet.find(Partition); - ui64 tabletId = it != PartitionToTablet.end() ? it->second : 0; + ui64 PartitionTabletId = it != PartitionToTablet.end() ? it->second : 0; + LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session cookie: " << Cookie << " sessionId: " << OwnerCookie + << " partition: " << Partition); - if (!tabletId) { + if (!PartitionTabletId) { CloseSession( Sprintf("no partition %u in topic '%s', Marker# PQ4", Partition, DiscoveryConverter->GetPrintableString().c_str()), @@ -950,7 +1000,10 @@ void TWriteSessionActor<UseMigrationProtocol>::ProceedPartition(const ui32 parti return; } - Writer = ctx.RegisterWithSameMailbox(NPQ::CreatePartitionWriter(ctx.SelfID, tabletId, Partition, SourceId)); + Writer = ctx.RegisterWithSameMailbox(NPQ::CreatePartitionWriter( + ctx.SelfID, PartitionTabletId, Partition, SourceId, + TPartitionWriterOpts().WithDeduplication(UseDeduplication) + )); State = ES_WAIT_WRITER_INIT; ui32 border = AppData(ctx)->PQConfig.GetWriteInitLatencyBigMs(); @@ -1000,31 +1053,20 @@ void TWriteSessionActor<UseMigrationProtocol>::CloseSession(const TString& error } template<bool UseMigrationProtocol> -void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev, const TActorContext& ctx) { - if (State != ES_WAIT_WRITER_INIT) { - return CloseSession("got init result but not wait for it", PersQueue::ErrorCode::ERROR, ctx); - } - - const auto& result = *ev->Get(); - if (!result.IsSuccess()) { - const auto& error = result.GetError(); - if (error.Response.HasErrorCode()) { - return CloseSession("status is not ok: " + error.Response.GetErrorReason(), ConvertOldCode(error.Response.GetErrorCode()), ctx); - } else { - return CloseSession("error at writer init: " + error.Reason, PersQueue::ErrorCode::ERROR, ctx); - } - } - - OwnerCookie = result.GetResult().OwnerCookie; - const auto& maxSeqNo = result.GetResult().SourceIdInfo.GetSeqNo(); - +void TWriteSessionActor<UseMigrationProtocol>::MakeAndSentInitResponse( + const TMaybe<ui64>& maxSeqNo, const TActorContext& ctx +) { TServerMessage response; response.set_status(Ydb::StatusIds::SUCCESS); auto init = response.mutable_init_response(); - if constexpr (UseMigrationProtocol) { + if (!OwnerCookie.empty()) { init->set_session_id(EscapeC(OwnerCookie)); - init->set_last_sequence_number(maxSeqNo); + } + if constexpr (UseMigrationProtocol) { + if (maxSeqNo.Defined()) { + init->set_last_sequence_number(*maxSeqNo); + } init->set_partition_id(Partition); init->set_topic(FullConverter->GetFederationPath()); init->set_cluster(FullConverter->GetCluster()); @@ -1035,8 +1077,10 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T } } } else { - init->set_session_id(EscapeC(OwnerCookie)); - init->set_last_seq_no(maxSeqNo); + //init->set_session_id(EscapeC(OwnerCookie)); + if (maxSeqNo.Defined()) { + init->set_last_seq_no(*maxSeqNo); + } init->set_partition_id(Partition); if (InitialPQTabletConfig.HasCodecs()) { for (const auto& codecName : InitialPQTabletConfig.GetCodecs().GetCodecs()) { @@ -1046,7 +1090,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T } LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session inited cookie: " << Cookie << " partition: " << Partition - << " MaxSeqNo: " << maxSeqNo << " sessionId: " << OwnerCookie); + << " MaxSeqNo: " << maxSeqNo << " sessionId: " << OwnerCookie); if (!Request->GetStreamCtx()->Write(std::move(response))) { LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc write failed"); @@ -1068,6 +1112,31 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T } template<bool UseMigrationProtocol> +void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev, const TActorContext& ctx) { + if (State != ES_WAIT_WRITER_INIT) { + return CloseSession("got init result but not wait for it", PersQueue::ErrorCode::ERROR, ctx); + } + + const auto& result = *ev->Get(); + if (!result.IsSuccess()) { + const auto& error = result.GetError(); + if (error.Response.HasErrorCode()) { + return CloseSession("status is not ok: " + error.Response.GetErrorReason(), ConvertOldCode(error.Response.GetErrorCode()), ctx); + } else { + return CloseSession("error at writer init: " + error.Reason, PersQueue::ErrorCode::ERROR, ctx); + } + } + + OwnerCookie = result.GetResult().OwnerCookie; + const auto& maxSeqNo = result.GetResult().SourceIdInfo.GetSeqNo(); + if (!UseDeduplication) { + Y_VERIFY(maxSeqNo == 0); + } + MakeAndSentInitResponse(maxSeqNo, ctx); + +} + +template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev, const TActorContext& ctx) { if (State != ES_INITED) { return CloseSession("got write permission but not wait for it", PersQueue::ErrorCode::ERROR, ctx); @@ -1108,56 +1177,42 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T } template<bool UseMigrationProtocol> -void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev, const TActorContext& ctx) { - if (State != ES_INITED) { - return CloseSession("got write response but not wait for it", PersQueue::ErrorCode::ERROR, ctx); - } - - const auto& result = *ev->Get(); - if (!result.IsSuccess()) { - const auto& record = result.Record; - if (record.HasErrorCode()) { - return CloseSession("status is not ok: " + record.GetErrorReason(), ConvertOldCode(record.GetErrorCode()), ctx); - } else { - return CloseSession("error at write: " + result.GetError().Reason, PersQueue::ErrorCode::ERROR, ctx); - } - } - - const auto& resp = result.Record.GetPartitionResponse(); - - if (AcceptedRequests.empty()) { - CloseSession("got too many replies from server, internal error", PersQueue::ErrorCode::ERROR, ctx); - return; - } +void TWriteSessionActor<UseMigrationProtocol>::ProcessWriteResponse( + const NKikimrClient::TPersQueuePartitionResponse& response, const TActorContext& ctx +) { auto writeRequest = std::move(AcceptedRequests.front()); AcceptedRequests.pop_front(); - if (resp.GetCookie() != writeRequest->Cookie) { - return CloseSession("out of order write response from server, may be previous is lost", PersQueue::ErrorCode::ERROR, ctx); - } - auto addAckMigration = [](const TPersQueuePartitionResponse::TCmdWriteResult& res, StreamingWriteServerMessage::BatchWriteResponse* batchWriteResponse, - StreamingWriteServerMessage::WriteStatistics* stat) { + auto addAckMigration = [this]( + const TPersQueuePartitionResponse::TCmdWriteResult& res, + StreamingWriteServerMessage::BatchWriteResponse* batchWriteResponse, + StreamingWriteServerMessage::WriteStatistics* stat) { + batchWriteResponse->add_sequence_numbers(res.GetSeqNo()); batchWriteResponse->add_offsets(res.GetOffset()); + if (!UseDeduplication) { + Y_VERIFY(!res.GetAlreadyWritten()); + } batchWriteResponse->add_already_written(res.GetAlreadyWritten()); - stat->set_queued_in_partition_duration_ms( - Max((i64)res.GetTotalTimeInPartitionQueueMs(), stat->queued_in_partition_duration_ms())); + Max((i64)res.GetTotalTimeInPartitionQueueMs(), stat->queued_in_partition_duration_ms())); stat->set_throttled_on_partition_duration_ms( - Max((i64)res.GetPartitionQuotedTimeMs(), stat->throttled_on_partition_duration_ms())); + Max((i64)res.GetPartitionQuotedTimeMs(), stat->throttled_on_partition_duration_ms())); stat->set_throttled_on_topic_duration_ms(Max(static_cast<i64>(res.GetTopicQuotedTimeMs()), stat->throttled_on_topic_duration_ms())); stat->set_persist_duration_ms( - Max((i64)res.GetWriteTimeMs(), stat->persist_duration_ms())); + Max((i64)res.GetWriteTimeMs(), stat->persist_duration_ms())); }; - auto addAck = [](const TPersQueuePartitionResponse::TCmdWriteResult& res, Topic::StreamWriteMessage::WriteResponse* writeResponse, - Topic::StreamWriteMessage::WriteResponse::WriteStatistics* stat) { + auto addAck = [this](const TPersQueuePartitionResponse::TCmdWriteResult& res, + Topic::StreamWriteMessage::WriteResponse* writeResponse, + Topic::StreamWriteMessage::WriteResponse::WriteStatistics* stat) { auto ack = writeResponse->add_acks(); // TODO (ildar-khisam@): validate res before filling ack fields ack->set_seq_no(res.GetSeqNo()); if (res.GetAlreadyWritten()) { + Y_VERIFY(UseDeduplication); ack->mutable_skipped()->set_reason(Topic::StreamWriteMessage::WriteResponse::WriteAck::Skipped::REASON_ALREADY_WRITTEN); } else { ack->mutable_written()->set_offset(res.GetOffset()); @@ -1170,8 +1225,8 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T *stat->mutable_persisting_time() = TimeUtil::MillisecondsToDuration(persisting_time_ms); auto min_queue_wait_time_ms = (stat->min_queue_wait_time() == Duration()) - ? (i64)res.GetTotalTimeInPartitionQueueMs() - : Min<i64>(res.GetTotalTimeInPartitionQueueMs(), TimeUtil::DurationToMilliseconds(stat->min_queue_wait_time())); + ? (i64)res.GetTotalTimeInPartitionQueueMs() + : Min<i64>(res.GetTotalTimeInPartitionQueueMs(), TimeUtil::DurationToMilliseconds(stat->min_queue_wait_time())); *stat->mutable_min_queue_wait_time() = TimeUtil::MillisecondsToDuration(min_queue_wait_time_ms); auto max_queue_wait_time_ms = Max<i64>(res.GetTotalTimeInPartitionQueueMs(), TimeUtil::DurationToMilliseconds(stat->max_queue_wait_time())); @@ -1194,14 +1249,16 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T auto batchWriteResponse = result.mutable_batch_write_response(); batchWriteResponse->set_partition_id(Partition); - for (size_t messageIndex = 0, endIndex = userWriteRequest->Request.write_request().sequence_numbers_size(); messageIndex != endIndex; ++messageIndex) { - if (partitionCmdWriteResultIndex == resp.CmdWriteResultSize()) { + for (size_t messageIndex = 0, endIndex = userWriteRequest->Request.write_request().sequence_numbers_size(); + messageIndex != endIndex; ++messageIndex) { + + if (partitionCmdWriteResultIndex == response.CmdWriteResultSize()) { CloseSession("too few responses from server", PersQueue::ErrorCode::ERROR, ctx); return; } - const auto& partitionCmdWriteResult = resp.GetCmdWriteResult(partitionCmdWriteResultIndex); + const auto& partitionCmdWriteResult = response.GetCmdWriteResult(partitionCmdWriteResultIndex); const auto writtenSequenceNumber = userWriteRequest->Request.write_request().sequence_numbers(messageIndex); - if (partitionCmdWriteResult.GetSeqNo() != writtenSequenceNumber) { + if (UseDeduplication && partitionCmdWriteResult.GetSeqNo() != writtenSequenceNumber) { CloseSession(TStringBuilder() << "Expected partition " << Partition << " write result for message with sequence number " << writtenSequenceNumber << " but got for " << partitionCmdWriteResult.GetSeqNo(), PersQueue::ErrorCode::ERROR, ctx); return; } @@ -1214,15 +1271,20 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T auto batchWriteResponse = result.mutable_write_response(); batchWriteResponse->set_partition_id(Partition); - for (size_t messageIndex = 0, endIndex = userWriteRequest->Request.write_request().messages_size(); messageIndex != endIndex; ++messageIndex) { - if (partitionCmdWriteResultIndex == resp.CmdWriteResultSize()) { + for (size_t messageIndex = 0, endIndex = userWriteRequest->Request.write_request().messages_size(); + messageIndex != endIndex; ++messageIndex) { + + if (partitionCmdWriteResultIndex == response.CmdWriteResultSize()) { CloseSession("too few responses from server", PersQueue::ErrorCode::ERROR, ctx); return; } - const auto& partitionCmdWriteResult = resp.GetCmdWriteResult(partitionCmdWriteResultIndex); + const auto& partitionCmdWriteResult = response.GetCmdWriteResult(partitionCmdWriteResultIndex); const auto writtenSequenceNumber = userWriteRequest->Request.write_request().messages(messageIndex).seq_no(); - if (partitionCmdWriteResult.GetSeqNo() != writtenSequenceNumber) { - CloseSession(TStringBuilder() << "Expected partition " << Partition << " write result for message with sequence number " << writtenSequenceNumber << " but got for " << partitionCmdWriteResult.GetSeqNo(), PersQueue::ErrorCode::ERROR, ctx); + if (UseDeduplication && partitionCmdWriteResult.GetSeqNo() != writtenSequenceNumber) { + CloseSession(TStringBuilder() << "Expected partition " << Partition + << " write result for message with sequence number " + << writtenSequenceNumber << " but got for " + << partitionCmdWriteResult.GetSeqNo(), PersQueue::ErrorCode::ERROR, ctx); return; } @@ -1251,6 +1313,37 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T } template<bool UseMigrationProtocol> +void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev, const TActorContext& ctx) { + if (State != ES_INITED) { + return CloseSession("got write response but not wait for it", PersQueue::ErrorCode::ERROR, ctx); + } + + const auto& result = *ev->Get(); + if (!result.IsSuccess()) { + const auto& record = result.Record; + if (record.HasErrorCode()) { + return CloseSession("status is not ok: " + record.GetErrorReason(), ConvertOldCode(record.GetErrorCode()), ctx); + } else { + return CloseSession("error at write: " + result.GetError().Reason, PersQueue::ErrorCode::ERROR, ctx); + } + } + + + if (AcceptedRequests.empty()) { + CloseSession("got too many replies from server, internal error", PersQueue::ErrorCode::ERROR, ctx); + return; + } + + const auto& writeRequest = AcceptedRequests.front(); + const auto& resp = result.Record.GetPartitionResponse(); + + if (resp.GetCookie() != writeRequest->Cookie) { + return CloseSession("out of order write response from server, may be previous is lost", PersQueue::ErrorCode::ERROR, ctx); + } + ProcessWriteResponse(resp, ctx); +} + +template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvDisconnected::TPtr&, const TActorContext& ctx) { CloseSession("pipe to partition's tablet is dead", PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED, ctx); } @@ -1283,8 +1376,13 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>& auto addDataMigration = [&](const StreamingWriteClientMessage::WriteRequest& writeRequest, const i32 messageIndex) { auto w = request.MutablePartitionRequest()->AddCmdWrite(); w->SetData(GetSerializedData(InitMeta, writeRequest, messageIndex)); + if (UseDeduplication) { + w->SetSourceId(NPQ::NSourceIdEncoding::EncodeSimple(SourceId)); + } w->SetSeqNo(writeRequest.sequence_numbers(messageIndex)); - w->SetSourceId(NPQ::NSourceIdEncoding::EncodeSimple(SourceId)); + if (!UseDeduplication) + SeqNoInflight.push_back(w->GetSeqNo()); + w->SetCreateTimeMS(writeRequest.created_at_ms(messageIndex)); w->SetUncompressedSize(writeRequest.blocks_uncompressed_sizes(messageIndex)); w->SetClientDC(ClientDC); @@ -1292,16 +1390,28 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>& payloadSize += w->GetData().size() + w->GetSourceId().size(); }; + ui64 maxMessageMetadataSize = 0; auto addData = [&](const Topic::StreamWriteMessage::WriteRequest& writeRequest, const i32 messageIndex) { auto w = request.MutablePartitionRequest()->AddCmdWrite(); w->SetData(GetSerializedData(InitMeta, writeRequest, messageIndex)); + if (UseDeduplication) { + w->SetSourceId(NPQ::NSourceIdEncoding::EncodeSimple(SourceId)); + } else { + w->SetDisableDeduplication(true); + } w->SetSeqNo(writeRequest.messages(messageIndex).seq_no()); - w->SetSourceId(NPQ::NSourceIdEncoding::EncodeSimple(SourceId)); + SeqNoInflight.push_back(w->GetSeqNo()); w->SetCreateTimeMS(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(writeRequest.messages(messageIndex).created_at())); w->SetUncompressedSize(writeRequest.messages(messageIndex).uncompressed_size()); w->SetClientDC(ClientDC); w->SetIgnoreQuotaDeadline(true); payloadSize += w->GetData().size() + w->GetSourceId().size(); + const auto& msg = writeRequest.messages(messageIndex); + ui64 currMetadataSize = 0; + for (const auto& [k, v] : msg.message_meta()) { + currMetadataSize += k.size() + v.size(); + } + maxMessageMetadataSize = std::max(maxMessageMetadataSize, currMetadataSize); }; const auto& writeRequest = ev->Request.write_request(); @@ -1317,7 +1427,19 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>& PendingRequest->UserWriteRequests.push_back(std::move(ev)); PendingRequest->ByteSize = request.ByteSize(); - + auto msgMetaEnabled = AppData(ctx)->FeatureFlags.GetEnableTopicMessageMeta(); + if (!msgMetaEnabled && maxMessageMetadataSize > 0) { + CloseSession("Message level metadata support is disabled on server size", PersQueue::ErrorCode::BAD_REQUEST, ctx); + return; + } + if (maxMessageMetadataSize > MAX_METADATA_SIZE_PER_MESSAGE) { + CloseSession( + TStringBuilder() << "Message level metadata size is limited to " << MAX_METADATA_SIZE_PER_MESSAGE + << " per message", + PersQueue::ErrorCode::BAD_REQUEST, ctx + ); + return; + } if (const auto ru = CalcRuConsumption(payloadSize)) { PendingRequest->RequiredQuota += ru; if (MaybeRequestQuota(PendingRequest->RequiredQuota, EWakeupTag::RlAllowed, ctx)) { @@ -1631,6 +1753,7 @@ void TWriteSessionActor<UseMigrationProtocol>::RecheckACL(const TActorContext& c if ((!pqConfig.GetTopicsAreFirstClassCitizen() || pqConfig.GetUseSrcIdMetaMappingInFirstClass()) && !SourceIdUpdatesInflight && ctx.Now() - LastSourceIdUpdate > SOURCEID_UPDATE_PERIOD + && UseDeduplication ) { SendUpdateSrcIdsRequests(ctx); } diff --git a/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp b/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp index d466a5cd307..d331a19832e 100644 --- a/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp +++ b/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp @@ -80,7 +80,6 @@ Y_UNIT_TEST_SUITE(TFstClassSrcIdPQTest) { UNIT_ASSERT(res); writer->Close(); }; - alterAndCheck(2); alterAndCheck(4); alterAndCheck(12); diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 1cbdd25bb5c..517c6f9a9f8 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -6191,6 +6191,116 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { } } + THolder<NYdb::TDriver> SetupTestAndGetDriver( + NPersQueue::TTestServer& server, const TString& topicName, ui64 partsCount = 1 + ) { + NYdb::TDriverConfig driverCfg; + driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort); + auto driver = MakeHolder<NYdb::TDriver>(driverCfg); + + server.EnableLogs({ NKikimrServices::PQ_READ_PROXY, NKikimrServices::PQ_WRITE_PROXY}); + server.EnableLogs({ NKikimrServices::KQP_PROXY}, NActors::NLog::PRI_EMERG); + + server.AnnoyingClient->CreateTopic(topicName, partsCount); + auto topicClient = NYdb::NTopic::TTopicClient(*driver); + auto alterSettings = NYdb::NTopic::TAlterTopicSettings(); + alterSettings.BeginAddConsumer("debug"); + auto alterRes = topicClient.AlterTopic(TString("/Root/PQ/") + topicName, alterSettings).GetValueSync(); + UNIT_ASSERT(alterRes.IsSuccess()); + return std::move(driver); + } + + Y_UNIT_TEST(MessageMetadata) { + return; //No supported; + NPersQueue::TTestServer server; + server.CleverServer->GetRuntime()->GetAppData().FeatureFlags.SetEnableTopicMessageMeta(true); + TString topicFullName = "rt3.dc1--topic1"; + auto driver = SetupTestAndGetDriver(server, topicFullName); + + auto topicClient = NYdb::NTopic::TTopicClient(*driver); + + NYdb::NTopic::TWriteSessionSettings wSettings {topicFullName, "srcId", "srcId"}; + auto writer = topicClient.CreateSimpleBlockingWriteSession(wSettings); + std::unordered_map<TString, TString> metadata = {{"key1", "val1"}, {"key2", "val2"}}; +/* writer->Write("Somedata", Nothing(), Nothing(), TDuration::Max(), metadata); + metadata = {{"key3", "val3"}}; + writer->Write("Somedata2", Nothing(), Nothing(), TDuration::Max(), metadata); + writer->Write("Somedata3"); +*/ + writer->Close(); + NYdb::NTopic::TReadSessionSettings rSettings; + rSettings.ConsumerName("debug").AppendTopics({topicFullName}); + auto readSession = topicClient.CreateReadSession(rSettings); + + auto ev = readSession->GetEvent(true); + UNIT_ASSERT(ev.Defined()); + auto spsEv = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*ev); + UNIT_ASSERT(spsEv); + spsEv->Confirm(); + ev = readSession->GetEvent(true); + auto dataEv = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*ev); + UNIT_ASSERT(dataEv); + const auto& messages = dataEv->GetMessages(); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3); + auto metadataSizeExpected = 2; + for (const auto& msg : dataEv->GetMessages()) { + auto meta = msg.GetMessageMeta(); + UNIT_ASSERT_VALUES_EQUAL(meta->Fields.size(), metadataSizeExpected); + metadataSizeExpected--; + } + } + + Y_UNIT_TEST(DisableDeduplication) { + NPersQueue::TTestServer server; + TString topicFullName = "rt3.dc1--topic1"; + auto driver = SetupTestAndGetDriver(server, topicFullName, 3); + + auto topicClient = NYdb::NTopic::TTopicClient(*driver); + NYdb::NTopic::TWriteSessionSettings wSettings; + wSettings.Path(topicFullName).DeduplicationEnabled(false); + + TVector<std::shared_ptr<NYdb::NTopic::ISimpleBlockingWriteSession>> writers; + for (auto i = 0u; i < 3; i++) { + auto writer = topicClient.CreateSimpleBlockingWriteSession(wSettings); + for (auto j = 0u; j < 3; j++) { + writer->Write(TString("MyData") + ToString(i) + ToString(j)); + } + writers.push_back(writer); + } + + for (auto& w : writers) { + w->Close(); + } + NYdb::NTopic::TReadSessionSettings rSettings; + rSettings.ConsumerName("debug").AppendTopics({topicFullName}); + auto readSession = topicClient.CreateReadSession(rSettings); + + THashSet<ui64> partitions; + ui64 totalMessages = 0; + Cerr << "Start reads\n"; + while(totalMessages < 9) { + auto ev = readSession->GetEvent(true); + UNIT_ASSERT(ev.Defined()); + + auto spsEv = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*ev); + if (spsEv) { + spsEv->Confirm(); + Cerr << "Got start stream event\n"; + continue; + } + auto dataEv = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*ev); + UNIT_ASSERT(dataEv); + const auto& messages = dataEv->GetMessages(); + totalMessages += messages.size(); + Cerr << "Got data event with total " << messages.size() << " messages, current total messages: " << totalMessages << Endl; + for (const auto& msg: dataEv->GetMessages()) { + auto session = msg.GetPartitionSession(); + partitions.insert(session->GetPartitionId()); + } + } + UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 3); + } + Y_UNIT_TEST(LOGBROKER_7820) { // // 700 messages of 2000 characters are sent in the test |