aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-10-24 18:03:02 +0300
committerilnaz <ilnaz@ydb.tech>2023-10-24 19:07:11 +0300
commit1c1a58e15405cbb07776a5c15c8530bd06d6f3f5 (patch)
tree6686117dac9cb3b8425ca4da20fd57d12166f0ec
parent1ed4742ff7b185c61308ebfa67df74425bec4016 (diff)
downloadydb-1c1a58e15405cbb07776a5c15c8530bd06d6f3f5.tar.gz
TChangeRecord serialization iface KIKIMR-19823
-rw-r--r--ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/change_record.cpp401
-rw-r--r--ydb/core/tx/datashard/change_record.h14
-rw-r--r--ydb/core/tx/datashard/change_record_cdc_serializer.cpp551
-rw-r--r--ydb/core/tx/datashard/change_record_cdc_serializer.h32
-rw-r--r--ydb/core/tx/datashard/change_sender_async_index.cpp2
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp98
-rw-r--r--ydb/core/tx/datashard/ya.make1
11 files changed, 602 insertions, 501 deletions
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
index 2a8ad90687..6b925e74ad 100644
--- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
@@ -166,6 +166,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange_split.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_body_serializer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_cdc_serializer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
index 0622573166..3fe65be82c 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
@@ -167,6 +167,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange_split.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_body_serializer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_cdc_serializer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
index 0622573166..3fe65be82c 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
@@ -167,6 +167,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange_split.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_body_serializer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_cdc_serializer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
index 4a4986e6f8..c19b8e2985 100644
--- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
@@ -167,6 +167,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange_split.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_body_serializer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_cdc_serializer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp
index 6f2ed24d6f..dafa956856 100644
--- a/ydb/core/tx/datashard/change_record.cpp
+++ b/ydb/core/tx/datashard/change_record.cpp
@@ -1,22 +1,10 @@
#include "change_record.h"
-#include "export_common.h"
-
-#include <library/cpp/digest/md5/md5.h>
-#include <library/cpp/json/json_reader.h>
-#include <library/cpp/json/json_writer.h>
-#include <library/cpp/json/yson/json2yson.h>
-#include <library/cpp/string_utils/base64/base64.h>
#include <ydb/core/protos/change_exchange.pb.h>
-#include <ydb/library/yverify_stream/yverify_stream.h>
-#include <ydb/library/binary_json/read.h>
-
-#include <util/stream/str.h>
-#include <util/string/printf.h>
namespace NKikimr::NDataShard {
-void TChangeRecord::SerializeToProto(NKikimrChangeExchange::TChangeRecord& record) const {
+void TChangeRecord::Serialize(NKikimrChangeExchange::TChangeRecord& record) const {
record.SetOrder(Order);
record.SetGroup(Group);
record.SetStep(Step);
@@ -45,366 +33,6 @@ static auto ParseBody(const TString& protoBody) {
return body;
}
-static NJson::TJsonValue StringToJson(TStringBuf in) {
- NJson::TJsonValue result;
- Y_ABORT_UNLESS(NJson::ReadJsonTree(in, &result));
- return result;
-}
-
-static NJson::TJsonValue YsonToJson(TStringBuf in) {
- NJson::TJsonValue result;
- Y_ABORT_UNLESS(NJson2Yson::DeserializeYsonAsJsonValue(in, &result));
- return result;
-}
-
-static NJson::TJsonValue ToJson(const TCell& cell, NScheme::TTypeInfo type) {
- if (cell.IsNull()) {
- return NJson::TJsonValue(NJson::JSON_NULL);
- }
-
- switch (type.GetTypeId()) {
- case NScheme::NTypeIds::Bool:
- return NJson::TJsonValue(cell.AsValue<bool>());
- case NScheme::NTypeIds::Int8:
- return NJson::TJsonValue(cell.AsValue<i8>());
- case NScheme::NTypeIds::Uint8:
- return NJson::TJsonValue(cell.AsValue<ui8>());
- case NScheme::NTypeIds::Int16:
- return NJson::TJsonValue(cell.AsValue<i16>());
- case NScheme::NTypeIds::Uint16:
- return NJson::TJsonValue(cell.AsValue<ui16>());
- case NScheme::NTypeIds::Int32:
- return NJson::TJsonValue(cell.AsValue<i32>());
- case NScheme::NTypeIds::Uint32:
- return NJson::TJsonValue(cell.AsValue<ui32>());
- case NScheme::NTypeIds::Int64:
- return NJson::TJsonValue(cell.AsValue<i64>());
- case NScheme::NTypeIds::Uint64:
- return NJson::TJsonValue(cell.AsValue<ui64>());
- case NScheme::NTypeIds::Float:
- return NJson::TJsonValue(cell.AsValue<float>());
- case NScheme::NTypeIds::Double:
- return NJson::TJsonValue(cell.AsValue<double>());
- case NScheme::NTypeIds::Date:
- return NJson::TJsonValue(TInstant::Days(cell.AsValue<ui16>()).ToString());
- case NScheme::NTypeIds::Datetime:
- return NJson::TJsonValue(TInstant::Seconds(cell.AsValue<ui32>()).ToString());
- case NScheme::NTypeIds::Timestamp:
- return NJson::TJsonValue(TInstant::MicroSeconds(cell.AsValue<ui64>()).ToString());
- case NScheme::NTypeIds::Interval:
- return NJson::TJsonValue(cell.AsValue<i64>());
- case NScheme::NTypeIds::Decimal:
- return NJson::TJsonValue(DecimalToString(cell.AsValue<std::pair<ui64, i64>>()));
- case NScheme::NTypeIds::DyNumber:
- return NJson::TJsonValue(DyNumberToString(cell.AsBuf()));
- case NScheme::NTypeIds::String:
- case NScheme::NTypeIds::String4k:
- case NScheme::NTypeIds::String2m:
- return NJson::TJsonValue(Base64Encode(cell.AsBuf()));
- case NScheme::NTypeIds::Utf8:
- return NJson::TJsonValue(cell.AsBuf());
- case NScheme::NTypeIds::Json:
- return StringToJson(cell.AsBuf());
- case NScheme::NTypeIds::JsonDocument:
- return StringToJson(NBinaryJson::SerializeToJson(cell.AsBuf()));
- case NScheme::NTypeIds::Yson:
- return YsonToJson(cell.AsBuf());
- case NScheme::NTypeIds::Pg:
- // TODO: support pg types
- Y_ABORT("pg types are not supported");
- default:
- Y_ABORT("Unexpected type");
- }
-}
-
-static void SerializeJsonKey(TUserTable::TCPtr schema, NJson::TJsonValue& key,
- const NKikimrChangeExchange::TDataChange::TSerializedCells& in)
-{
- Y_ABORT_UNLESS(in.TagsSize() == schema->KeyColumnIds.size());
- for (size_t i = 0; i < schema->KeyColumnIds.size(); ++i) {
- Y_ABORT_UNLESS(in.GetTags(i) == schema->KeyColumnIds.at(i));
- }
-
- TSerializedCellVec cells;
- Y_ABORT_UNLESS(TSerializedCellVec::TryParse(in.GetData(), cells));
-
- Y_ABORT_UNLESS(cells.GetCells().size() == schema->KeyColumnTypes.size());
- for (size_t i = 0; i < schema->KeyColumnTypes.size(); ++i) {
- const auto type = schema->KeyColumnTypes.at(i);
- const auto& cell = cells.GetCells().at(i);
- key.AppendValue(ToJson(cell, type));
- }
-}
-
-static void SerializeJsonValue(TUserTable::TCPtr schema, NJson::TJsonValue& value,
- const NKikimrChangeExchange::TDataChange::TSerializedCells& in)
-{
- TSerializedCellVec cells;
- Y_ABORT_UNLESS(TSerializedCellVec::TryParse(in.GetData(), cells));
- Y_ABORT_UNLESS(in.TagsSize() == cells.GetCells().size());
-
- for (ui32 i = 0; i < in.TagsSize(); ++i) {
- const auto tag = in.GetTags(i);
- const auto& cell = cells.GetCells().at(i);
-
- auto it = schema->Columns.find(tag);
- Y_ABORT_UNLESS(it != schema->Columns.end());
-
- const auto& column = it->second;
- value.InsertValue(column.Name, ToJson(cell, column.Type));
- }
-}
-
-static void MergeJsonMaps(NJson::TJsonValue& mergeTo, NJson::TJsonValue& mergeFrom) {
- Y_ABORT_UNLESS(mergeTo.GetType() == NJson::EJsonValueType::JSON_MAP);
- Y_ABORT_UNLESS(mergeFrom.GetType() == NJson::EJsonValueType::JSON_MAP);
- for (const auto& entry : mergeFrom.GetMap()) {
- mergeTo.InsertValue(entry.first, entry.second);
- }
-}
-
-static void SerializeVirtualTimestamp(NJson::TJsonValue& value, std::initializer_list<ui64> vt) {
- for (auto v : vt) {
- value.AppendValue(v);
- }
-}
-
-void TChangeRecord::SerializeToYdbJson(NJson::TJsonValue& json, bool virtualTimestamps) const {
- if (Kind == EKind::CdcHeartbeat) {
- return SerializeVirtualTimestamp(json["resolved"], {Step, TxId});
- }
-
- Y_ABORT_UNLESS(Kind == EKind::CdcDataChange);
- Y_ABORT_UNLESS(Schema);
-
- const auto body = ParseBody(Body);
- SerializeJsonKey(Schema, json["key"], body.GetKey());
-
- if (body.HasOldImage()) {
- SerializeJsonValue(Schema, json["oldImage"], body.GetOldImage());
- }
-
- if (body.HasNewImage()) {
- SerializeJsonValue(Schema, json["newImage"], body.GetNewImage());
- }
-
- const auto hasAnyImage = body.HasOldImage() || body.HasNewImage();
- switch (body.GetRowOperationCase()) {
- case NKikimrChangeExchange::TDataChange::kUpsert:
- json["update"].SetType(NJson::JSON_MAP);
- if (!hasAnyImage) {
- SerializeJsonValue(Schema, json["update"], body.GetUpsert());
- }
- break;
- case NKikimrChangeExchange::TDataChange::kReset:
- json["reset"].SetType(NJson::JSON_MAP);
- if (!hasAnyImage) {
- SerializeJsonValue(Schema, json["reset"], body.GetReset());
- }
- break;
- case NKikimrChangeExchange::TDataChange::kErase:
- json["erase"].SetType(NJson::JSON_MAP);
- break;
- default:
- Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase()));
- }
-
- if (virtualTimestamps) {
- SerializeVirtualTimestamp(json["ts"], {Step, TxId});
- }
-}
-
-static void ExtendJson(NJson::TJsonValue& value, const NJson::TJsonValue& ext) {
- Y_ABORT_UNLESS(ext.GetType() == NJson::JSON_MAP);
- for (const auto& [k, v] : ext.GetMapSafe()) {
- value.InsertValue(k, v);
- }
-}
-
-static void ToAttributeValues(TUserTable::TCPtr schema, NJson::TJsonValue& value,
- const NKikimrChangeExchange::TDataChange::TSerializedCells& in)
-{
- TSerializedCellVec cells;
- Y_ABORT_UNLESS(TSerializedCellVec::TryParse(in.GetData(), cells));
- Y_ABORT_UNLESS(in.TagsSize() == cells.GetCells().size());
-
- for (ui32 i = 0; i < in.TagsSize(); ++i) {
- const auto tag = in.GetTags(i);
- const auto& cell = cells.GetCells().at(i);
-
- if (cell.IsNull()) {
- continue;
- }
-
- auto it = schema->Columns.find(tag);
- Y_ABORT_UNLESS(it != schema->Columns.end());
-
- const auto& column = it->second;
- const auto& name = column.Name;
- const auto type = column.Type.GetTypeId();
-
- if (name == "__Hash" || name == "__CreatedAt") {
- continue; // hidden column
- } else if (name.StartsWith("__Hash_")) {
- bool indexed = false;
- for (const auto& [_, index] : schema->Indexes) {
- Y_ABORT_UNLESS(index.KeyColumnIds.size() >= 1);
- if (index.KeyColumnIds.at(0) == tag) {
- indexed = true;
- break;
- }
- }
- if (indexed) {
- continue; // index hash column
- }
- } else if (name == "__RowData") {
- Y_DEBUG_ABORT_UNLESS(type == NScheme::NTypeIds::JsonDocument);
- const auto rowData = StringToJson(NBinaryJson::SerializeToJson(cell.AsBuf()));
- if (rowData.GetType() == NJson::JSON_MAP) {
- auto map = rowData.GetMapSafe().find("M");
- if (map != rowData.GetMapSafe().end()) {
- if (map->second.GetType() == NJson::JSON_MAP) {
- ExtendJson(value, map->second);
- }
- }
- }
- }
-
- if (type == NScheme::NTypeIds::Bool) {
- value.InsertValue(name, NJson::TJsonMap({{"BOOL", cell.AsValue<bool>()}}));
- } else if (type == NScheme::NTypeIds::DyNumber) {
- value.InsertValue(name, NJson::TJsonMap({{"N", DyNumberToString(cell.AsBuf())}}));
- } else if (type == NScheme::NTypeIds::String) {
- value.InsertValue(name, NJson::TJsonMap({{"B", Base64Encode(cell.AsBuf())}}));
- } else if (type == NScheme::NTypeIds::Utf8) {
- value.InsertValue(name, NJson::TJsonMap({{"S", cell.AsBuf()}}));
- }
- }
-}
-
-void TChangeRecord::SerializeToDynamoDBStreamsJson(NJson::TJsonValue& json, const TAwsJsonOptions& opts) const {
- Y_ABORT_UNLESS(Kind == EKind::CdcDataChange);
- Y_ABORT_UNLESS(Schema);
-
- json = NJson::TJsonMap({
- {"awsRegion", opts.AwsRegion},
- {"dynamodb", NJson::TJsonMap({
- {"ApproximateCreationDateTime", GetApproximateCreationDateTime().MilliSeconds()},
- {"SequenceNumber", Sprintf("%0*" PRIi64, 21 /* min length */, GetSeqNo())},
- })},
- {"eventID", Sprintf("%" PRIu64 "-%" PRIi64, opts.ShardId, GetSeqNo())},
- {"eventSource", "ydb:document-table"},
- {"eventVersion", "1.0"},
- });
-
- auto& dynamodb = json["dynamodb"];
- const auto body = ParseBody(Body);
-
- bool keysOnly = false;
- bool newAndOldImages = false;
- switch (opts.StreamMode) {
- case TUserTable::TCdcStream::EMode::ECdcStreamModeNewImage:
- dynamodb["StreamViewType"] = "NEW_IMAGE";
- break;
- case TUserTable::TCdcStream::EMode::ECdcStreamModeOldImage:
- dynamodb["StreamViewType"] = "OLD_IMAGE";
- break;
- case TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages:
- dynamodb["StreamViewType"] = "NEW_AND_OLD_IMAGES";
- newAndOldImages = true;
- break;
- default:
- dynamodb["StreamViewType"] = "KEYS_ONLY";
- keysOnly = true;
- break;
- }
-
- NJson::TJsonMap keys;
- ToAttributeValues(Schema, keys, body.GetKey());
- dynamodb["Keys"] = keys;
-
- if (!keysOnly && body.HasOldImage()) {
- ToAttributeValues(Schema, dynamodb["OldImage"], body.GetOldImage());
- ExtendJson(dynamodb["OldImage"], keys);
- }
-
- if (!keysOnly && body.HasNewImage()) {
- ToAttributeValues(Schema, dynamodb["NewImage"], body.GetNewImage());
- ExtendJson(dynamodb["NewImage"], keys);
- }
-
- switch (body.GetRowOperationCase()) {
- case NKikimrChangeExchange::TDataChange::kUpsert:
- case NKikimrChangeExchange::TDataChange::kReset:
- if (newAndOldImages) {
- json["eventName"] = body.HasOldImage() ? "MODIFY" : "INSERT";
- } else {
- json["eventName"] = "MODIFY";
- }
- break;
- case NKikimrChangeExchange::TDataChange::kErase:
- json["eventName"] = "REMOVE";
- break;
- default:
- Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase()));
- }
-}
-
-void TChangeRecord::SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::TJsonValue& valueJson, TUserTable::TCdcStream::EMode streamMode) const {
- Y_ABORT_UNLESS(Kind == EKind::CdcDataChange);
- Y_ABORT_UNLESS(Schema);
-
- const auto body = ParseBody(Body);
-
- keyJson["payload"].SetType(NJson::JSON_MAP);
- SerializeJsonValue(Schema, keyJson["payload"], body.GetKey()); // Debezium expects key in the same format as values
-
- valueJson["payload"].SetType(NJson::JSON_MAP);
- // payload.before. Optional
- if (body.HasOldImage()) {
- SerializeJsonValue(Schema, valueJson["payload"]["before"], body.GetOldImage());
- MergeJsonMaps(valueJson["payload"]["before"], keyJson["payload"]); // Debezium expects key included in value
- }
-
- // payload.after. Optional
- if (body.HasNewImage()) {
- SerializeJsonValue(Schema, valueJson["payload"]["after"], body.GetNewImage());
- MergeJsonMaps(valueJson["payload"]["after"], keyJson["payload"]); // Debezium expects key included in value
- }
-
- // payload.op. Mandatory
- if (Source == ESource::InitialScan) {
- valueJson["payload"]["op"] = "r"; // r = read
- } else {
- switch (body.GetRowOperationCase()) {
- case NKikimrChangeExchange::TDataChange::kUpsert:
- case NKikimrChangeExchange::TDataChange::kReset:
- if (streamMode == TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages) {
- valueJson["payload"]["op"] = body.HasOldImage() ? "u" : "c"; // c = create
- } else {
- valueJson["payload"]["op"] = "u"; // u = update
- }
- break;
- case NKikimrChangeExchange::TDataChange::kErase:
- valueJson["payload"]["op"] = "d"; // d = delete
- break;
- default:
- Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase()));
- }
- }
-
- // payload.source. Mandatory.
- valueJson["payload"]["source"] = NJson::TJsonMap({
- {"version", "1.0.0"},
- {"connector", "ydb"},
- {"ts_ms", GetApproximateCreationDateTime().MilliSeconds()}, // payload.ts_ms has no sense
- {"snapshot", Source == ESource::InitialScan},
- {"step", Step},
- {"txId", TxId},
- // TODO: db & table
- });
-}
-
TConstArrayRef<TCell> TChangeRecord::GetKey() const {
if (Key) {
return *Key;
@@ -436,33 +64,6 @@ i64 TChangeRecord::GetSeqNo() const {
return static_cast<i64>(Order);
}
-TString TChangeRecord::GetPartitionKey() const {
- if (PartitionKey) {
- return *PartitionKey;
- }
-
- switch (Kind) {
- case EKind::CdcDataChange: {
- Y_ABORT_UNLESS(Schema);
- const auto body = ParseBody(Body);
-
- NJson::TJsonValue key;
- SerializeJsonKey(Schema, key, body.GetKey());
-
- PartitionKey.ConstructInPlace(MD5::Calc(WriteJson(key, false)));
- break;
- }
-
- case EKind::CdcHeartbeat:
- case EKind::AsyncIndex: {
- Y_ABORT("Not supported");
- }
- }
-
- Y_ABORT_UNLESS(PartitionKey);
- return *PartitionKey;
-}
-
TInstant TChangeRecord::GetApproximateCreationDateTime() const {
return GetGroup()
? TInstant::FromValue(GetGroup())
diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h
index abe5cb6d75..e287ee7260 100644
--- a/ydb/core/tx/datashard/change_record.h
+++ b/ydb/core/tx/datashard/change_record.h
@@ -2,8 +2,6 @@
#include "datashard_user_table.h"
-#include <library/cpp/json/json_value.h>
-
#include <ydb/core/scheme/scheme_pathid.h>
#include <ydb/core/scheme/scheme_tablecell.h>
@@ -33,12 +31,6 @@ public:
CdcHeartbeat,
};
- struct TAwsJsonOptions {
- TString AwsRegion;
- NKikimrSchemeOp::ECdcStreamMode StreamMode;
- ui64 ShardId;
- };
-
public:
ui64 GetOrder() const { return Order; }
ui64 GetGroup() const { return Group; }
@@ -53,11 +45,9 @@ public:
const TPathId& GetTableId() const { return TableId; }
ui64 GetSchemaVersion() const { return SchemaVersion; }
+ TUserTable::TCPtr GetSchema() const { return Schema; }
- void SerializeToProto(NKikimrChangeExchange::TChangeRecord& record) const;
- void SerializeToYdbJson(NJson::TJsonValue& json, bool virtualTimestamps) const;
- void SerializeToDynamoDBStreamsJson(NJson::TJsonValue& json, const TAwsJsonOptions& opts) const;
- void SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::TJsonValue& valueJson, TUserTable::TCdcStream::EMode streamMode) const;
+ void Serialize(NKikimrChangeExchange::TChangeRecord& record) const;
TConstArrayRef<TCell> GetKey() const;
i64 GetSeqNo() const;
diff --git a/ydb/core/tx/datashard/change_record_cdc_serializer.cpp b/ydb/core/tx/datashard/change_record_cdc_serializer.cpp
new file mode 100644
index 0000000000..fb6f60833a
--- /dev/null
+++ b/ydb/core/tx/datashard/change_record_cdc_serializer.cpp
@@ -0,0 +1,551 @@
+#include "change_record_cdc_serializer.h"
+#include "change_record.h"
+#include "export_common.h"
+
+#include <ydb/core/protos/change_exchange.pb.h>
+#include <ydb/core/protos/grpc_pq_old.pb.h>
+#include <ydb/core/protos/msgbus_pq.pb.h>
+#include <ydb/library/binary_json/read.h>
+#include <ydb/library/yverify_stream/yverify_stream.h>
+
+#include <library/cpp/digest/md5/md5.h>
+#include <library/cpp/json/json_reader.h>
+#include <library/cpp/json/json_writer.h>
+#include <library/cpp/json/yson/json2yson.h>
+#include <library/cpp/string_utils/base64/base64.h>
+
+#include <util/stream/str.h>
+#include <util/string/printf.h>
+
+namespace NKikimr::NDataShard {
+
+class TBaseSerializer: public IChangeRecordSerializer {
+ static NKikimrPQClient::TDataChunk MakeDataChunk() {
+ NKikimrPQClient::TDataChunk data;
+ data.SetCodec(0 /* CODEC_RAW */);
+ return data;
+ }
+
+ void SerializeDataChange(TCmdWrite& cmd, const TChangeRecord& record) {
+ auto data = MakeDataChunk();
+ FillDataChunk(data, record);
+ cmd.SetData(data.SerializeAsString());
+ }
+
+ void SerializeHeartbeat(TCmdWrite& cmd, const TChangeRecord& record) {
+ auto data = MakeDataChunk();
+ FillDataChunk(data, record);
+
+ auto& heartbeat = *cmd.MutableHeartbeat();
+ heartbeat.SetStep(record.GetStep());
+ heartbeat.SetTxId(record.GetTxId());
+ heartbeat.SetData(data.SerializeAsString());
+ }
+
+protected:
+ virtual void FillDataChunk(NKikimrPQClient::TDataChunk& chunk, const TChangeRecord& record) = 0;
+
+public:
+ explicit TBaseSerializer(const TChangeRecordSerializerOpts& opts)
+ : Opts(opts)
+ {}
+
+ void Serialize(TCmdWrite& cmd, const TChangeRecord& record) override {
+ cmd.SetSeqNo(record.GetSeqNo());
+ cmd.SetCreateTimeMS(record.GetApproximateCreationDateTime().MilliSeconds());
+ switch (record.GetKind()) {
+ case TChangeRecord::EKind::CdcDataChange:
+ return SerializeDataChange(cmd, record);
+ case TChangeRecord::EKind::CdcHeartbeat:
+ return SerializeHeartbeat(cmd, record);
+ case TChangeRecord::EKind::AsyncIndex:
+ Y_ABORT("Unexpected");
+ }
+ }
+
+protected:
+ const TChangeRecordSerializerOpts Opts;
+
+}; // TBaseSerializer
+
+class TProtoSerializer: public TBaseSerializer {
+protected:
+ void FillDataChunk(NKikimrPQClient::TDataChunk& data, const TChangeRecord& record) override {
+ NKikimrChangeExchange::TChangeRecord proto;
+ record.Serialize(proto);
+ data.SetData(proto.SerializeAsString());
+ }
+
+public:
+ using TBaseSerializer::TBaseSerializer;
+
+}; // TProtoSerializer
+
+class TJsonSerializer: public TBaseSerializer {
+ friend class TChangeRecord; // used in GetPartitionKey()
+
+ static NJson::TJsonWriterConfig DefaultJsonConfig() {
+ NJson::TJsonWriterConfig jsonConfig;
+ jsonConfig.ValidateUtf8 = false;
+ jsonConfig.WriteNanAsString = true;
+ return jsonConfig;
+ }
+
+protected:
+ static auto ParseBody(const TString& protoBody) {
+ NKikimrChangeExchange::TDataChange body;
+ Y_ABORT_UNLESS(body.ParseFromArray(protoBody.data(), protoBody.size()));
+ return body;
+ }
+
+ static NJson::TJsonValue StringToJson(TStringBuf in) {
+ NJson::TJsonValue result;
+ Y_ABORT_UNLESS(NJson::ReadJsonTree(in, &result));
+ return result;
+ }
+
+ static NJson::TJsonValue YsonToJson(TStringBuf in) {
+ NJson::TJsonValue result;
+ Y_ABORT_UNLESS(NJson2Yson::DeserializeYsonAsJsonValue(in, &result));
+ return result;
+ }
+
+ static NJson::TJsonValue ToJson(const TCell& cell, NScheme::TTypeInfo type) {
+ if (cell.IsNull()) {
+ return NJson::TJsonValue(NJson::JSON_NULL);
+ }
+
+ switch (type.GetTypeId()) {
+ case NScheme::NTypeIds::Bool:
+ return NJson::TJsonValue(cell.AsValue<bool>());
+ case NScheme::NTypeIds::Int8:
+ return NJson::TJsonValue(cell.AsValue<i8>());
+ case NScheme::NTypeIds::Uint8:
+ return NJson::TJsonValue(cell.AsValue<ui8>());
+ case NScheme::NTypeIds::Int16:
+ return NJson::TJsonValue(cell.AsValue<i16>());
+ case NScheme::NTypeIds::Uint16:
+ return NJson::TJsonValue(cell.AsValue<ui16>());
+ case NScheme::NTypeIds::Int32:
+ return NJson::TJsonValue(cell.AsValue<i32>());
+ case NScheme::NTypeIds::Uint32:
+ return NJson::TJsonValue(cell.AsValue<ui32>());
+ case NScheme::NTypeIds::Int64:
+ return NJson::TJsonValue(cell.AsValue<i64>());
+ case NScheme::NTypeIds::Uint64:
+ return NJson::TJsonValue(cell.AsValue<ui64>());
+ case NScheme::NTypeIds::Float:
+ return NJson::TJsonValue(cell.AsValue<float>());
+ case NScheme::NTypeIds::Double:
+ return NJson::TJsonValue(cell.AsValue<double>());
+ case NScheme::NTypeIds::Date:
+ return NJson::TJsonValue(TInstant::Days(cell.AsValue<ui16>()).ToString());
+ case NScheme::NTypeIds::Datetime:
+ return NJson::TJsonValue(TInstant::Seconds(cell.AsValue<ui32>()).ToString());
+ case NScheme::NTypeIds::Timestamp:
+ return NJson::TJsonValue(TInstant::MicroSeconds(cell.AsValue<ui64>()).ToString());
+ case NScheme::NTypeIds::Interval:
+ return NJson::TJsonValue(cell.AsValue<i64>());
+ case NScheme::NTypeIds::Decimal:
+ return NJson::TJsonValue(DecimalToString(cell.AsValue<std::pair<ui64, i64>>()));
+ case NScheme::NTypeIds::DyNumber:
+ return NJson::TJsonValue(DyNumberToString(cell.AsBuf()));
+ case NScheme::NTypeIds::String:
+ case NScheme::NTypeIds::String4k:
+ case NScheme::NTypeIds::String2m:
+ return NJson::TJsonValue(Base64Encode(cell.AsBuf()));
+ case NScheme::NTypeIds::Utf8:
+ return NJson::TJsonValue(cell.AsBuf());
+ case NScheme::NTypeIds::Json:
+ return StringToJson(cell.AsBuf());
+ case NScheme::NTypeIds::JsonDocument:
+ return StringToJson(NBinaryJson::SerializeToJson(cell.AsBuf()));
+ case NScheme::NTypeIds::Yson:
+ return YsonToJson(cell.AsBuf());
+ case NScheme::NTypeIds::Pg:
+ // TODO: support pg types
+ Y_ABORT("pg types are not supported");
+ default:
+ Y_ABORT("Unexpected type");
+ }
+ }
+
+ static void SerializeJsonKey(TUserTable::TCPtr schema, NJson::TJsonValue& key,
+ const NKikimrChangeExchange::TDataChange::TSerializedCells& in)
+ {
+ Y_ABORT_UNLESS(in.TagsSize() == schema->KeyColumnIds.size());
+ for (size_t i = 0; i < schema->KeyColumnIds.size(); ++i) {
+ Y_ABORT_UNLESS(in.GetTags(i) == schema->KeyColumnIds.at(i));
+ }
+
+ TSerializedCellVec cells;
+ Y_ABORT_UNLESS(TSerializedCellVec::TryParse(in.GetData(), cells));
+
+ Y_ABORT_UNLESS(cells.GetCells().size() == schema->KeyColumnTypes.size());
+ for (size_t i = 0; i < schema->KeyColumnTypes.size(); ++i) {
+ const auto type = schema->KeyColumnTypes.at(i);
+ const auto& cell = cells.GetCells().at(i);
+ key.AppendValue(ToJson(cell, type));
+ }
+ }
+
+ static void SerializeJsonValue(TUserTable::TCPtr schema, NJson::TJsonValue& value,
+ const NKikimrChangeExchange::TDataChange::TSerializedCells& in)
+ {
+ TSerializedCellVec cells;
+ Y_ABORT_UNLESS(TSerializedCellVec::TryParse(in.GetData(), cells));
+ Y_ABORT_UNLESS(in.TagsSize() == cells.GetCells().size());
+
+ for (ui32 i = 0; i < in.TagsSize(); ++i) {
+ const auto tag = in.GetTags(i);
+ const auto& cell = cells.GetCells().at(i);
+
+ auto it = schema->Columns.find(tag);
+ Y_ABORT_UNLESS(it != schema->Columns.end());
+
+ const auto& column = it->second;
+ value.InsertValue(column.Name, ToJson(cell, column.Type));
+ }
+ }
+
+ static void ExtendJson(NJson::TJsonValue& value, const NJson::TJsonValue& ext) {
+ Y_ABORT_UNLESS(ext.GetType() == NJson::JSON_MAP);
+ for (const auto& [k, v] : ext.GetMapSafe()) {
+ value.InsertValue(k, v);
+ }
+ }
+
+ static void SerializeVirtualTimestamp(NJson::TJsonValue& value, std::initializer_list<ui64> vt) {
+ for (auto v : vt) {
+ value.AppendValue(v);
+ }
+ }
+
+ void FillDataChunk(NKikimrPQClient::TDataChunk& data, const TChangeRecord& record) override {
+ NJson::TJsonValue json;
+ SerializeToJson(json, record);
+
+ TStringStream str;
+ NJson::WriteJson(&str, &json, JsonConfig);
+ data.SetData(str.Str());
+ }
+
+ virtual void SerializeToJson(NJson::TJsonValue& json, const TChangeRecord& record) = 0;
+
+public:
+ explicit TJsonSerializer(const TChangeRecordSerializerOpts& opts)
+ : TBaseSerializer(opts)
+ , JsonConfig(DefaultJsonConfig())
+ {}
+
+ void Serialize(TCmdWrite& cmd, const TChangeRecord& record) override {
+ TBaseSerializer::Serialize(cmd, record);
+ if (record.GetKind() == TChangeRecord::EKind::CdcDataChange) {
+ cmd.SetPartitionKey(record.GetPartitionKey());
+ }
+ }
+
+protected:
+ const NJson::TJsonWriterConfig JsonConfig;
+
+}; // TJsonSerializer
+
+class TYdbJsonSerializer: public TJsonSerializer {
+protected:
+ void SerializeToJson(NJson::TJsonValue& json, const TChangeRecord& record) override {
+ if (record.GetKind() == TChangeRecord::EKind::CdcHeartbeat) {
+ return SerializeVirtualTimestamp(json["resolved"], {record.GetStep(), record.GetTxId()});
+ }
+
+ Y_ABORT_UNLESS(record.GetKind() == TChangeRecord::EKind::CdcDataChange);
+ Y_ABORT_UNLESS(record.GetSchema());
+
+ const auto body = ParseBody(record.GetBody());
+ SerializeJsonKey(record.GetSchema(), json["key"], body.GetKey());
+
+ if (body.HasOldImage()) {
+ SerializeJsonValue(record.GetSchema(), json["oldImage"], body.GetOldImage());
+ }
+
+ if (body.HasNewImage()) {
+ SerializeJsonValue(record.GetSchema(), json["newImage"], body.GetNewImage());
+ }
+
+ const auto hasAnyImage = body.HasOldImage() || body.HasNewImage();
+ switch (body.GetRowOperationCase()) {
+ case NKikimrChangeExchange::TDataChange::kUpsert:
+ json["update"].SetType(NJson::JSON_MAP);
+ if (!hasAnyImage) {
+ SerializeJsonValue(record.GetSchema(), json["update"], body.GetUpsert());
+ }
+ break;
+ case NKikimrChangeExchange::TDataChange::kReset:
+ json["reset"].SetType(NJson::JSON_MAP);
+ if (!hasAnyImage) {
+ SerializeJsonValue(record.GetSchema(), json["reset"], body.GetReset());
+ }
+ break;
+ case NKikimrChangeExchange::TDataChange::kErase:
+ json["erase"].SetType(NJson::JSON_MAP);
+ break;
+ default:
+ Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase()));
+ }
+
+ if (Opts.VirtualTimestamps) {
+ SerializeVirtualTimestamp(json["ts"], {record.GetStep(), record.GetTxId()});
+ }
+ }
+
+public:
+ using TJsonSerializer::TJsonSerializer;
+
+}; // TYdbJsonSerializer
+
+class TDynamoDBStreamsJsonSerializer: public TJsonSerializer {
+ static void ToAttributeValues(TUserTable::TCPtr schema, NJson::TJsonValue& value,
+ const NKikimrChangeExchange::TDataChange::TSerializedCells& in)
+ {
+ TSerializedCellVec cells;
+ Y_ABORT_UNLESS(TSerializedCellVec::TryParse(in.GetData(), cells));
+ Y_ABORT_UNLESS(in.TagsSize() == cells.GetCells().size());
+
+ for (ui32 i = 0; i < in.TagsSize(); ++i) {
+ const auto tag = in.GetTags(i);
+ const auto& cell = cells.GetCells().at(i);
+
+ if (cell.IsNull()) {
+ continue;
+ }
+
+ auto it = schema->Columns.find(tag);
+ Y_ABORT_UNLESS(it != schema->Columns.end());
+
+ const auto& column = it->second;
+ const auto& name = column.Name;
+ const auto type = column.Type.GetTypeId();
+
+ if (name == "__Hash" || name == "__CreatedAt") {
+ continue; // hidden column
+ } else if (name.StartsWith("__Hash_")) {
+ bool indexed = false;
+ for (const auto& [_, index] : schema->Indexes) {
+ Y_ABORT_UNLESS(index.KeyColumnIds.size() >= 1);
+ if (index.KeyColumnIds.at(0) == tag) {
+ indexed = true;
+ break;
+ }
+ }
+ if (indexed) {
+ continue; // index hash column
+ }
+ } else if (name == "__RowData") {
+ Y_DEBUG_ABORT_UNLESS(type == NScheme::NTypeIds::JsonDocument);
+ const auto rowData = StringToJson(NBinaryJson::SerializeToJson(cell.AsBuf()));
+ if (rowData.GetType() == NJson::JSON_MAP) {
+ auto map = rowData.GetMapSafe().find("M");
+ if (map != rowData.GetMapSafe().end()) {
+ if (map->second.GetType() == NJson::JSON_MAP) {
+ ExtendJson(value, map->second);
+ }
+ }
+ }
+ }
+
+ if (type == NScheme::NTypeIds::Bool) {
+ value.InsertValue(name, NJson::TJsonMap({{"BOOL", cell.AsValue<bool>()}}));
+ } else if (type == NScheme::NTypeIds::DyNumber) {
+ value.InsertValue(name, NJson::TJsonMap({{"N", DyNumberToString(cell.AsBuf())}}));
+ } else if (type == NScheme::NTypeIds::String) {
+ value.InsertValue(name, NJson::TJsonMap({{"B", Base64Encode(cell.AsBuf())}}));
+ } else if (type == NScheme::NTypeIds::Utf8) {
+ value.InsertValue(name, NJson::TJsonMap({{"S", cell.AsBuf()}}));
+ }
+ }
+ }
+
+protected:
+ void SerializeToJson(NJson::TJsonValue& json, const TChangeRecord& record) override {
+ Y_ABORT_UNLESS(record.GetKind() == TChangeRecord::EKind::CdcDataChange);
+ Y_ABORT_UNLESS(record.GetSchema());
+
+ json = NJson::TJsonMap({
+ {"awsRegion", Opts.AwsRegion},
+ {"dynamodb", NJson::TJsonMap({
+ {"ApproximateCreationDateTime", record.GetApproximateCreationDateTime().MilliSeconds()},
+ {"SequenceNumber", Sprintf("%0*" PRIi64, 21 /* min length */, record.GetSeqNo())},
+ })},
+ {"eventID", Sprintf("%" PRIu64 "-%" PRIi64, Opts.ShardId, record.GetSeqNo())},
+ {"eventSource", "ydb:document-table"},
+ {"eventVersion", "1.0"},
+ });
+
+ auto& dynamodb = json["dynamodb"];
+ const auto body = ParseBody(record.GetBody());
+
+ bool keysOnly = false;
+ bool newAndOldImages = false;
+ switch (Opts.StreamMode) {
+ case TUserTable::TCdcStream::EMode::ECdcStreamModeNewImage:
+ dynamodb["StreamViewType"] = "NEW_IMAGE";
+ break;
+ case TUserTable::TCdcStream::EMode::ECdcStreamModeOldImage:
+ dynamodb["StreamViewType"] = "OLD_IMAGE";
+ break;
+ case TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages:
+ dynamodb["StreamViewType"] = "NEW_AND_OLD_IMAGES";
+ newAndOldImages = true;
+ break;
+ default:
+ dynamodb["StreamViewType"] = "KEYS_ONLY";
+ keysOnly = true;
+ break;
+ }
+
+ NJson::TJsonMap keys;
+ ToAttributeValues(record.GetSchema(), keys, body.GetKey());
+ dynamodb["Keys"] = keys;
+
+ if (!keysOnly && body.HasOldImage()) {
+ ToAttributeValues(record.GetSchema(), dynamodb["OldImage"], body.GetOldImage());
+ ExtendJson(dynamodb["OldImage"], keys);
+ }
+
+ if (!keysOnly && body.HasNewImage()) {
+ ToAttributeValues(record.GetSchema(), dynamodb["NewImage"], body.GetNewImage());
+ ExtendJson(dynamodb["NewImage"], keys);
+ }
+
+ switch (body.GetRowOperationCase()) {
+ case NKikimrChangeExchange::TDataChange::kUpsert:
+ case NKikimrChangeExchange::TDataChange::kReset:
+ if (newAndOldImages) {
+ json["eventName"] = body.HasOldImage() ? "MODIFY" : "INSERT";
+ } else {
+ json["eventName"] = "MODIFY";
+ }
+ break;
+ case NKikimrChangeExchange::TDataChange::kErase:
+ json["eventName"] = "REMOVE";
+ break;
+ default:
+ Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase()));
+ }
+ }
+
+public:
+ using TJsonSerializer::TJsonSerializer;
+
+}; // TDynamoDBStreamsJsonSerializer
+
+class TDebeziumJsonSerializer: public TJsonSerializer {
+protected:
+ void SerializeToJson(NJson::TJsonValue& json, const TChangeRecord& record) override {
+ Y_ABORT_UNLESS(record.GetKind() == TChangeRecord::EKind::CdcDataChange);
+ Y_ABORT_UNLESS(record.GetSchema());
+
+ const auto body = ParseBody(record.GetBody());
+ auto& keyJson = json["key"];
+ auto& valueJson = json["value"];
+
+ keyJson["payload"].SetType(NJson::JSON_MAP);
+ SerializeJsonValue(record.GetSchema(), keyJson["payload"], body.GetKey());
+
+ valueJson["payload"].SetType(NJson::JSON_MAP);
+
+ if (body.HasOldImage()) {
+ SerializeJsonValue(record.GetSchema(), valueJson["payload"]["before"], body.GetOldImage());
+ ExtendJson(valueJson["payload"]["before"], keyJson["payload"]);
+ }
+
+ if (body.HasNewImage()) {
+ SerializeJsonValue(record.GetSchema(), valueJson["payload"]["after"], body.GetNewImage());
+ ExtendJson(valueJson["payload"]["after"], keyJson["payload"]);
+ }
+
+ if (record.GetSource() == TChangeRecord::ESource::InitialScan) {
+ valueJson["payload"]["op"] = "r"; // r = read
+ } else {
+ switch (body.GetRowOperationCase()) {
+ case NKikimrChangeExchange::TDataChange::kUpsert:
+ case NKikimrChangeExchange::TDataChange::kReset:
+ if (Opts.StreamMode == TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages) {
+ valueJson["payload"]["op"] = body.HasOldImage() ? "u" : "c"; // c = create
+ } else {
+ valueJson["payload"]["op"] = "u"; // u = update
+ }
+ break;
+ case NKikimrChangeExchange::TDataChange::kErase:
+ valueJson["payload"]["op"] = "d"; // d = delete
+ break;
+ default:
+ Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase()));
+ }
+ }
+
+ valueJson["payload"]["source"] = NJson::TJsonMap({
+ {"version", "1.0.0"},
+ {"connector", "ydb"},
+ {"ts_ms", record.GetApproximateCreationDateTime().MilliSeconds()},
+ {"snapshot", record.GetSource() == TChangeRecord::ESource::InitialScan},
+ {"step", record.GetStep()},
+ {"txId", record.GetTxId()},
+ // TODO: db & table
+ });
+ }
+
+ void FillDataChunk(NKikimrPQClient::TDataChunk& data, const TChangeRecord& record) override {
+ NJson::TJsonValue json;
+ SerializeToJson(json, record);
+ {
+ TStringStream str;
+ NJson::WriteJson(&str, &json["key"], JsonConfig);
+ auto& messageMeta = *data.AddMessageMeta();
+ messageMeta.set_key("__key");
+ messageMeta.set_value(str.Str());
+ }
+ {
+ TStringStream str;
+ NJson::WriteJson(&str, &json["value"], JsonConfig);
+ data.SetData(str.Str());
+ }
+ }
+
+public:
+ using TJsonSerializer::TJsonSerializer;
+
+}; // TDebeziumJsonSerializer
+
+IChangeRecordSerializer* CreateChangeRecordSerializer(const TChangeRecordSerializerOpts& opts) {
+ switch (opts.StreamFormat) {
+ case TUserTable::TCdcStream::EFormat::ECdcStreamFormatProto:
+ return new TProtoSerializer(opts);
+ case TUserTable::TCdcStream::EFormat::ECdcStreamFormatJson:
+ return new TYdbJsonSerializer(opts);
+ case TUserTable::TCdcStream::EFormat::ECdcStreamFormatDynamoDBStreamsJson:
+ return new TDynamoDBStreamsJsonSerializer(opts);
+ case TUserTable::TCdcStream::EFormat::ECdcStreamFormatDebeziumJson:
+ return new TDebeziumJsonSerializer(opts);
+ default:
+ Y_ABORT("Unsupported format");
+ }
+}
+
+TString TChangeRecord::GetPartitionKey() const {
+ if (PartitionKey) {
+ return *PartitionKey;
+ }
+
+ Y_ABORT_UNLESS(Kind == EKind::CdcDataChange);
+ Y_ABORT_UNLESS(Schema);
+
+ const auto body = TJsonSerializer::ParseBody(Body);
+
+ NJson::TJsonValue key;
+ TJsonSerializer::SerializeJsonKey(Schema, key, body.GetKey());
+
+ PartitionKey.ConstructInPlace(MD5::Calc(WriteJson(key, false)));
+ return *PartitionKey;
+}
+
+}
diff --git a/ydb/core/tx/datashard/change_record_cdc_serializer.h b/ydb/core/tx/datashard/change_record_cdc_serializer.h
new file mode 100644
index 0000000000..e7cac6a3f2
--- /dev/null
+++ b/ydb/core/tx/datashard/change_record_cdc_serializer.h
@@ -0,0 +1,32 @@
+#pragma once
+
+#include "datashard_user_table.h"
+
+namespace NKikimrClient {
+ class TPersQueuePartitionRequest_TCmdWrite;
+}
+
+namespace NKikimr::NDataShard {
+
+class TChangeRecord;
+
+class IChangeRecordSerializer {
+protected:
+ using TCmdWrite = NKikimrClient::TPersQueuePartitionRequest_TCmdWrite;
+
+public:
+ virtual ~IChangeRecordSerializer() = default;
+ virtual void Serialize(TCmdWrite& cmd, const TChangeRecord& record) = 0;
+};
+
+struct TChangeRecordSerializerOpts {
+ TUserTable::TCdcStream::EFormat StreamFormat;
+ TUserTable::TCdcStream::EMode StreamMode;
+ TString AwsRegion;
+ bool VirtualTimestamps = false;
+ ui64 ShardId = 0;
+};
+
+IChangeRecordSerializer* CreateChangeRecordSerializer(const TChangeRecordSerializerOpts& opts);
+
+}
diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp
index 7ac5240242..b42482229f 100644
--- a/ydb/core/tx/datashard/change_sender_async_index.cpp
+++ b/ydb/core/tx/datashard/change_sender_async_index.cpp
@@ -131,7 +131,7 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS
}
auto& proto = *records->Record.AddRecords();
- record.SerializeToProto(proto);
+ record.Serialize(proto);
Adjust(proto);
}
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 358a1517bc..b5df5c2783 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -1,5 +1,6 @@
#include "change_exchange.h"
#include "change_exchange_impl.h"
+#include "change_record_cdc_serializer.h"
#include "change_sender_common_ops.h"
#include "change_sender_monitoring.h"
#include "datashard_user_table.h"
@@ -7,7 +8,6 @@
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/persqueue/writer/writer.h>
-#include <ydb/core/protos/grpc_pq_old.pb.h>
#include <ydb/services/lib/sharding/sharding.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -86,103 +86,19 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
}
}
- static NJson::TJsonWriterConfig DefaultJsonConfig() {
- NJson::TJsonWriterConfig jsonConfig;
- jsonConfig.ValidateUtf8 = false;
- jsonConfig.WriteNanAsString = true;
- return jsonConfig;
- }
-
void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
NKikimrClient::TPersQueueRequest request;
- const auto awsJsonOpts = TChangeRecord::TAwsJsonOptions{
- .AwsRegion = Stream.AwsRegion.GetOrElse(AppData()->AwsCompatibilityConfig.GetAwsRegion()),
- .StreamMode = Stream.Mode,
- .ShardId = DataShard.TabletId,
- };
-
for (const auto& record : ev->Get()->Records) {
if (record.GetSeqNo() <= MaxSeqNo) {
continue;
}
auto& cmd = *request.MutablePartitionRequest()->AddCmdWrite();
- cmd.SetSeqNo(record.GetSeqNo());
cmd.SetSourceId(NSourceIdEncoding::EncodeSimple(SourceId));
- cmd.SetCreateTimeMS(record.GetApproximateCreationDateTime().MilliSeconds());
cmd.SetIgnoreQuotaDeadline(true);
-
- NKikimrPQClient::TDataChunk data;
- data.SetCodec(0 /* CODEC_RAW */);
-
- switch (Stream.Format) {
- case NKikimrSchemeOp::ECdcStreamFormatProto: {
- NKikimrChangeExchange::TChangeRecord protoRecord;
- record.SerializeToProto(protoRecord);
- data.SetData(protoRecord.SerializeAsString());
- cmd.SetData(data.SerializeAsString());
- break;
- }
-
- case NKikimrSchemeOp::ECdcStreamFormatJson:
- case NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson: {
- NJson::TJsonValue json;
- if (Stream.Format == NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson) {
- record.SerializeToDynamoDBStreamsJson(json, awsJsonOpts);
- } else {
- record.SerializeToYdbJson(json, Stream.VirtualTimestamps);
- }
-
- TStringStream str;
- WriteJson(&str, &json, DefaultJsonConfig());
- data.SetData(str.Str());
-
- if (record.GetKind() == TChangeRecord::EKind::CdcDataChange) {
- cmd.SetData(data.SerializeAsString());
- cmd.SetPartitionKey(record.GetPartitionKey());
- } else if (record.GetKind() == TChangeRecord::EKind::CdcHeartbeat) {
- auto& heartbeat = *cmd.MutableHeartbeat();
- heartbeat.SetStep(record.GetStep());
- heartbeat.SetTxId(record.GetTxId());
- heartbeat.SetData(data.SerializeAsString());
- } else {
- Y_FAIL_S("Unexpected cdc record"
- << ": kind# " << record.GetKind());
- }
- break;
- }
-
- case NKikimrSchemeOp::ECdcStreamFormatDebeziumJson: {
- NJson::TJsonValue keyJson;
- NJson::TJsonValue valueJson;
- record.SerializeToDebeziumJson(keyJson, valueJson, Stream.Mode);
-
- TStringStream keyStr;
- WriteJson(&keyStr, &keyJson, DefaultJsonConfig());
-
- TStringStream valueStr;
- WriteJson(&valueStr, &valueJson, DefaultJsonConfig());
-
- // Add key in the same way as Kafka integration does
- auto messageMeta = data.AddMessageMeta();
- messageMeta->set_key("__key"); // Kafka integration stores kafka key in "__key" metadata
- messageMeta->set_value(keyStr.Str());
-
- // Add value
- data.SetData(valueStr.Str());
- cmd.SetData(data.SerializeAsString());
- cmd.SetPartitionKey(record.GetPartitionKey());
- break;
- }
-
- default: {
- LOG_E("Unknown format"
- << ": format# " << static_cast<int>(Stream.Format));
- return Leave();
- }
- }
+ Serializer->Serialize(cmd, record);
Pending.push_back(record.GetSeqNo());
}
@@ -327,8 +243,14 @@ public:
, DataShard(dataShard)
, PartitionId(partitionId)
, ShardId(shardId)
- , Stream(stream)
, SourceId(ToString(DataShard.TabletId))
+ , Serializer(CreateChangeRecordSerializer({
+ .StreamFormat = stream.Format,
+ .StreamMode = stream.Mode,
+ .AwsRegion = stream.AwsRegion.GetOrElse(AppData()->AwsCompatibilityConfig.GetAwsRegion()),
+ .VirtualTimestamps = stream.VirtualTimestamps,
+ .ShardId = DataShard.TabletId,
+ }))
{
}
@@ -350,8 +272,8 @@ private:
const TDataShardId DataShard;
const ui32 PartitionId;
const ui64 ShardId;
- const TUserTable::TCdcStream Stream;
const TString SourceId;
+ THolder<IChangeRecordSerializer> Serializer;
mutable TMaybe<TString> LogPrefix;
TActorId Writer;
diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make
index 9c191545b6..ff0be5f912 100644
--- a/ydb/core/tx/datashard/ya.make
+++ b/ydb/core/tx/datashard/ya.make
@@ -22,6 +22,7 @@ SRCS(
change_exchange_split.cpp
change_record.cpp
change_record_body_serializer.cpp
+ change_record_cdc_serializer.cpp
change_sender.cpp
change_sender_async_index.cpp
change_sender_cdc_stream.cpp