aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@ydb.tech>2023-07-21 11:41:42 +0300
committerkomels <komels@ydb.tech>2023-07-21 11:41:42 +0300
commit3b23aaacfaa9814c88de0113c7732cddaf99a448 (patch)
tree331ddc5a8d193aa521caa15f7e470b00e1b28117
parentaf4641604be03d7cbd6c0b10e528e9f6bcd53fee (diff)
downloadydb-3b23aaacfaa9814c88de0113c7732cddaf99a448.tar.gz
Refactor message meta
https://a.yandex-team.ru/arcadia/kikimr/docs/ru/overlay/rfc/107_topic_message_meta.md
-rw-r--r--ydb/core/persqueue/ut/mirrorer_ut.cpp16
-rw-r--r--ydb/core/persqueue/write_meta.cpp5
-rwxr-xr-xydb/core/protos/grpc_pq_old.proto3
-rw-r--r--ydb/public/api/protos/ydb_topic.proto9
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h10
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp16
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp32
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp31
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h31
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/topic.h76
-rw-r--r--ydb/services/persqueue_v1/actors/partition_actor.cpp2
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp6
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp13
16 files changed, 196 insertions, 71 deletions
diff --git a/ydb/core/persqueue/ut/mirrorer_ut.cpp b/ydb/core/persqueue/ut/mirrorer_ut.cpp
index 559684ad3d..ed2818716a 100644
--- a/ydb/core/persqueue/ut/mirrorer_ut.cpp
+++ b/ydb/core/persqueue/ut/mirrorer_ut.cpp
@@ -91,10 +91,18 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {
msg->set_seq_no(1);
msg->set_data("data");
msg->mutable_created_at()->set_seconds(1000);
- auto* meta = msg->mutable_message_meta();
- (*meta)["meta-key"] = "meta-value";
- (*meta)["meta-key2"] = "meta-value2";
- if (!writeSession->Write(req)) {
+ {
+ auto* meta = msg->add_metadata_items();
+ meta->set_key("meta-key");
+ meta->set_value("meta-value");
+ };
+ {
+ auto* meta = msg->add_metadata_items();
+ meta->set_key("meta-key2");
+ meta->set_value("meta-value2");
+ };
+
+ if (!writeSession->Write(req)) {
UNIT_FAIL("Grpc write fail");
}
UNIT_ASSERT(writeSession->Read(&resp));
diff --git a/ydb/core/persqueue/write_meta.cpp b/ydb/core/persqueue/write_meta.cpp
index 2230d26429..043a1b2b92 100644
--- a/ydb/core/persqueue/write_meta.cpp
+++ b/ydb/core/persqueue/write_meta.cpp
@@ -52,9 +52,10 @@ TString GetSerializedData(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEv
}
auto& fields = message.GetMessageMeta()->Fields;
if (!fields.empty()) {
- auto& messageMeta = *proto.MutableMessageMeta();
for (const auto& item : fields) {
- messageMeta[item.first] = item.second;
+ auto& metaItem = *proto.AddMessageMeta();
+ metaItem.set_key(item.first);
+ metaItem.set_value(item.second);
}
}
proto.SetSeqNo(message.GetSeqNo());
diff --git a/ydb/core/protos/grpc_pq_old.proto b/ydb/core/protos/grpc_pq_old.proto
index 3f2d32bba7..4aa2a5e40b 100755
--- a/ydb/core/protos/grpc_pq_old.proto
+++ b/ydb/core/protos/grpc_pq_old.proto
@@ -1,3 +1,4 @@
+import "ydb/public/api/protos/ydb_topic.proto";
package NKikimrPQClient;
message TKeyValue {
@@ -38,5 +39,5 @@ message TDataChunk {
optional bytes Data = 127; // ~ 64K
- map<string, string> MessageMeta = 128;
+ repeated Ydb.Topic.MetadataItem MessageMeta = 128;
}
diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto
index 0ded0094f3..9621b32d98 100644
--- a/ydb/public/api/protos/ydb_topic.proto
+++ b/ydb/public/api/protos/ydb_topic.proto
@@ -55,6 +55,11 @@ message PartitionWithGeneration {
// Partition generation.
int64 generation = 2;
+};
+
+message MetadataItem {
+ string key = 1;
+ bytes value = 2;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -165,7 +170,7 @@ message StreamWriteMessage {
PartitionWithGeneration partition_with_generation = 8;
}
// Message metadata. Overall size is limited to 4096 symbols (all keys and values combined).
- map<string, string> message_meta = 7;
+ repeated MetadataItem metadata_items = 7 [(Ydb.size).le = 1000];
}
}
@@ -373,7 +378,7 @@ message StreamReadMessage {
// Filled if message_group_id was set on message write.
string message_group_id = 7;
- map<string, string> message_meta = 8;
+ repeated MetadataItem metadata_items = 8;
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
index 8ca0811ff8..2586ed9798 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
@@ -50,6 +50,11 @@ using TAWriteSessionMeta = std::conditional_t<UseMigrationProtocol,
NYdb::NTopic::TWriteSessionMeta>;
template <bool UseMigrationProtocol>
+using TAMessageMeta = std::conditional_t<UseMigrationProtocol,
+ NYdb::NPersQueue::TMessageMeta,
+ NYdb::NTopic::TMessageMeta>;
+
+template <bool UseMigrationProtocol>
using TASessionClosedEvent = std::conditional_t<UseMigrationProtocol,
NYdb::NPersQueue::TSessionClosedEvent,
NYdb::NTopic::TSessionClosedEvent>;
@@ -266,7 +271,7 @@ public:
}
template <bool V = UseMigrationProtocol, class = std::enable_if_t<!V>>
- typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr GetMessageMeta(size_t batchIndex, size_t messageIndex) const {
+ typename TAMessageMeta<UseMigrationProtocol>::TPtr GetMessageMeta(size_t batchIndex, size_t messageIndex) const {
Y_ASSERT(batchIndex < MessagesMeta.size());
Y_ASSERT(messageIndex < MessagesMeta[batchIndex].size());
return MessagesMeta[batchIndex][messageIndex];
@@ -330,8 +335,9 @@ private:
private:
TPartitionData<UseMigrationProtocol> ServerMessage;
using TMetadataPtrVector = std::vector<typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr>;
+ using TMessageMetaPtrVector = std::vector<typename TAMessageMeta<UseMigrationProtocol>::TPtr>;
TMetadataPtrVector BatchesMeta;
- std::vector<TMetadataPtrVector> MessagesMeta;
+ std::vector<TMessageMetaPtrVector> MessagesMeta;
std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session;
bool DoDecompress;
i64 ServerBytesSize = 0;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
index 15eb81f091..c1aa7c4378 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
@@ -2217,13 +2217,13 @@ void TDataDecompressionInfo<UseMigrationProtocol>::BuildBatchesMeta() {
for (const auto& [key, value] : batch.write_session_meta()) {
meta->Fields.emplace(key, value);
}
- MessagesMeta.emplace_back(TMetadataPtrVector{});
- auto &currBatchMessagesMeta = MessagesMeta.back();
- for (const auto &messageData: batch.message_data()) {
- typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr msgMeta = MakeIntrusive<TAWriteSessionMeta<UseMigrationProtocol>>();
- msgMeta->Fields.reserve(messageData.message_meta_size());
- for (const auto &[key, value]: messageData.message_meta()) {
- msgMeta->Fields.emplace(key, value);
+ MessagesMeta.emplace_back(TMessageMetaPtrVector{});
+ auto& currBatchMessagesMeta = MessagesMeta.back();
+ for (const auto& messageData: batch.message_data()) {
+ typename TAMessageMeta<UseMigrationProtocol>::TPtr msgMeta = MakeIntrusive<TAMessageMeta<UseMigrationProtocol>>();
+ msgMeta->Fields.reserve(messageData.metadata_items_size());
+ for (const auto& metaPair: messageData.metadata_items()) {
+ msgMeta->Fields.emplace_back(std::make_pair(metaPair.key(), metaPair.value()));
}
currBatchMessagesMeta.emplace_back(std::move(msgMeta));
}
@@ -2395,7 +2395,7 @@ void TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPart
messageData.explicit_hash());
}
} else {
- const auto &messageMeta = Parent->GetMessageMeta(Batch, Message);
+ const auto& messageMeta = Parent->GetMessageMeta(Batch, Message);
NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo(
messageData.offset(),
batch.producer_id(),
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
index 60b348040c..d6459c2974 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
@@ -319,6 +319,14 @@ struct TWriteSessionMeta : public TThrRefBase {
THashMap<TString, TString> Fields;
};
+//! Message levelmetainformation.
+struct TMessageMeta : public TThrRefBase {
+ using TPtr = TIntrusivePtr<TWriteSessionMeta>;
+
+ //! User defined fields.
+ TVector<std::pair<TString, TString>> Fields;
+};
+
//! Event that is sent to client during session destruction.
struct TSessionClosedEvent : public TStatus {
using TStatus::TStatus;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp
index fbe8e1a75a..3bf8719fde 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp
@@ -25,7 +25,7 @@ TReadSessionEvent::TDataReceivedEvent::TMessageInformation::TMessageInformation(
TInstant createTime,
TInstant writeTime,
TWriteSessionMeta::TPtr meta,
- TWriteSessionMeta::TPtr messageMeta,
+ TMessageMeta::TPtr messageMeta,
ui64 uncompressedSize,
TString messageGroupId
)
@@ -163,7 +163,7 @@ const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TMessage::
return Information.Meta;
}
-const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TMessage::GetMessageMeta() const {
+const TMessageMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TMessage::GetMessageMeta() const {
return Information.MessageMeta;
}
@@ -223,7 +223,7 @@ const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TCompresse
return Information.Meta;
}
-const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetMessageMeta() const {
+const TMessageMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetMessageMeta() const {
return Information.MessageMeta;
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
index e42d98b95b..0cfdf77356 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
@@ -46,12 +46,26 @@ NThreading::TFuture<void> TWriteSession::WaitEvent() {
void TWriteSession::WriteEncoded(TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize,
TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp) {
- Impl->WriteInternal(std::move(token), data, codec, originalSize, seqNo, createTimestamp);
+ auto message = TWriteMessage::CompressedMessage(data, codec, originalSize);
+ if (seqNo.Defined())
+ message.SeqNo(*seqNo);
+ if (createTimestamp.Defined())
+ message.CreateTimestamp(*createTimestamp);
+ Impl->WriteInternal(std::move(token), std::move(message));
}
void TWriteSession::Write(TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo,
TMaybe<TInstant> createTimestamp) {
- Impl->WriteInternal(std::move(token), data, {}, 0, seqNo, createTimestamp);
+ TWriteMessage message{data};
+ if (seqNo.Defined())
+ message.SeqNo(*seqNo);
+ if (createTimestamp.Defined())
+ message.CreateTimestamp(*createTimestamp);
+ Impl->WriteInternal(std::move(token), std::move(message));
+}
+
+void TWriteSession::Write(TContinuationToken&& token, TWriteMessage&& message) {
+ Impl->WriteInternal(std::move(token), std::move(message));
}
bool TWriteSession::Close(TDuration closeTimeout) {
@@ -102,6 +116,20 @@ bool TSimpleBlockingWriteSession::Write(
return false;
}
+bool TSimpleBlockingWriteSession::Write(
+ TWriteMessage&& message, const TDuration& blockTimeout
+) {
+ if (!IsAlive())
+ return false;
+
+ auto continuationToken = WaitForToken(blockTimeout);
+ if (continuationToken.Defined()) {
+ Writer->Write(std::move(*continuationToken), std::move(message));
+ return true;
+ }
+ return false;
+}
+
TMaybe<TContinuationToken> TSimpleBlockingWriteSession::WaitForToken(const TDuration& timeout) {
auto startTime = TInstant::Now();
TDuration remainingTime = timeout;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
index 7e480c883e..a12c61b4b9 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
@@ -38,6 +38,7 @@ public:
void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize,
TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override;
+ void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) override;
NThreading::TFuture<void> WaitEvent() override;
@@ -67,6 +68,8 @@ public:
bool Write(TStringBuf data, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing(),
const TDuration& blockTimeout = TDuration::Max()) override;
+ bool Write(TWriteMessage&& message, const TDuration& blockTimeout = TDuration::Max()) override;
+
ui64 GetInitSeqNo() override;
bool Close(TDuration closeTimeout = TDuration::Max()) override;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
index 58393858cf..3fb36f63da 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
@@ -244,15 +244,15 @@ NThreading::TFuture<void> TWriteSessionImpl::WaitEvent() {
}
// Client method.
-void TWriteSessionImpl::WriteInternal(
- TContinuationToken&&, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp
- ) {
- TInstant createdAtValue = createTimestamp.Defined() ? *createTimestamp : TInstant::Now();
+void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& message) {
+ TInstant createdAtValue = message.CreateTimestamp_.Defined() ? *message.CreateTimestamp_ : TInstant::Now();
bool readyToAccept = false;
- size_t bufferSize = data.size();
+ size_t bufferSize = message.Data.size();
with_lock(Lock) {
- //ToDo[message-meta] - Pass message meta here.
- CurrentBatch.Add(GetNextSeqNoImpl(seqNo), createdAtValue, data, codec, originalSize, {});
+ CurrentBatch.Add(
+ GetNextSeqNoImpl(message.SeqNo_), createdAtValue, message.Data, message.Codec, message.OriginalSize,
+ message.MessageMeta_
+ );
FlushWriteIfRequiredImpl();
readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk;
@@ -263,16 +263,8 @@ void TWriteSessionImpl::WriteInternal(
}
// Client method.
-void TWriteSessionImpl::WriteEncoded(
- TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp
- ) {
- WriteInternal(std::move(token), data, codec, originalSize, seqNo, createTimestamp);
-}
-
-void TWriteSessionImpl::Write(
- TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp
- ) {
- WriteInternal(std::move(token), data, {}, 0, seqNo, createTimestamp);
+void TWriteSessionImpl::Write(TContinuationToken&& token, TWriteMessage&& message) {
+ WriteInternal(std::move(token), std::move(message));
}
@@ -1021,9 +1013,10 @@ void TWriteSessionImpl::SendImpl() {
*msgData->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(message.CreatedAt.MilliSeconds());
if (!message.MessageMeta.empty()) {
- auto* meta = msgData->mutable_message_meta();
for (auto& [k, v] : message.MessageMeta) {
- (*meta)[k] = v;
+ auto* pair = msgData->add_metadata_items();
+ pair->set_key(k);
+ pair->set_value(v);
}
}
SentOriginalMessages.emplace(std::move(message));
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
index 2f739d4064..6e218de737 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
@@ -173,9 +173,9 @@ private:
TStringBuf DataRef;
TMaybe<ECodec> Codec;
ui32 OriginalSize; // only for coded messages
- std::unordered_map<TString, TString> MessageMeta;
+ TVector<std::pair<TString, TString>> MessageMeta;
TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {},
- ui32 originalSize = 0, const std::unordered_map<TString, TString>& messageMeta = {})
+ ui32 originalSize = 0, const TVector<std::pair<TString, TString>>& messageMeta = {})
: SeqNo(seqNo)
, CreatedAt(createdAt)
, DataRef(data)
@@ -193,7 +193,7 @@ private:
bool Acquired = false;
bool FlushRequested = false;
void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize,
- const std::unordered_map<TString, TString>& messageMeta) {
+ const TVector<std::pair<TString, TString>>& messageMeta) {
if (StartedAt == TInstant::Zero())
StartedAt = TInstant::Now();
CurrentSize += codec ? originalSize : data.size();
@@ -266,14 +266,14 @@ private:
ui64 SeqNo;
TInstant CreatedAt;
size_t Size;
- std::unordered_map<TString, TString> MessageMeta;
+ TVector<std::pair<TString, TString>> MessageMeta;
TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size)
: SeqNo(sequenceNumber)
, CreatedAt(createdAt)
, Size(size)
{}
TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size,
- std::unordered_map<TString, TString>&& messageMeta)
+ TVector<std::pair<TString, TString>>&& messageMeta)
: SeqNo(sequenceNumber)
, CreatedAt(createdAt)
, Size(size)
@@ -315,11 +315,21 @@ public:
TMaybe<size_t> maxEventsCount = Nothing()) override;
NThreading::TFuture<ui64> GetInitSeqNo() override;
- void Write(TContinuationToken&& continuationToken, TStringBuf data,
- TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override;
+ void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) override;
- void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize,
- TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override;
+ void Write(TContinuationToken&&, TStringBuf, TMaybe<ui64> seqNo = Nothing(),
+ TMaybe<TInstant> createTimestamp = Nothing()) override {
+ Y_UNUSED(seqNo);
+ Y_UNUSED(createTimestamp);
+ Y_FAIL("Do not use this method");
+ };
+
+ void WriteEncoded(TContinuationToken&&, TStringBuf, ECodec, ui32,
+ TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override {
+ Y_UNUSED(seqNo);
+ Y_UNUSED(createTimestamp);
+ Y_FAIL("Do not use this method");
+ }
NThreading::TFuture<void> WaitEvent() override;
@@ -337,8 +347,7 @@ private:
void UpdateTokenIfNeededImpl();
- void WriteInternal(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize,
- TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing());
+ void WriteInternal(TContinuationToken&& continuationToken, TWriteMessage&& message);
void FlushWriteIfRequiredImpl();
size_t WriteBatchImpl();
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
index b548b70e85..cc49bcf126 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
@@ -556,6 +556,13 @@ struct TWriteSessionMeta: public TThrRefBase {
THashMap<TString, TString> Fields;
};
+struct TMessageMeta: public TThrRefBase {
+ using TPtr = TIntrusivePtr<TMessageMeta>;
+
+ //! User defined fields.
+ TVector<std::pair<TString, TString>> Fields;
+};
+
//! Event that is sent to client during session destruction.
struct TSessionClosedEvent: public TStatus {
using TStatus::TStatus;
@@ -690,7 +697,7 @@ struct TReadSessionEvent {
TInstant createTime,
TInstant writeTime,
TWriteSessionMeta::TPtr meta,
- TWriteSessionMeta::TPtr messageMeta,
+ TMessageMeta::TPtr messageMeta,
ui64 uncompressedSize,
TString messageGroupId);
ui64 Offset;
@@ -699,7 +706,7 @@ struct TReadSessionEvent {
TInstant CreateTime;
TInstant WriteTime;
TWriteSessionMeta::TPtr Meta;
- TWriteSessionMeta::TPtr MessageMeta;
+ TMessageMeta::TPtr MessageMeta;
ui64 UncompressedSize;
TString MessageGroupId;
};
@@ -759,7 +766,7 @@ struct TReadSessionEvent {
const TWriteSessionMeta::TPtr& GetMeta() const;
//! Message level meta info.
- const TWriteSessionMeta::TPtr& GetMessageMeta() const;
+ const TMessageMeta::TPtr& GetMessageMeta() const;
//! Commits single message.
void Commit() override;
@@ -804,7 +811,7 @@ struct TReadSessionEvent {
const TWriteSessionMeta::TPtr& GetMeta() const;
//! Message level meta info.
- const TWriteSessionMeta::TPtr& GetMessageMeta() const;
+ const TMessageMeta::TPtr& GetMessageMeta() const;
//! Uncompressed size.
ui64 GetUncompressedSize() const;
@@ -1472,12 +1479,60 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> {
FLUENT_SETTING_OPTIONAL(TLog, Log);
};
+//! Contains the message to write and all the options.
+struct TWriteMessage {
+ using TSelf = TWriteMessage;
+ using TMessageMeta = TVector<std::pair<TString, TString>>;
+public:
+ TWriteMessage() = delete;
+ TWriteMessage(TStringBuf data)
+ : Data(data)
+ {}
+
+ //! A message that is already compressed by codec. Codec from WriteSessionSettings does not apply to this message.
+ //! Compression will not be performed in SDK for such messages.
+ static TWriteMessage CompressedMessage(const TStringBuf& data, ECodec codec, ui32 originalSize) {
+ TWriteMessage result{data};
+ result.Codec = codec;
+ result.OriginalSize = originalSize;
+ return result;
+ }
+
+ bool Compressed() const {
+ return Codec.Defined();
+ }
+
+ //! Message body.
+ const TStringBuf Data;
+
+ //! Codec and original size for compressed message.
+ //! Do not specify or change these options directly, use CompressedMessage()
+ //! method to create an object for compressed message.
+ TMaybe<ECodec> Codec;
+ ui32 OriginalSize = 0;
+
+ //! Message SeqNo, optional. If not provided SDK core will calculate SeqNo automatically.
+ //! NOTICE: Either all messages within one write session must have SeqNo provided or none of them.
+ FLUENT_SETTING_OPTIONAL(ui64, SeqNo);
+
+ //! Message creation timestamp. If not provided, Now() will be used.
+ FLUENT_SETTING_OPTIONAL(TInstant, CreateTimestamp);
+
+ //! Message metadata. Limited to 4096 characters overall (all keys and values combined).
+ FLUENT_SETTING(TMessageMeta, MessageMeta);
+
+};
+
//! Simple write session. Does not need event handlers. Does not provide Events, ContinuationTokens, write Acks.
class ISimpleBlockingWriteSession : public TThrRefBase {
public:
//! Write single message. Blocks for up to blockTimeout if inflight is full or memoryUsage is exceeded;
//! return - true if write succeeded, false if message was not enqueued for write within blockTimeout.
//! no Ack is provided.
+ virtual bool Write(TWriteMessage&& message, const TDuration& blockTimeout = TDuration::Max()) = 0;
+
+
+ //! Write single message. Deprecated method with only basic message options.
virtual bool Write(TStringBuf data, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing(),
const TDuration& blockTimeout = TDuration::Max()) = 0;
@@ -1519,12 +1574,15 @@ public:
//! Write single message.
//! continuationToken - a token earlier provided to client with ReadyToAccept event.
- virtual void Write(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) = 0;
+ virtual void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) = 0;
- //! Write single message that is already coded by codec. Codec from settings does not apply to this message.
- //! continuationToken - a token earlier provided to client with ReadyToAccept event.
- //! originalSize - size of unpacked message
- virtual void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) = 0;
+ //! Write single message. Old method with only basic message options.
+ virtual void Write(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ui64> seqNo = Nothing(),
+ TMaybe<TInstant> createTimestamp = Nothing()) = 0;
+
+ //! Write single message that is already compressed by codec. Old method with only basic message options.
+ virtual void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize,
+ TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) = 0;
//! Wait for all writes to complete (no more that closeTimeout()), than close. Empty maybe - means infinite timeout.
diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp
index 94a9b1ba3b..9e01d199a6 100644
--- a/ydb/services/persqueue_v1/actors/partition_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp
@@ -423,7 +423,7 @@ bool FillBatchedData(
::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(r.GetCreateTimestampMS());
message->set_message_group_id(GetBatchSourceId(currentBatch));
- auto* msgMeta = message->mutable_message_meta();
+ auto* msgMeta = message->mutable_metadata_items();
*msgMeta = (proto.GetMessageMeta());
}
hasData = true;
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index df28db4e2d..8618219d08 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -151,7 +151,7 @@ inline void FillChunkDataFromReq(
}
proto.SetData(msg.data());
auto* msgMeta = proto.MutableMessageMeta();
- *msgMeta = msg.message_meta();
+ *msgMeta = msg.metadata_items();
}
namespace NGRpcProxy {
@@ -1433,8 +1433,8 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>&
payloadSize += w->GetData().size() + w->GetSourceId().size();
const auto& msg = writeRequest.messages(messageIndex);
ui64 currMetadataSize = 0;
- for (const auto& [k, v] : msg.message_meta()) {
- currMetadataSize += k.size() + v.size();
+ for (const auto& metaItem : msg.metadata_items()) {
+ currMetadataSize += metaItem.key().size() + metaItem.value().size();
}
maxMessageMetadataSize = std::max(maxMessageMetadataSize, currMetadataSize);
};
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 79884fc97e..d32271e93a 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -6153,12 +6153,17 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
NYdb::NTopic::TWriteSessionSettings wSettings {topicFullName, "srcId", "srcId"};
auto writer = topicClient.CreateSimpleBlockingWriteSession(wSettings);
- std::unordered_map<TString, TString> metadata = {{"key1", "val1"}, {"key2", "val2"}};
-/* writer->Write("Somedata", Nothing(), Nothing(), TDuration::Max(), metadata);
+ TVector<std::pair<TString, TString>> metadata = {{"key1", "val1"}, {"key2", "val2"}};
+ {
+ auto message = NYdb::NTopic::TWriteMessage{"Somedata"}.MessageMeta(metadata);
+ writer->Write(std::move(message));
+ }
metadata = {{"key3", "val3"}};
- writer->Write("Somedata2", Nothing(), Nothing(), TDuration::Max(), metadata);
+ {
+ auto message = NYdb::NTopic::TWriteMessage{"Somedata2"}.MessageMeta(metadata);
+ writer->Write(std::move(message));
+ }
writer->Write("Somedata3");
-*/
writer->Close();
NYdb::NTopic::TReadSessionSettings rSettings;
rSettings.ConsumerName("debug").AppendTopics({topicFullName});