diff options
author | Nikolay Shestakov <tesseract@ydb.tech> | 2025-02-24 18:37:49 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-24 13:37:49 +0000 |
commit | f1dcfdb2034c068ba69256784aebdc63b765ad01 (patch) | |
tree | b564021d68eb4ce9a1ed74c5e3b3d1956fe6aeef | |
parent | cb5d0e3b009025201c8fbc9f90e6960765da9bc5 (diff) | |
download | ydb-f1dcfdb2034c068ba69256784aebdc63b765ad01.tar.gz |
Expand the list of message fields available for transfering from a topic to a table (#14941)
-rw-r--r-- | ydb/core/backup/impl/local_partition_reader.cpp | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/purecalc/purecalc.cpp | 52 | ||||
-rw-r--r-- | ydb/core/persqueue/purecalc/purecalc.h | 15 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/base_table_writer.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/common_ut.h | 13 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/table_writer_ut.cpp | 27 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/topic_reader.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/transfer_writer.cpp | 37 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/transfer_writer_ut.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/worker.cpp | 24 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/worker.h | 12 | ||||
-rw-r--r-- | ydb/core/tx/replication/ydb_proxy/ydb_proxy.h | 9 | ||||
-rw-r--r-- | ydb/tests/functional/transfer/main.cpp | 265 |
13 files changed, 382 insertions, 90 deletions
diff --git a/ydb/core/backup/impl/local_partition_reader.cpp b/ydb/core/backup/impl/local_partition_reader.cpp index b7a476693a..2a43bb8741 100644 --- a/ydb/core/backup/impl/local_partition_reader.cpp +++ b/ydb/core/backup/impl/local_partition_reader.cpp @@ -135,11 +135,11 @@ private: for (auto& result : readResult.GetResult()) { gotOffset = std::max(gotOffset, result.GetOffset()); - records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData()); + records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData(), TInstant::MilliSeconds(result.GetCreateTimestampMS()), result.GetSourceId(), result.GetSourceId(), result.GetSeqNo()); } SentOffset = gotOffset + 1; - Send(Worker, new TEvWorker::TEvData(ToString(Partition), std::move(records))); + Send(Worker, new TEvWorker::TEvData(Partition, ToString(Partition), std::move(records))); } void Leave(TEvWorker::TEvGone::EStatus status) { diff --git a/ydb/core/persqueue/purecalc/purecalc.cpp b/ydb/core/persqueue/purecalc/purecalc.cpp index 018483c9e4..be27c6d34f 100644 --- a/ydb/core/persqueue/purecalc/purecalc.cpp +++ b/ydb/core/persqueue/purecalc/purecalc.cpp @@ -13,14 +13,13 @@ using namespace NYql::NUdf; using namespace NKikimr::NMiniKQL; constexpr const char* DataFieldName = "_data"; +constexpr const char* MessageGroupIdFieldName = "_message_group_id"; constexpr const char* OffsetFieldName = "_offset"; +constexpr const char* PartitionFieldName = "_partition"; +constexpr const char* ProducerIdFieldName = "_producer_id"; +constexpr const char* SeqNoFieldName = "_seq_no"; -constexpr const size_t FieldCount = 2; // Change it when change fields - -struct FieldPositions { - ui64 Data = 0; - ui64 Offset = 0; -}; +constexpr const size_t FieldCount = 6; // Change it when change fields NYT::TNode CreateTypeNode(const TString& fieldType) { @@ -41,7 +40,11 @@ void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldTy NYT::TNode CreateMessageScheme() { auto structMembers = NYT::TNode::CreateList(); AddField(structMembers, DataFieldName, "String"); + AddField(structMembers, MessageGroupIdFieldName, "String"); AddField(structMembers, OffsetFieldName, "Uint64"); + AddField(structMembers, PartitionFieldName, "Uint32"); + AddField(structMembers, ProducerIdFieldName, "String"); + AddField(structMembers, SeqNoFieldName, "Uint64"); return NYT::TNode::CreateList() .Add("StructType") @@ -57,32 +60,36 @@ struct TMessageWrapper { return NKikimr::NMiniKQL::MakeString(Message.Data); } + NYql::NUdf::TUnboxedValuePod GetMessageGroupId() const { + return NKikimr::NMiniKQL::MakeString(Message.MessageGroupId); + } + NYql::NUdf::TUnboxedValuePod GetOffset() const { return NYql::NUdf::TUnboxedValuePod(Message.Offset); } + + NYql::NUdf::TUnboxedValuePod GetPartition() const { + return NYql::NUdf::TUnboxedValuePod(Message.Partition); + } + + NYql::NUdf::TUnboxedValuePod GetProducerId() const { + return NKikimr::NMiniKQL::MakeString(Message.ProducerId); + } + + NYql::NUdf::TUnboxedValuePod GetSeqNo() const { + return NYql::NUdf::TUnboxedValuePod(Message.SeqNo); + } }; class TInputConverter { protected: IWorker* Worker_; TPlainContainerCache Cache_; - FieldPositions Position; public: explicit TInputConverter(IWorker* worker) : Worker_(worker) { - const TStructType* structType = worker->GetInputType(); - const ui64 count = structType->GetMembersCount(); - - for (ui64 i = 0; i < count; ++i) { - const auto name = structType->GetMemberName(i); - if (name == DataFieldName) { - Position.Data = i; - } else if (name == OffsetFieldName) { - Position.Offset = i; - } - } } public: @@ -92,8 +99,13 @@ public: result = Cache_.NewArray(holderFactory, static_cast<ui32>(FieldCount), items); TMessageWrapper wrap {*message}; - items[Position.Data] = wrap.GetData(); - items[Position.Offset] = wrap.GetOffset(); + // lex order by field name + items[0] = wrap.GetData(); + items[1] = wrap.GetMessageGroupId(); + items[2] = wrap.GetOffset(); + items[3] = wrap.GetPartition(); + items[4] = wrap.GetProducerId(); + items[5] = wrap.GetSeqNo(); } void ClearCache() { diff --git a/ydb/core/persqueue/purecalc/purecalc.h b/ydb/core/persqueue/purecalc/purecalc.h index 90fe1a21e7..57fdff5e33 100644 --- a/ydb/core/persqueue/purecalc/purecalc.h +++ b/ydb/core/persqueue/purecalc/purecalc.h @@ -7,17 +7,12 @@ namespace NYdb::NTopic::NPurecalc { using namespace NYql::NPureCalc; struct TMessage { - TMessage(const TString& data) - : Data(data) { - } - - TMessage& WithOffset(ui64 offset) { - Offset = offset; - return *this; - } - - const TString& Data; + TString Data; + TString MessageGroupId; ui64 Offset = 0; + ui32 Partition = 0; + TString ProducerId; + ui64 SeqNo = 0; }; class TMessageInputSpec: public TInputSpecBase { diff --git a/ydb/core/tx/replication/service/base_table_writer.cpp b/ydb/core/tx/replication/service/base_table_writer.cpp index 7c6c57dfc6..b2bcd0ef5a 100644 --- a/ydb/core/tx/replication/service/base_table_writer.cpp +++ b/ydb/core/tx/replication/service/base_table_writer.cpp @@ -433,7 +433,10 @@ class TLocalTableWriter TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> records(::Reserve(ev->Get()->Records.size())); TSet<TRowVersion> versionsWithoutTxId; - for (auto& [offset, data, _] : ev->Get()->Records) { + for (auto& r : ev->Get()->Records) { + auto offset = r.Offset; + auto& data = r.Data; + auto record = Parser->Parse(ev->Get()->Source, offset, std::move(data)); if (Mode == EWriteMode::Consistent) { diff --git a/ydb/core/tx/replication/service/common_ut.h b/ydb/core/tx/replication/service/common_ut.h new file mode 100644 index 0000000000..6d833760f6 --- /dev/null +++ b/ydb/core/tx/replication/service/common_ut.h @@ -0,0 +1,13 @@ +#pragma once + +#include "worker.h" + +namespace NKikimr::NReplication::NService { + +struct TRecord: public TEvWorker::TEvData::TRecord { + explicit TRecord(ui64 offset, const TString& data) + : TEvWorker::TEvData::TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 42) + {} +}; + +} diff --git a/ydb/core/tx/replication/service/table_writer_ut.cpp b/ydb/core/tx/replication/service/table_writer_ut.cpp index ffac955bbe..362972f738 100644 --- a/ydb/core/tx/replication/service/table_writer_ut.cpp +++ b/ydb/core/tx/replication/service/table_writer_ut.cpp @@ -1,6 +1,6 @@ #include "service.h" #include "table_writer.h" -#include "worker.h" +#include "common_ut.h" #include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h> #include <ydb/core/tx/replication/ut_helpers/test_env.h> @@ -16,7 +16,6 @@ namespace NKikimr::NReplication::NService { Y_UNIT_TEST_SUITE(LocalTableWriter) { using namespace NTestHelpers; - using TRecord = TEvWorker::TEvData::TRecord; Y_UNIT_TEST(WriteTable) { TEnv env; @@ -35,7 +34,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table"))); env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake()); - env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", { + env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(1, R"({"key":[1], "update":{"value":"10"}})"), TRecord(2, R"({"key":[2], "update":{"value":"20"}})"), TRecord(3, R"({"key":[3], "update":{"value":"30"}})"), @@ -92,7 +91,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table"))); env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake()); - env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", { + env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(1, R"({"key":[1], "update":{"int32_value":-100500}})"), TRecord(2, R"({"key":[2], "update":{"uint32_value":100500}})"), TRecord(3, R"({"key":[3], "update":{"int64_value":-200500}})"), @@ -143,7 +142,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table"))); env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake()); - env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", { + env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(1, R"({"key":["1.0"], "update":{"value":"155555555555555.321"}})"), TRecord(2, R"({"key":["2.0"], "update":{"value":"255555555555555.321"}})"), TRecord(3, R"({"key":["3.0"], "update":{"value":"355555555555555.321"}})"), @@ -184,7 +183,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { ui64 order = 1; { - auto ev = env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", { + auto ev = env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0,"TestSource", { TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"), TRecord(order++, R"({"key":[3], "update":{"value":"30"}, "ts":[3,0]})"), @@ -203,14 +202,14 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { })); } { - auto ev = env.Send<TEvService::TEvHeartbeat>(writer, new TEvWorker::TEvData("TestSource", { + auto ev = env.Send<TEvService::TEvHeartbeat>(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(order++, R"({"resolved":[10,0]})"), })); UNIT_ASSERT_VALUES_EQUAL(TRowVersion::FromProto(ev->Get()->Record.GetVersion()), TRowVersion(10, 0)); env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender()); } - env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", { + env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[11,0]})"), TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[12,0]})"), TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[21,0]})"), @@ -222,12 +221,12 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { {TRowVersion(30, 0), 3}, })); - env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", { + env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[13,0]})"), TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[23,0]})"), })); - env.Send<TEvService::TEvHeartbeat>(writer, new TEvWorker::TEvData("TestSource", { + env.Send<TEvService::TEvHeartbeat>(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(order++, R"({"resolved":[30,0]})"), })); env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender()); @@ -294,7 +293,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { auto worker = env.GetRuntime().Register(new TMockWorker(writer, env.GetSender())); env.Send<TEvWorker::TEvHandshake>(worker, new TEvWorker::TEvHandshake()); - env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", { + env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), TRecord(2, R"({"key":[2], "update":{"value":"20"}, "ts":[11,0]})"), })); @@ -377,7 +376,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { auto worker = env.GetRuntime().Register(new TMockWorker(writer, env.GetSender())); env.Send<TEvWorker::TEvHandshake>(worker, new TEvWorker::TEvHandshake()); - env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", { + env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), TRecord(2, R"({"resolved":[10,0]})"), })); @@ -407,14 +406,14 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table"), EWriteMode::Consistent)); env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake()); - env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", { + env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), })); env.Send<TEvWorker::TEvPoll>(writer, MakeTxIdResult({ {TRowVersion(10, 0), 1}, })); - env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", { + env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(2, R"({"key":[3], "update":{"value":"30"}, "ts":[11,0]})"), TRecord(3, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"), TRecord(4, R"({"resolved":[20,0]})"), diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp index 594c1033c6..0e14f9324d 100644 --- a/ydb/core/tx/replication/service/topic_reader.cpp +++ b/ydb/core/tx/replication/service/topic_reader.cpp @@ -58,10 +58,10 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> { for (auto& msg : result.Messages) { Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW); - records.emplace_back(msg.GetOffset(), std::move(msg.GetData()), msg.GetCreateTime()); + records.emplace_back(msg.GetOffset(), std::move(msg.GetData()), msg.GetCreateTime(), std::move(msg.GetMessageGroupId()), std::move(msg.GetProducerId()), msg.GetSeqNo()); } - Send(Worker, new TEvWorker::TEvData(ToString(result.PartitionId), std::move(records))); + Send(Worker, new TEvWorker::TEvData(result.PartitionId, ToString(result.PartitionId), std::move(records))); } void Handle(TEvYdbProxy::TEvTopicEndPartition::TPtr& ev) { diff --git a/ydb/core/tx/replication/service/transfer_writer.cpp b/ydb/core/tx/replication/service/transfer_writer.cpp index bb2b193e4f..c6c97c42a4 100644 --- a/ydb/core/tx/replication/service/transfer_writer.cpp +++ b/ydb/core/tx/replication/service/transfer_writer.cpp @@ -551,7 +551,7 @@ private: } if (PendingRecords) { - ProcessData(*PendingRecords); + ProcessData(PendingPartitionId, *PendingRecords); PendingRecords.reset(); } } @@ -584,15 +584,16 @@ private: void HoldHandle(TEvWorker::TEvData::TPtr& ev) { Y_ABORT_UNLESS(!PendingRecords); + PendingPartitionId = ev->Get()->PartitionId; PendingRecords = std::move(ev->Get()->Records); } void Handle(TEvWorker::TEvData::TPtr& ev) { LOG_D("Handle TEvData " << ev->Get()->ToString()); - ProcessData(ev->Get()->Records); + ProcessData(ev->Get()->PartitionId, ev->Get()->Records); } - void ProcessData(const TVector<TEvWorker::TEvData::TRecord>& records) { + void ProcessData(const ui32 partitionId, const TVector<TEvWorker::TEvData::TRecord>& records) { if (!records) { Send(Worker, new TEvWorker::TEvGone(TEvWorker::TEvGone::DONE)); return; @@ -601,17 +602,29 @@ private: TableState->EnshureDataBatch(); for (auto& message : records) { - NYdb::NTopic::NPurecalc::TMessage input(message.Data); - input.WithOffset(message.Offset); - - auto result = ProgramHolder->GetProgram()->Apply(NYql::NPureCalc::StreamFromVector(TVector{input})); - while (auto* m = result->Fetch()) { - TableState->AddData(m->Data); + NYdb::NTopic::NPurecalc::TMessage input; + input.Data = std::move(message.Data); + input.MessageGroupId = std::move(message.MessageGroupId); + input.Partition = partitionId; + input.ProducerId = std::move(message.ProducerId); + input.Offset = message.Offset; + input.SeqNo = message.SeqNo; + + try { + auto result = ProgramHolder->GetProgram()->Apply(NYql::NPureCalc::StreamFromVector(TVector{input})); + while (auto* m = result->Fetch()) { + TableState->AddData(m->Data); + } + } catch (const yexception& e) { + ProcessingError = TStringBuilder() << "Error transform message: '" << message.Data << "': " << e.what(); + break; } } if (TableState->Flush()) { Become(&TThis::StateWrite); + } else if (ProcessingError) { + LogCritAndLeave(*ProcessingError); } } @@ -636,6 +649,10 @@ private: return LogCritAndLeave(error); } + if (ProcessingError) { + return LogCritAndLeave(*ProcessingError); + } + Send(Worker, new TEvWorker::TEvPoll()); return StartWork(); } @@ -709,8 +726,10 @@ private: TProgramHolder::TPtr ProgramHolder; mutable TMaybe<TString> LogPrefix; + mutable TMaybe<TString> ProcessingError; std::optional<TActorId> PendingWorker; + ui32 PendingPartitionId = 0; std::optional<TVector<TEvWorker::TEvData::TRecord>> PendingRecords; ui32 Attempt = 0; diff --git a/ydb/core/tx/replication/service/transfer_writer_ut.cpp b/ydb/core/tx/replication/service/transfer_writer_ut.cpp index d617bdc3d0..0085181952 100644 --- a/ydb/core/tx/replication/service/transfer_writer_ut.cpp +++ b/ydb/core/tx/replication/service/transfer_writer_ut.cpp @@ -1,6 +1,6 @@ #include "service.h" #include "transfer_writer.h" -#include "worker.h" +#include "common_ut.h" #include <ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.h> #include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h> @@ -17,7 +17,6 @@ namespace NKikimr::NReplication::NService { Y_UNIT_TEST_SUITE(TransferWriter) { using namespace NTestHelpers; - using TRecord = TEvWorker::TEvData::TRecord; Y_UNIT_TEST(Write_ColumnTable) { TEnv env; @@ -51,7 +50,7 @@ Y_UNIT_TEST_SUITE(TransferWriter) { auto writer = env.GetRuntime().Register(CreateTransferWriter(lambda, tablePathId, compiler)); env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake()); - env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", { + env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(1, R"({"key":[1], "update":{"value":"10"}})"), TRecord(2, R"({"key":[2], "update":{"value":"20"}})"), TRecord(3, R"({"key":[3], "update":{"value":"30"}})"), diff --git a/ydb/core/tx/replication/service/worker.cpp b/ydb/core/tx/replication/service/worker.cpp index 66ff8d1f83..a156b748f6 100644 --- a/ydb/core/tx/replication/service/worker.cpp +++ b/ydb/core/tx/replication/service/worker.cpp @@ -13,28 +13,36 @@ namespace NKikimr::NReplication::NService { -TEvWorker::TEvData::TRecord::TRecord(ui64 offset, const TString& data, TInstant createTime) +TEvWorker::TEvData::TRecord::TRecord(ui64 offset, const TString& data, TInstant createTime, const TString& messageGroupId, const TString& producerId, ui64 seqNo) : Offset(offset) , Data(data) , CreateTime(createTime) + , MessageGroupId(messageGroupId) + , ProducerId(producerId) + , SeqNo(seqNo) { } -TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data, TInstant createTime) +TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data, TInstant createTime, TString&& messageGroupId, TString&& producerId, ui64 seqNo) : Offset(offset) , Data(std::move(data)) , CreateTime(createTime) + , MessageGroupId(std::move(messageGroupId)) + , ProducerId(std::move(producerId)) + , SeqNo(seqNo) { } -TEvWorker::TEvData::TEvData(const TString& source, const TVector<TRecord>& records) - : Source(source) +TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, const TVector<TRecord>& records) + : PartitionId(partitionId) + , Source(source) , Records(records) { } -TEvWorker::TEvData::TEvData(const TString& source, TVector<TRecord>&& records) - : Source(source) +TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, TVector<TRecord>&& records) + : PartitionId(partitionId) + , Source(source) , Records(std::move(records)) { } @@ -160,7 +168,7 @@ class TWorker: public TActorBootstrapped<TWorker> { Writer.Registered(); if (InFlightData) { - Send(Writer, new TEvWorker::TEvData(InFlightData->Source, InFlightData->Records)); + Send(Writer, new TEvWorker::TEvData(InFlightData->PartitionId, InFlightData->Source, InFlightData->Records)); } } else { LOG_W("Handshake from unknown actor" @@ -205,7 +213,7 @@ class TWorker: public TActorBootstrapped<TWorker> { } Y_ABORT_UNLESS(!InFlightData); - InFlightData = MakeHolder<TEvWorker::TEvData>(ev->Get()->Source, ev->Get()->Records); + InFlightData = MakeHolder<TEvWorker::TEvData>(ev->Get()->PartitionId, ev->Get()->Source, ev->Get()->Records); if (Writer) { Send(ev->Forward(Writer)); diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h index 6246eb1099..e03f3b24dc 100644 --- a/ydb/core/tx/replication/service/worker.h +++ b/ydb/core/tx/replication/service/worker.h @@ -34,17 +34,21 @@ struct TEvWorker { ui64 Offset; TString Data; TInstant CreateTime; + TString MessageGroupId; + TString ProducerId; + ui64 SeqNo; - explicit TRecord(ui64 offset, const TString& data, TInstant createTime = TInstant::Zero()); - explicit TRecord(ui64 offset, TString&& data, TInstant createTime = TInstant::Zero()); + explicit TRecord(ui64 offset, const TString& data, TInstant createTime, const TString& messageGroupId, const TString& producerId, ui64 seqNo); + explicit TRecord(ui64 offset, TString&& data, TInstant createTime, TString&& messageGroupId, TString&& producerId, ui64 seqNo); void Out(IOutputStream& out) const; }; + ui32 PartitionId; TString Source; TVector<TRecord> Records; - explicit TEvData(const TString& source, const TVector<TRecord>& records); - explicit TEvData(const TString& source, TVector<TRecord>&& records); + explicit TEvData(ui32 partitionId, const TString& source, const TVector<TRecord>& records); + explicit TEvData(ui32 partitionId, const TString& source, TVector<TRecord>&& records); TString ToString() const override; }; diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h index 6a5801003c..ec0d8bca62 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h @@ -167,6 +167,9 @@ struct TEvYdbProxy { , Data(msg.GetData()) , CreateTime(msg.GetCreateTime()) , Codec(codec) + , MessageGroupId(msg.GetMessageGroupId()) + , ProducerId(msg.GetProducerId()) + , SeqNo(msg.GetSeqNo()) { } @@ -186,6 +189,9 @@ struct TEvYdbProxy { TString& GetData() { return Data; } TInstant GetCreateTime() const { return CreateTime; } ECodec GetCodec() const { return Codec; } + TString& GetMessageGroupId() { return MessageGroupId; } + TString& GetProducerId() { return ProducerId; } + ui64 GetSeqNo() { return SeqNo; } void Out(IOutputStream& out) const; private: @@ -193,6 +199,9 @@ struct TEvYdbProxy { TString Data; TInstant CreateTime; ECodec Codec; + TString MessageGroupId; + TString ProducerId; + ui64 SeqNo; }; explicit TReadTopicResult(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& event) { diff --git a/ydb/tests/functional/transfer/main.cpp b/ydb/tests/functional/transfer/main.cpp index 9ea3b36bb7..6a96711a1a 100644 --- a/ydb/tests/functional/transfer/main.cpp +++ b/ydb/tests/functional/transfer/main.cpp @@ -43,6 +43,11 @@ bool Checker<bool>::Get(const ::Ydb::Value& value) { } template<> +ui32 Checker<ui32>::Get(const ::Ydb::Value& value) { + return value.uint32_value(); +} + +template<> ui64 Checker<ui64>::Get(const ::Ydb::Value& value) { return value.uint64_value(); } @@ -70,10 +75,48 @@ std::pair<TString, std::shared_ptr<IChecker>> _C(TString&& name, T&& expected) { }; } +struct TMessage { + TString Message; + std::optional<ui32> Partition = std::nullopt; + std::optional<TString> ProducerId = std::nullopt; + std::optional<TString> MessageGroupId = std::nullopt; + std::optional<ui64> SeqNo = std::nullopt; +}; + +TMessage _withSeqNo(ui64 seqNo) { + return { + .Message = TStringBuilder() << "Message-" << seqNo, + .Partition = 0, + .ProducerId = std::nullopt, + .MessageGroupId = std::nullopt, + .SeqNo = seqNo + }; +} + +TMessage _withProducerId(const TString& producerId) { + return { + .Message = TStringBuilder() << "Message-" << producerId, + .Partition = 0, + .ProducerId = producerId, + .MessageGroupId = std::nullopt, + .SeqNo = std::nullopt + }; +} + +TMessage _withMessageGroupId(const TString& messageGroupId) { + return { + .Message = TStringBuilder() << "Message-" << messageGroupId, + .Partition = 0, + .ProducerId = messageGroupId, + .MessageGroupId = messageGroupId, + .SeqNo = std::nullopt + }; +} + struct TConfig { const char* TableDDL; const char* Lambda; - const char* Message; + const TVector<TMessage> Messages; TVector<std::pair<TString, std::shared_ptr<IChecker>>> Expectations; }; @@ -106,7 +149,10 @@ struct MainTestCase { { auto res = session.ExecuteQuery(Sprintf(R"( - CREATE TOPIC `%s`; + CREATE TOPIC `%s` + WITH ( + min_active_partitions = 10 + ); )", TopicName.data()), TTxControl::NoTx()).GetValueSync(); UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); } @@ -126,20 +172,31 @@ struct MainTestCase { } { - TWriteSessionSettings writeSettings; - writeSettings.Path(TopicName); - writeSettings.DeduplicationEnabled(false); - auto writeSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings); + for (const auto& m : Config.Messages) { + TWriteSessionSettings writeSettings; + writeSettings.Path(TopicName); + writeSettings.DeduplicationEnabled(m.SeqNo); + if (m.Partition) { + writeSettings.PartitionId(m.Partition); + } + if (m.ProducerId) { + writeSettings.ProducerId(*m.ProducerId); + } + if (m.MessageGroupId) { + writeSettings.MessageGroupId(*m.MessageGroupId); + } + auto writeSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings); - UNIT_ASSERT(writeSession->Write(Config.Message)); - writeSession->Close(TDuration::Seconds(1)); + UNIT_ASSERT(writeSession->Write(m.Message, m.SeqNo)); + writeSession->Close(TDuration::Seconds(1)); + } } { for (size_t attempt = 20; attempt--; ) { auto res = DoRead(session); Cerr << "Attempt=" << attempt << " count=" << res.first << Endl << Flush; - if (res.first == 1) { + if (res.first == Config.Messages.size()) { const Ydb::ResultSet& proto = res.second; for (size_t i = 0; i < Config.Expectations.size(); ++i) { auto& c = Config.Expectations[i]; @@ -214,7 +271,7 @@ Y_UNIT_TEST_SUITE(Transfer) }; )", - .Message = "Message-1", + .Messages = {{"Message-1"}}, .Expectations = { _C("Key", ui64(0)), @@ -247,7 +304,7 @@ Y_UNIT_TEST_SUITE(Transfer) }; )", - .Message = "Message-1", + .Messages = {{"Message-1"}}, .Expectations = { _C("Key", ui64(0)), @@ -256,6 +313,51 @@ Y_UNIT_TEST_SUITE(Transfer) }).Run(); } + Y_UNIT_TEST(Main_ColumnTable_ComplexKey) + { + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + Key1 Uint64 NOT NULL, + Key3 Uint64 NOT NULL, + Value1 Utf8, + Key2 Uint64 NOT NULL, + Value2 Utf8, + Key4 Uint64 NOT NULL, + PRIMARY KEY (Key3, Key2, Key1, Key4) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + return [ + <| + Key1:CAST(1 AS Uint64), + Key2:CAST(2 AS Uint64), + Value2:CAST("value-2" AS Utf8), + Key4:CAST(4 AS Uint64), + Key3:CAST(3 AS Uint64), + Value1:CAST("value-1" AS Utf8), + |> + ]; + }; + )", + + .Messages = {{"Message-1"}}, + + .Expectations = { + _C("Key1", ui64(1)), + _C("Key2", ui64(2)), + _C("Key3", ui64(3)), + _C("Key4", ui64(4)), + _C("Value1", TString("value-1")), + _C("Value2", TString("value-2")), + } + }).Run(); + } + Y_UNIT_TEST(Main_ColumnTable_JsonMessage) { MainTestCase({ @@ -286,12 +388,12 @@ Y_UNIT_TEST_SUITE(Transfer) }; )", - .Message = R"({ + .Messages = {{R"({ "id": 1, "first_name": "Vasya", "last_name": "Pupkin", "salary": "123" - })", + })"}}, .Expectations = { _C("Id", ui64(1)), @@ -326,7 +428,7 @@ Y_UNIT_TEST_SUITE(Transfer) }; )", - .Message = "Message-1", + .Messages = {{"Message-1"}}, .Expectations = { _C("Key", ui64(0)), @@ -359,7 +461,7 @@ Y_UNIT_TEST_SUITE(Transfer) }; )", - .Message = "2025-02-21", + .Messages = {{"2025-02-21"}}, .Expectations = { _C("Key", ui64(0)), @@ -392,7 +494,7 @@ Y_UNIT_TEST_SUITE(Transfer) }; )", - .Message = "1.23", + .Messages = {{"1.23"}}, .Expectations = { _C("Key", ui64(0)), @@ -425,7 +527,7 @@ Y_UNIT_TEST_SUITE(Transfer) }; )", - .Message = "Message-1 long value 0 1234567890 1 1234567890 2 1234567890 3 1234567890 4 1234567890 5 1234567890 6 1234567890", + .Messages = {{"Message-1 long value 0 1234567890 1 1234567890 2 1234567890 3 1234567890 4 1234567890 5 1234567890 6 1234567890"}}, .Expectations = { _C("Key", ui64(0)), @@ -434,5 +536,134 @@ Y_UNIT_TEST_SUITE(Transfer) }).Run(); } + Y_UNIT_TEST(Main_MessageField_Partition) + { + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + Partition Uint32 NOT NULL, + Message Utf8, + PRIMARY KEY (Partition) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + return [ + <| + Partition:CAST($x._partition AS Uint32), + Message:CAST($x._data AS Utf8) + |> + ]; + }; + )", + + .Messages = {{"Message-1", 7}}, + + .Expectations = { + _C("Partition", ui32(7)), + _C("Message", TString("Message-1")), + } + }).Run(); + } + + Y_UNIT_TEST(Main_MessageField_SeqNo) + { + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + SeqNo Uint64 NOT NULL, + Message Utf8, + PRIMARY KEY (SeqNo) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + return [ + <| + SeqNo:CAST($x._seq_no AS Uint32), + Message:CAST($x._data AS Utf8) + |> + ]; + }; + )", + + .Messages = {_withSeqNo(13)}, + + .Expectations = { + _C("SeqNo", ui64(13)), + } + }).Run(); + } + + Y_UNIT_TEST(Main_MessageField_ProducerId) + { + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + Offset Uint64 NOT NULL, + ProducerId Utf8, + PRIMARY KEY (Offset) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + return [ + <| + Offset:CAST($x._offset AS Uint64), + ProducerId:CAST($x._producer_id AS Utf8) + |> + ]; + }; + )", + + .Messages = {_withProducerId("Producer-13")}, + + .Expectations = { + _C("ProducerId", TString("Producer-13")), + } + }).Run(); + } + + Y_UNIT_TEST(Main_MessageField_MessageGroupId) + { + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + Offset Uint64 NOT NULL, + MessageGroupId Utf8, + PRIMARY KEY (Offset) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + return [ + <| + Offset:CAST($x._offset AS Uint64), + MessageGroupId:CAST($x._message_group_id AS Utf8) + |> + ]; + }; + )", + + .Messages = {_withMessageGroupId("MessageGroupId-13")}, + + .Expectations = { + _C("MessageGroupId", TString("MessageGroupId-13")), + } + }).Run(); + } + } |