diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-10-24 18:03:02 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-10-24 19:07:11 +0300 |
commit | 1c1a58e15405cbb07776a5c15c8530bd06d6f3f5 (patch) | |
tree | 6686117dac9cb3b8425ca4da20fd57d12166f0ec | |
parent | 1ed4742ff7b185c61308ebfa67df74425bec4016 (diff) | |
download | ydb-1c1a58e15405cbb07776a5c15c8530bd06d6f3f5.tar.gz |
TChangeRecord serialization iface KIKIMR-19823
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record.cpp | 401 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record.h | 14 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record_cdc_serializer.cpp | 551 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record_cdc_serializer.h | 32 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_async_index.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 98 | ||||
-rw-r--r-- | ydb/core/tx/datashard/ya.make | 1 |
11 files changed, 602 insertions, 501 deletions
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt index 2a8ad90687..6b925e74ad 100644 --- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt @@ -166,6 +166,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange_split.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_body_serializer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_cdc_serializer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt index 0622573166..3fe65be82c 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt @@ -167,6 +167,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange_split.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_body_serializer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_cdc_serializer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt index 0622573166..3fe65be82c 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt @@ -167,6 +167,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange_split.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_body_serializer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_cdc_serializer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt index 4a4986e6f8..c19b8e2985 100644 --- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt @@ -167,6 +167,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange_split.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_body_serializer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_record_cdc_serializer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp index 6f2ed24d6f..dafa956856 100644 --- a/ydb/core/tx/datashard/change_record.cpp +++ b/ydb/core/tx/datashard/change_record.cpp @@ -1,22 +1,10 @@ #include "change_record.h" -#include "export_common.h" - -#include <library/cpp/digest/md5/md5.h> -#include <library/cpp/json/json_reader.h> -#include <library/cpp/json/json_writer.h> -#include <library/cpp/json/yson/json2yson.h> -#include <library/cpp/string_utils/base64/base64.h> #include <ydb/core/protos/change_exchange.pb.h> -#include <ydb/library/yverify_stream/yverify_stream.h> -#include <ydb/library/binary_json/read.h> - -#include <util/stream/str.h> -#include <util/string/printf.h> namespace NKikimr::NDataShard { -void TChangeRecord::SerializeToProto(NKikimrChangeExchange::TChangeRecord& record) const { +void TChangeRecord::Serialize(NKikimrChangeExchange::TChangeRecord& record) const { record.SetOrder(Order); record.SetGroup(Group); record.SetStep(Step); @@ -45,366 +33,6 @@ static auto ParseBody(const TString& protoBody) { return body; } -static NJson::TJsonValue StringToJson(TStringBuf in) { - NJson::TJsonValue result; - Y_ABORT_UNLESS(NJson::ReadJsonTree(in, &result)); - return result; -} - -static NJson::TJsonValue YsonToJson(TStringBuf in) { - NJson::TJsonValue result; - Y_ABORT_UNLESS(NJson2Yson::DeserializeYsonAsJsonValue(in, &result)); - return result; -} - -static NJson::TJsonValue ToJson(const TCell& cell, NScheme::TTypeInfo type) { - if (cell.IsNull()) { - return NJson::TJsonValue(NJson::JSON_NULL); - } - - switch (type.GetTypeId()) { - case NScheme::NTypeIds::Bool: - return NJson::TJsonValue(cell.AsValue<bool>()); - case NScheme::NTypeIds::Int8: - return NJson::TJsonValue(cell.AsValue<i8>()); - case NScheme::NTypeIds::Uint8: - return NJson::TJsonValue(cell.AsValue<ui8>()); - case NScheme::NTypeIds::Int16: - return NJson::TJsonValue(cell.AsValue<i16>()); - case NScheme::NTypeIds::Uint16: - return NJson::TJsonValue(cell.AsValue<ui16>()); - case NScheme::NTypeIds::Int32: - return NJson::TJsonValue(cell.AsValue<i32>()); - case NScheme::NTypeIds::Uint32: - return NJson::TJsonValue(cell.AsValue<ui32>()); - case NScheme::NTypeIds::Int64: - return NJson::TJsonValue(cell.AsValue<i64>()); - case NScheme::NTypeIds::Uint64: - return NJson::TJsonValue(cell.AsValue<ui64>()); - case NScheme::NTypeIds::Float: - return NJson::TJsonValue(cell.AsValue<float>()); - case NScheme::NTypeIds::Double: - return NJson::TJsonValue(cell.AsValue<double>()); - case NScheme::NTypeIds::Date: - return NJson::TJsonValue(TInstant::Days(cell.AsValue<ui16>()).ToString()); - case NScheme::NTypeIds::Datetime: - return NJson::TJsonValue(TInstant::Seconds(cell.AsValue<ui32>()).ToString()); - case NScheme::NTypeIds::Timestamp: - return NJson::TJsonValue(TInstant::MicroSeconds(cell.AsValue<ui64>()).ToString()); - case NScheme::NTypeIds::Interval: - return NJson::TJsonValue(cell.AsValue<i64>()); - case NScheme::NTypeIds::Decimal: - return NJson::TJsonValue(DecimalToString(cell.AsValue<std::pair<ui64, i64>>())); - case NScheme::NTypeIds::DyNumber: - return NJson::TJsonValue(DyNumberToString(cell.AsBuf())); - case NScheme::NTypeIds::String: - case NScheme::NTypeIds::String4k: - case NScheme::NTypeIds::String2m: - return NJson::TJsonValue(Base64Encode(cell.AsBuf())); - case NScheme::NTypeIds::Utf8: - return NJson::TJsonValue(cell.AsBuf()); - case NScheme::NTypeIds::Json: - return StringToJson(cell.AsBuf()); - case NScheme::NTypeIds::JsonDocument: - return StringToJson(NBinaryJson::SerializeToJson(cell.AsBuf())); - case NScheme::NTypeIds::Yson: - return YsonToJson(cell.AsBuf()); - case NScheme::NTypeIds::Pg: - // TODO: support pg types - Y_ABORT("pg types are not supported"); - default: - Y_ABORT("Unexpected type"); - } -} - -static void SerializeJsonKey(TUserTable::TCPtr schema, NJson::TJsonValue& key, - const NKikimrChangeExchange::TDataChange::TSerializedCells& in) -{ - Y_ABORT_UNLESS(in.TagsSize() == schema->KeyColumnIds.size()); - for (size_t i = 0; i < schema->KeyColumnIds.size(); ++i) { - Y_ABORT_UNLESS(in.GetTags(i) == schema->KeyColumnIds.at(i)); - } - - TSerializedCellVec cells; - Y_ABORT_UNLESS(TSerializedCellVec::TryParse(in.GetData(), cells)); - - Y_ABORT_UNLESS(cells.GetCells().size() == schema->KeyColumnTypes.size()); - for (size_t i = 0; i < schema->KeyColumnTypes.size(); ++i) { - const auto type = schema->KeyColumnTypes.at(i); - const auto& cell = cells.GetCells().at(i); - key.AppendValue(ToJson(cell, type)); - } -} - -static void SerializeJsonValue(TUserTable::TCPtr schema, NJson::TJsonValue& value, - const NKikimrChangeExchange::TDataChange::TSerializedCells& in) -{ - TSerializedCellVec cells; - Y_ABORT_UNLESS(TSerializedCellVec::TryParse(in.GetData(), cells)); - Y_ABORT_UNLESS(in.TagsSize() == cells.GetCells().size()); - - for (ui32 i = 0; i < in.TagsSize(); ++i) { - const auto tag = in.GetTags(i); - const auto& cell = cells.GetCells().at(i); - - auto it = schema->Columns.find(tag); - Y_ABORT_UNLESS(it != schema->Columns.end()); - - const auto& column = it->second; - value.InsertValue(column.Name, ToJson(cell, column.Type)); - } -} - -static void MergeJsonMaps(NJson::TJsonValue& mergeTo, NJson::TJsonValue& mergeFrom) { - Y_ABORT_UNLESS(mergeTo.GetType() == NJson::EJsonValueType::JSON_MAP); - Y_ABORT_UNLESS(mergeFrom.GetType() == NJson::EJsonValueType::JSON_MAP); - for (const auto& entry : mergeFrom.GetMap()) { - mergeTo.InsertValue(entry.first, entry.second); - } -} - -static void SerializeVirtualTimestamp(NJson::TJsonValue& value, std::initializer_list<ui64> vt) { - for (auto v : vt) { - value.AppendValue(v); - } -} - -void TChangeRecord::SerializeToYdbJson(NJson::TJsonValue& json, bool virtualTimestamps) const { - if (Kind == EKind::CdcHeartbeat) { - return SerializeVirtualTimestamp(json["resolved"], {Step, TxId}); - } - - Y_ABORT_UNLESS(Kind == EKind::CdcDataChange); - Y_ABORT_UNLESS(Schema); - - const auto body = ParseBody(Body); - SerializeJsonKey(Schema, json["key"], body.GetKey()); - - if (body.HasOldImage()) { - SerializeJsonValue(Schema, json["oldImage"], body.GetOldImage()); - } - - if (body.HasNewImage()) { - SerializeJsonValue(Schema, json["newImage"], body.GetNewImage()); - } - - const auto hasAnyImage = body.HasOldImage() || body.HasNewImage(); - switch (body.GetRowOperationCase()) { - case NKikimrChangeExchange::TDataChange::kUpsert: - json["update"].SetType(NJson::JSON_MAP); - if (!hasAnyImage) { - SerializeJsonValue(Schema, json["update"], body.GetUpsert()); - } - break; - case NKikimrChangeExchange::TDataChange::kReset: - json["reset"].SetType(NJson::JSON_MAP); - if (!hasAnyImage) { - SerializeJsonValue(Schema, json["reset"], body.GetReset()); - } - break; - case NKikimrChangeExchange::TDataChange::kErase: - json["erase"].SetType(NJson::JSON_MAP); - break; - default: - Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase())); - } - - if (virtualTimestamps) { - SerializeVirtualTimestamp(json["ts"], {Step, TxId}); - } -} - -static void ExtendJson(NJson::TJsonValue& value, const NJson::TJsonValue& ext) { - Y_ABORT_UNLESS(ext.GetType() == NJson::JSON_MAP); - for (const auto& [k, v] : ext.GetMapSafe()) { - value.InsertValue(k, v); - } -} - -static void ToAttributeValues(TUserTable::TCPtr schema, NJson::TJsonValue& value, - const NKikimrChangeExchange::TDataChange::TSerializedCells& in) -{ - TSerializedCellVec cells; - Y_ABORT_UNLESS(TSerializedCellVec::TryParse(in.GetData(), cells)); - Y_ABORT_UNLESS(in.TagsSize() == cells.GetCells().size()); - - for (ui32 i = 0; i < in.TagsSize(); ++i) { - const auto tag = in.GetTags(i); - const auto& cell = cells.GetCells().at(i); - - if (cell.IsNull()) { - continue; - } - - auto it = schema->Columns.find(tag); - Y_ABORT_UNLESS(it != schema->Columns.end()); - - const auto& column = it->second; - const auto& name = column.Name; - const auto type = column.Type.GetTypeId(); - - if (name == "__Hash" || name == "__CreatedAt") { - continue; // hidden column - } else if (name.StartsWith("__Hash_")) { - bool indexed = false; - for (const auto& [_, index] : schema->Indexes) { - Y_ABORT_UNLESS(index.KeyColumnIds.size() >= 1); - if (index.KeyColumnIds.at(0) == tag) { - indexed = true; - break; - } - } - if (indexed) { - continue; // index hash column - } - } else if (name == "__RowData") { - Y_DEBUG_ABORT_UNLESS(type == NScheme::NTypeIds::JsonDocument); - const auto rowData = StringToJson(NBinaryJson::SerializeToJson(cell.AsBuf())); - if (rowData.GetType() == NJson::JSON_MAP) { - auto map = rowData.GetMapSafe().find("M"); - if (map != rowData.GetMapSafe().end()) { - if (map->second.GetType() == NJson::JSON_MAP) { - ExtendJson(value, map->second); - } - } - } - } - - if (type == NScheme::NTypeIds::Bool) { - value.InsertValue(name, NJson::TJsonMap({{"BOOL", cell.AsValue<bool>()}})); - } else if (type == NScheme::NTypeIds::DyNumber) { - value.InsertValue(name, NJson::TJsonMap({{"N", DyNumberToString(cell.AsBuf())}})); - } else if (type == NScheme::NTypeIds::String) { - value.InsertValue(name, NJson::TJsonMap({{"B", Base64Encode(cell.AsBuf())}})); - } else if (type == NScheme::NTypeIds::Utf8) { - value.InsertValue(name, NJson::TJsonMap({{"S", cell.AsBuf()}})); - } - } -} - -void TChangeRecord::SerializeToDynamoDBStreamsJson(NJson::TJsonValue& json, const TAwsJsonOptions& opts) const { - Y_ABORT_UNLESS(Kind == EKind::CdcDataChange); - Y_ABORT_UNLESS(Schema); - - json = NJson::TJsonMap({ - {"awsRegion", opts.AwsRegion}, - {"dynamodb", NJson::TJsonMap({ - {"ApproximateCreationDateTime", GetApproximateCreationDateTime().MilliSeconds()}, - {"SequenceNumber", Sprintf("%0*" PRIi64, 21 /* min length */, GetSeqNo())}, - })}, - {"eventID", Sprintf("%" PRIu64 "-%" PRIi64, opts.ShardId, GetSeqNo())}, - {"eventSource", "ydb:document-table"}, - {"eventVersion", "1.0"}, - }); - - auto& dynamodb = json["dynamodb"]; - const auto body = ParseBody(Body); - - bool keysOnly = false; - bool newAndOldImages = false; - switch (opts.StreamMode) { - case TUserTable::TCdcStream::EMode::ECdcStreamModeNewImage: - dynamodb["StreamViewType"] = "NEW_IMAGE"; - break; - case TUserTable::TCdcStream::EMode::ECdcStreamModeOldImage: - dynamodb["StreamViewType"] = "OLD_IMAGE"; - break; - case TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages: - dynamodb["StreamViewType"] = "NEW_AND_OLD_IMAGES"; - newAndOldImages = true; - break; - default: - dynamodb["StreamViewType"] = "KEYS_ONLY"; - keysOnly = true; - break; - } - - NJson::TJsonMap keys; - ToAttributeValues(Schema, keys, body.GetKey()); - dynamodb["Keys"] = keys; - - if (!keysOnly && body.HasOldImage()) { - ToAttributeValues(Schema, dynamodb["OldImage"], body.GetOldImage()); - ExtendJson(dynamodb["OldImage"], keys); - } - - if (!keysOnly && body.HasNewImage()) { - ToAttributeValues(Schema, dynamodb["NewImage"], body.GetNewImage()); - ExtendJson(dynamodb["NewImage"], keys); - } - - switch (body.GetRowOperationCase()) { - case NKikimrChangeExchange::TDataChange::kUpsert: - case NKikimrChangeExchange::TDataChange::kReset: - if (newAndOldImages) { - json["eventName"] = body.HasOldImage() ? "MODIFY" : "INSERT"; - } else { - json["eventName"] = "MODIFY"; - } - break; - case NKikimrChangeExchange::TDataChange::kErase: - json["eventName"] = "REMOVE"; - break; - default: - Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase())); - } -} - -void TChangeRecord::SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::TJsonValue& valueJson, TUserTable::TCdcStream::EMode streamMode) const { - Y_ABORT_UNLESS(Kind == EKind::CdcDataChange); - Y_ABORT_UNLESS(Schema); - - const auto body = ParseBody(Body); - - keyJson["payload"].SetType(NJson::JSON_MAP); - SerializeJsonValue(Schema, keyJson["payload"], body.GetKey()); // Debezium expects key in the same format as values - - valueJson["payload"].SetType(NJson::JSON_MAP); - // payload.before. Optional - if (body.HasOldImage()) { - SerializeJsonValue(Schema, valueJson["payload"]["before"], body.GetOldImage()); - MergeJsonMaps(valueJson["payload"]["before"], keyJson["payload"]); // Debezium expects key included in value - } - - // payload.after. Optional - if (body.HasNewImage()) { - SerializeJsonValue(Schema, valueJson["payload"]["after"], body.GetNewImage()); - MergeJsonMaps(valueJson["payload"]["after"], keyJson["payload"]); // Debezium expects key included in value - } - - // payload.op. Mandatory - if (Source == ESource::InitialScan) { - valueJson["payload"]["op"] = "r"; // r = read - } else { - switch (body.GetRowOperationCase()) { - case NKikimrChangeExchange::TDataChange::kUpsert: - case NKikimrChangeExchange::TDataChange::kReset: - if (streamMode == TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages) { - valueJson["payload"]["op"] = body.HasOldImage() ? "u" : "c"; // c = create - } else { - valueJson["payload"]["op"] = "u"; // u = update - } - break; - case NKikimrChangeExchange::TDataChange::kErase: - valueJson["payload"]["op"] = "d"; // d = delete - break; - default: - Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase())); - } - } - - // payload.source. Mandatory. - valueJson["payload"]["source"] = NJson::TJsonMap({ - {"version", "1.0.0"}, - {"connector", "ydb"}, - {"ts_ms", GetApproximateCreationDateTime().MilliSeconds()}, // payload.ts_ms has no sense - {"snapshot", Source == ESource::InitialScan}, - {"step", Step}, - {"txId", TxId}, - // TODO: db & table - }); -} - TConstArrayRef<TCell> TChangeRecord::GetKey() const { if (Key) { return *Key; @@ -436,33 +64,6 @@ i64 TChangeRecord::GetSeqNo() const { return static_cast<i64>(Order); } -TString TChangeRecord::GetPartitionKey() const { - if (PartitionKey) { - return *PartitionKey; - } - - switch (Kind) { - case EKind::CdcDataChange: { - Y_ABORT_UNLESS(Schema); - const auto body = ParseBody(Body); - - NJson::TJsonValue key; - SerializeJsonKey(Schema, key, body.GetKey()); - - PartitionKey.ConstructInPlace(MD5::Calc(WriteJson(key, false))); - break; - } - - case EKind::CdcHeartbeat: - case EKind::AsyncIndex: { - Y_ABORT("Not supported"); - } - } - - Y_ABORT_UNLESS(PartitionKey); - return *PartitionKey; -} - TInstant TChangeRecord::GetApproximateCreationDateTime() const { return GetGroup() ? TInstant::FromValue(GetGroup()) diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h index abe5cb6d75..e287ee7260 100644 --- a/ydb/core/tx/datashard/change_record.h +++ b/ydb/core/tx/datashard/change_record.h @@ -2,8 +2,6 @@ #include "datashard_user_table.h" -#include <library/cpp/json/json_value.h> - #include <ydb/core/scheme/scheme_pathid.h> #include <ydb/core/scheme/scheme_tablecell.h> @@ -33,12 +31,6 @@ public: CdcHeartbeat, }; - struct TAwsJsonOptions { - TString AwsRegion; - NKikimrSchemeOp::ECdcStreamMode StreamMode; - ui64 ShardId; - }; - public: ui64 GetOrder() const { return Order; } ui64 GetGroup() const { return Group; } @@ -53,11 +45,9 @@ public: const TPathId& GetTableId() const { return TableId; } ui64 GetSchemaVersion() const { return SchemaVersion; } + TUserTable::TCPtr GetSchema() const { return Schema; } - void SerializeToProto(NKikimrChangeExchange::TChangeRecord& record) const; - void SerializeToYdbJson(NJson::TJsonValue& json, bool virtualTimestamps) const; - void SerializeToDynamoDBStreamsJson(NJson::TJsonValue& json, const TAwsJsonOptions& opts) const; - void SerializeToDebeziumJson(NJson::TJsonValue& keyJson, NJson::TJsonValue& valueJson, TUserTable::TCdcStream::EMode streamMode) const; + void Serialize(NKikimrChangeExchange::TChangeRecord& record) const; TConstArrayRef<TCell> GetKey() const; i64 GetSeqNo() const; diff --git a/ydb/core/tx/datashard/change_record_cdc_serializer.cpp b/ydb/core/tx/datashard/change_record_cdc_serializer.cpp new file mode 100644 index 0000000000..fb6f60833a --- /dev/null +++ b/ydb/core/tx/datashard/change_record_cdc_serializer.cpp @@ -0,0 +1,551 @@ +#include "change_record_cdc_serializer.h" +#include "change_record.h" +#include "export_common.h" + +#include <ydb/core/protos/change_exchange.pb.h> +#include <ydb/core/protos/grpc_pq_old.pb.h> +#include <ydb/core/protos/msgbus_pq.pb.h> +#include <ydb/library/binary_json/read.h> +#include <ydb/library/yverify_stream/yverify_stream.h> + +#include <library/cpp/digest/md5/md5.h> +#include <library/cpp/json/json_reader.h> +#include <library/cpp/json/json_writer.h> +#include <library/cpp/json/yson/json2yson.h> +#include <library/cpp/string_utils/base64/base64.h> + +#include <util/stream/str.h> +#include <util/string/printf.h> + +namespace NKikimr::NDataShard { + +class TBaseSerializer: public IChangeRecordSerializer { + static NKikimrPQClient::TDataChunk MakeDataChunk() { + NKikimrPQClient::TDataChunk data; + data.SetCodec(0 /* CODEC_RAW */); + return data; + } + + void SerializeDataChange(TCmdWrite& cmd, const TChangeRecord& record) { + auto data = MakeDataChunk(); + FillDataChunk(data, record); + cmd.SetData(data.SerializeAsString()); + } + + void SerializeHeartbeat(TCmdWrite& cmd, const TChangeRecord& record) { + auto data = MakeDataChunk(); + FillDataChunk(data, record); + + auto& heartbeat = *cmd.MutableHeartbeat(); + heartbeat.SetStep(record.GetStep()); + heartbeat.SetTxId(record.GetTxId()); + heartbeat.SetData(data.SerializeAsString()); + } + +protected: + virtual void FillDataChunk(NKikimrPQClient::TDataChunk& chunk, const TChangeRecord& record) = 0; + +public: + explicit TBaseSerializer(const TChangeRecordSerializerOpts& opts) + : Opts(opts) + {} + + void Serialize(TCmdWrite& cmd, const TChangeRecord& record) override { + cmd.SetSeqNo(record.GetSeqNo()); + cmd.SetCreateTimeMS(record.GetApproximateCreationDateTime().MilliSeconds()); + switch (record.GetKind()) { + case TChangeRecord::EKind::CdcDataChange: + return SerializeDataChange(cmd, record); + case TChangeRecord::EKind::CdcHeartbeat: + return SerializeHeartbeat(cmd, record); + case TChangeRecord::EKind::AsyncIndex: + Y_ABORT("Unexpected"); + } + } + +protected: + const TChangeRecordSerializerOpts Opts; + +}; // TBaseSerializer + +class TProtoSerializer: public TBaseSerializer { +protected: + void FillDataChunk(NKikimrPQClient::TDataChunk& data, const TChangeRecord& record) override { + NKikimrChangeExchange::TChangeRecord proto; + record.Serialize(proto); + data.SetData(proto.SerializeAsString()); + } + +public: + using TBaseSerializer::TBaseSerializer; + +}; // TProtoSerializer + +class TJsonSerializer: public TBaseSerializer { + friend class TChangeRecord; // used in GetPartitionKey() + + static NJson::TJsonWriterConfig DefaultJsonConfig() { + NJson::TJsonWriterConfig jsonConfig; + jsonConfig.ValidateUtf8 = false; + jsonConfig.WriteNanAsString = true; + return jsonConfig; + } + +protected: + static auto ParseBody(const TString& protoBody) { + NKikimrChangeExchange::TDataChange body; + Y_ABORT_UNLESS(body.ParseFromArray(protoBody.data(), protoBody.size())); + return body; + } + + static NJson::TJsonValue StringToJson(TStringBuf in) { + NJson::TJsonValue result; + Y_ABORT_UNLESS(NJson::ReadJsonTree(in, &result)); + return result; + } + + static NJson::TJsonValue YsonToJson(TStringBuf in) { + NJson::TJsonValue result; + Y_ABORT_UNLESS(NJson2Yson::DeserializeYsonAsJsonValue(in, &result)); + return result; + } + + static NJson::TJsonValue ToJson(const TCell& cell, NScheme::TTypeInfo type) { + if (cell.IsNull()) { + return NJson::TJsonValue(NJson::JSON_NULL); + } + + switch (type.GetTypeId()) { + case NScheme::NTypeIds::Bool: + return NJson::TJsonValue(cell.AsValue<bool>()); + case NScheme::NTypeIds::Int8: + return NJson::TJsonValue(cell.AsValue<i8>()); + case NScheme::NTypeIds::Uint8: + return NJson::TJsonValue(cell.AsValue<ui8>()); + case NScheme::NTypeIds::Int16: + return NJson::TJsonValue(cell.AsValue<i16>()); + case NScheme::NTypeIds::Uint16: + return NJson::TJsonValue(cell.AsValue<ui16>()); + case NScheme::NTypeIds::Int32: + return NJson::TJsonValue(cell.AsValue<i32>()); + case NScheme::NTypeIds::Uint32: + return NJson::TJsonValue(cell.AsValue<ui32>()); + case NScheme::NTypeIds::Int64: + return NJson::TJsonValue(cell.AsValue<i64>()); + case NScheme::NTypeIds::Uint64: + return NJson::TJsonValue(cell.AsValue<ui64>()); + case NScheme::NTypeIds::Float: + return NJson::TJsonValue(cell.AsValue<float>()); + case NScheme::NTypeIds::Double: + return NJson::TJsonValue(cell.AsValue<double>()); + case NScheme::NTypeIds::Date: + return NJson::TJsonValue(TInstant::Days(cell.AsValue<ui16>()).ToString()); + case NScheme::NTypeIds::Datetime: + return NJson::TJsonValue(TInstant::Seconds(cell.AsValue<ui32>()).ToString()); + case NScheme::NTypeIds::Timestamp: + return NJson::TJsonValue(TInstant::MicroSeconds(cell.AsValue<ui64>()).ToString()); + case NScheme::NTypeIds::Interval: + return NJson::TJsonValue(cell.AsValue<i64>()); + case NScheme::NTypeIds::Decimal: + return NJson::TJsonValue(DecimalToString(cell.AsValue<std::pair<ui64, i64>>())); + case NScheme::NTypeIds::DyNumber: + return NJson::TJsonValue(DyNumberToString(cell.AsBuf())); + case NScheme::NTypeIds::String: + case NScheme::NTypeIds::String4k: + case NScheme::NTypeIds::String2m: + return NJson::TJsonValue(Base64Encode(cell.AsBuf())); + case NScheme::NTypeIds::Utf8: + return NJson::TJsonValue(cell.AsBuf()); + case NScheme::NTypeIds::Json: + return StringToJson(cell.AsBuf()); + case NScheme::NTypeIds::JsonDocument: + return StringToJson(NBinaryJson::SerializeToJson(cell.AsBuf())); + case NScheme::NTypeIds::Yson: + return YsonToJson(cell.AsBuf()); + case NScheme::NTypeIds::Pg: + // TODO: support pg types + Y_ABORT("pg types are not supported"); + default: + Y_ABORT("Unexpected type"); + } + } + + static void SerializeJsonKey(TUserTable::TCPtr schema, NJson::TJsonValue& key, + const NKikimrChangeExchange::TDataChange::TSerializedCells& in) + { + Y_ABORT_UNLESS(in.TagsSize() == schema->KeyColumnIds.size()); + for (size_t i = 0; i < schema->KeyColumnIds.size(); ++i) { + Y_ABORT_UNLESS(in.GetTags(i) == schema->KeyColumnIds.at(i)); + } + + TSerializedCellVec cells; + Y_ABORT_UNLESS(TSerializedCellVec::TryParse(in.GetData(), cells)); + + Y_ABORT_UNLESS(cells.GetCells().size() == schema->KeyColumnTypes.size()); + for (size_t i = 0; i < schema->KeyColumnTypes.size(); ++i) { + const auto type = schema->KeyColumnTypes.at(i); + const auto& cell = cells.GetCells().at(i); + key.AppendValue(ToJson(cell, type)); + } + } + + static void SerializeJsonValue(TUserTable::TCPtr schema, NJson::TJsonValue& value, + const NKikimrChangeExchange::TDataChange::TSerializedCells& in) + { + TSerializedCellVec cells; + Y_ABORT_UNLESS(TSerializedCellVec::TryParse(in.GetData(), cells)); + Y_ABORT_UNLESS(in.TagsSize() == cells.GetCells().size()); + + for (ui32 i = 0; i < in.TagsSize(); ++i) { + const auto tag = in.GetTags(i); + const auto& cell = cells.GetCells().at(i); + + auto it = schema->Columns.find(tag); + Y_ABORT_UNLESS(it != schema->Columns.end()); + + const auto& column = it->second; + value.InsertValue(column.Name, ToJson(cell, column.Type)); + } + } + + static void ExtendJson(NJson::TJsonValue& value, const NJson::TJsonValue& ext) { + Y_ABORT_UNLESS(ext.GetType() == NJson::JSON_MAP); + for (const auto& [k, v] : ext.GetMapSafe()) { + value.InsertValue(k, v); + } + } + + static void SerializeVirtualTimestamp(NJson::TJsonValue& value, std::initializer_list<ui64> vt) { + for (auto v : vt) { + value.AppendValue(v); + } + } + + void FillDataChunk(NKikimrPQClient::TDataChunk& data, const TChangeRecord& record) override { + NJson::TJsonValue json; + SerializeToJson(json, record); + + TStringStream str; + NJson::WriteJson(&str, &json, JsonConfig); + data.SetData(str.Str()); + } + + virtual void SerializeToJson(NJson::TJsonValue& json, const TChangeRecord& record) = 0; + +public: + explicit TJsonSerializer(const TChangeRecordSerializerOpts& opts) + : TBaseSerializer(opts) + , JsonConfig(DefaultJsonConfig()) + {} + + void Serialize(TCmdWrite& cmd, const TChangeRecord& record) override { + TBaseSerializer::Serialize(cmd, record); + if (record.GetKind() == TChangeRecord::EKind::CdcDataChange) { + cmd.SetPartitionKey(record.GetPartitionKey()); + } + } + +protected: + const NJson::TJsonWriterConfig JsonConfig; + +}; // TJsonSerializer + +class TYdbJsonSerializer: public TJsonSerializer { +protected: + void SerializeToJson(NJson::TJsonValue& json, const TChangeRecord& record) override { + if (record.GetKind() == TChangeRecord::EKind::CdcHeartbeat) { + return SerializeVirtualTimestamp(json["resolved"], {record.GetStep(), record.GetTxId()}); + } + + Y_ABORT_UNLESS(record.GetKind() == TChangeRecord::EKind::CdcDataChange); + Y_ABORT_UNLESS(record.GetSchema()); + + const auto body = ParseBody(record.GetBody()); + SerializeJsonKey(record.GetSchema(), json["key"], body.GetKey()); + + if (body.HasOldImage()) { + SerializeJsonValue(record.GetSchema(), json["oldImage"], body.GetOldImage()); + } + + if (body.HasNewImage()) { + SerializeJsonValue(record.GetSchema(), json["newImage"], body.GetNewImage()); + } + + const auto hasAnyImage = body.HasOldImage() || body.HasNewImage(); + switch (body.GetRowOperationCase()) { + case NKikimrChangeExchange::TDataChange::kUpsert: + json["update"].SetType(NJson::JSON_MAP); + if (!hasAnyImage) { + SerializeJsonValue(record.GetSchema(), json["update"], body.GetUpsert()); + } + break; + case NKikimrChangeExchange::TDataChange::kReset: + json["reset"].SetType(NJson::JSON_MAP); + if (!hasAnyImage) { + SerializeJsonValue(record.GetSchema(), json["reset"], body.GetReset()); + } + break; + case NKikimrChangeExchange::TDataChange::kErase: + json["erase"].SetType(NJson::JSON_MAP); + break; + default: + Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase())); + } + + if (Opts.VirtualTimestamps) { + SerializeVirtualTimestamp(json["ts"], {record.GetStep(), record.GetTxId()}); + } + } + +public: + using TJsonSerializer::TJsonSerializer; + +}; // TYdbJsonSerializer + +class TDynamoDBStreamsJsonSerializer: public TJsonSerializer { + static void ToAttributeValues(TUserTable::TCPtr schema, NJson::TJsonValue& value, + const NKikimrChangeExchange::TDataChange::TSerializedCells& in) + { + TSerializedCellVec cells; + Y_ABORT_UNLESS(TSerializedCellVec::TryParse(in.GetData(), cells)); + Y_ABORT_UNLESS(in.TagsSize() == cells.GetCells().size()); + + for (ui32 i = 0; i < in.TagsSize(); ++i) { + const auto tag = in.GetTags(i); + const auto& cell = cells.GetCells().at(i); + + if (cell.IsNull()) { + continue; + } + + auto it = schema->Columns.find(tag); + Y_ABORT_UNLESS(it != schema->Columns.end()); + + const auto& column = it->second; + const auto& name = column.Name; + const auto type = column.Type.GetTypeId(); + + if (name == "__Hash" || name == "__CreatedAt") { + continue; // hidden column + } else if (name.StartsWith("__Hash_")) { + bool indexed = false; + for (const auto& [_, index] : schema->Indexes) { + Y_ABORT_UNLESS(index.KeyColumnIds.size() >= 1); + if (index.KeyColumnIds.at(0) == tag) { + indexed = true; + break; + } + } + if (indexed) { + continue; // index hash column + } + } else if (name == "__RowData") { + Y_DEBUG_ABORT_UNLESS(type == NScheme::NTypeIds::JsonDocument); + const auto rowData = StringToJson(NBinaryJson::SerializeToJson(cell.AsBuf())); + if (rowData.GetType() == NJson::JSON_MAP) { + auto map = rowData.GetMapSafe().find("M"); + if (map != rowData.GetMapSafe().end()) { + if (map->second.GetType() == NJson::JSON_MAP) { + ExtendJson(value, map->second); + } + } + } + } + + if (type == NScheme::NTypeIds::Bool) { + value.InsertValue(name, NJson::TJsonMap({{"BOOL", cell.AsValue<bool>()}})); + } else if (type == NScheme::NTypeIds::DyNumber) { + value.InsertValue(name, NJson::TJsonMap({{"N", DyNumberToString(cell.AsBuf())}})); + } else if (type == NScheme::NTypeIds::String) { + value.InsertValue(name, NJson::TJsonMap({{"B", Base64Encode(cell.AsBuf())}})); + } else if (type == NScheme::NTypeIds::Utf8) { + value.InsertValue(name, NJson::TJsonMap({{"S", cell.AsBuf()}})); + } + } + } + +protected: + void SerializeToJson(NJson::TJsonValue& json, const TChangeRecord& record) override { + Y_ABORT_UNLESS(record.GetKind() == TChangeRecord::EKind::CdcDataChange); + Y_ABORT_UNLESS(record.GetSchema()); + + json = NJson::TJsonMap({ + {"awsRegion", Opts.AwsRegion}, + {"dynamodb", NJson::TJsonMap({ + {"ApproximateCreationDateTime", record.GetApproximateCreationDateTime().MilliSeconds()}, + {"SequenceNumber", Sprintf("%0*" PRIi64, 21 /* min length */, record.GetSeqNo())}, + })}, + {"eventID", Sprintf("%" PRIu64 "-%" PRIi64, Opts.ShardId, record.GetSeqNo())}, + {"eventSource", "ydb:document-table"}, + {"eventVersion", "1.0"}, + }); + + auto& dynamodb = json["dynamodb"]; + const auto body = ParseBody(record.GetBody()); + + bool keysOnly = false; + bool newAndOldImages = false; + switch (Opts.StreamMode) { + case TUserTable::TCdcStream::EMode::ECdcStreamModeNewImage: + dynamodb["StreamViewType"] = "NEW_IMAGE"; + break; + case TUserTable::TCdcStream::EMode::ECdcStreamModeOldImage: + dynamodb["StreamViewType"] = "OLD_IMAGE"; + break; + case TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages: + dynamodb["StreamViewType"] = "NEW_AND_OLD_IMAGES"; + newAndOldImages = true; + break; + default: + dynamodb["StreamViewType"] = "KEYS_ONLY"; + keysOnly = true; + break; + } + + NJson::TJsonMap keys; + ToAttributeValues(record.GetSchema(), keys, body.GetKey()); + dynamodb["Keys"] = keys; + + if (!keysOnly && body.HasOldImage()) { + ToAttributeValues(record.GetSchema(), dynamodb["OldImage"], body.GetOldImage()); + ExtendJson(dynamodb["OldImage"], keys); + } + + if (!keysOnly && body.HasNewImage()) { + ToAttributeValues(record.GetSchema(), dynamodb["NewImage"], body.GetNewImage()); + ExtendJson(dynamodb["NewImage"], keys); + } + + switch (body.GetRowOperationCase()) { + case NKikimrChangeExchange::TDataChange::kUpsert: + case NKikimrChangeExchange::TDataChange::kReset: + if (newAndOldImages) { + json["eventName"] = body.HasOldImage() ? "MODIFY" : "INSERT"; + } else { + json["eventName"] = "MODIFY"; + } + break; + case NKikimrChangeExchange::TDataChange::kErase: + json["eventName"] = "REMOVE"; + break; + default: + Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase())); + } + } + +public: + using TJsonSerializer::TJsonSerializer; + +}; // TDynamoDBStreamsJsonSerializer + +class TDebeziumJsonSerializer: public TJsonSerializer { +protected: + void SerializeToJson(NJson::TJsonValue& json, const TChangeRecord& record) override { + Y_ABORT_UNLESS(record.GetKind() == TChangeRecord::EKind::CdcDataChange); + Y_ABORT_UNLESS(record.GetSchema()); + + const auto body = ParseBody(record.GetBody()); + auto& keyJson = json["key"]; + auto& valueJson = json["value"]; + + keyJson["payload"].SetType(NJson::JSON_MAP); + SerializeJsonValue(record.GetSchema(), keyJson["payload"], body.GetKey()); + + valueJson["payload"].SetType(NJson::JSON_MAP); + + if (body.HasOldImage()) { + SerializeJsonValue(record.GetSchema(), valueJson["payload"]["before"], body.GetOldImage()); + ExtendJson(valueJson["payload"]["before"], keyJson["payload"]); + } + + if (body.HasNewImage()) { + SerializeJsonValue(record.GetSchema(), valueJson["payload"]["after"], body.GetNewImage()); + ExtendJson(valueJson["payload"]["after"], keyJson["payload"]); + } + + if (record.GetSource() == TChangeRecord::ESource::InitialScan) { + valueJson["payload"]["op"] = "r"; // r = read + } else { + switch (body.GetRowOperationCase()) { + case NKikimrChangeExchange::TDataChange::kUpsert: + case NKikimrChangeExchange::TDataChange::kReset: + if (Opts.StreamMode == TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages) { + valueJson["payload"]["op"] = body.HasOldImage() ? "u" : "c"; // c = create + } else { + valueJson["payload"]["op"] = "u"; // u = update + } + break; + case NKikimrChangeExchange::TDataChange::kErase: + valueJson["payload"]["op"] = "d"; // d = delete + break; + default: + Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase())); + } + } + + valueJson["payload"]["source"] = NJson::TJsonMap({ + {"version", "1.0.0"}, + {"connector", "ydb"}, + {"ts_ms", record.GetApproximateCreationDateTime().MilliSeconds()}, + {"snapshot", record.GetSource() == TChangeRecord::ESource::InitialScan}, + {"step", record.GetStep()}, + {"txId", record.GetTxId()}, + // TODO: db & table + }); + } + + void FillDataChunk(NKikimrPQClient::TDataChunk& data, const TChangeRecord& record) override { + NJson::TJsonValue json; + SerializeToJson(json, record); + { + TStringStream str; + NJson::WriteJson(&str, &json["key"], JsonConfig); + auto& messageMeta = *data.AddMessageMeta(); + messageMeta.set_key("__key"); + messageMeta.set_value(str.Str()); + } + { + TStringStream str; + NJson::WriteJson(&str, &json["value"], JsonConfig); + data.SetData(str.Str()); + } + } + +public: + using TJsonSerializer::TJsonSerializer; + +}; // TDebeziumJsonSerializer + +IChangeRecordSerializer* CreateChangeRecordSerializer(const TChangeRecordSerializerOpts& opts) { + switch (opts.StreamFormat) { + case TUserTable::TCdcStream::EFormat::ECdcStreamFormatProto: + return new TProtoSerializer(opts); + case TUserTable::TCdcStream::EFormat::ECdcStreamFormatJson: + return new TYdbJsonSerializer(opts); + case TUserTable::TCdcStream::EFormat::ECdcStreamFormatDynamoDBStreamsJson: + return new TDynamoDBStreamsJsonSerializer(opts); + case TUserTable::TCdcStream::EFormat::ECdcStreamFormatDebeziumJson: + return new TDebeziumJsonSerializer(opts); + default: + Y_ABORT("Unsupported format"); + } +} + +TString TChangeRecord::GetPartitionKey() const { + if (PartitionKey) { + return *PartitionKey; + } + + Y_ABORT_UNLESS(Kind == EKind::CdcDataChange); + Y_ABORT_UNLESS(Schema); + + const auto body = TJsonSerializer::ParseBody(Body); + + NJson::TJsonValue key; + TJsonSerializer::SerializeJsonKey(Schema, key, body.GetKey()); + + PartitionKey.ConstructInPlace(MD5::Calc(WriteJson(key, false))); + return *PartitionKey; +} + +} diff --git a/ydb/core/tx/datashard/change_record_cdc_serializer.h b/ydb/core/tx/datashard/change_record_cdc_serializer.h new file mode 100644 index 0000000000..e7cac6a3f2 --- /dev/null +++ b/ydb/core/tx/datashard/change_record_cdc_serializer.h @@ -0,0 +1,32 @@ +#pragma once + +#include "datashard_user_table.h" + +namespace NKikimrClient { + class TPersQueuePartitionRequest_TCmdWrite; +} + +namespace NKikimr::NDataShard { + +class TChangeRecord; + +class IChangeRecordSerializer { +protected: + using TCmdWrite = NKikimrClient::TPersQueuePartitionRequest_TCmdWrite; + +public: + virtual ~IChangeRecordSerializer() = default; + virtual void Serialize(TCmdWrite& cmd, const TChangeRecord& record) = 0; +}; + +struct TChangeRecordSerializerOpts { + TUserTable::TCdcStream::EFormat StreamFormat; + TUserTable::TCdcStream::EMode StreamMode; + TString AwsRegion; + bool VirtualTimestamps = false; + ui64 ShardId = 0; +}; + +IChangeRecordSerializer* CreateChangeRecordSerializer(const TChangeRecordSerializerOpts& opts); + +} diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 7ac5240242..b42482229f 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -131,7 +131,7 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS } auto& proto = *records->Record.AddRecords(); - record.SerializeToProto(proto); + record.Serialize(proto); Adjust(proto); } diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 358a1517bc..b5df5c2783 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -1,5 +1,6 @@ #include "change_exchange.h" #include "change_exchange_impl.h" +#include "change_record_cdc_serializer.h" #include "change_sender_common_ops.h" #include "change_sender_monitoring.h" #include "datashard_user_table.h" @@ -7,7 +8,6 @@ #include <ydb/core/persqueue/partition_key_range/partition_key_range.h> #include <ydb/core/persqueue/writer/source_id_encoding.h> #include <ydb/core/persqueue/writer/writer.h> -#include <ydb/core/protos/grpc_pq_old.pb.h> #include <ydb/services/lib/sharding/sharding.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -86,103 +86,19 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti } } - static NJson::TJsonWriterConfig DefaultJsonConfig() { - NJson::TJsonWriterConfig jsonConfig; - jsonConfig.ValidateUtf8 = false; - jsonConfig.WriteNanAsString = true; - return jsonConfig; - } - void Handle(TEvChangeExchange::TEvRecords::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); NKikimrClient::TPersQueueRequest request; - const auto awsJsonOpts = TChangeRecord::TAwsJsonOptions{ - .AwsRegion = Stream.AwsRegion.GetOrElse(AppData()->AwsCompatibilityConfig.GetAwsRegion()), - .StreamMode = Stream.Mode, - .ShardId = DataShard.TabletId, - }; - for (const auto& record : ev->Get()->Records) { if (record.GetSeqNo() <= MaxSeqNo) { continue; } auto& cmd = *request.MutablePartitionRequest()->AddCmdWrite(); - cmd.SetSeqNo(record.GetSeqNo()); cmd.SetSourceId(NSourceIdEncoding::EncodeSimple(SourceId)); - cmd.SetCreateTimeMS(record.GetApproximateCreationDateTime().MilliSeconds()); cmd.SetIgnoreQuotaDeadline(true); - - NKikimrPQClient::TDataChunk data; - data.SetCodec(0 /* CODEC_RAW */); - - switch (Stream.Format) { - case NKikimrSchemeOp::ECdcStreamFormatProto: { - NKikimrChangeExchange::TChangeRecord protoRecord; - record.SerializeToProto(protoRecord); - data.SetData(protoRecord.SerializeAsString()); - cmd.SetData(data.SerializeAsString()); - break; - } - - case NKikimrSchemeOp::ECdcStreamFormatJson: - case NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson: { - NJson::TJsonValue json; - if (Stream.Format == NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson) { - record.SerializeToDynamoDBStreamsJson(json, awsJsonOpts); - } else { - record.SerializeToYdbJson(json, Stream.VirtualTimestamps); - } - - TStringStream str; - WriteJson(&str, &json, DefaultJsonConfig()); - data.SetData(str.Str()); - - if (record.GetKind() == TChangeRecord::EKind::CdcDataChange) { - cmd.SetData(data.SerializeAsString()); - cmd.SetPartitionKey(record.GetPartitionKey()); - } else if (record.GetKind() == TChangeRecord::EKind::CdcHeartbeat) { - auto& heartbeat = *cmd.MutableHeartbeat(); - heartbeat.SetStep(record.GetStep()); - heartbeat.SetTxId(record.GetTxId()); - heartbeat.SetData(data.SerializeAsString()); - } else { - Y_FAIL_S("Unexpected cdc record" - << ": kind# " << record.GetKind()); - } - break; - } - - case NKikimrSchemeOp::ECdcStreamFormatDebeziumJson: { - NJson::TJsonValue keyJson; - NJson::TJsonValue valueJson; - record.SerializeToDebeziumJson(keyJson, valueJson, Stream.Mode); - - TStringStream keyStr; - WriteJson(&keyStr, &keyJson, DefaultJsonConfig()); - - TStringStream valueStr; - WriteJson(&valueStr, &valueJson, DefaultJsonConfig()); - - // Add key in the same way as Kafka integration does - auto messageMeta = data.AddMessageMeta(); - messageMeta->set_key("__key"); // Kafka integration stores kafka key in "__key" metadata - messageMeta->set_value(keyStr.Str()); - - // Add value - data.SetData(valueStr.Str()); - cmd.SetData(data.SerializeAsString()); - cmd.SetPartitionKey(record.GetPartitionKey()); - break; - } - - default: { - LOG_E("Unknown format" - << ": format# " << static_cast<int>(Stream.Format)); - return Leave(); - } - } + Serializer->Serialize(cmd, record); Pending.push_back(record.GetSeqNo()); } @@ -327,8 +243,14 @@ public: , DataShard(dataShard) , PartitionId(partitionId) , ShardId(shardId) - , Stream(stream) , SourceId(ToString(DataShard.TabletId)) + , Serializer(CreateChangeRecordSerializer({ + .StreamFormat = stream.Format, + .StreamMode = stream.Mode, + .AwsRegion = stream.AwsRegion.GetOrElse(AppData()->AwsCompatibilityConfig.GetAwsRegion()), + .VirtualTimestamps = stream.VirtualTimestamps, + .ShardId = DataShard.TabletId, + })) { } @@ -350,8 +272,8 @@ private: const TDataShardId DataShard; const ui32 PartitionId; const ui64 ShardId; - const TUserTable::TCdcStream Stream; const TString SourceId; + THolder<IChangeRecordSerializer> Serializer; mutable TMaybe<TString> LogPrefix; TActorId Writer; diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index 9c191545b6..ff0be5f912 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -22,6 +22,7 @@ SRCS( change_exchange_split.cpp change_record.cpp change_record_body_serializer.cpp + change_record_cdc_serializer.cpp change_sender.cpp change_sender_async_index.cpp change_sender_cdc_stream.cpp |