diff options
author | komels <komels@ydb.tech> | 2023-07-21 11:41:42 +0300 |
---|---|---|
committer | komels <komels@ydb.tech> | 2023-07-21 11:41:42 +0300 |
commit | 3b23aaacfaa9814c88de0113c7732cddaf99a448 (patch) | |
tree | 331ddc5a8d193aa521caa15f7e470b00e1b28117 | |
parent | af4641604be03d7cbd6c0b10e528e9f6bcd53fee (diff) | |
download | ydb-3b23aaacfaa9814c88de0113c7732cddaf99a448.tar.gz |
Refactor message meta
https://a.yandex-team.ru/arcadia/kikimr/docs/ru/overlay/rfc/107_topic_message_meta.md
16 files changed, 196 insertions, 71 deletions
diff --git a/ydb/core/persqueue/ut/mirrorer_ut.cpp b/ydb/core/persqueue/ut/mirrorer_ut.cpp index 559684ad3d..ed2818716a 100644 --- a/ydb/core/persqueue/ut/mirrorer_ut.cpp +++ b/ydb/core/persqueue/ut/mirrorer_ut.cpp @@ -91,10 +91,18 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) { msg->set_seq_no(1); msg->set_data("data"); msg->mutable_created_at()->set_seconds(1000); - auto* meta = msg->mutable_message_meta(); - (*meta)["meta-key"] = "meta-value"; - (*meta)["meta-key2"] = "meta-value2"; - if (!writeSession->Write(req)) { + { + auto* meta = msg->add_metadata_items(); + meta->set_key("meta-key"); + meta->set_value("meta-value"); + }; + { + auto* meta = msg->add_metadata_items(); + meta->set_key("meta-key2"); + meta->set_value("meta-value2"); + }; + + if (!writeSession->Write(req)) { UNIT_FAIL("Grpc write fail"); } UNIT_ASSERT(writeSession->Read(&resp)); diff --git a/ydb/core/persqueue/write_meta.cpp b/ydb/core/persqueue/write_meta.cpp index 2230d26429..043a1b2b92 100644 --- a/ydb/core/persqueue/write_meta.cpp +++ b/ydb/core/persqueue/write_meta.cpp @@ -52,9 +52,10 @@ TString GetSerializedData(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEv } auto& fields = message.GetMessageMeta()->Fields; if (!fields.empty()) { - auto& messageMeta = *proto.MutableMessageMeta(); for (const auto& item : fields) { - messageMeta[item.first] = item.second; + auto& metaItem = *proto.AddMessageMeta(); + metaItem.set_key(item.first); + metaItem.set_value(item.second); } } proto.SetSeqNo(message.GetSeqNo()); diff --git a/ydb/core/protos/grpc_pq_old.proto b/ydb/core/protos/grpc_pq_old.proto index 3f2d32bba7..4aa2a5e40b 100755 --- a/ydb/core/protos/grpc_pq_old.proto +++ b/ydb/core/protos/grpc_pq_old.proto @@ -1,3 +1,4 @@ +import "ydb/public/api/protos/ydb_topic.proto"; package NKikimrPQClient; message TKeyValue { @@ -38,5 +39,5 @@ message TDataChunk { optional bytes Data = 127; // ~ 64K - map<string, string> MessageMeta = 128; + repeated Ydb.Topic.MetadataItem MessageMeta = 128; } diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index 0ded0094f3..9621b32d98 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -55,6 +55,11 @@ message PartitionWithGeneration { // Partition generation. int64 generation = 2; +}; + +message MetadataItem { + string key = 1; + bytes value = 2; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -165,7 +170,7 @@ message StreamWriteMessage { PartitionWithGeneration partition_with_generation = 8; } // Message metadata. Overall size is limited to 4096 symbols (all keys and values combined). - map<string, string> message_meta = 7; + repeated MetadataItem metadata_items = 7 [(Ydb.size).le = 1000]; } } @@ -373,7 +378,7 @@ message StreamReadMessage { // Filled if message_group_id was set on message write. string message_group_id = 7; - map<string, string> message_meta = 8; + repeated MetadataItem metadata_items = 8; } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index 8ca0811ff8..2586ed9798 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -50,6 +50,11 @@ using TAWriteSessionMeta = std::conditional_t<UseMigrationProtocol, NYdb::NTopic::TWriteSessionMeta>; template <bool UseMigrationProtocol> +using TAMessageMeta = std::conditional_t<UseMigrationProtocol, + NYdb::NPersQueue::TMessageMeta, + NYdb::NTopic::TMessageMeta>; + +template <bool UseMigrationProtocol> using TASessionClosedEvent = std::conditional_t<UseMigrationProtocol, NYdb::NPersQueue::TSessionClosedEvent, NYdb::NTopic::TSessionClosedEvent>; @@ -266,7 +271,7 @@ public: } template <bool V = UseMigrationProtocol, class = std::enable_if_t<!V>> - typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr GetMessageMeta(size_t batchIndex, size_t messageIndex) const { + typename TAMessageMeta<UseMigrationProtocol>::TPtr GetMessageMeta(size_t batchIndex, size_t messageIndex) const { Y_ASSERT(batchIndex < MessagesMeta.size()); Y_ASSERT(messageIndex < MessagesMeta[batchIndex].size()); return MessagesMeta[batchIndex][messageIndex]; @@ -330,8 +335,9 @@ private: private: TPartitionData<UseMigrationProtocol> ServerMessage; using TMetadataPtrVector = std::vector<typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr>; + using TMessageMetaPtrVector = std::vector<typename TAMessageMeta<UseMigrationProtocol>::TPtr>; TMetadataPtrVector BatchesMeta; - std::vector<TMetadataPtrVector> MessagesMeta; + std::vector<TMessageMetaPtrVector> MessagesMeta; std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session; bool DoDecompress; i64 ServerBytesSize = 0; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index 15eb81f091..c1aa7c4378 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -2217,13 +2217,13 @@ void TDataDecompressionInfo<UseMigrationProtocol>::BuildBatchesMeta() { for (const auto& [key, value] : batch.write_session_meta()) { meta->Fields.emplace(key, value); } - MessagesMeta.emplace_back(TMetadataPtrVector{}); - auto &currBatchMessagesMeta = MessagesMeta.back(); - for (const auto &messageData: batch.message_data()) { - typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr msgMeta = MakeIntrusive<TAWriteSessionMeta<UseMigrationProtocol>>(); - msgMeta->Fields.reserve(messageData.message_meta_size()); - for (const auto &[key, value]: messageData.message_meta()) { - msgMeta->Fields.emplace(key, value); + MessagesMeta.emplace_back(TMessageMetaPtrVector{}); + auto& currBatchMessagesMeta = MessagesMeta.back(); + for (const auto& messageData: batch.message_data()) { + typename TAMessageMeta<UseMigrationProtocol>::TPtr msgMeta = MakeIntrusive<TAMessageMeta<UseMigrationProtocol>>(); + msgMeta->Fields.reserve(messageData.metadata_items_size()); + for (const auto& metaPair: messageData.metadata_items()) { + msgMeta->Fields.emplace_back(std::make_pair(metaPair.key(), metaPair.value())); } currBatchMessagesMeta.emplace_back(std::move(msgMeta)); } @@ -2395,7 +2395,7 @@ void TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPart messageData.explicit_hash()); } } else { - const auto &messageMeta = Parent->GetMessageMeta(Batch, Message); + const auto& messageMeta = Parent->GetMessageMeta(Batch, Message); NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo( messageData.offset(), batch.producer_id(), diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h index 60b348040c..d6459c2974 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h @@ -319,6 +319,14 @@ struct TWriteSessionMeta : public TThrRefBase { THashMap<TString, TString> Fields; }; +//! Message levelmetainformation. +struct TMessageMeta : public TThrRefBase { + using TPtr = TIntrusivePtr<TWriteSessionMeta>; + + //! User defined fields. + TVector<std::pair<TString, TString>> Fields; +}; + //! Event that is sent to client during session destruction. struct TSessionClosedEvent : public TStatus { using TStatus::TStatus; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp index fbe8e1a75a..3bf8719fde 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp @@ -25,7 +25,7 @@ TReadSessionEvent::TDataReceivedEvent::TMessageInformation::TMessageInformation( TInstant createTime, TInstant writeTime, TWriteSessionMeta::TPtr meta, - TWriteSessionMeta::TPtr messageMeta, + TMessageMeta::TPtr messageMeta, ui64 uncompressedSize, TString messageGroupId ) @@ -163,7 +163,7 @@ const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TMessage:: return Information.Meta; } -const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TMessage::GetMessageMeta() const { +const TMessageMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TMessage::GetMessageMeta() const { return Information.MessageMeta; } @@ -223,7 +223,7 @@ const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TCompresse return Information.Meta; } -const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetMessageMeta() const { +const TMessageMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetMessageMeta() const { return Information.MessageMeta; } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp index e42d98b95b..0cfdf77356 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp @@ -46,12 +46,26 @@ NThreading::TFuture<void> TWriteSession::WaitEvent() { void TWriteSession::WriteEncoded(TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp) { - Impl->WriteInternal(std::move(token), data, codec, originalSize, seqNo, createTimestamp); + auto message = TWriteMessage::CompressedMessage(data, codec, originalSize); + if (seqNo.Defined()) + message.SeqNo(*seqNo); + if (createTimestamp.Defined()) + message.CreateTimestamp(*createTimestamp); + Impl->WriteInternal(std::move(token), std::move(message)); } void TWriteSession::Write(TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp) { - Impl->WriteInternal(std::move(token), data, {}, 0, seqNo, createTimestamp); + TWriteMessage message{data}; + if (seqNo.Defined()) + message.SeqNo(*seqNo); + if (createTimestamp.Defined()) + message.CreateTimestamp(*createTimestamp); + Impl->WriteInternal(std::move(token), std::move(message)); +} + +void TWriteSession::Write(TContinuationToken&& token, TWriteMessage&& message) { + Impl->WriteInternal(std::move(token), std::move(message)); } bool TWriteSession::Close(TDuration closeTimeout) { @@ -102,6 +116,20 @@ bool TSimpleBlockingWriteSession::Write( return false; } +bool TSimpleBlockingWriteSession::Write( + TWriteMessage&& message, const TDuration& blockTimeout +) { + if (!IsAlive()) + return false; + + auto continuationToken = WaitForToken(blockTimeout); + if (continuationToken.Defined()) { + Writer->Write(std::move(*continuationToken), std::move(message)); + return true; + } + return false; +} + TMaybe<TContinuationToken> TSimpleBlockingWriteSession::WaitForToken(const TDuration& timeout) { auto startTime = TInstant::Now(); TDuration remainingTime = timeout; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h index 7e480c883e..a12c61b4b9 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h @@ -38,6 +38,7 @@ public: void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override; + void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) override; NThreading::TFuture<void> WaitEvent() override; @@ -67,6 +68,8 @@ public: bool Write(TStringBuf data, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing(), const TDuration& blockTimeout = TDuration::Max()) override; + bool Write(TWriteMessage&& message, const TDuration& blockTimeout = TDuration::Max()) override; + ui64 GetInitSeqNo() override; bool Close(TDuration closeTimeout = TDuration::Max()) override; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp index 58393858cf..3fb36f63da 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -244,15 +244,15 @@ NThreading::TFuture<void> TWriteSessionImpl::WaitEvent() { } // Client method. -void TWriteSessionImpl::WriteInternal( - TContinuationToken&&, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp - ) { - TInstant createdAtValue = createTimestamp.Defined() ? *createTimestamp : TInstant::Now(); +void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& message) { + TInstant createdAtValue = message.CreateTimestamp_.Defined() ? *message.CreateTimestamp_ : TInstant::Now(); bool readyToAccept = false; - size_t bufferSize = data.size(); + size_t bufferSize = message.Data.size(); with_lock(Lock) { - //ToDo[message-meta] - Pass message meta here. - CurrentBatch.Add(GetNextSeqNoImpl(seqNo), createdAtValue, data, codec, originalSize, {}); + CurrentBatch.Add( + GetNextSeqNoImpl(message.SeqNo_), createdAtValue, message.Data, message.Codec, message.OriginalSize, + message.MessageMeta_ + ); FlushWriteIfRequiredImpl(); readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk; @@ -263,16 +263,8 @@ void TWriteSessionImpl::WriteInternal( } // Client method. -void TWriteSessionImpl::WriteEncoded( - TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp - ) { - WriteInternal(std::move(token), data, codec, originalSize, seqNo, createTimestamp); -} - -void TWriteSessionImpl::Write( - TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp - ) { - WriteInternal(std::move(token), data, {}, 0, seqNo, createTimestamp); +void TWriteSessionImpl::Write(TContinuationToken&& token, TWriteMessage&& message) { + WriteInternal(std::move(token), std::move(message)); } @@ -1021,9 +1013,10 @@ void TWriteSessionImpl::SendImpl() { *msgData->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(message.CreatedAt.MilliSeconds()); if (!message.MessageMeta.empty()) { - auto* meta = msgData->mutable_message_meta(); for (auto& [k, v] : message.MessageMeta) { - (*meta)[k] = v; + auto* pair = msgData->add_metadata_items(); + pair->set_key(k); + pair->set_value(v); } } SentOriginalMessages.emplace(std::move(message)); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h index 2f739d4064..6e218de737 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h @@ -173,9 +173,9 @@ private: TStringBuf DataRef; TMaybe<ECodec> Codec; ui32 OriginalSize; // only for coded messages - std::unordered_map<TString, TString> MessageMeta; + TVector<std::pair<TString, TString>> MessageMeta; TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {}, - ui32 originalSize = 0, const std::unordered_map<TString, TString>& messageMeta = {}) + ui32 originalSize = 0, const TVector<std::pair<TString, TString>>& messageMeta = {}) : SeqNo(seqNo) , CreatedAt(createdAt) , DataRef(data) @@ -193,7 +193,7 @@ private: bool Acquired = false; bool FlushRequested = false; void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, - const std::unordered_map<TString, TString>& messageMeta) { + const TVector<std::pair<TString, TString>>& messageMeta) { if (StartedAt == TInstant::Zero()) StartedAt = TInstant::Now(); CurrentSize += codec ? originalSize : data.size(); @@ -266,14 +266,14 @@ private: ui64 SeqNo; TInstant CreatedAt; size_t Size; - std::unordered_map<TString, TString> MessageMeta; + TVector<std::pair<TString, TString>> MessageMeta; TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size) : SeqNo(sequenceNumber) , CreatedAt(createdAt) , Size(size) {} TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size, - std::unordered_map<TString, TString>&& messageMeta) + TVector<std::pair<TString, TString>>&& messageMeta) : SeqNo(sequenceNumber) , CreatedAt(createdAt) , Size(size) @@ -315,11 +315,21 @@ public: TMaybe<size_t> maxEventsCount = Nothing()) override; NThreading::TFuture<ui64> GetInitSeqNo() override; - void Write(TContinuationToken&& continuationToken, TStringBuf data, - TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override; + void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) override; - void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize, - TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override; + void Write(TContinuationToken&&, TStringBuf, TMaybe<ui64> seqNo = Nothing(), + TMaybe<TInstant> createTimestamp = Nothing()) override { + Y_UNUSED(seqNo); + Y_UNUSED(createTimestamp); + Y_FAIL("Do not use this method"); + }; + + void WriteEncoded(TContinuationToken&&, TStringBuf, ECodec, ui32, + TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override { + Y_UNUSED(seqNo); + Y_UNUSED(createTimestamp); + Y_FAIL("Do not use this method"); + } NThreading::TFuture<void> WaitEvent() override; @@ -337,8 +347,7 @@ private: void UpdateTokenIfNeededImpl(); - void WriteInternal(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, - TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()); + void WriteInternal(TContinuationToken&& continuationToken, TWriteMessage&& message); void FlushWriteIfRequiredImpl(); size_t WriteBatchImpl(); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index b548b70e85..cc49bcf126 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -556,6 +556,13 @@ struct TWriteSessionMeta: public TThrRefBase { THashMap<TString, TString> Fields; }; +struct TMessageMeta: public TThrRefBase { + using TPtr = TIntrusivePtr<TMessageMeta>; + + //! User defined fields. + TVector<std::pair<TString, TString>> Fields; +}; + //! Event that is sent to client during session destruction. struct TSessionClosedEvent: public TStatus { using TStatus::TStatus; @@ -690,7 +697,7 @@ struct TReadSessionEvent { TInstant createTime, TInstant writeTime, TWriteSessionMeta::TPtr meta, - TWriteSessionMeta::TPtr messageMeta, + TMessageMeta::TPtr messageMeta, ui64 uncompressedSize, TString messageGroupId); ui64 Offset; @@ -699,7 +706,7 @@ struct TReadSessionEvent { TInstant CreateTime; TInstant WriteTime; TWriteSessionMeta::TPtr Meta; - TWriteSessionMeta::TPtr MessageMeta; + TMessageMeta::TPtr MessageMeta; ui64 UncompressedSize; TString MessageGroupId; }; @@ -759,7 +766,7 @@ struct TReadSessionEvent { const TWriteSessionMeta::TPtr& GetMeta() const; //! Message level meta info. - const TWriteSessionMeta::TPtr& GetMessageMeta() const; + const TMessageMeta::TPtr& GetMessageMeta() const; //! Commits single message. void Commit() override; @@ -804,7 +811,7 @@ struct TReadSessionEvent { const TWriteSessionMeta::TPtr& GetMeta() const; //! Message level meta info. - const TWriteSessionMeta::TPtr& GetMessageMeta() const; + const TMessageMeta::TPtr& GetMessageMeta() const; //! Uncompressed size. ui64 GetUncompressedSize() const; @@ -1472,12 +1479,60 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> { FLUENT_SETTING_OPTIONAL(TLog, Log); }; +//! Contains the message to write and all the options. +struct TWriteMessage { + using TSelf = TWriteMessage; + using TMessageMeta = TVector<std::pair<TString, TString>>; +public: + TWriteMessage() = delete; + TWriteMessage(TStringBuf data) + : Data(data) + {} + + //! A message that is already compressed by codec. Codec from WriteSessionSettings does not apply to this message. + //! Compression will not be performed in SDK for such messages. + static TWriteMessage CompressedMessage(const TStringBuf& data, ECodec codec, ui32 originalSize) { + TWriteMessage result{data}; + result.Codec = codec; + result.OriginalSize = originalSize; + return result; + } + + bool Compressed() const { + return Codec.Defined(); + } + + //! Message body. + const TStringBuf Data; + + //! Codec and original size for compressed message. + //! Do not specify or change these options directly, use CompressedMessage() + //! method to create an object for compressed message. + TMaybe<ECodec> Codec; + ui32 OriginalSize = 0; + + //! Message SeqNo, optional. If not provided SDK core will calculate SeqNo automatically. + //! NOTICE: Either all messages within one write session must have SeqNo provided or none of them. + FLUENT_SETTING_OPTIONAL(ui64, SeqNo); + + //! Message creation timestamp. If not provided, Now() will be used. + FLUENT_SETTING_OPTIONAL(TInstant, CreateTimestamp); + + //! Message metadata. Limited to 4096 characters overall (all keys and values combined). + FLUENT_SETTING(TMessageMeta, MessageMeta); + +}; + //! Simple write session. Does not need event handlers. Does not provide Events, ContinuationTokens, write Acks. class ISimpleBlockingWriteSession : public TThrRefBase { public: //! Write single message. Blocks for up to blockTimeout if inflight is full or memoryUsage is exceeded; //! return - true if write succeeded, false if message was not enqueued for write within blockTimeout. //! no Ack is provided. + virtual bool Write(TWriteMessage&& message, const TDuration& blockTimeout = TDuration::Max()) = 0; + + + //! Write single message. Deprecated method with only basic message options. virtual bool Write(TStringBuf data, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing(), const TDuration& blockTimeout = TDuration::Max()) = 0; @@ -1519,12 +1574,15 @@ public: //! Write single message. //! continuationToken - a token earlier provided to client with ReadyToAccept event. - virtual void Write(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) = 0; + virtual void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) = 0; - //! Write single message that is already coded by codec. Codec from settings does not apply to this message. - //! continuationToken - a token earlier provided to client with ReadyToAccept event. - //! originalSize - size of unpacked message - virtual void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) = 0; + //! Write single message. Old method with only basic message options. + virtual void Write(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ui64> seqNo = Nothing(), + TMaybe<TInstant> createTimestamp = Nothing()) = 0; + + //! Write single message that is already compressed by codec. Old method with only basic message options. + virtual void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize, + TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) = 0; //! Wait for all writes to complete (no more that closeTimeout()), than close. Empty maybe - means infinite timeout. diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp index 94a9b1ba3b..9e01d199a6 100644 --- a/ydb/services/persqueue_v1/actors/partition_actor.cpp +++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp @@ -423,7 +423,7 @@ bool FillBatchedData( ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(r.GetCreateTimestampMS()); message->set_message_group_id(GetBatchSourceId(currentBatch)); - auto* msgMeta = message->mutable_message_meta(); + auto* msgMeta = message->mutable_metadata_items(); *msgMeta = (proto.GetMessageMeta()); } hasData = true; diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index df28db4e2d..8618219d08 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -151,7 +151,7 @@ inline void FillChunkDataFromReq( } proto.SetData(msg.data()); auto* msgMeta = proto.MutableMessageMeta(); - *msgMeta = msg.message_meta(); + *msgMeta = msg.metadata_items(); } namespace NGRpcProxy { @@ -1433,8 +1433,8 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>& payloadSize += w->GetData().size() + w->GetSourceId().size(); const auto& msg = writeRequest.messages(messageIndex); ui64 currMetadataSize = 0; - for (const auto& [k, v] : msg.message_meta()) { - currMetadataSize += k.size() + v.size(); + for (const auto& metaItem : msg.metadata_items()) { + currMetadataSize += metaItem.key().size() + metaItem.value().size(); } maxMessageMetadataSize = std::max(maxMessageMetadataSize, currMetadataSize); }; diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 79884fc97e..d32271e93a 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -6153,12 +6153,17 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { NYdb::NTopic::TWriteSessionSettings wSettings {topicFullName, "srcId", "srcId"}; auto writer = topicClient.CreateSimpleBlockingWriteSession(wSettings); - std::unordered_map<TString, TString> metadata = {{"key1", "val1"}, {"key2", "val2"}}; -/* writer->Write("Somedata", Nothing(), Nothing(), TDuration::Max(), metadata); + TVector<std::pair<TString, TString>> metadata = {{"key1", "val1"}, {"key2", "val2"}}; + { + auto message = NYdb::NTopic::TWriteMessage{"Somedata"}.MessageMeta(metadata); + writer->Write(std::move(message)); + } metadata = {{"key3", "val3"}}; - writer->Write("Somedata2", Nothing(), Nothing(), TDuration::Max(), metadata); + { + auto message = NYdb::NTopic::TWriteMessage{"Somedata2"}.MessageMeta(metadata); + writer->Write(std::move(message)); + } writer->Write("Somedata3"); -*/ writer->Close(); NYdb::NTopic::TReadSessionSettings rSettings; rSettings.ConsumerName("debug").AppendTopics({topicFullName}); |