summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <[email protected]>2023-06-14 11:00:52 +0300
committerkomels <[email protected]>2023-06-14 11:00:52 +0300
commite4aa2c56ed1958de4ed580a1b9b772559cde1ef9 (patch)
treeffa34c0c1099bbea8f06c09427ce7134da96c555
parentf6e723596f2e176356bbd53c0a87ee9168e14f22 (diff)
No dedup mode in YDB Topic & message level metadata(server-side)
A verison of PR 3886257 without changes in SDK WriteSession api.
-rw-r--r--ydb/core/persqueue/partition_write.cpp2
-rw-r--r--ydb/core/persqueue/pq_impl.cpp2
-rw-r--r--ydb/core/persqueue/writer/writer.cpp11
-rw-r--r--ydb/core/persqueue/writer/writer.h2
-rw-r--r--ydb/core/protos/config.proto1
-rwxr-xr-xydb/core/protos/grpc_pq_old.proto2
-rw-r--r--ydb/public/api/protos/ydb_topic.proto4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h11
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp34
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp10
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp62
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h24
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/topic.h13
-rw-r--r--ydb/services/persqueue_v1/actors/partition_actor.cpp3
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.h11
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp311
-rw-r--r--ydb/services/persqueue_v1/first_class_src_ids_ut.cpp1
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp110
18 files changed, 488 insertions, 126 deletions
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index 2c847ecdcbb..9d73e0ba85c 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -572,7 +572,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c
}
for (const auto& msg: ev->Get()->Msgs) {
//this is checked in pq_impl when forming EvWrite request
- Y_VERIFY(!msg.SourceId.empty() || ev->Get()->IsDirectWrite);
+ Y_VERIFY(!msg.SourceId.empty() || ev->Get()->IsDirectWrite || msg.DisableDeduplication);
Y_VERIFY(!msg.Data.empty());
if (msg.SeqNo > (ui64)Max<i64>()) {
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 47ec65fe768..a556d295e87 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -1754,7 +1754,7 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
errorStr = "no SeqNo";
} else if (!cmd.HasData() || cmd.GetData().empty()){
errorStr = "empty Data";
- } else if ((!cmd.HasSourceId() || cmd.GetSourceId().empty()) && !req.GetIsDirectWrite()) {
+ } else if ((!cmd.HasSourceId() || cmd.GetSourceId().empty()) && !req.GetIsDirectWrite() && !cmd.GetDisableDeduplication()) {
errorStr = "empty SourceId";
} else if (cmd.GetPartitionKey().size() > 256) {
errorStr = "too long partition key";
diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp
index b0298fc4c38..df0f0925ee2 100644
--- a/ydb/core/persqueue/writer/writer.cpp
+++ b/ydb/core/persqueue/writer/writer.cpp
@@ -10,6 +10,7 @@
#include <ydb/public/lib/base/msgbus_status.h>
#include <util/generic/deque.h>
+#include <util/generic/guid.h>
#include <util/generic/map.h>
#include <util/string/builder.h>
@@ -179,7 +180,11 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
auto ev = MakeRequest(PartitionId, PipeClient);
auto& cmd = *ev->Record.MutablePartitionRequest()->MutableCmdGetOwnership();
- cmd.SetOwner(SourceId);
+ if (Opts.UseDeduplication) {
+ cmd.SetOwner(SourceId);
+ } else {
+ cmd.SetOwner(CreateGuidAsString());
+ }
cmd.SetForce(true);
NTabletPipe::SendData(SelfId(), PipeClient, ev.Release());
@@ -382,6 +387,9 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
auto& request = *ev->Record.MutablePartitionRequest();
request.SetMessageNo(MessageNo++);
+ if (!Opts.UseDeduplication) {
+ request.SetPartition(PartitionId);
+ }
NTabletPipe::SendData(SelfId(), PipeClient, ev.Release());
PendingWrite.emplace_back(cookie);
@@ -479,7 +487,6 @@ public:
};
PipeClient = RegisterWithSameMailbox(NTabletPipe::CreateClient(SelfId(), TabletId, config));
-
GetOwnership();
}
diff --git a/ydb/core/persqueue/writer/writer.h b/ydb/core/persqueue/writer/writer.h
index 950758f3022..919cd3f22f3 100644
--- a/ydb/core/persqueue/writer/writer.h
+++ b/ydb/core/persqueue/writer/writer.h
@@ -113,9 +113,11 @@ struct TEvPartitionWriter {
struct TPartitionWriterOpts {
bool CheckState = false;
bool AutoRegister = false;
+ bool UseDeduplication = true;
TPartitionWriterOpts& WithCheckState(bool value) { CheckState = value; return *this; }
TPartitionWriterOpts& WithAutoRegister(bool value) { AutoRegister = value; return *this; }
+ TPartitionWriterOpts& WithDeduplication(bool value) { UseDeduplication = value; return *this; }
};
IActor* CreatePartitionWriter(const TActorId& client, ui64 tabletId, ui32 partitionId, const TString& sourceId,
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index e9c4a5efa36..fcd260195f0 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -811,6 +811,7 @@ message TFeatureFlags {
optional bool ForceColumnTablesCompositeMarks = 97 [default = false];
optional bool EnableSubscriptionsInDiscovery = 98 [default = false];
optional bool EnableGetNodeLabels = 99 [default = false];
+ optional bool EnableTopicMessageMeta = 100 [default = false];
}
message THttpProxyConfig {
diff --git a/ydb/core/protos/grpc_pq_old.proto b/ydb/core/protos/grpc_pq_old.proto
index ed273e14041..3f2d32bba75 100755
--- a/ydb/core/protos/grpc_pq_old.proto
+++ b/ydb/core/protos/grpc_pq_old.proto
@@ -37,4 +37,6 @@ message TDataChunk {
optional TMapType ExtraFields = 126;
optional bytes Data = 127; // ~ 64K
+
+ map<string, string> MessageMeta = 128;
}
diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto
index 3ebab230c35..0cea5ae5277 100644
--- a/ydb/public/api/protos/ydb_topic.proto
+++ b/ydb/public/api/protos/ydb_topic.proto
@@ -151,6 +151,8 @@ message StreamWriteMessage {
// Explicit partition id to write to.
int64 partition_id = 6;
}
+ // Message metadata. Overall size is limited to 4096 symbols (all keys and values combined).
+ map<string, string> message_meta = 7;
}
}
@@ -358,6 +360,8 @@ message StreamReadMessage {
// Filled if message_group_id was set on message write.
string message_group_id = 7;
+ map<string, string> message_meta = 8;
+
}
// Representation of sequence of client messages from one write session.
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
index c4acb6fcab6..8ca0811ff89 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
@@ -265,6 +265,13 @@ public:
return BatchesMeta[batchIndex];
}
+ template <bool V = UseMigrationProtocol, class = std::enable_if_t<!V>>
+ typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr GetMessageMeta(size_t batchIndex, size_t messageIndex) const {
+ Y_ASSERT(batchIndex < MessagesMeta.size());
+ Y_ASSERT(messageIndex < MessagesMeta[batchIndex].size());
+ return MessagesMeta[batchIndex][messageIndex];
+ }
+
bool HasMoreData() const {
return CurrentReadingMessage.first < static_cast<size_t>(GetServerMessage().batches_size());
}
@@ -322,7 +329,9 @@ private:
private:
TPartitionData<UseMigrationProtocol> ServerMessage;
- std::vector<typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr> BatchesMeta;
+ using TMetadataPtrVector = std::vector<typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr>;
+ TMetadataPtrVector BatchesMeta;
+ std::vector<TMetadataPtrVector> MessagesMeta;
std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session;
bool DoDecompress;
i64 ServerBytesSize = 0;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
index 53c0561f60c..3848d313b5d 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
@@ -2192,6 +2192,9 @@ TDataDecompressionInfo<UseMigrationProtocol>::~TDataDecompressionInfo()
template<bool UseMigrationProtocol>
void TDataDecompressionInfo<UseMigrationProtocol>::BuildBatchesMeta() {
BatchesMeta.reserve(ServerMessage.batches_size());
+ if constexpr (!UseMigrationProtocol) {
+ MessagesMeta.reserve(ServerMessage.batches_size());
+ }
for (const auto& batch : ServerMessage.batches()) {
// Extra fields.
typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr meta = MakeIntrusive<TAWriteSessionMeta<UseMigrationProtocol>>();
@@ -2206,6 +2209,16 @@ void TDataDecompressionInfo<UseMigrationProtocol>::BuildBatchesMeta() {
for (const auto& [key, value] : batch.write_session_meta()) {
meta->Fields.emplace(key, value);
}
+ MessagesMeta.emplace_back(TMetadataPtrVector{});
+ auto &currBatchMessagesMeta = MessagesMeta.back();
+ for (const auto &messageData: batch.message_data()) {
+ typename TAWriteSessionMeta<UseMigrationProtocol>::TPtr msgMeta = MakeIntrusive<TAWriteSessionMeta<UseMigrationProtocol>>();
+ msgMeta->Fields.reserve(messageData.message_meta_size());
+ for (const auto &[key, value]: messageData.message_meta()) {
+ msgMeta->Fields.emplace(key, value);
+ }
+ currBatchMessagesMeta.emplace_back(std::move(msgMeta));
+ }
}
BatchesMeta.emplace_back(std::move(meta));
@@ -2336,7 +2349,6 @@ void TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPart
i64 maxOffset = 0;
auto& batch = *msg.mutable_batches(Batch);
const auto& meta = Parent->GetBatchMeta(Batch);
-
const TInstant batchWriteTimestamp = [&batch](){
if constexpr (UseMigrationProtocol) {
return TInstant::MilliSeconds(batch.write_timestamp_ms());
@@ -2375,14 +2387,18 @@ void TDataDecompressionEvent<UseMigrationProtocol>::TakeData(TIntrusivePtr<TPart
messageData.explicit_hash());
}
} else {
- NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo(messageData.offset(),
- batch.producer_id(),
- messageData.seq_no(),
- TInstant::MilliSeconds(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(messageData.created_at())),
- batchWriteTimestamp,
- meta,
- messageData.uncompressed_size(),
- messageData.message_group_id());
+ const auto &messageMeta = Parent->GetMessageMeta(Batch, Message);
+ NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation messageInfo(
+ messageData.offset(),
+ batch.producer_id(),
+ messageData.seq_no(),
+ TInstant::MilliSeconds(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(messageData.created_at())),
+ batchWriteTimestamp,
+ meta,
+ messageMeta,
+ messageData.uncompressed_size(),
+ messageData.message_group_id()
+ );
if (Parent->GetDoDecompress()) {
messages.emplace_back(messageData.data(),
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp
index adb43a2c0d8..fbe8e1a75a6 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp
@@ -25,6 +25,7 @@ TReadSessionEvent::TDataReceivedEvent::TMessageInformation::TMessageInformation(
TInstant createTime,
TInstant writeTime,
TWriteSessionMeta::TPtr meta,
+ TWriteSessionMeta::TPtr messageMeta,
ui64 uncompressedSize,
TString messageGroupId
)
@@ -34,6 +35,7 @@ TReadSessionEvent::TDataReceivedEvent::TMessageInformation::TMessageInformation(
, CreateTime(createTime)
, WriteTime(writeTime)
, Meta(meta)
+ , MessageMeta(messageMeta)
, UncompressedSize(uncompressedSize)
, MessageGroupId(messageGroupId)
{}
@@ -161,6 +163,10 @@ const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TMessage::
return Information.Meta;
}
+const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TMessage::GetMessageMeta() const {
+ return Information.MessageMeta;
+}
+
void TReadSessionEvent::TDataReceivedEvent::TMessage::Commit() {
static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get())
->Commit(Information.Offset, Information.Offset + 1);
@@ -217,6 +223,10 @@ const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TCompresse
return Information.Meta;
}
+const TWriteSessionMeta::TPtr& TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetMessageMeta() const {
+ return Information.MessageMeta;
+}
+
ui64 TReadSessionEvent::TDataReceivedEvent::TCompressedMessage::GetUncompressedSize() const {
return Information.UncompressedSize;
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
index 8b6e898c36b..58393858cf3 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
@@ -10,6 +10,7 @@
#include <util/generic/store_policy.h>
#include <util/generic/utility.h>
#include <util/stream/buffer.h>
+#include <util/generic/guid.h>
namespace NYdb::NTopic {
@@ -125,8 +126,24 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
}
return result;
}
+TString GenerateProducerId() {
+ return CreateGuidAsString();
+}
void TWriteSessionImpl::InitWriter() { // No Lock, very initial start - no race yet as well.
+ if (!Settings.DeduplicationEnabled_.Defined()) {
+ Settings.DeduplicationEnabled_ = !(Settings.ProducerId_.empty());
+ }
+ else if (Settings.DeduplicationEnabled_.GetRef()) {
+ if (Settings.ProducerId_.empty()) {
+ Settings.ProducerId(GenerateProducerId());
+ }
+ } else {
+ if (!Settings.ProducerId_.empty()) {
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "ProducerId is not empty when deduplication is switched off");
+ ThrowFatalError("Cannot disable deduplication when non-empty ProducerId is provided");
+ }
+ }
CompressionExecutor = Settings.CompressionExecutor_;
IExecutor::TPtr executor;
executor = CreateSyncExecutor();
@@ -139,6 +156,10 @@ void TWriteSessionImpl::InitWriter() { // No Lock, very initial start - no race
}
// Client method
NThreading::TFuture<ui64> TWriteSessionImpl::GetInitSeqNo() {
+ if (!Settings.DeduplicationEnabled_.GetOrElse(true)) {
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "GetInitSeqNo called with deduplication disabled");
+ ThrowFatalError("Cannot call GetInitSeqNo when deduplication is disabled");
+ }
if (Settings.ValidateSeqNo_) {
if (AutoSeqNoMode.Defined() && *AutoSeqNoMode) {
LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode");
@@ -177,6 +198,10 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) {
}
}
if (seqNo.Defined()) {
+ if (!Settings.DeduplicationEnabled_.GetOrElse(true)) {
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "SeqNo is provided on write when deduplication is disabled");
+ ThrowFatalError("Cannot provide SeqNo on Write() when deduplication is disabled");
+ }
if (*AutoSeqNoMode) {
LOG_LAZY(DbDriverState->Log,
TLOG_ERR,
@@ -226,7 +251,8 @@ void TWriteSessionImpl::WriteInternal(
bool readyToAccept = false;
size_t bufferSize = data.size();
with_lock(Lock) {
- CurrentBatch.Add(GetNextSeqNoImpl(seqNo), createdAtValue, data, codec, originalSize);
+ //ToDo[message-meta] - Pass message meta here.
+ CurrentBatch.Add(GetNextSeqNoImpl(seqNo), createdAtValue, data, codec, originalSize, {});
FlushWriteIfRequiredImpl();
readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk;
@@ -591,9 +617,13 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
ui64 newLastSeqNo = initResponse.last_seq_no();
// SeqNo increased, so there's a risk of loss, apply SeqNo shift.
// MinUnsentSeqNo must be > 0 if anything was ever sent yet
- if (MinUnsentSeqNo && OnSeqNoShift && newLastSeqNo > MinUnsentSeqNo) {
- SeqNoShift = newLastSeqNo - MinUnsentSeqNo;
- }
+ if (Settings.DeduplicationEnabled_.GetOrElse(true)) {
+ if(MinUnsentSeqNo && OnSeqNoShift && newLastSeqNo > MinUnsentSeqNo) {
+ SeqNoShift = newLastSeqNo - MinUnsentSeqNo;
+ }
+ } else {
+ newLastSeqNo = 1;
+ }
result.InitSeqNo = newLastSeqNo;
LastSeqNo = newLastSeqNo;
@@ -878,29 +908,35 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
for (size_t i = 0; i != CurrentBatch.Messages.size();) {
TBlock block{};
for (; block.OriginalSize < MaxBlockSize && i != CurrentBatch.Messages.size(); ++i) {
- auto sequenceNumber = CurrentBatch.Messages[i].SeqNo;
- auto createTs = CurrentBatch.Messages[i].CreatedAt;
+ auto& currMessage = CurrentBatch.Messages[i];
+ auto sequenceNumber = currMessage.SeqNo;
+ auto createTs = currMessage.CreatedAt;
if (!block.MessageCount) {
block.Offset = sequenceNumber;
}
block.MessageCount += 1;
- const auto& datum = CurrentBatch.Messages[i].DataRef;
+ const auto& datum = currMessage.DataRef;
block.OriginalSize += datum.size();
block.OriginalMemoryUsage = CurrentBatch.Data.size();
block.OriginalDataRefs.emplace_back(datum);
if (CurrentBatch.Messages[i].Codec.Defined()) {
Y_VERIFY(CurrentBatch.Messages.size() == 1);
- block.CodecID = static_cast<ui32>(*CurrentBatch.Messages[i].Codec);
- block.OriginalSize = CurrentBatch.Messages[i].OriginalSize;
+ block.CodecID = static_cast<ui32>(*currMessage.Codec);
+ block.OriginalSize = currMessage.OriginalSize;
block.Compressed = false;
}
size += datum.size();
UpdateTimedCountersImpl();
(*Counters->BytesInflightUncompressed) += datum.size();
(*Counters->MessagesInflight)++;
- OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size());
+ if (!currMessage.MessageMeta.empty()) {
+ OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size(),
+ std::move(currMessage.MessageMeta));
+ } else {
+ OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size());
+ }
}
block.Data = std::move(CurrentBatch.Data);
if (skipCompression) {
@@ -984,6 +1020,12 @@ void TWriteSessionImpl::SendImpl() {
msgData->set_seq_no(message.SeqNo + SeqNoShift);
*msgData->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(message.CreatedAt.MilliSeconds());
+ if (!message.MessageMeta.empty()) {
+ auto* meta = msgData->mutable_message_meta();
+ for (auto& [k, v] : message.MessageMeta) {
+ (*meta)[k] = v;
+ }
+ }
SentOriginalMessages.emplace(std::move(message));
OriginalMessagesToSend.pop();
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
index 9399fb22135..2f739d4064d 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
@@ -173,12 +173,15 @@ private:
TStringBuf DataRef;
TMaybe<ECodec> Codec;
ui32 OriginalSize; // only for coded messages
- TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {}, ui32 originalSize = 0)
+ std::unordered_map<TString, TString> MessageMeta;
+ TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {},
+ ui32 originalSize = 0, const std::unordered_map<TString, TString>& messageMeta = {})
: SeqNo(seqNo)
, CreatedAt(createdAt)
, DataRef(data)
, Codec(codec)
, OriginalSize(originalSize)
+ , MessageMeta(messageMeta)
{}
};
@@ -189,11 +192,12 @@ private:
TInstant StartedAt = TInstant::Zero();
bool Acquired = false;
bool FlushRequested = false;
- void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize) {
+ void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize,
+ const std::unordered_map<TString, TString>& messageMeta) {
if (StartedAt == TInstant::Zero())
StartedAt = TInstant::Now();
CurrentSize += codec ? originalSize : data.size();
- Messages.emplace_back(seqNo, createdAt, data, codec, originalSize);
+ Messages.emplace_back(seqNo, createdAt, data, codec, originalSize, messageMeta);
Acquired = false;
}
@@ -262,10 +266,18 @@ private:
ui64 SeqNo;
TInstant CreatedAt;
size_t Size;
+ std::unordered_map<TString, TString> MessageMeta;
TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size)
- : SeqNo(sequenceNumber)
- , CreatedAt(createdAt)
- , Size(size)
+ : SeqNo(sequenceNumber)
+ , CreatedAt(createdAt)
+ , Size(size)
+ {}
+ TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size,
+ std::unordered_map<TString, TString>&& messageMeta)
+ : SeqNo(sequenceNumber)
+ , CreatedAt(createdAt)
+ , Size(size)
+ , MessageMeta(std::move(messageMeta))
{}
};
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
index 40ba536e48b..751ca9d7cff 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
@@ -686,6 +686,7 @@ struct TReadSessionEvent {
TInstant createTime,
TInstant writeTime,
TWriteSessionMeta::TPtr meta,
+ TWriteSessionMeta::TPtr messageMeta,
ui64 uncompressedSize,
TString messageGroupId);
ui64 Offset;
@@ -694,6 +695,7 @@ struct TReadSessionEvent {
TInstant CreateTime;
TInstant WriteTime;
TWriteSessionMeta::TPtr Meta;
+ TWriteSessionMeta::TPtr MessageMeta;
ui64 UncompressedSize;
TString MessageGroupId;
};
@@ -752,6 +754,9 @@ struct TReadSessionEvent {
//! Metainfo.
const TWriteSessionMeta::TPtr& GetMeta() const;
+ //! Message level meta info.
+ const TWriteSessionMeta::TPtr& GetMessageMeta() const;
+
//! Commits single message.
void Commit() override;
@@ -794,6 +799,9 @@ struct TReadSessionEvent {
//! Metainfo.
const TWriteSessionMeta::TPtr& GetMeta() const;
+ //! Message level meta info.
+ const TWriteSessionMeta::TPtr& GetMessageMeta() const;
+
//! Uncompressed size.
ui64 GetUncompressedSize() const;
@@ -1212,6 +1220,11 @@ struct TWriteSessionSettings : public TRequestSettings<TWriteSessionSettings> {
//! MessageGroupId to use.
FLUENT_SETTING(TString, MessageGroupId);
+ //! Explicitly enables or disables deduplication for this write session.
+ //! If ProducerId option is defined deduplication will always be enabled.
+ //! If ProducerId option is empty, but deduplication is enable, a random ProducerId is generated.
+ FLUENT_SETTING_OPTIONAL(bool, DeduplicationEnabled);
+
//! Write to an exact partition. Generally server assigns partition automatically by message_group_id.
//! Using this option is not recommended unless you know for sure why you need it.
FLUENT_SETTING_OPTIONAL(ui32, PartitionId);
diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp
index 997ce3b4023..94a9b1ba3b4 100644
--- a/ydb/services/persqueue_v1/actors/partition_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp
@@ -387,7 +387,6 @@ bool FillBatchedData(
SetBatchExtraField(currentBatch, "logtype", header.GetLogType());
}
}
-
if (proto.HasExtraFields()) {
const auto& map = proto.GetExtraFields();
for (const auto& kv : map.GetItems()) {
@@ -424,6 +423,8 @@ bool FillBatchedData(
::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(r.GetCreateTimestampMS());
message->set_message_group_id(GetBatchSourceId(currentBatch));
+ auto* msgMeta = message->mutable_message_meta();
+ *msgMeta = (proto.GetMessageMeta());
}
hasData = true;
}
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h
index c52a40da557..ba87c01c89c 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.h
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.h
@@ -116,6 +116,7 @@ private:
HFunc(NPQ::TEvPartitionWriter::TEvInitResult, Handle);
HFunc(NPQ::TEvPartitionWriter::TEvWriteAccepted, Handle);
HFunc(NPQ::TEvPartitionWriter::TEvWriteResponse, Handle);
+
HFunc(NPQ::TEvPartitionWriter::TEvDisconnected, Handle);
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
@@ -164,8 +165,10 @@ private:
void StartSession(const NActors::TActorContext& ctx);
void SendSelectPartitionRequest(const TString& topic, const NActors::TActorContext& ctx);
+ void UpdateOrProceedPartition(const NActors::TActorContext& ctx);
void UpdatePartition(const NActors::TActorContext& ctx);
void RequestNextPartition(const NActors::TActorContext& ctx);
+ void GetOrProcessPartition(const NActors::TActorContext& ctx);
void ProceedPartition(const ui32 partition, const NActors::TActorContext& ctx);
@@ -175,7 +178,10 @@ private:
//void InitCheckACL(const TActorContext& ctx);
void Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev, const TActorContext& ctx);
+ void MakeAndSentInitResponse(const TMaybe<ui64>& maxSeqNo, const TActorContext& ctx);
+
void Handle(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev, const TActorContext& ctx);
+ void ProcessWriteResponse(const NKikimrClient::TPersQueuePartitionResponse& response, const TActorContext& ctx);
void Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev, const TActorContext& ctx);
void Handle(NPQ::TEvPartitionWriter::TEvDisconnected::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx);
@@ -225,6 +231,7 @@ private:
bool PartitionFound = false;
// 'SourceId' is called 'MessageGroupId' since gRPC data plane API v1
TString SourceId; // TODO: Replace with 'MessageGroupId' everywhere
+ bool UseDeduplication = true;
NPQ::NSourceIdEncoding::TEncodedSourceId EncodedSourceId;
TString OwnerCookie;
@@ -281,7 +288,9 @@ private:
TInstant LogSessionDeadline;
ui64 BalancerTabletId;
+ ui64 PartitionTabletId;
TActorId PipeToBalancer;
+ TActorId PipeToPartition;
// PQ tablet configuration that we get at the time of session initialization
NKikimrPQ::TPQTabletConfig InitialPQTabletConfig;
@@ -310,6 +319,8 @@ private:
TInitRequest InitRequest;
NPQ::ESourceIdTableGeneration SrcIdTableGeneration;
+ TDeque<ui64> SeqNoInflight;
+
};
}
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 72f1f67c7d7..3054d7cb3b2 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -36,6 +36,8 @@ using namespace NPQ;
template <bool UseMigrationProtocol>
using ECodec = std::conditional_t<UseMigrationProtocol, Ydb::PersQueue::V1::Codec, i32>;
+static constexpr ui64 MAX_METADATA_SIZE_PER_MESSAGE = 4096;
+
template <bool UseMigrationProtocol>
ECodec<UseMigrationProtocol> CodecByName(const TString& codec) {
THashMap<TString, ECodec<UseMigrationProtocol>> codecsByName;
@@ -147,6 +149,8 @@ inline void FillChunkDataFromReq(
proto.SetCodec(writeRequest.codec() - 1);
}
proto.SetData(msg.data());
+ auto* msgMeta = proto.MutableMessageMeta();
+ *msgMeta = msg.message_meta();
}
namespace NGRpcProxy {
@@ -410,9 +414,11 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
// For now exactly two scenarios supported:
// 1. Non-empty producer_id == message_group_id
// 2. Non-empty producer_id && non-empty valid partition_id (explicit partitioning)
+ // 3. Empty producer id (no deduplication).
bool isScenarioSupported = (!InitRequest.producer_id().empty() && InitRequest.has_message_group_id() &&
InitRequest.message_group_id() == InitRequest.producer_id())
- || (!InitRequest.producer_id().empty() && InitRequest.has_partition_id());
+ || (!InitRequest.producer_id().empty() && InitRequest.has_partition_id()
+ || InitRequest.producer_id().empty());
if (!isScenarioSupported) {
CloseSession("unsupported producer_id / message_group_id / partition_id settings in init request",
@@ -436,11 +442,17 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
if constexpr (UseMigrationProtocol) {
return InitRequest.message_group_id();
} else {
+ if (InitRequest.producer_id().empty()) {
+ UseDeduplication = false;
+ }
return InitRequest.has_message_group_id() ? InitRequest.message_group_id() : InitRequest.producer_id();
}
}();
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << " " << InitRequest << " from " << PeerName);
+ if (!UseDeduplication) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << ". Disable deduplication for empty producer id");
+ }
//TODO: get user agent from headers
UserAgent = "pqv1 server";
LogSession(ctx);
@@ -473,11 +485,18 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::InitAfterDiscovery(const TActorContext& ctx) {
- try {
- EncodedSourceId = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcIdHash(), SourceId, SrcIdTableGeneration);
- } catch (yexception& e) {
- CloseSession(TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(), PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ if (!SourceId.empty()) {
+ try {
+ EncodedSourceId = NSourceIdEncoding::EncodeSrcId(
+ FullConverter->GetTopicForSrcIdHash(), SourceId, SrcIdTableGeneration
+ );
+ } catch (yexception &e) {
+ CloseSession(TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(),
+ PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ return;
+ }
+ } else {
+ Y_VERIFY(!UseDeduplication);
}
InitMeta = GetInitialDataChunk(InitRequest, FullConverter->GetClientsideName(), PeerName); // ToDo[migration] - check?
@@ -645,6 +664,9 @@ ui32 TWriteSessionActor<UseMigrationProtocol>::CalculateFirstClassPartition(cons
template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::DiscoverPartition(const NActors::TActorContext& ctx) {
const auto &pqConfig = AppData(ctx)->PQConfig;
+ if (!UseDeduplication) {
+ return GetOrProcessPartition(ctx);
+ }
if (pqConfig.GetTopicsAreFirstClassCitizen()) {
if (pqConfig.GetUseSrcIdMetaMappingInFirstClass()) {
return SendCreateManagerRequest(ctx);
@@ -724,6 +746,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvCreateSes
template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::SendSelectPartitionRequest(const TString& topic, const NActors::TActorContext& ctx) {
+ Y_VERIFY(UseDeduplication);
//read from DS
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
@@ -749,8 +772,18 @@ void TWriteSessionActor<UseMigrationProtocol>::SendSelectPartitionRequest(const
}
template<bool UseMigrationProtocol>
+void TWriteSessionActor<UseMigrationProtocol>::UpdateOrProceedPartition(const TActorContext& ctx) {
+ if (UseDeduplication) {
+ UpdatePartition(ctx);
+ } else {
+ ProceedPartition(Partition, ctx);
+ }
+}
+
+template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::UpdatePartition(const TActorContext& ctx) {
Y_VERIFY(State == ES_WAIT_TABLE_REQUEST_1 || State == ES_WAIT_NEXT_PARTITION);
+ Y_VERIFY(UseDeduplication);
SendUpdateSrcIdsRequests(ctx);
State = ES_WAIT_TABLE_REQUEST_2;
}
@@ -779,13 +812,41 @@ template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const TActorContext& ctx) {
Y_VERIFY(State == ES_WAIT_NEXT_PARTITION);
Partition = ev->Get()->Record.GetPartitionId();
- UpdatePartition(ctx);
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie
+ << ". Got next partition from server: " << Partition);
+ UpdateOrProceedPartition(ctx);
+}
+
+template<bool UseMigrationProtocol>
+void TWriteSessionActor<UseMigrationProtocol>::GetOrProcessPartition(const TActorContext& ctx) {
+ if (!UseDeduplication) {
+ Y_VERIFY(!PartitionFound);
+ }
+ if (!PartitionFound) {
+ const auto& pqConfig = AppData(ctx)->PQConfig;
+ auto partition = GetPartitionFromConfigOptions(
+ PreferedPartition, EncodedSourceId, PartitionToTablet.size(),
+ pqConfig.GetTopicsAreFirstClassCitizen(),
+ pqConfig.GetRoundRobinPartitionMapping() || SourceId.empty()
+ //Empty SrcId = No-dedup mode and no msg-group id provided, so always use RoundRobin;
+ );
+ if (partition.Defined()) {
+ PartitionFound = true;
+ Partition = *partition;
+ }
+ }
+ if (PartitionFound) {
+ UpdateOrProceedPartition(ctx);
+ } else {
+ State = ES_WAIT_TABLE_REQUEST_1;
+ RequestNextPartition(ctx);
+ }
+
}
template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const TActorContext &ctx) {
auto& record = ev->Get()->Record.GetRef();
- const auto& pqConfig = AppData(ctx)->PQConfig;
if (record.GetYdbStatus() == Ydb::StatusIds::ABORTED) {
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId "
<< SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " discover partition race, retrying");
@@ -844,20 +905,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvQueryResp
<< SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " partition " << Partition << " partitions "
<< PartitionToTablet.size() << "(" << EncodedSourceId.Hash % PartitionToTablet.size() << ") create " << SourceIdCreateTime << " result " << t);
- if (!PartitionFound) {
- auto partition = GetPartitionFromConfigOptions(PreferedPartition, EncodedSourceId, PartitionToTablet.size(),
- pqConfig.GetTopicsAreFirstClassCitizen(),
- pqConfig.GetRoundRobinPartitionMapping());
- if (partition.Defined()) {
- PartitionFound = true;
- Partition = *partition;
- }
- }
- if (PartitionFound) {
- UpdatePartition(ctx);
- } else {
- RequestNextPartition(ctx);
- }
+ GetOrProcessPartition(ctx);
return;
} else if (State == EState::ES_WAIT_TABLE_REQUEST_2) {
@@ -939,9 +987,11 @@ void TWriteSessionActor<UseMigrationProtocol>::ProceedPartition(const ui32 parti
Partition = partition;
auto it = PartitionToTablet.find(Partition);
- ui64 tabletId = it != PartitionToTablet.end() ? it->second : 0;
+ ui64 PartitionTabletId = it != PartitionToTablet.end() ? it->second : 0;
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session cookie: " << Cookie << " sessionId: " << OwnerCookie
+ << " partition: " << Partition);
- if (!tabletId) {
+ if (!PartitionTabletId) {
CloseSession(
Sprintf("no partition %u in topic '%s', Marker# PQ4", Partition,
DiscoveryConverter->GetPrintableString().c_str()),
@@ -950,7 +1000,10 @@ void TWriteSessionActor<UseMigrationProtocol>::ProceedPartition(const ui32 parti
return;
}
- Writer = ctx.RegisterWithSameMailbox(NPQ::CreatePartitionWriter(ctx.SelfID, tabletId, Partition, SourceId));
+ Writer = ctx.RegisterWithSameMailbox(NPQ::CreatePartitionWriter(
+ ctx.SelfID, PartitionTabletId, Partition, SourceId,
+ TPartitionWriterOpts().WithDeduplication(UseDeduplication)
+ ));
State = ES_WAIT_WRITER_INIT;
ui32 border = AppData(ctx)->PQConfig.GetWriteInitLatencyBigMs();
@@ -1000,31 +1053,20 @@ void TWriteSessionActor<UseMigrationProtocol>::CloseSession(const TString& error
}
template<bool UseMigrationProtocol>
-void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev, const TActorContext& ctx) {
- if (State != ES_WAIT_WRITER_INIT) {
- return CloseSession("got init result but not wait for it", PersQueue::ErrorCode::ERROR, ctx);
- }
-
- const auto& result = *ev->Get();
- if (!result.IsSuccess()) {
- const auto& error = result.GetError();
- if (error.Response.HasErrorCode()) {
- return CloseSession("status is not ok: " + error.Response.GetErrorReason(), ConvertOldCode(error.Response.GetErrorCode()), ctx);
- } else {
- return CloseSession("error at writer init: " + error.Reason, PersQueue::ErrorCode::ERROR, ctx);
- }
- }
-
- OwnerCookie = result.GetResult().OwnerCookie;
- const auto& maxSeqNo = result.GetResult().SourceIdInfo.GetSeqNo();
-
+void TWriteSessionActor<UseMigrationProtocol>::MakeAndSentInitResponse(
+ const TMaybe<ui64>& maxSeqNo, const TActorContext& ctx
+) {
TServerMessage response;
response.set_status(Ydb::StatusIds::SUCCESS);
auto init = response.mutable_init_response();
- if constexpr (UseMigrationProtocol) {
+ if (!OwnerCookie.empty()) {
init->set_session_id(EscapeC(OwnerCookie));
- init->set_last_sequence_number(maxSeqNo);
+ }
+ if constexpr (UseMigrationProtocol) {
+ if (maxSeqNo.Defined()) {
+ init->set_last_sequence_number(*maxSeqNo);
+ }
init->set_partition_id(Partition);
init->set_topic(FullConverter->GetFederationPath());
init->set_cluster(FullConverter->GetCluster());
@@ -1035,8 +1077,10 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
}
}
} else {
- init->set_session_id(EscapeC(OwnerCookie));
- init->set_last_seq_no(maxSeqNo);
+ //init->set_session_id(EscapeC(OwnerCookie));
+ if (maxSeqNo.Defined()) {
+ init->set_last_seq_no(*maxSeqNo);
+ }
init->set_partition_id(Partition);
if (InitialPQTabletConfig.HasCodecs()) {
for (const auto& codecName : InitialPQTabletConfig.GetCodecs().GetCodecs()) {
@@ -1046,7 +1090,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
}
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session inited cookie: " << Cookie << " partition: " << Partition
- << " MaxSeqNo: " << maxSeqNo << " sessionId: " << OwnerCookie);
+ << " MaxSeqNo: " << maxSeqNo << " sessionId: " << OwnerCookie);
if (!Request->GetStreamCtx()->Write(std::move(response))) {
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc write failed");
@@ -1068,6 +1112,31 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
}
template<bool UseMigrationProtocol>
+void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev, const TActorContext& ctx) {
+ if (State != ES_WAIT_WRITER_INIT) {
+ return CloseSession("got init result but not wait for it", PersQueue::ErrorCode::ERROR, ctx);
+ }
+
+ const auto& result = *ev->Get();
+ if (!result.IsSuccess()) {
+ const auto& error = result.GetError();
+ if (error.Response.HasErrorCode()) {
+ return CloseSession("status is not ok: " + error.Response.GetErrorReason(), ConvertOldCode(error.Response.GetErrorCode()), ctx);
+ } else {
+ return CloseSession("error at writer init: " + error.Reason, PersQueue::ErrorCode::ERROR, ctx);
+ }
+ }
+
+ OwnerCookie = result.GetResult().OwnerCookie;
+ const auto& maxSeqNo = result.GetResult().SourceIdInfo.GetSeqNo();
+ if (!UseDeduplication) {
+ Y_VERIFY(maxSeqNo == 0);
+ }
+ MakeAndSentInitResponse(maxSeqNo, ctx);
+
+}
+
+template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev, const TActorContext& ctx) {
if (State != ES_INITED) {
return CloseSession("got write permission but not wait for it", PersQueue::ErrorCode::ERROR, ctx);
@@ -1108,56 +1177,42 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
}
template<bool UseMigrationProtocol>
-void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev, const TActorContext& ctx) {
- if (State != ES_INITED) {
- return CloseSession("got write response but not wait for it", PersQueue::ErrorCode::ERROR, ctx);
- }
-
- const auto& result = *ev->Get();
- if (!result.IsSuccess()) {
- const auto& record = result.Record;
- if (record.HasErrorCode()) {
- return CloseSession("status is not ok: " + record.GetErrorReason(), ConvertOldCode(record.GetErrorCode()), ctx);
- } else {
- return CloseSession("error at write: " + result.GetError().Reason, PersQueue::ErrorCode::ERROR, ctx);
- }
- }
-
- const auto& resp = result.Record.GetPartitionResponse();
-
- if (AcceptedRequests.empty()) {
- CloseSession("got too many replies from server, internal error", PersQueue::ErrorCode::ERROR, ctx);
- return;
- }
+void TWriteSessionActor<UseMigrationProtocol>::ProcessWriteResponse(
+ const NKikimrClient::TPersQueuePartitionResponse& response, const TActorContext& ctx
+) {
auto writeRequest = std::move(AcceptedRequests.front());
AcceptedRequests.pop_front();
- if (resp.GetCookie() != writeRequest->Cookie) {
- return CloseSession("out of order write response from server, may be previous is lost", PersQueue::ErrorCode::ERROR, ctx);
- }
- auto addAckMigration = [](const TPersQueuePartitionResponse::TCmdWriteResult& res, StreamingWriteServerMessage::BatchWriteResponse* batchWriteResponse,
- StreamingWriteServerMessage::WriteStatistics* stat) {
+ auto addAckMigration = [this](
+ const TPersQueuePartitionResponse::TCmdWriteResult& res,
+ StreamingWriteServerMessage::BatchWriteResponse* batchWriteResponse,
+ StreamingWriteServerMessage::WriteStatistics* stat) {
+
batchWriteResponse->add_sequence_numbers(res.GetSeqNo());
batchWriteResponse->add_offsets(res.GetOffset());
+ if (!UseDeduplication) {
+ Y_VERIFY(!res.GetAlreadyWritten());
+ }
batchWriteResponse->add_already_written(res.GetAlreadyWritten());
-
stat->set_queued_in_partition_duration_ms(
- Max((i64)res.GetTotalTimeInPartitionQueueMs(), stat->queued_in_partition_duration_ms()));
+ Max((i64)res.GetTotalTimeInPartitionQueueMs(), stat->queued_in_partition_duration_ms()));
stat->set_throttled_on_partition_duration_ms(
- Max((i64)res.GetPartitionQuotedTimeMs(), stat->throttled_on_partition_duration_ms()));
+ Max((i64)res.GetPartitionQuotedTimeMs(), stat->throttled_on_partition_duration_ms()));
stat->set_throttled_on_topic_duration_ms(Max(static_cast<i64>(res.GetTopicQuotedTimeMs()), stat->throttled_on_topic_duration_ms()));
stat->set_persist_duration_ms(
- Max((i64)res.GetWriteTimeMs(), stat->persist_duration_ms()));
+ Max((i64)res.GetWriteTimeMs(), stat->persist_duration_ms()));
};
- auto addAck = [](const TPersQueuePartitionResponse::TCmdWriteResult& res, Topic::StreamWriteMessage::WriteResponse* writeResponse,
- Topic::StreamWriteMessage::WriteResponse::WriteStatistics* stat) {
+ auto addAck = [this](const TPersQueuePartitionResponse::TCmdWriteResult& res,
+ Topic::StreamWriteMessage::WriteResponse* writeResponse,
+ Topic::StreamWriteMessage::WriteResponse::WriteStatistics* stat) {
auto ack = writeResponse->add_acks();
// TODO (ildar-khisam@): validate res before filling ack fields
ack->set_seq_no(res.GetSeqNo());
if (res.GetAlreadyWritten()) {
+ Y_VERIFY(UseDeduplication);
ack->mutable_skipped()->set_reason(Topic::StreamWriteMessage::WriteResponse::WriteAck::Skipped::REASON_ALREADY_WRITTEN);
} else {
ack->mutable_written()->set_offset(res.GetOffset());
@@ -1170,8 +1225,8 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
*stat->mutable_persisting_time() = TimeUtil::MillisecondsToDuration(persisting_time_ms);
auto min_queue_wait_time_ms = (stat->min_queue_wait_time() == Duration())
- ? (i64)res.GetTotalTimeInPartitionQueueMs()
- : Min<i64>(res.GetTotalTimeInPartitionQueueMs(), TimeUtil::DurationToMilliseconds(stat->min_queue_wait_time()));
+ ? (i64)res.GetTotalTimeInPartitionQueueMs()
+ : Min<i64>(res.GetTotalTimeInPartitionQueueMs(), TimeUtil::DurationToMilliseconds(stat->min_queue_wait_time()));
*stat->mutable_min_queue_wait_time() = TimeUtil::MillisecondsToDuration(min_queue_wait_time_ms);
auto max_queue_wait_time_ms = Max<i64>(res.GetTotalTimeInPartitionQueueMs(), TimeUtil::DurationToMilliseconds(stat->max_queue_wait_time()));
@@ -1194,14 +1249,16 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
auto batchWriteResponse = result.mutable_batch_write_response();
batchWriteResponse->set_partition_id(Partition);
- for (size_t messageIndex = 0, endIndex = userWriteRequest->Request.write_request().sequence_numbers_size(); messageIndex != endIndex; ++messageIndex) {
- if (partitionCmdWriteResultIndex == resp.CmdWriteResultSize()) {
+ for (size_t messageIndex = 0, endIndex = userWriteRequest->Request.write_request().sequence_numbers_size();
+ messageIndex != endIndex; ++messageIndex) {
+
+ if (partitionCmdWriteResultIndex == response.CmdWriteResultSize()) {
CloseSession("too few responses from server", PersQueue::ErrorCode::ERROR, ctx);
return;
}
- const auto& partitionCmdWriteResult = resp.GetCmdWriteResult(partitionCmdWriteResultIndex);
+ const auto& partitionCmdWriteResult = response.GetCmdWriteResult(partitionCmdWriteResultIndex);
const auto writtenSequenceNumber = userWriteRequest->Request.write_request().sequence_numbers(messageIndex);
- if (partitionCmdWriteResult.GetSeqNo() != writtenSequenceNumber) {
+ if (UseDeduplication && partitionCmdWriteResult.GetSeqNo() != writtenSequenceNumber) {
CloseSession(TStringBuilder() << "Expected partition " << Partition << " write result for message with sequence number " << writtenSequenceNumber << " but got for " << partitionCmdWriteResult.GetSeqNo(), PersQueue::ErrorCode::ERROR, ctx);
return;
}
@@ -1214,15 +1271,20 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
auto batchWriteResponse = result.mutable_write_response();
batchWriteResponse->set_partition_id(Partition);
- for (size_t messageIndex = 0, endIndex = userWriteRequest->Request.write_request().messages_size(); messageIndex != endIndex; ++messageIndex) {
- if (partitionCmdWriteResultIndex == resp.CmdWriteResultSize()) {
+ for (size_t messageIndex = 0, endIndex = userWriteRequest->Request.write_request().messages_size();
+ messageIndex != endIndex; ++messageIndex) {
+
+ if (partitionCmdWriteResultIndex == response.CmdWriteResultSize()) {
CloseSession("too few responses from server", PersQueue::ErrorCode::ERROR, ctx);
return;
}
- const auto& partitionCmdWriteResult = resp.GetCmdWriteResult(partitionCmdWriteResultIndex);
+ const auto& partitionCmdWriteResult = response.GetCmdWriteResult(partitionCmdWriteResultIndex);
const auto writtenSequenceNumber = userWriteRequest->Request.write_request().messages(messageIndex).seq_no();
- if (partitionCmdWriteResult.GetSeqNo() != writtenSequenceNumber) {
- CloseSession(TStringBuilder() << "Expected partition " << Partition << " write result for message with sequence number " << writtenSequenceNumber << " but got for " << partitionCmdWriteResult.GetSeqNo(), PersQueue::ErrorCode::ERROR, ctx);
+ if (UseDeduplication && partitionCmdWriteResult.GetSeqNo() != writtenSequenceNumber) {
+ CloseSession(TStringBuilder() << "Expected partition " << Partition
+ << " write result for message with sequence number "
+ << writtenSequenceNumber << " but got for "
+ << partitionCmdWriteResult.GetSeqNo(), PersQueue::ErrorCode::ERROR, ctx);
return;
}
@@ -1251,6 +1313,37 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
}
template<bool UseMigrationProtocol>
+void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev, const TActorContext& ctx) {
+ if (State != ES_INITED) {
+ return CloseSession("got write response but not wait for it", PersQueue::ErrorCode::ERROR, ctx);
+ }
+
+ const auto& result = *ev->Get();
+ if (!result.IsSuccess()) {
+ const auto& record = result.Record;
+ if (record.HasErrorCode()) {
+ return CloseSession("status is not ok: " + record.GetErrorReason(), ConvertOldCode(record.GetErrorCode()), ctx);
+ } else {
+ return CloseSession("error at write: " + result.GetError().Reason, PersQueue::ErrorCode::ERROR, ctx);
+ }
+ }
+
+
+ if (AcceptedRequests.empty()) {
+ CloseSession("got too many replies from server, internal error", PersQueue::ErrorCode::ERROR, ctx);
+ return;
+ }
+
+ const auto& writeRequest = AcceptedRequests.front();
+ const auto& resp = result.Record.GetPartitionResponse();
+
+ if (resp.GetCookie() != writeRequest->Cookie) {
+ return CloseSession("out of order write response from server, may be previous is lost", PersQueue::ErrorCode::ERROR, ctx);
+ }
+ ProcessWriteResponse(resp, ctx);
+}
+
+template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvDisconnected::TPtr&, const TActorContext& ctx) {
CloseSession("pipe to partition's tablet is dead", PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED, ctx);
}
@@ -1283,8 +1376,13 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>&
auto addDataMigration = [&](const StreamingWriteClientMessage::WriteRequest& writeRequest, const i32 messageIndex) {
auto w = request.MutablePartitionRequest()->AddCmdWrite();
w->SetData(GetSerializedData(InitMeta, writeRequest, messageIndex));
+ if (UseDeduplication) {
+ w->SetSourceId(NPQ::NSourceIdEncoding::EncodeSimple(SourceId));
+ }
w->SetSeqNo(writeRequest.sequence_numbers(messageIndex));
- w->SetSourceId(NPQ::NSourceIdEncoding::EncodeSimple(SourceId));
+ if (!UseDeduplication)
+ SeqNoInflight.push_back(w->GetSeqNo());
+
w->SetCreateTimeMS(writeRequest.created_at_ms(messageIndex));
w->SetUncompressedSize(writeRequest.blocks_uncompressed_sizes(messageIndex));
w->SetClientDC(ClientDC);
@@ -1292,16 +1390,28 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>&
payloadSize += w->GetData().size() + w->GetSourceId().size();
};
+ ui64 maxMessageMetadataSize = 0;
auto addData = [&](const Topic::StreamWriteMessage::WriteRequest& writeRequest, const i32 messageIndex) {
auto w = request.MutablePartitionRequest()->AddCmdWrite();
w->SetData(GetSerializedData(InitMeta, writeRequest, messageIndex));
+ if (UseDeduplication) {
+ w->SetSourceId(NPQ::NSourceIdEncoding::EncodeSimple(SourceId));
+ } else {
+ w->SetDisableDeduplication(true);
+ }
w->SetSeqNo(writeRequest.messages(messageIndex).seq_no());
- w->SetSourceId(NPQ::NSourceIdEncoding::EncodeSimple(SourceId));
+ SeqNoInflight.push_back(w->GetSeqNo());
w->SetCreateTimeMS(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(writeRequest.messages(messageIndex).created_at()));
w->SetUncompressedSize(writeRequest.messages(messageIndex).uncompressed_size());
w->SetClientDC(ClientDC);
w->SetIgnoreQuotaDeadline(true);
payloadSize += w->GetData().size() + w->GetSourceId().size();
+ const auto& msg = writeRequest.messages(messageIndex);
+ ui64 currMetadataSize = 0;
+ for (const auto& [k, v] : msg.message_meta()) {
+ currMetadataSize += k.size() + v.size();
+ }
+ maxMessageMetadataSize = std::max(maxMessageMetadataSize, currMetadataSize);
};
const auto& writeRequest = ev->Request.write_request();
@@ -1317,7 +1427,19 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>&
PendingRequest->UserWriteRequests.push_back(std::move(ev));
PendingRequest->ByteSize = request.ByteSize();
-
+ auto msgMetaEnabled = AppData(ctx)->FeatureFlags.GetEnableTopicMessageMeta();
+ if (!msgMetaEnabled && maxMessageMetadataSize > 0) {
+ CloseSession("Message level metadata support is disabled on server size", PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ return;
+ }
+ if (maxMessageMetadataSize > MAX_METADATA_SIZE_PER_MESSAGE) {
+ CloseSession(
+ TStringBuilder() << "Message level metadata size is limited to " << MAX_METADATA_SIZE_PER_MESSAGE
+ << " per message",
+ PersQueue::ErrorCode::BAD_REQUEST, ctx
+ );
+ return;
+ }
if (const auto ru = CalcRuConsumption(payloadSize)) {
PendingRequest->RequiredQuota += ru;
if (MaybeRequestQuota(PendingRequest->RequiredQuota, EWakeupTag::RlAllowed, ctx)) {
@@ -1631,6 +1753,7 @@ void TWriteSessionActor<UseMigrationProtocol>::RecheckACL(const TActorContext& c
if ((!pqConfig.GetTopicsAreFirstClassCitizen() || pqConfig.GetUseSrcIdMetaMappingInFirstClass())
&& !SourceIdUpdatesInflight
&& ctx.Now() - LastSourceIdUpdate > SOURCEID_UPDATE_PERIOD
+ && UseDeduplication
) {
SendUpdateSrcIdsRequests(ctx);
}
diff --git a/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp b/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp
index d466a5cd307..d331a19832e 100644
--- a/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp
+++ b/ydb/services/persqueue_v1/first_class_src_ids_ut.cpp
@@ -80,7 +80,6 @@ Y_UNIT_TEST_SUITE(TFstClassSrcIdPQTest) {
UNIT_ASSERT(res);
writer->Close();
};
-
alterAndCheck(2);
alterAndCheck(4);
alterAndCheck(12);
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 1cbdd25bb5c..517c6f9a9f8 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -6191,6 +6191,116 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
}
}
+ THolder<NYdb::TDriver> SetupTestAndGetDriver(
+ NPersQueue::TTestServer& server, const TString& topicName, ui64 partsCount = 1
+ ) {
+ NYdb::TDriverConfig driverCfg;
+ driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort);
+ auto driver = MakeHolder<NYdb::TDriver>(driverCfg);
+
+ server.EnableLogs({ NKikimrServices::PQ_READ_PROXY, NKikimrServices::PQ_WRITE_PROXY});
+ server.EnableLogs({ NKikimrServices::KQP_PROXY}, NActors::NLog::PRI_EMERG);
+
+ server.AnnoyingClient->CreateTopic(topicName, partsCount);
+ auto topicClient = NYdb::NTopic::TTopicClient(*driver);
+ auto alterSettings = NYdb::NTopic::TAlterTopicSettings();
+ alterSettings.BeginAddConsumer("debug");
+ auto alterRes = topicClient.AlterTopic(TString("/Root/PQ/") + topicName, alterSettings).GetValueSync();
+ UNIT_ASSERT(alterRes.IsSuccess());
+ return std::move(driver);
+ }
+
+ Y_UNIT_TEST(MessageMetadata) {
+ return; //No supported;
+ NPersQueue::TTestServer server;
+ server.CleverServer->GetRuntime()->GetAppData().FeatureFlags.SetEnableTopicMessageMeta(true);
+ TString topicFullName = "rt3.dc1--topic1";
+ auto driver = SetupTestAndGetDriver(server, topicFullName);
+
+ auto topicClient = NYdb::NTopic::TTopicClient(*driver);
+
+ NYdb::NTopic::TWriteSessionSettings wSettings {topicFullName, "srcId", "srcId"};
+ auto writer = topicClient.CreateSimpleBlockingWriteSession(wSettings);
+ std::unordered_map<TString, TString> metadata = {{"key1", "val1"}, {"key2", "val2"}};
+/* writer->Write("Somedata", Nothing(), Nothing(), TDuration::Max(), metadata);
+ metadata = {{"key3", "val3"}};
+ writer->Write("Somedata2", Nothing(), Nothing(), TDuration::Max(), metadata);
+ writer->Write("Somedata3");
+*/
+ writer->Close();
+ NYdb::NTopic::TReadSessionSettings rSettings;
+ rSettings.ConsumerName("debug").AppendTopics({topicFullName});
+ auto readSession = topicClient.CreateReadSession(rSettings);
+
+ auto ev = readSession->GetEvent(true);
+ UNIT_ASSERT(ev.Defined());
+ auto spsEv = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*ev);
+ UNIT_ASSERT(spsEv);
+ spsEv->Confirm();
+ ev = readSession->GetEvent(true);
+ auto dataEv = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*ev);
+ UNIT_ASSERT(dataEv);
+ const auto& messages = dataEv->GetMessages();
+ UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3);
+ auto metadataSizeExpected = 2;
+ for (const auto& msg : dataEv->GetMessages()) {
+ auto meta = msg.GetMessageMeta();
+ UNIT_ASSERT_VALUES_EQUAL(meta->Fields.size(), metadataSizeExpected);
+ metadataSizeExpected--;
+ }
+ }
+
+ Y_UNIT_TEST(DisableDeduplication) {
+ NPersQueue::TTestServer server;
+ TString topicFullName = "rt3.dc1--topic1";
+ auto driver = SetupTestAndGetDriver(server, topicFullName, 3);
+
+ auto topicClient = NYdb::NTopic::TTopicClient(*driver);
+ NYdb::NTopic::TWriteSessionSettings wSettings;
+ wSettings.Path(topicFullName).DeduplicationEnabled(false);
+
+ TVector<std::shared_ptr<NYdb::NTopic::ISimpleBlockingWriteSession>> writers;
+ for (auto i = 0u; i < 3; i++) {
+ auto writer = topicClient.CreateSimpleBlockingWriteSession(wSettings);
+ for (auto j = 0u; j < 3; j++) {
+ writer->Write(TString("MyData") + ToString(i) + ToString(j));
+ }
+ writers.push_back(writer);
+ }
+
+ for (auto& w : writers) {
+ w->Close();
+ }
+ NYdb::NTopic::TReadSessionSettings rSettings;
+ rSettings.ConsumerName("debug").AppendTopics({topicFullName});
+ auto readSession = topicClient.CreateReadSession(rSettings);
+
+ THashSet<ui64> partitions;
+ ui64 totalMessages = 0;
+ Cerr << "Start reads\n";
+ while(totalMessages < 9) {
+ auto ev = readSession->GetEvent(true);
+ UNIT_ASSERT(ev.Defined());
+
+ auto spsEv = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*ev);
+ if (spsEv) {
+ spsEv->Confirm();
+ Cerr << "Got start stream event\n";
+ continue;
+ }
+ auto dataEv = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*ev);
+ UNIT_ASSERT(dataEv);
+ const auto& messages = dataEv->GetMessages();
+ totalMessages += messages.size();
+ Cerr << "Got data event with total " << messages.size() << " messages, current total messages: " << totalMessages << Endl;
+ for (const auto& msg: dataEv->GetMessages()) {
+ auto session = msg.GetPartitionSession();
+ partitions.insert(session->GetPartitionId());
+ }
+ }
+ UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 3);
+ }
+
Y_UNIT_TEST(LOGBROKER_7820) {
//
// 700 messages of 2000 characters are sent in the test