aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2025-02-24 18:37:49 +0500
committerGitHub <noreply@github.com>2025-02-24 13:37:49 +0000
commitf1dcfdb2034c068ba69256784aebdc63b765ad01 (patch)
treeb564021d68eb4ce9a1ed74c5e3b3d1956fe6aeef
parentcb5d0e3b009025201c8fbc9f90e6960765da9bc5 (diff)
downloadydb-f1dcfdb2034c068ba69256784aebdc63b765ad01.tar.gz
Expand the list of message fields available for transfering from a topic to a table (#14941)
-rw-r--r--ydb/core/backup/impl/local_partition_reader.cpp4
-rw-r--r--ydb/core/persqueue/purecalc/purecalc.cpp52
-rw-r--r--ydb/core/persqueue/purecalc/purecalc.h15
-rw-r--r--ydb/core/tx/replication/service/base_table_writer.cpp5
-rw-r--r--ydb/core/tx/replication/service/common_ut.h13
-rw-r--r--ydb/core/tx/replication/service/table_writer_ut.cpp27
-rw-r--r--ydb/core/tx/replication/service/topic_reader.cpp4
-rw-r--r--ydb/core/tx/replication/service/transfer_writer.cpp37
-rw-r--r--ydb/core/tx/replication/service/transfer_writer_ut.cpp5
-rw-r--r--ydb/core/tx/replication/service/worker.cpp24
-rw-r--r--ydb/core/tx/replication/service/worker.h12
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ydb_proxy.h9
-rw-r--r--ydb/tests/functional/transfer/main.cpp265
13 files changed, 382 insertions, 90 deletions
diff --git a/ydb/core/backup/impl/local_partition_reader.cpp b/ydb/core/backup/impl/local_partition_reader.cpp
index b7a476693a..2a43bb8741 100644
--- a/ydb/core/backup/impl/local_partition_reader.cpp
+++ b/ydb/core/backup/impl/local_partition_reader.cpp
@@ -135,11 +135,11 @@ private:
for (auto& result : readResult.GetResult()) {
gotOffset = std::max(gotOffset, result.GetOffset());
- records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData());
+ records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData(), TInstant::MilliSeconds(result.GetCreateTimestampMS()), result.GetSourceId(), result.GetSourceId(), result.GetSeqNo());
}
SentOffset = gotOffset + 1;
- Send(Worker, new TEvWorker::TEvData(ToString(Partition), std::move(records)));
+ Send(Worker, new TEvWorker::TEvData(Partition, ToString(Partition), std::move(records)));
}
void Leave(TEvWorker::TEvGone::EStatus status) {
diff --git a/ydb/core/persqueue/purecalc/purecalc.cpp b/ydb/core/persqueue/purecalc/purecalc.cpp
index 018483c9e4..be27c6d34f 100644
--- a/ydb/core/persqueue/purecalc/purecalc.cpp
+++ b/ydb/core/persqueue/purecalc/purecalc.cpp
@@ -13,14 +13,13 @@ using namespace NYql::NUdf;
using namespace NKikimr::NMiniKQL;
constexpr const char* DataFieldName = "_data";
+constexpr const char* MessageGroupIdFieldName = "_message_group_id";
constexpr const char* OffsetFieldName = "_offset";
+constexpr const char* PartitionFieldName = "_partition";
+constexpr const char* ProducerIdFieldName = "_producer_id";
+constexpr const char* SeqNoFieldName = "_seq_no";
-constexpr const size_t FieldCount = 2; // Change it when change fields
-
-struct FieldPositions {
- ui64 Data = 0;
- ui64 Offset = 0;
-};
+constexpr const size_t FieldCount = 6; // Change it when change fields
NYT::TNode CreateTypeNode(const TString& fieldType) {
@@ -41,7 +40,11 @@ void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldTy
NYT::TNode CreateMessageScheme() {
auto structMembers = NYT::TNode::CreateList();
AddField(structMembers, DataFieldName, "String");
+ AddField(structMembers, MessageGroupIdFieldName, "String");
AddField(structMembers, OffsetFieldName, "Uint64");
+ AddField(structMembers, PartitionFieldName, "Uint32");
+ AddField(structMembers, ProducerIdFieldName, "String");
+ AddField(structMembers, SeqNoFieldName, "Uint64");
return NYT::TNode::CreateList()
.Add("StructType")
@@ -57,32 +60,36 @@ struct TMessageWrapper {
return NKikimr::NMiniKQL::MakeString(Message.Data);
}
+ NYql::NUdf::TUnboxedValuePod GetMessageGroupId() const {
+ return NKikimr::NMiniKQL::MakeString(Message.MessageGroupId);
+ }
+
NYql::NUdf::TUnboxedValuePod GetOffset() const {
return NYql::NUdf::TUnboxedValuePod(Message.Offset);
}
+
+ NYql::NUdf::TUnboxedValuePod GetPartition() const {
+ return NYql::NUdf::TUnboxedValuePod(Message.Partition);
+ }
+
+ NYql::NUdf::TUnboxedValuePod GetProducerId() const {
+ return NKikimr::NMiniKQL::MakeString(Message.ProducerId);
+ }
+
+ NYql::NUdf::TUnboxedValuePod GetSeqNo() const {
+ return NYql::NUdf::TUnboxedValuePod(Message.SeqNo);
+ }
};
class TInputConverter {
protected:
IWorker* Worker_;
TPlainContainerCache Cache_;
- FieldPositions Position;
public:
explicit TInputConverter(IWorker* worker)
: Worker_(worker)
{
- const TStructType* structType = worker->GetInputType();
- const ui64 count = structType->GetMembersCount();
-
- for (ui64 i = 0; i < count; ++i) {
- const auto name = structType->GetMemberName(i);
- if (name == DataFieldName) {
- Position.Data = i;
- } else if (name == OffsetFieldName) {
- Position.Offset = i;
- }
- }
}
public:
@@ -92,8 +99,13 @@ public:
result = Cache_.NewArray(holderFactory, static_cast<ui32>(FieldCount), items);
TMessageWrapper wrap {*message};
- items[Position.Data] = wrap.GetData();
- items[Position.Offset] = wrap.GetOffset();
+ // lex order by field name
+ items[0] = wrap.GetData();
+ items[1] = wrap.GetMessageGroupId();
+ items[2] = wrap.GetOffset();
+ items[3] = wrap.GetPartition();
+ items[4] = wrap.GetProducerId();
+ items[5] = wrap.GetSeqNo();
}
void ClearCache() {
diff --git a/ydb/core/persqueue/purecalc/purecalc.h b/ydb/core/persqueue/purecalc/purecalc.h
index 90fe1a21e7..57fdff5e33 100644
--- a/ydb/core/persqueue/purecalc/purecalc.h
+++ b/ydb/core/persqueue/purecalc/purecalc.h
@@ -7,17 +7,12 @@ namespace NYdb::NTopic::NPurecalc {
using namespace NYql::NPureCalc;
struct TMessage {
- TMessage(const TString& data)
- : Data(data) {
- }
-
- TMessage& WithOffset(ui64 offset) {
- Offset = offset;
- return *this;
- }
-
- const TString& Data;
+ TString Data;
+ TString MessageGroupId;
ui64 Offset = 0;
+ ui32 Partition = 0;
+ TString ProducerId;
+ ui64 SeqNo = 0;
};
class TMessageInputSpec: public TInputSpecBase {
diff --git a/ydb/core/tx/replication/service/base_table_writer.cpp b/ydb/core/tx/replication/service/base_table_writer.cpp
index 7c6c57dfc6..b2bcd0ef5a 100644
--- a/ydb/core/tx/replication/service/base_table_writer.cpp
+++ b/ydb/core/tx/replication/service/base_table_writer.cpp
@@ -433,7 +433,10 @@ class TLocalTableWriter
TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> records(::Reserve(ev->Get()->Records.size()));
TSet<TRowVersion> versionsWithoutTxId;
- for (auto& [offset, data, _] : ev->Get()->Records) {
+ for (auto& r : ev->Get()->Records) {
+ auto offset = r.Offset;
+ auto& data = r.Data;
+
auto record = Parser->Parse(ev->Get()->Source, offset, std::move(data));
if (Mode == EWriteMode::Consistent) {
diff --git a/ydb/core/tx/replication/service/common_ut.h b/ydb/core/tx/replication/service/common_ut.h
new file mode 100644
index 0000000000..6d833760f6
--- /dev/null
+++ b/ydb/core/tx/replication/service/common_ut.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include "worker.h"
+
+namespace NKikimr::NReplication::NService {
+
+struct TRecord: public TEvWorker::TEvData::TRecord {
+ explicit TRecord(ui64 offset, const TString& data)
+ : TEvWorker::TEvData::TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 42)
+ {}
+};
+
+}
diff --git a/ydb/core/tx/replication/service/table_writer_ut.cpp b/ydb/core/tx/replication/service/table_writer_ut.cpp
index ffac955bbe..362972f738 100644
--- a/ydb/core/tx/replication/service/table_writer_ut.cpp
+++ b/ydb/core/tx/replication/service/table_writer_ut.cpp
@@ -1,6 +1,6 @@
#include "service.h"
#include "table_writer.h"
-#include "worker.h"
+#include "common_ut.h"
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
#include <ydb/core/tx/replication/ut_helpers/test_env.h>
@@ -16,7 +16,6 @@ namespace NKikimr::NReplication::NService {
Y_UNIT_TEST_SUITE(LocalTableWriter) {
using namespace NTestHelpers;
- using TRecord = TEvWorker::TEvData::TRecord;
Y_UNIT_TEST(WriteTable) {
TEnv env;
@@ -35,7 +34,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table")));
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());
- env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", {
+ env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", {
TRecord(1, R"({"key":[1], "update":{"value":"10"}})"),
TRecord(2, R"({"key":[2], "update":{"value":"20"}})"),
TRecord(3, R"({"key":[3], "update":{"value":"30"}})"),
@@ -92,7 +91,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table")));
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());
- env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", {
+ env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", {
TRecord(1, R"({"key":[1], "update":{"int32_value":-100500}})"),
TRecord(2, R"({"key":[2], "update":{"uint32_value":100500}})"),
TRecord(3, R"({"key":[3], "update":{"int64_value":-200500}})"),
@@ -143,7 +142,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table")));
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());
- env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", {
+ env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", {
TRecord(1, R"({"key":["1.0"], "update":{"value":"155555555555555.321"}})"),
TRecord(2, R"({"key":["2.0"], "update":{"value":"255555555555555.321"}})"),
TRecord(3, R"({"key":["3.0"], "update":{"value":"355555555555555.321"}})"),
@@ -184,7 +183,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
ui64 order = 1;
{
- auto ev = env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
+ auto ev = env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0,"TestSource", {
TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"),
TRecord(order++, R"({"key":[3], "update":{"value":"30"}, "ts":[3,0]})"),
@@ -203,14 +202,14 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
}));
}
{
- auto ev = env.Send<TEvService::TEvHeartbeat>(writer, new TEvWorker::TEvData("TestSource", {
+ auto ev = env.Send<TEvService::TEvHeartbeat>(writer, new TEvWorker::TEvData(0, "TestSource", {
TRecord(order++, R"({"resolved":[10,0]})"),
}));
UNIT_ASSERT_VALUES_EQUAL(TRowVersion::FromProto(ev->Get()->Record.GetVersion()), TRowVersion(10, 0));
env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender());
}
- env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
+ env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", {
TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[11,0]})"),
TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[12,0]})"),
TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[21,0]})"),
@@ -222,12 +221,12 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
{TRowVersion(30, 0), 3},
}));
- env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", {
+ env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", {
TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[13,0]})"),
TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[23,0]})"),
}));
- env.Send<TEvService::TEvHeartbeat>(writer, new TEvWorker::TEvData("TestSource", {
+ env.Send<TEvService::TEvHeartbeat>(writer, new TEvWorker::TEvData(0, "TestSource", {
TRecord(order++, R"({"resolved":[30,0]})"),
}));
env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender());
@@ -294,7 +293,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
auto worker = env.GetRuntime().Register(new TMockWorker(writer, env.GetSender()));
env.Send<TEvWorker::TEvHandshake>(worker, new TEvWorker::TEvHandshake());
- env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
+ env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", {
TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
TRecord(2, R"({"key":[2], "update":{"value":"20"}, "ts":[11,0]})"),
}));
@@ -377,7 +376,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
auto worker = env.GetRuntime().Register(new TMockWorker(writer, env.GetSender()));
env.Send<TEvWorker::TEvHandshake>(worker, new TEvWorker::TEvHandshake());
- env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
+ env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", {
TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
TRecord(2, R"({"resolved":[10,0]})"),
}));
@@ -407,14 +406,14 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table"), EWriteMode::Consistent));
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());
- env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
+ env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", {
TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
}));
env.Send<TEvWorker::TEvPoll>(writer, MakeTxIdResult({
{TRowVersion(10, 0), 1},
}));
- env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
+ env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", {
TRecord(2, R"({"key":[3], "update":{"value":"30"}, "ts":[11,0]})"),
TRecord(3, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"),
TRecord(4, R"({"resolved":[20,0]})"),
diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp
index 594c1033c6..0e14f9324d 100644
--- a/ydb/core/tx/replication/service/topic_reader.cpp
+++ b/ydb/core/tx/replication/service/topic_reader.cpp
@@ -58,10 +58,10 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
for (auto& msg : result.Messages) {
Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW);
- records.emplace_back(msg.GetOffset(), std::move(msg.GetData()), msg.GetCreateTime());
+ records.emplace_back(msg.GetOffset(), std::move(msg.GetData()), msg.GetCreateTime(), std::move(msg.GetMessageGroupId()), std::move(msg.GetProducerId()), msg.GetSeqNo());
}
- Send(Worker, new TEvWorker::TEvData(ToString(result.PartitionId), std::move(records)));
+ Send(Worker, new TEvWorker::TEvData(result.PartitionId, ToString(result.PartitionId), std::move(records)));
}
void Handle(TEvYdbProxy::TEvTopicEndPartition::TPtr& ev) {
diff --git a/ydb/core/tx/replication/service/transfer_writer.cpp b/ydb/core/tx/replication/service/transfer_writer.cpp
index bb2b193e4f..c6c97c42a4 100644
--- a/ydb/core/tx/replication/service/transfer_writer.cpp
+++ b/ydb/core/tx/replication/service/transfer_writer.cpp
@@ -551,7 +551,7 @@ private:
}
if (PendingRecords) {
- ProcessData(*PendingRecords);
+ ProcessData(PendingPartitionId, *PendingRecords);
PendingRecords.reset();
}
}
@@ -584,15 +584,16 @@ private:
void HoldHandle(TEvWorker::TEvData::TPtr& ev) {
Y_ABORT_UNLESS(!PendingRecords);
+ PendingPartitionId = ev->Get()->PartitionId;
PendingRecords = std::move(ev->Get()->Records);
}
void Handle(TEvWorker::TEvData::TPtr& ev) {
LOG_D("Handle TEvData " << ev->Get()->ToString());
- ProcessData(ev->Get()->Records);
+ ProcessData(ev->Get()->PartitionId, ev->Get()->Records);
}
- void ProcessData(const TVector<TEvWorker::TEvData::TRecord>& records) {
+ void ProcessData(const ui32 partitionId, const TVector<TEvWorker::TEvData::TRecord>& records) {
if (!records) {
Send(Worker, new TEvWorker::TEvGone(TEvWorker::TEvGone::DONE));
return;
@@ -601,17 +602,29 @@ private:
TableState->EnshureDataBatch();
for (auto& message : records) {
- NYdb::NTopic::NPurecalc::TMessage input(message.Data);
- input.WithOffset(message.Offset);
-
- auto result = ProgramHolder->GetProgram()->Apply(NYql::NPureCalc::StreamFromVector(TVector{input}));
- while (auto* m = result->Fetch()) {
- TableState->AddData(m->Data);
+ NYdb::NTopic::NPurecalc::TMessage input;
+ input.Data = std::move(message.Data);
+ input.MessageGroupId = std::move(message.MessageGroupId);
+ input.Partition = partitionId;
+ input.ProducerId = std::move(message.ProducerId);
+ input.Offset = message.Offset;
+ input.SeqNo = message.SeqNo;
+
+ try {
+ auto result = ProgramHolder->GetProgram()->Apply(NYql::NPureCalc::StreamFromVector(TVector{input}));
+ while (auto* m = result->Fetch()) {
+ TableState->AddData(m->Data);
+ }
+ } catch (const yexception& e) {
+ ProcessingError = TStringBuilder() << "Error transform message: '" << message.Data << "': " << e.what();
+ break;
}
}
if (TableState->Flush()) {
Become(&TThis::StateWrite);
+ } else if (ProcessingError) {
+ LogCritAndLeave(*ProcessingError);
}
}
@@ -636,6 +649,10 @@ private:
return LogCritAndLeave(error);
}
+ if (ProcessingError) {
+ return LogCritAndLeave(*ProcessingError);
+ }
+
Send(Worker, new TEvWorker::TEvPoll());
return StartWork();
}
@@ -709,8 +726,10 @@ private:
TProgramHolder::TPtr ProgramHolder;
mutable TMaybe<TString> LogPrefix;
+ mutable TMaybe<TString> ProcessingError;
std::optional<TActorId> PendingWorker;
+ ui32 PendingPartitionId = 0;
std::optional<TVector<TEvWorker::TEvData::TRecord>> PendingRecords;
ui32 Attempt = 0;
diff --git a/ydb/core/tx/replication/service/transfer_writer_ut.cpp b/ydb/core/tx/replication/service/transfer_writer_ut.cpp
index d617bdc3d0..0085181952 100644
--- a/ydb/core/tx/replication/service/transfer_writer_ut.cpp
+++ b/ydb/core/tx/replication/service/transfer_writer_ut.cpp
@@ -1,6 +1,6 @@
#include "service.h"
#include "transfer_writer.h"
-#include "worker.h"
+#include "common_ut.h"
#include <ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.h>
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
@@ -17,7 +17,6 @@ namespace NKikimr::NReplication::NService {
Y_UNIT_TEST_SUITE(TransferWriter) {
using namespace NTestHelpers;
- using TRecord = TEvWorker::TEvData::TRecord;
Y_UNIT_TEST(Write_ColumnTable) {
TEnv env;
@@ -51,7 +50,7 @@ Y_UNIT_TEST_SUITE(TransferWriter) {
auto writer = env.GetRuntime().Register(CreateTransferWriter(lambda, tablePathId, compiler));
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());
- env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", {
+ env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", {
TRecord(1, R"({"key":[1], "update":{"value":"10"}})"),
TRecord(2, R"({"key":[2], "update":{"value":"20"}})"),
TRecord(3, R"({"key":[3], "update":{"value":"30"}})"),
diff --git a/ydb/core/tx/replication/service/worker.cpp b/ydb/core/tx/replication/service/worker.cpp
index 66ff8d1f83..a156b748f6 100644
--- a/ydb/core/tx/replication/service/worker.cpp
+++ b/ydb/core/tx/replication/service/worker.cpp
@@ -13,28 +13,36 @@
namespace NKikimr::NReplication::NService {
-TEvWorker::TEvData::TRecord::TRecord(ui64 offset, const TString& data, TInstant createTime)
+TEvWorker::TEvData::TRecord::TRecord(ui64 offset, const TString& data, TInstant createTime, const TString& messageGroupId, const TString& producerId, ui64 seqNo)
: Offset(offset)
, Data(data)
, CreateTime(createTime)
+ , MessageGroupId(messageGroupId)
+ , ProducerId(producerId)
+ , SeqNo(seqNo)
{
}
-TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data, TInstant createTime)
+TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data, TInstant createTime, TString&& messageGroupId, TString&& producerId, ui64 seqNo)
: Offset(offset)
, Data(std::move(data))
, CreateTime(createTime)
+ , MessageGroupId(std::move(messageGroupId))
+ , ProducerId(std::move(producerId))
+ , SeqNo(seqNo)
{
}
-TEvWorker::TEvData::TEvData(const TString& source, const TVector<TRecord>& records)
- : Source(source)
+TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, const TVector<TRecord>& records)
+ : PartitionId(partitionId)
+ , Source(source)
, Records(records)
{
}
-TEvWorker::TEvData::TEvData(const TString& source, TVector<TRecord>&& records)
- : Source(source)
+TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, TVector<TRecord>&& records)
+ : PartitionId(partitionId)
+ , Source(source)
, Records(std::move(records))
{
}
@@ -160,7 +168,7 @@ class TWorker: public TActorBootstrapped<TWorker> {
Writer.Registered();
if (InFlightData) {
- Send(Writer, new TEvWorker::TEvData(InFlightData->Source, InFlightData->Records));
+ Send(Writer, new TEvWorker::TEvData(InFlightData->PartitionId, InFlightData->Source, InFlightData->Records));
}
} else {
LOG_W("Handshake from unknown actor"
@@ -205,7 +213,7 @@ class TWorker: public TActorBootstrapped<TWorker> {
}
Y_ABORT_UNLESS(!InFlightData);
- InFlightData = MakeHolder<TEvWorker::TEvData>(ev->Get()->Source, ev->Get()->Records);
+ InFlightData = MakeHolder<TEvWorker::TEvData>(ev->Get()->PartitionId, ev->Get()->Source, ev->Get()->Records);
if (Writer) {
Send(ev->Forward(Writer));
diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h
index 6246eb1099..e03f3b24dc 100644
--- a/ydb/core/tx/replication/service/worker.h
+++ b/ydb/core/tx/replication/service/worker.h
@@ -34,17 +34,21 @@ struct TEvWorker {
ui64 Offset;
TString Data;
TInstant CreateTime;
+ TString MessageGroupId;
+ TString ProducerId;
+ ui64 SeqNo;
- explicit TRecord(ui64 offset, const TString& data, TInstant createTime = TInstant::Zero());
- explicit TRecord(ui64 offset, TString&& data, TInstant createTime = TInstant::Zero());
+ explicit TRecord(ui64 offset, const TString& data, TInstant createTime, const TString& messageGroupId, const TString& producerId, ui64 seqNo);
+ explicit TRecord(ui64 offset, TString&& data, TInstant createTime, TString&& messageGroupId, TString&& producerId, ui64 seqNo);
void Out(IOutputStream& out) const;
};
+ ui32 PartitionId;
TString Source;
TVector<TRecord> Records;
- explicit TEvData(const TString& source, const TVector<TRecord>& records);
- explicit TEvData(const TString& source, TVector<TRecord>&& records);
+ explicit TEvData(ui32 partitionId, const TString& source, const TVector<TRecord>& records);
+ explicit TEvData(ui32 partitionId, const TString& source, TVector<TRecord>&& records);
TString ToString() const override;
};
diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
index 6a5801003c..ec0d8bca62 100644
--- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
+++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
@@ -167,6 +167,9 @@ struct TEvYdbProxy {
, Data(msg.GetData())
, CreateTime(msg.GetCreateTime())
, Codec(codec)
+ , MessageGroupId(msg.GetMessageGroupId())
+ , ProducerId(msg.GetProducerId())
+ , SeqNo(msg.GetSeqNo())
{
}
@@ -186,6 +189,9 @@ struct TEvYdbProxy {
TString& GetData() { return Data; }
TInstant GetCreateTime() const { return CreateTime; }
ECodec GetCodec() const { return Codec; }
+ TString& GetMessageGroupId() { return MessageGroupId; }
+ TString& GetProducerId() { return ProducerId; }
+ ui64 GetSeqNo() { return SeqNo; }
void Out(IOutputStream& out) const;
private:
@@ -193,6 +199,9 @@ struct TEvYdbProxy {
TString Data;
TInstant CreateTime;
ECodec Codec;
+ TString MessageGroupId;
+ TString ProducerId;
+ ui64 SeqNo;
};
explicit TReadTopicResult(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& event) {
diff --git a/ydb/tests/functional/transfer/main.cpp b/ydb/tests/functional/transfer/main.cpp
index 9ea3b36bb7..6a96711a1a 100644
--- a/ydb/tests/functional/transfer/main.cpp
+++ b/ydb/tests/functional/transfer/main.cpp
@@ -43,6 +43,11 @@ bool Checker<bool>::Get(const ::Ydb::Value& value) {
}
template<>
+ui32 Checker<ui32>::Get(const ::Ydb::Value& value) {
+ return value.uint32_value();
+}
+
+template<>
ui64 Checker<ui64>::Get(const ::Ydb::Value& value) {
return value.uint64_value();
}
@@ -70,10 +75,48 @@ std::pair<TString, std::shared_ptr<IChecker>> _C(TString&& name, T&& expected) {
};
}
+struct TMessage {
+ TString Message;
+ std::optional<ui32> Partition = std::nullopt;
+ std::optional<TString> ProducerId = std::nullopt;
+ std::optional<TString> MessageGroupId = std::nullopt;
+ std::optional<ui64> SeqNo = std::nullopt;
+};
+
+TMessage _withSeqNo(ui64 seqNo) {
+ return {
+ .Message = TStringBuilder() << "Message-" << seqNo,
+ .Partition = 0,
+ .ProducerId = std::nullopt,
+ .MessageGroupId = std::nullopt,
+ .SeqNo = seqNo
+ };
+}
+
+TMessage _withProducerId(const TString& producerId) {
+ return {
+ .Message = TStringBuilder() << "Message-" << producerId,
+ .Partition = 0,
+ .ProducerId = producerId,
+ .MessageGroupId = std::nullopt,
+ .SeqNo = std::nullopt
+ };
+}
+
+TMessage _withMessageGroupId(const TString& messageGroupId) {
+ return {
+ .Message = TStringBuilder() << "Message-" << messageGroupId,
+ .Partition = 0,
+ .ProducerId = messageGroupId,
+ .MessageGroupId = messageGroupId,
+ .SeqNo = std::nullopt
+ };
+}
+
struct TConfig {
const char* TableDDL;
const char* Lambda;
- const char* Message;
+ const TVector<TMessage> Messages;
TVector<std::pair<TString, std::shared_ptr<IChecker>>> Expectations;
};
@@ -106,7 +149,10 @@ struct MainTestCase {
{
auto res = session.ExecuteQuery(Sprintf(R"(
- CREATE TOPIC `%s`;
+ CREATE TOPIC `%s`
+ WITH (
+ min_active_partitions = 10
+ );
)", TopicName.data()), TTxControl::NoTx()).GetValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}
@@ -126,20 +172,31 @@ struct MainTestCase {
}
{
- TWriteSessionSettings writeSettings;
- writeSettings.Path(TopicName);
- writeSettings.DeduplicationEnabled(false);
- auto writeSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings);
+ for (const auto& m : Config.Messages) {
+ TWriteSessionSettings writeSettings;
+ writeSettings.Path(TopicName);
+ writeSettings.DeduplicationEnabled(m.SeqNo);
+ if (m.Partition) {
+ writeSettings.PartitionId(m.Partition);
+ }
+ if (m.ProducerId) {
+ writeSettings.ProducerId(*m.ProducerId);
+ }
+ if (m.MessageGroupId) {
+ writeSettings.MessageGroupId(*m.MessageGroupId);
+ }
+ auto writeSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings);
- UNIT_ASSERT(writeSession->Write(Config.Message));
- writeSession->Close(TDuration::Seconds(1));
+ UNIT_ASSERT(writeSession->Write(m.Message, m.SeqNo));
+ writeSession->Close(TDuration::Seconds(1));
+ }
}
{
for (size_t attempt = 20; attempt--; ) {
auto res = DoRead(session);
Cerr << "Attempt=" << attempt << " count=" << res.first << Endl << Flush;
- if (res.first == 1) {
+ if (res.first == Config.Messages.size()) {
const Ydb::ResultSet& proto = res.second;
for (size_t i = 0; i < Config.Expectations.size(); ++i) {
auto& c = Config.Expectations[i];
@@ -214,7 +271,7 @@ Y_UNIT_TEST_SUITE(Transfer)
};
)",
- .Message = "Message-1",
+ .Messages = {{"Message-1"}},
.Expectations = {
_C("Key", ui64(0)),
@@ -247,7 +304,7 @@ Y_UNIT_TEST_SUITE(Transfer)
};
)",
- .Message = "Message-1",
+ .Messages = {{"Message-1"}},
.Expectations = {
_C("Key", ui64(0)),
@@ -256,6 +313,51 @@ Y_UNIT_TEST_SUITE(Transfer)
}).Run();
}
+ Y_UNIT_TEST(Main_ColumnTable_ComplexKey)
+ {
+ MainTestCase({
+ .TableDDL = R"(
+ CREATE TABLE `%s` (
+ Key1 Uint64 NOT NULL,
+ Key3 Uint64 NOT NULL,
+ Value1 Utf8,
+ Key2 Uint64 NOT NULL,
+ Value2 Utf8,
+ Key4 Uint64 NOT NULL,
+ PRIMARY KEY (Key3, Key2, Key1, Key4)
+ ) WITH (
+ STORE = COLUMN
+ );
+ )",
+
+ .Lambda = R"(
+ $l = ($x) -> {
+ return [
+ <|
+ Key1:CAST(1 AS Uint64),
+ Key2:CAST(2 AS Uint64),
+ Value2:CAST("value-2" AS Utf8),
+ Key4:CAST(4 AS Uint64),
+ Key3:CAST(3 AS Uint64),
+ Value1:CAST("value-1" AS Utf8),
+ |>
+ ];
+ };
+ )",
+
+ .Messages = {{"Message-1"}},
+
+ .Expectations = {
+ _C("Key1", ui64(1)),
+ _C("Key2", ui64(2)),
+ _C("Key3", ui64(3)),
+ _C("Key4", ui64(4)),
+ _C("Value1", TString("value-1")),
+ _C("Value2", TString("value-2")),
+ }
+ }).Run();
+ }
+
Y_UNIT_TEST(Main_ColumnTable_JsonMessage)
{
MainTestCase({
@@ -286,12 +388,12 @@ Y_UNIT_TEST_SUITE(Transfer)
};
)",
- .Message = R"({
+ .Messages = {{R"({
"id": 1,
"first_name": "Vasya",
"last_name": "Pupkin",
"salary": "123"
- })",
+ })"}},
.Expectations = {
_C("Id", ui64(1)),
@@ -326,7 +428,7 @@ Y_UNIT_TEST_SUITE(Transfer)
};
)",
- .Message = "Message-1",
+ .Messages = {{"Message-1"}},
.Expectations = {
_C("Key", ui64(0)),
@@ -359,7 +461,7 @@ Y_UNIT_TEST_SUITE(Transfer)
};
)",
- .Message = "2025-02-21",
+ .Messages = {{"2025-02-21"}},
.Expectations = {
_C("Key", ui64(0)),
@@ -392,7 +494,7 @@ Y_UNIT_TEST_SUITE(Transfer)
};
)",
- .Message = "1.23",
+ .Messages = {{"1.23"}},
.Expectations = {
_C("Key", ui64(0)),
@@ -425,7 +527,7 @@ Y_UNIT_TEST_SUITE(Transfer)
};
)",
- .Message = "Message-1 long value 0 1234567890 1 1234567890 2 1234567890 3 1234567890 4 1234567890 5 1234567890 6 1234567890",
+ .Messages = {{"Message-1 long value 0 1234567890 1 1234567890 2 1234567890 3 1234567890 4 1234567890 5 1234567890 6 1234567890"}},
.Expectations = {
_C("Key", ui64(0)),
@@ -434,5 +536,134 @@ Y_UNIT_TEST_SUITE(Transfer)
}).Run();
}
+ Y_UNIT_TEST(Main_MessageField_Partition)
+ {
+ MainTestCase({
+ .TableDDL = R"(
+ CREATE TABLE `%s` (
+ Partition Uint32 NOT NULL,
+ Message Utf8,
+ PRIMARY KEY (Partition)
+ ) WITH (
+ STORE = COLUMN
+ );
+ )",
+
+ .Lambda = R"(
+ $l = ($x) -> {
+ return [
+ <|
+ Partition:CAST($x._partition AS Uint32),
+ Message:CAST($x._data AS Utf8)
+ |>
+ ];
+ };
+ )",
+
+ .Messages = {{"Message-1", 7}},
+
+ .Expectations = {
+ _C("Partition", ui32(7)),
+ _C("Message", TString("Message-1")),
+ }
+ }).Run();
+ }
+
+ Y_UNIT_TEST(Main_MessageField_SeqNo)
+ {
+ MainTestCase({
+ .TableDDL = R"(
+ CREATE TABLE `%s` (
+ SeqNo Uint64 NOT NULL,
+ Message Utf8,
+ PRIMARY KEY (SeqNo)
+ ) WITH (
+ STORE = COLUMN
+ );
+ )",
+
+ .Lambda = R"(
+ $l = ($x) -> {
+ return [
+ <|
+ SeqNo:CAST($x._seq_no AS Uint32),
+ Message:CAST($x._data AS Utf8)
+ |>
+ ];
+ };
+ )",
+
+ .Messages = {_withSeqNo(13)},
+
+ .Expectations = {
+ _C("SeqNo", ui64(13)),
+ }
+ }).Run();
+ }
+
+ Y_UNIT_TEST(Main_MessageField_ProducerId)
+ {
+ MainTestCase({
+ .TableDDL = R"(
+ CREATE TABLE `%s` (
+ Offset Uint64 NOT NULL,
+ ProducerId Utf8,
+ PRIMARY KEY (Offset)
+ ) WITH (
+ STORE = COLUMN
+ );
+ )",
+
+ .Lambda = R"(
+ $l = ($x) -> {
+ return [
+ <|
+ Offset:CAST($x._offset AS Uint64),
+ ProducerId:CAST($x._producer_id AS Utf8)
+ |>
+ ];
+ };
+ )",
+
+ .Messages = {_withProducerId("Producer-13")},
+
+ .Expectations = {
+ _C("ProducerId", TString("Producer-13")),
+ }
+ }).Run();
+ }
+
+ Y_UNIT_TEST(Main_MessageField_MessageGroupId)
+ {
+ MainTestCase({
+ .TableDDL = R"(
+ CREATE TABLE `%s` (
+ Offset Uint64 NOT NULL,
+ MessageGroupId Utf8,
+ PRIMARY KEY (Offset)
+ ) WITH (
+ STORE = COLUMN
+ );
+ )",
+
+ .Lambda = R"(
+ $l = ($x) -> {
+ return [
+ <|
+ Offset:CAST($x._offset AS Uint64),
+ MessageGroupId:CAST($x._message_group_id AS Utf8)
+ |>
+ ];
+ };
+ )",
+
+ .Messages = {_withMessageGroupId("MessageGroupId-13")},
+
+ .Expectations = {
+ _C("MessageGroupId", TString("MessageGroupId-13")),
+ }
+ }).Run();
+ }
+
}