diff options
author | ilnaz <[email protected]> | 2023-05-12 23:14:49 +0300 |
---|---|---|
committer | ilnaz <[email protected]> | 2023-05-12 23:14:49 +0300 |
commit | 44241885851a00105e467af090e06be035ed015e (patch) | |
tree | 1c99a42d6e1dbec3bb5e23a207844ded14ac1c49 | |
parent | 4c8d0ad1db6cd949eba0a9ce79b36ee1348f65f8 (diff) |
Document table cdc records
19 files changed, 560 insertions, 104 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 895d6040db1..6f2009058cc 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -715,6 +715,7 @@ enum ECdcStreamFormat { ECdcStreamFormatInvalid = 0; ECdcStreamFormatProto = 1; ECdcStreamFormatJson = 2; + ECdcStreamFormatDocApiJson = 3; } message TCdcStreamDescription { @@ -726,6 +727,8 @@ message TCdcStreamDescription { optional ECdcStreamState State = 4; optional uint64 SchemaVersion = 5; repeated TUserAttribute UserAttributes = 8; + // AwsRegion used to mark records in DynamoDB-compatible mode (FormatDocApiJson) + optional string AwsRegion = 9; } message TCreateCdcStream { diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp index cba5084194c..14b20389468 100644 --- a/ydb/core/tx/datashard/change_record.cpp +++ b/ydb/core/tx/datashard/change_record.cpp @@ -12,10 +12,11 @@ #include <ydb/library/binary_json/read.h> #include <util/stream/str.h> +#include <util/string/printf.h> namespace NKikimr::NDataShard { -void TChangeRecord::SerializeTo(NKikimrChangeExchange::TChangeRecord& record) const { +void TChangeRecord::SerializeToProto(NKikimrChangeExchange::TChangeRecord& record) const { record.SetOrder(Order); record.SetGroup(Group); record.SetStep(Step); @@ -35,6 +36,12 @@ void TChangeRecord::SerializeTo(NKikimrChangeExchange::TChangeRecord& record) co } } +static auto ParseBody(const TString& protoBody) { + NKikimrChangeExchange::TChangeRecord::TDataChange body; + Y_VERIFY(body.ParseFromArray(protoBody.data(), protoBody.size())); + return body; +} + static NJson::TJsonValue StringToJson(TStringBuf in) { NJson::TJsonValue result; Y_VERIFY(NJson::ReadJsonTree(in, &result)); @@ -145,60 +152,185 @@ static void SerializeJsonValue(TUserTable::TCPtr schema, NJson::TJsonValue& valu } } -void TChangeRecord::SerializeTo(NJson::TJsonValue& json, bool virtualTimestamps) const { - switch (Kind) { - case EKind::CdcDataChange: { - Y_VERIFY(Schema); +void TChangeRecord::SerializeToYdbJson(NJson::TJsonValue& json, bool virtualTimestamps) const { + Y_VERIFY(Kind == EKind::CdcDataChange); + Y_VERIFY(Schema); - NKikimrChangeExchange::TChangeRecord::TDataChange body; - Y_VERIFY(body.ParseFromArray(Body.data(), Body.size())); + const auto body = ParseBody(Body); + SerializeJsonKey(Schema, json["key"], body.GetKey()); - SerializeJsonKey(Schema, json["key"], body.GetKey()); + if (body.HasOldImage()) { + SerializeJsonValue(Schema, json["oldImage"], body.GetOldImage()); + } - if (body.HasOldImage()) { - SerializeJsonValue(Schema, json["oldImage"], body.GetOldImage()); - } + if (body.HasNewImage()) { + SerializeJsonValue(Schema, json["newImage"], body.GetNewImage()); + } - if (body.HasNewImage()) { - SerializeJsonValue(Schema, json["newImage"], body.GetNewImage()); + const auto hasAnyImage = body.HasOldImage() || body.HasNewImage(); + switch (body.GetRowOperationCase()) { + case NKikimrChangeExchange::TChangeRecord::TDataChange::kUpsert: + json["update"].SetType(NJson::JSON_MAP); + if (!hasAnyImage) { + SerializeJsonValue(Schema, json["update"], body.GetUpsert()); } + break; + case NKikimrChangeExchange::TChangeRecord::TDataChange::kReset: + json["reset"].SetType(NJson::JSON_MAP); + if (!hasAnyImage) { + SerializeJsonValue(Schema, json["reset"], body.GetReset()); + } + break; + case NKikimrChangeExchange::TChangeRecord::TDataChange::kErase: + json["erase"].SetType(NJson::JSON_MAP); + break; + default: + Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase())); + } - const auto hasAnyImage = body.HasOldImage() || body.HasNewImage(); - switch (body.GetRowOperationCase()) { - case NKikimrChangeExchange::TChangeRecord::TDataChange::kUpsert: - json["update"].SetType(NJson::EJsonValueType::JSON_MAP); - if (!hasAnyImage) { - SerializeJsonValue(Schema, json["update"], body.GetUpsert()); - } - break; - case NKikimrChangeExchange::TChangeRecord::TDataChange::kReset: - json["reset"].SetType(NJson::EJsonValueType::JSON_MAP); - if (!hasAnyImage) { - SerializeJsonValue(Schema, json["reset"], body.GetReset()); - } - break; - case NKikimrChangeExchange::TChangeRecord::TDataChange::kErase: - json["erase"].SetType(NJson::EJsonValueType::JSON_MAP); + if (virtualTimestamps) { + for (auto v : {Step, TxId}) { + json["ts"].AppendValue(v); + } + } +} + +static void ExtendJson(NJson::TJsonValue& value, const NJson::TJsonValue& ext) { + Y_VERIFY(ext.GetType() == NJson::JSON_MAP); + for (const auto& [k, v] : ext.GetMapSafe()) { + value.InsertValue(k, v); + } +} + +static void ToAttributeValues(TUserTable::TCPtr schema, NJson::TJsonValue& value, + const NKikimrChangeExchange::TChangeRecord::TDataChange::TSerializedCells& in) +{ + TSerializedCellVec cells; + Y_VERIFY(TSerializedCellVec::TryParse(in.GetData(), cells)); + Y_VERIFY(in.TagsSize() == cells.GetCells().size()); + + for (ui32 i = 0; i < in.TagsSize(); ++i) { + const auto tag = in.GetTags(i); + const auto& cell = cells.GetCells().at(i); + + if (cell.IsNull()) { + continue; + } + + auto it = schema->Columns.find(tag); + Y_VERIFY(it != schema->Columns.end()); + + const auto& column = it->second; + const auto& name = column.Name; + const auto type = column.Type.GetTypeId(); + + if (name == "__Hash" || name == "__CreatedAt") { + continue; // hidden column + } else if (name.StartsWith("__Hash_")) { + bool indexed = false; + for (const auto& [_, index] : schema->Indexes) { + Y_VERIFY(index.KeyColumnIds.size() >= 1); + if (index.KeyColumnIds.at(0) == tag) { + indexed = true; break; - default: - Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase())); + } } - - if (virtualTimestamps) { - for (auto v : {Step, TxId}) { - json["ts"].AppendValue(v); + if (indexed) { + continue; // index hash column + } + } else if (name == "__RowData") { + Y_VERIFY_DEBUG(type == NScheme::NTypeIds::JsonDocument); + const auto rowData = StringToJson(NBinaryJson::SerializeToJson(cell.AsBuf())); + if (rowData.GetType() == NJson::JSON_MAP) { + auto map = rowData.GetMapSafe().find("M"); + if (map != rowData.GetMapSafe().end()) { + if (map->second.GetType() == NJson::JSON_MAP) { + ExtendJson(value, map->second); + } } } - - break; } - case EKind::AsyncIndex: { - Y_FAIL("Not supported"); + if (type == NScheme::NTypeIds::Bool) { + value.InsertValue(name, NJson::TJsonMap({{"BOOL", cell.AsValue<bool>()}})); + } else if (type == NScheme::NTypeIds::DyNumber) { + value.InsertValue(name, NJson::TJsonMap({{"N", DyNumberToString(cell.AsBuf())}})); + } else if (type == NScheme::NTypeIds::String) { + value.InsertValue(name, NJson::TJsonMap({{"B", Base64Encode(cell.AsBuf())}})); + } else if (type == NScheme::NTypeIds::Utf8) { + value.InsertValue(name, NJson::TJsonMap({{"S", cell.AsBuf()}})); } } } +void TChangeRecord::SerializeToDocApiJson(NJson::TJsonValue& json, const TDocApiJsonOptions& opts) const { + Y_VERIFY(Kind == EKind::CdcDataChange); + Y_VERIFY(Schema); + + json = NJson::TJsonMap({ + {"awsRegion", opts.AwsRegion}, + {"dynamodb", NJson::TJsonMap({ + {"ApproximateCreationDateTime", GetApproximateCreationDateTime().MilliSeconds()}, + {"SequenceNumber", Sprintf("%0*" PRIi64, 21 /* min length */, GetSeqNo())}, + })}, + {"eventID", Sprintf("%" PRIu64 "-%" PRIi64, opts.ShardId, GetSeqNo())}, + {"eventSource", "ydb:document-table"}, + {"eventVersion", "1.0"}, + }); + + auto& dynamodb = json["dynamodb"]; + const auto body = ParseBody(Body); + + bool keysOnly = false; + bool newAndOldImages = false; + switch (opts.StreamMode) { + case TUserTable::TCdcStream::EMode::ECdcStreamModeNewImage: + dynamodb["StreamViewType"] = "NEW_IMAGE"; + break; + case TUserTable::TCdcStream::EMode::ECdcStreamModeOldImage: + dynamodb["StreamViewType"] = "OLD_IMAGE"; + break; + case TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages: + dynamodb["StreamViewType"] = "NEW_AND_OLD_IMAGES"; + newAndOldImages = true; + break; + default: + dynamodb["StreamViewType"] = "KEYS_ONLY"; + keysOnly = true; + break; + } + + NJson::TJsonMap keys; + ToAttributeValues(Schema, keys, body.GetKey()); + dynamodb["Keys"] = keys; + + if (!keysOnly && body.HasOldImage()) { + ToAttributeValues(Schema, dynamodb["OldImage"], body.GetOldImage()); + ExtendJson(dynamodb["OldImage"], keys); + } + + if (!keysOnly && body.HasNewImage()) { + ToAttributeValues(Schema, dynamodb["NewImage"], body.GetNewImage()); + ExtendJson(dynamodb["NewImage"], keys); + } + + switch (body.GetRowOperationCase()) { + case NKikimrChangeExchange::TChangeRecord::TDataChange::kUpsert: + case NKikimrChangeExchange::TChangeRecord::TDataChange::kReset: + if (newAndOldImages) { + json["eventName"] = body.HasOldImage() ? "MODIFY" : "INSERT"; + } else { + json["eventName"] = "MODIFY"; + } + break; + case NKikimrChangeExchange::TChangeRecord::TDataChange::kErase: + json["eventName"] = "REMOVE"; + break; + default: + Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase())); + } +} + TConstArrayRef<TCell> TChangeRecord::GetKey() const { if (Key) { return *Key; @@ -207,8 +339,7 @@ TConstArrayRef<TCell> TChangeRecord::GetKey() const { switch (Kind) { case EKind::AsyncIndex: case EKind::CdcDataChange: { - NKikimrChangeExchange::TChangeRecord::TDataChange parsed; - Y_VERIFY(parsed.ParseFromArray(Body.data(), Body.size())); + const auto parsed = ParseBody(Body); TSerializedCellVec key; Y_VERIFY(TSerializedCellVec::TryParse(parsed.GetKey().GetData(), key)); @@ -235,9 +366,7 @@ TString TChangeRecord::GetPartitionKey() const { switch (Kind) { case EKind::CdcDataChange: { Y_VERIFY(Schema); - - NKikimrChangeExchange::TChangeRecord::TDataChange body; - Y_VERIFY(body.ParseFromArray(Body.data(), Body.size())); + const auto body = ParseBody(Body); NJson::TJsonValue key; SerializeJsonKey(Schema, key, body.GetKey()); @@ -255,6 +384,12 @@ TString TChangeRecord::GetPartitionKey() const { return *PartitionKey; } +TInstant TChangeRecord::GetApproximateCreationDateTime() const { + return GetGroup() + ? TInstant::FromValue(GetGroup()) + : TInstant::MilliSeconds(GetStep()); +} + TString TChangeRecord::ToString() const { TString result; TStringOutput out(result); diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h index decfa6dce6d..4668e2717c4 100644 --- a/ydb/core/tx/datashard/change_record.h +++ b/ydb/core/tx/datashard/change_record.h @@ -27,6 +27,12 @@ public: CdcDataChange, }; + struct TDocApiJsonOptions { + TString AwsRegion; + NKikimrSchemeOp::ECdcStreamMode StreamMode; + ui64 ShardId; + }; + public: ui64 GetOrder() const { return Order; } ui64 GetGroup() const { return Group; } @@ -41,12 +47,14 @@ public: const TPathId& GetTableId() const { return TableId; } ui64 GetSchemaVersion() const { return SchemaVersion; } - void SerializeTo(NKikimrChangeExchange::TChangeRecord& record) const; - void SerializeTo(NJson::TJsonValue& json, bool virtualTimestamps) const; + void SerializeToProto(NKikimrChangeExchange::TChangeRecord& record) const; + void SerializeToYdbJson(NJson::TJsonValue& json, bool virtualTimestamps) const; + void SerializeToDocApiJson(NJson::TJsonValue& json, const TDocApiJsonOptions& opts) const; TConstArrayRef<TCell> GetKey() const; i64 GetSeqNo() const; TString GetPartitionKey() const; + TInstant GetApproximateCreationDateTime() const; TString ToString() const; void Out(IOutputStream& out) const; diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 402a84e8495..99a25a8b593 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -118,7 +118,7 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS } auto& proto = *records->Record.AddRecords(); - record.SerializeTo(proto); + record.SerializeToProto(proto); Adjust(proto); } diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index f488f7b65e1..e1df72d2733 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -87,14 +87,10 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti continue; } - const auto createdAt = record.GetGroup() - ? TInstant::FromValue(record.GetGroup()) - : TInstant::MilliSeconds(record.GetStep()); - auto& cmd = *request.MutablePartitionRequest()->AddCmdWrite(); cmd.SetSeqNo(record.GetSeqNo()); cmd.SetSourceId(NSourceIdEncoding::EncodeSimple(SourceId)); - cmd.SetCreateTimeMS(createdAt.MilliSeconds()); + cmd.SetCreateTimeMS(record.GetApproximateCreationDateTime().MilliSeconds()); cmd.SetIgnoreQuotaDeadline(true); NKikimrPQClient::TDataChunk data; @@ -103,14 +99,23 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti switch (Stream.Format) { case NKikimrSchemeOp::ECdcStreamFormatProto: { NKikimrChangeExchange::TChangeRecord protoRecord; - record.SerializeTo(protoRecord); + record.SerializeToProto(protoRecord); data.SetData(protoRecord.SerializeAsString()); break; } - case NKikimrSchemeOp::ECdcStreamFormatJson: { + case NKikimrSchemeOp::ECdcStreamFormatJson: + case NKikimrSchemeOp::ECdcStreamFormatDocApiJson: { NJson::TJsonValue json; - record.SerializeTo(json, Stream.VirtualTimestamps); + if (Stream.Format == NKikimrSchemeOp::ECdcStreamFormatDocApiJson) { + record.SerializeToDocApiJson(json, TChangeRecord::TDocApiJsonOptions{ + .AwsRegion = Stream.AwsRegion, + .StreamMode = Stream.Mode, + .ShardId = DataShard.TabletId, + }); + } else { + record.SerializeToYdbJson(json, Stream.VirtualTimestamps); + } TStringStream str; NJson::TJsonWriterConfig jsonConfig; @@ -691,7 +696,8 @@ class TCdcChangeSenderMain return it->PartitionId; } - case NKikimrSchemeOp::ECdcStreamFormatJson: { + case NKikimrSchemeOp::ECdcStreamFormatJson: + case NKikimrSchemeOp::ECdcStreamFormatDocApiJson: { using namespace NKikimr::NDataStreams::V1; const auto hashKey = HexBytesToDecimal(record.GetPartitionKey() /* MD5 */); return ShardFromDecimal(hashKey, KeyDesc->Partitions.size()); diff --git a/ydb/core/tx/datashard/datashard_user_table.cpp b/ydb/core/tx/datashard/datashard_user_table.cpp index 217f079987f..d92ebe478dd 100644 --- a/ydb/core/tx/datashard/datashard_user_table.cpp +++ b/ydb/core/tx/datashard/datashard_user_table.cpp @@ -111,6 +111,16 @@ bool TUserTable::HasAsyncIndexes() const { return AsyncIndexCount > 0; } +static bool IsJsonCdcStream(TUserTable::TCdcStream::EFormat format) { + switch (format) { + case TUserTable::TCdcStream::EFormat::ECdcStreamFormatJson: + case TUserTable::TCdcStream::EFormat::ECdcStreamFormatDocApiJson: + return true; + default: + return false; + } +} + void TUserTable::AddCdcStream(const NKikimrSchemeOp::TCdcStreamDescription& streamDesc) { Y_VERIFY(streamDesc.HasPathId()); const auto streamPathId = PathIdFromPathId(streamDesc.GetPathId()); @@ -120,7 +130,7 @@ void TUserTable::AddCdcStream(const NKikimrSchemeOp::TCdcStreamDescription& stre } CdcStreams.emplace(streamPathId, TCdcStream(streamDesc)); - JsonCdcStreamCount += ui32(streamDesc.GetFormat() == TCdcStream::EFormat::ECdcStreamFormatJson); + JsonCdcStreamCount += ui32(IsJsonCdcStream(streamDesc.GetFormat())); NKikimrSchemeOp::TTableDescription schema; GetSchema(schema); @@ -160,7 +170,7 @@ void TUserTable::DropCdcStream(const TPathId& streamPathId) { return; } - JsonCdcStreamCount -= ui32(it->second.Format == TCdcStream::EFormat::ECdcStreamFormatJson); + JsonCdcStreamCount -= ui32(IsJsonCdcStream(it->second.Format)); CdcStreams.erase(it); NKikimrSchemeOp::TTableDescription schema; @@ -289,7 +299,7 @@ void TUserTable::ParseProto(const NKikimrSchemeOp::TTableDescription& descr) for (const auto& streamDesc : descr.GetCdcStreams()) { Y_VERIFY(streamDesc.HasPathId()); CdcStreams.emplace(PathIdFromPathId(streamDesc.GetPathId()), TCdcStream(streamDesc)); - JsonCdcStreamCount += ui32(streamDesc.GetFormat() == TCdcStream::EFormat::ECdcStreamFormatJson); + JsonCdcStreamCount += ui32(IsJsonCdcStream(streamDesc.GetFormat())); } } diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h index c1b31207525..631e5209255 100644 --- a/ydb/core/tx/datashard/datashard_user_table.h +++ b/ydb/core/tx/datashard/datashard_user_table.h @@ -295,6 +295,7 @@ struct TUserTable : public TThrRefBase { EFormat Format; EState State; bool VirtualTimestamps = false; + TString AwsRegion; TCdcStream() = default; @@ -304,6 +305,7 @@ struct TUserTable : public TThrRefBase { , Format(streamDesc.GetFormat()) , State(streamDesc.GetState()) , VirtualTimestamps(streamDesc.GetVirtualTimestamps()) + , AwsRegion(streamDesc.GetAwsRegion()) { } }; diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 6be8efdac38..541038f4e57 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -2,6 +2,7 @@ #include <library/cpp/digest/md5/md5.h> #include <library/cpp/json/json_reader.h> +#include <library/cpp/json/json_writer.h> #include <ydb/core/base/path.h> #include <ydb/core/persqueue/events/global.h> @@ -839,9 +840,51 @@ Y_UNIT_TEST_SUITE(Cdc) { return MD5::Calc(root.at("key").GetStringRobust()); } + static bool AreJsonsEqual(const TString& actual, const TString& expected) { + NJson::TJsonValue actualJson; + UNIT_ASSERT(NJson::ReadJsonTree(actual, &actualJson)); + NJson::TJsonValue expectedJson; + UNIT_ASSERT(NJson::ReadJsonTree(expected, &expectedJson)); + + class TScanner: public NJson::IScanCallback { + NJson::TJsonValue& Actual; + bool Success = true; + + public: + explicit TScanner(NJson::TJsonValue& actual) + : Actual(actual) + {} + + bool Do(const TString& path, NJson::TJsonValue*, NJson::TJsonValue& expectedValue) override { + if (expectedValue.GetStringRobust() != "***") { + return true; + } + + NJson::TJsonValue actualValue; + if (!Actual.GetValueByPath(path, actualValue)) { + Success = false; + return false; + } + + expectedValue = actualValue; + return true; + } + + bool IsSuccess() const { + return Success; + } + }; + + TScanner scanner(actualJson); + expectedJson.Scan(scanner); + + UNIT_ASSERT(scanner.IsSuccess()); + return actualJson == expectedJson; + } + struct PqRunner { static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, - const TVector<TString>& queries, const TVector<TString>& records, bool strict = true) + const TVector<TString>& queries, const TVector<TString>& records, bool checkKey = true) { TTestPqEnv env(tableDesc, streamDesc); @@ -875,11 +918,9 @@ Y_UNIT_TEST_SUITE(Cdc) { pStream = data->GetPartitionStream(); for (const auto& item : data->GetMessages()) { const auto& record = records.at(reads++); - if (strict) { - UNIT_ASSERT_VALUES_EQUAL(item.GetData(), record); + UNIT_ASSERT(AreJsonsEqual(item.GetData(), record)); + if (checkKey) { UNIT_ASSERT_VALUES_EQUAL(item.GetPartitionKey(), CalcPartitionKey(record)); - } else { - UNIT_ASSERT_STRING_CONTAINS(item.GetData(), record); } } } else if (auto* create = std::get_if<TReadSessionEvent::TCreatePartitionStreamEvent>(&*ev)) { @@ -930,7 +971,7 @@ Y_UNIT_TEST_SUITE(Cdc) { struct YdsRunner { static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, - const TVector<TString>& queries, const TVector<TString>& records, bool strict = true) + const TVector<TString>& queries, const TVector<TString>& records, bool checkKey = true) { TTestYdsEnv env(tableDesc, streamDesc); @@ -981,11 +1022,9 @@ Y_UNIT_TEST_SUITE(Cdc) { for (ui32 i = 0; i < records.size(); ++i) { const auto& actual = res.GetResult().records().at(i); const auto& expected = records.at(i); - if (strict) { - UNIT_ASSERT_VALUES_EQUAL(actual.data(), expected); + UNIT_ASSERT(AreJsonsEqual(actual.data(), expected)); + if (checkKey) { UNIT_ASSERT_VALUES_EQUAL(actual.partition_key(), CalcPartitionKey(expected)); - } else { - UNIT_ASSERT_STRING_CONTAINS(actual.data(), expected); } } } @@ -1014,7 +1053,7 @@ Y_UNIT_TEST_SUITE(Cdc) { struct TopicRunner { static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, - const TVector<TString>& queries, const TVector<TString>& records, bool strict = true) + const TVector<TString>& queries, const TVector<TString>& records, bool checkKey = true) { TTestTopicEnv env(tableDesc, streamDesc); @@ -1047,12 +1086,8 @@ Y_UNIT_TEST_SUITE(Cdc) { pStream = data->GetPartitionSession(); for (const auto& item : data->GetMessages()) { const auto& record = records.at(reads++); - if (strict) { - UNIT_ASSERT_VALUES_EQUAL(item.GetData(), record); - // TODO: check here partition key - } else { - UNIT_ASSERT_STRING_CONTAINS(item.GetData(), record); - } + UNIT_ASSERT(AreJsonsEqual(item.GetData(), record)); + Y_UNUSED(checkKey); } } else if (auto* create = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*ev)) { pStream = create->GetPartitionSession(); @@ -1175,10 +1210,135 @@ Y_UNIT_TEST_SUITE(Cdc) { (2, 20), (3, 30); )"}, { - R"({"update":{},"key":[1],"ts":[)", - R"({"update":{},"key":[2],"ts":[)", - R"({"update":{},"key":[3],"ts":[)", - }, false /* non-strict because of variadic timestamps */); + R"({"update":{},"key":[1],"ts":"***"})", + R"({"update":{},"key":[2],"ts":"***"})", + R"({"update":{},"key":[3],"ts":"***"})", + }); + } + + TShardedTableOptions DocApiTable() { + return TShardedTableOptions() + .Columns({ + {"__Hash", "Uint64", true, false}, + {"id_shard", "Utf8", true, false}, + {"id_sort", "Utf8", true, false}, + {"__RowData", "JsonDocument", false, false}, + {"extra", "Bool", false, false}, + }); + } + + Y_UNIT_TEST_TRIPLET(DocApi, PqRunner, YdsRunner, TopicRunner) { + TRunner::Read(DocApiTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDocApiJson), {R"( + UPSERT INTO `/Root/Table` (__Hash, id_shard, id_sort, __RowData) VALUES ( + 1, "10", "100", JsonDocument('{"M":{"color":{"S":"pink"},"weight":{"N":"4.5"}}}') + ); + )"}, { + WriteJson(NJson::TJsonMap({ + {"awsRegion", ""}, + {"dynamodb", NJson::TJsonMap({ + {"ApproximateCreationDateTime", "***"}, + {"Keys", NJson::TJsonMap({ + {"id_shard", NJson::TJsonMap({{"S", "10"}})}, + {"id_sort", NJson::TJsonMap({{"S", "100"}})}, + })}, + {"SequenceNumber", "000000000000000000001"}, + {"StreamViewType", "KEYS_ONLY"}, + })}, + {"eventID", "***"}, + {"eventName", "MODIFY"}, + {"eventSource", "ydb:document-table"}, + {"eventVersion", "1.0"}, + }), false), + }, false /* do not check key */); + + TRunner::Read(DocApiTable(), NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatDocApiJson), {R"( + UPSERT INTO `/Root/Table` (__Hash, id_shard, id_sort, __RowData, extra) VALUES ( + 1, "10", "100", JsonDocument('{"M":{"color":{"S":"pink"},"weight":{"N":"4.5"}}}'), true + ); + )", R"( + UPSERT INTO `/Root/Table` (__Hash, id_shard, id_sort, __RowData, extra) VALUES ( + 1, "10", "100", JsonDocument('{"M":{"color":{"S":"yellow"},"weight":{"N":"5.4"}}}'), false + ); + )", R"( + DELETE FROM `/Root/Table` WHERE __Hash = 1; + )"}, { + WriteJson(NJson::TJsonMap({ + {"awsRegion", ""}, + {"dynamodb", NJson::TJsonMap({ + {"ApproximateCreationDateTime", "***"}, + {"Keys", NJson::TJsonMap({ + {"id_shard", NJson::TJsonMap({{"S", "10"}})}, + {"id_sort", NJson::TJsonMap({{"S", "100"}})}, + })}, + {"NewImage", NJson::TJsonMap({ + {"id_shard", NJson::TJsonMap({{"S", "10"}})}, + {"id_sort", NJson::TJsonMap({{"S", "100"}})}, + {"color", NJson::TJsonMap({{"S", "pink"}})}, + {"weight", NJson::TJsonMap({{"N", "4.5"}})}, + {"extra", NJson::TJsonMap({{"BOOL", true}})}, + })}, + {"SequenceNumber", "000000000000000000001"}, + {"StreamViewType", "NEW_AND_OLD_IMAGES"}, + })}, + {"eventID", "***"}, + {"eventName", "INSERT"}, + {"eventSource", "ydb:document-table"}, + {"eventVersion", "1.0"}, + }), false), + WriteJson(NJson::TJsonMap({ + {"awsRegion", ""}, + {"dynamodb", NJson::TJsonMap({ + {"ApproximateCreationDateTime", "***"}, + {"Keys", NJson::TJsonMap({ + {"id_shard", NJson::TJsonMap({{"S", "10"}})}, + {"id_sort", NJson::TJsonMap({{"S", "100"}})}, + })}, + {"OldImage", NJson::TJsonMap({ + {"id_shard", NJson::TJsonMap({{"S", "10"}})}, + {"id_sort", NJson::TJsonMap({{"S", "100"}})}, + {"color", NJson::TJsonMap({{"S", "pink"}})}, + {"weight", NJson::TJsonMap({{"N", "4.5"}})}, + {"extra", NJson::TJsonMap({{"BOOL", true}})}, + })}, + {"NewImage", NJson::TJsonMap({ + {"id_shard", NJson::TJsonMap({{"S", "10"}})}, + {"id_sort", NJson::TJsonMap({{"S", "100"}})}, + {"color", NJson::TJsonMap({{"S", "yellow"}})}, + {"weight", NJson::TJsonMap({{"N", "5.4"}})}, + {"extra", NJson::TJsonMap({{"BOOL", false}})}, + })}, + {"SequenceNumber", "000000000000000000002"}, + {"StreamViewType", "NEW_AND_OLD_IMAGES"}, + })}, + {"eventID", "***"}, + {"eventName", "MODIFY"}, + {"eventSource", "ydb:document-table"}, + {"eventVersion", "1.0"}, + }), false), + WriteJson(NJson::TJsonMap({ + {"awsRegion", ""}, + {"dynamodb", NJson::TJsonMap({ + {"ApproximateCreationDateTime", "***"}, + {"Keys", NJson::TJsonMap({ + {"id_shard", NJson::TJsonMap({{"S", "10"}})}, + {"id_sort", NJson::TJsonMap({{"S", "100"}})}, + })}, + {"OldImage", NJson::TJsonMap({ + {"id_shard", NJson::TJsonMap({{"S", "10"}})}, + {"id_sort", NJson::TJsonMap({{"S", "100"}})}, + {"color", NJson::TJsonMap({{"S", "yellow"}})}, + {"weight", NJson::TJsonMap({{"N", "5.4"}})}, + {"extra", NJson::TJsonMap({{"BOOL", false}})}, + })}, + {"SequenceNumber", "000000000000000000003"}, + {"StreamViewType", "NEW_AND_OLD_IMAGES"}, + })}, + {"eventID", "***"}, + {"eventName", "REMOVE"}, + {"eventSource", "ydb:document-table"}, + {"eventVersion", "1.0"}, + }), false), + }, false /* do not check key */); } Y_UNIT_TEST_TRIPLET(NaN, PqRunner, YdsRunner, TopicRunner) { diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index ae94c7e39dc..065cb8be550 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -2804,6 +2804,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { auto mode = rowset.GetValue<Schema::CdcStream::Mode>(); auto format = rowset.GetValue<Schema::CdcStream::Format>(); auto vt = rowset.GetValueOrDefault<Schema::CdcStream::VirtualTimestamps>(false); + auto awsRegion = rowset.GetValue<Schema::CdcStream::AwsRegion>(); auto state = rowset.GetValue<Schema::CdcStream::State>(); Y_VERIFY_S(Self->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId); @@ -2814,7 +2815,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { << ", path type: " << NKikimrSchemeOp::EPathType_Name(path->PathType)); Y_VERIFY(!Self->CdcStreams.contains(pathId)); - Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, mode, format, vt, state); + Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, mode, format, vt, awsRegion, state); Self->IncrementPathDbRefCount(pathId); if (state == NKikimrSchemeOp::ECdcStreamStateScan) { @@ -2847,6 +2848,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { auto mode = rowset.GetValue<Schema::CdcStreamAlterData::Mode>(); auto format = rowset.GetValue<Schema::CdcStreamAlterData::Format>(); auto vt = rowset.GetValueOrDefault<Schema::CdcStreamAlterData::VirtualTimestamps>(false); + auto awsRegion = rowset.GetValue<Schema::CdcStreamAlterData::AwsRegion>(); auto state = rowset.GetValue<Schema::CdcStreamAlterData::State>(); Y_VERIFY_S(Self->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId); @@ -2858,14 +2860,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { if (!Self->CdcStreams.contains(pathId)) { Y_VERIFY(alterVersion == 1); - Self->CdcStreams[pathId] = TCdcStreamInfo::New(mode, format, vt); + Self->CdcStreams[pathId] = TCdcStreamInfo::New(mode, format, vt, awsRegion); Self->IncrementPathDbRefCount(pathId); } auto stream = Self->CdcStreams.at(pathId); Y_VERIFY(stream->AlterData == nullptr); Y_VERIFY(stream->AlterVersion < alterVersion); - stream->AlterData = new TCdcStreamInfo(alterVersion, mode, format, vt, state); + stream->AlterData = new TCdcStreamInfo(alterVersion, mode, format, vt, awsRegion, state); Y_VERIFY_S(Self->PathsById.contains(path->ParentPathId), "Parent path is not found" << ", cdc stream pathId: " << pathId diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 4fd6b490db0..2370f7a703f 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -174,11 +174,17 @@ public: switch (streamDesc.GetMode()) { case NKikimrSchemeOp::ECdcStreamModeKeysOnly: - case NKikimrSchemeOp::ECdcStreamModeUpdate: case NKikimrSchemeOp::ECdcStreamModeNewImage: case NKikimrSchemeOp::ECdcStreamModeOldImage: case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: break; + case NKikimrSchemeOp::ECdcStreamModeUpdate: + if (streamDesc.GetFormat() == NKikimrSchemeOp::ECdcStreamFormatDocApiJson) { + result->SetError(NKikimrScheme::StatusInvalidParameter, + "DocApiJson format incompatible with specified stream mode"); + return result; + } + break; default: result->SetError(NKikimrScheme::StatusInvalidParameter, TStringBuilder() << "Invalid stream mode: " << static_cast<ui32>(streamDesc.GetMode())); @@ -188,6 +194,13 @@ public: switch (streamDesc.GetFormat()) { case NKikimrSchemeOp::ECdcStreamFormatProto: case NKikimrSchemeOp::ECdcStreamFormatJson: + if (!streamDesc.GetAwsRegion().empty()) { + result->SetError(NKikimrScheme::StatusInvalidParameter, + "AwsRegion option incompatible with specified stream format"); + return result; + } + break; + case NKikimrSchemeOp::ECdcStreamFormatDocApiJson: break; default: result->SetError(NKikimrScheme::StatusInvalidParameter, TStringBuilder() diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 8787e0d877d..675394eed80 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1546,6 +1546,7 @@ void TSchemeShard::PersistCdcStream(NIceDb::TNiceDb& db, const TPathId& pathId) NIceDb::TUpdate<Schema::CdcStream::Mode>(alterData->Mode), NIceDb::TUpdate<Schema::CdcStream::Format>(alterData->Format), NIceDb::TUpdate<Schema::CdcStream::VirtualTimestamps>(alterData->VirtualTimestamps), + NIceDb::TUpdate<Schema::CdcStream::AwsRegion>(alterData->AwsRegion), NIceDb::TUpdate<Schema::CdcStream::State>(alterData->State) ); @@ -1570,6 +1571,7 @@ void TSchemeShard::PersistCdcStreamAlterData(NIceDb::TNiceDb& db, const TPathId& NIceDb::TUpdate<Schema::CdcStreamAlterData::Mode>(alterData->Mode), NIceDb::TUpdate<Schema::CdcStreamAlterData::Format>(alterData->Format), NIceDb::TUpdate<Schema::CdcStreamAlterData::VirtualTimestamps>(alterData->VirtualTimestamps), + NIceDb::TUpdate<Schema::CdcStreamAlterData::AwsRegion>(alterData->AwsRegion), NIceDb::TUpdate<Schema::CdcStreamAlterData::State>(alterData->State) ); } diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 418df0824ff..f0478f79f87 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2336,11 +2336,12 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> { static constexpr ui32 MaxInProgressShards = 10; - TCdcStreamInfo(ui64 version, EMode mode, EFormat format, bool vt, EState state) + TCdcStreamInfo(ui64 version, EMode mode, EFormat format, bool vt, const TString& awsRegion, EState state) : AlterVersion(version) , Mode(mode) , Format(format) , VirtualTimestamps(vt) + , AwsRegion(awsRegion) , State(state) {} @@ -2354,12 +2355,12 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> { return result; } - static TPtr New(EMode mode, EFormat format, bool vt) { - return new TCdcStreamInfo(0, mode, format, vt, EState::ECdcStreamStateInvalid); + static TPtr New(EMode mode, EFormat format, bool vt, const TString& awsRegion) { + return new TCdcStreamInfo(0, mode, format, vt, awsRegion, EState::ECdcStreamStateInvalid); } static TPtr Create(const NKikimrSchemeOp::TCdcStreamDescription& desc) { - TPtr result = New(desc.GetMode(), desc.GetFormat(), desc.GetVirtualTimestamps()); + TPtr result = New(desc.GetMode(), desc.GetFormat(), desc.GetVirtualTimestamps(), desc.GetAwsRegion()); TPtr alterData = result->CreateNextVersion(); alterData->State = EState::ECdcStreamStateReady; if (desc.HasState()) { @@ -2373,6 +2374,7 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> { EMode Mode; EFormat Format; bool VirtualTimestamps; + TString AwsRegion; EState State; TCdcStreamInfo::TPtr AlterData = nullptr; diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index 124092d7061..18d77bb4758 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -1162,6 +1162,7 @@ void TSchemeShard::DescribeCdcStream(const TPathId& pathId, const TString& name, desc.SetMode(info->Mode); desc.SetFormat(info->Format); desc.SetVirtualTimestamps(info->VirtualTimestamps); + desc.SetAwsRegion(info->AwsRegion); PathIdFromPathId(pathId, desc.MutablePathId()); desc.SetState(info->State); desc.SetSchemaVersion(info->AlterVersion); diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 9f69cbe17c1..330ee8c2962 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1542,9 +1542,10 @@ struct Schema : NIceDb::Schema { struct Mode : Column<5, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamMode; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamModeInvalid; }; struct Format : Column<6, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamFormat; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamFormatInvalid; }; struct VirtualTimestamps : Column<7, NScheme::NTypeIds::Bool> {}; + struct AwsRegion : Column<8, NScheme::NTypeIds::Utf8> {}; using TKey = TableKey<OwnerPathId, LocalPathId>; - using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format, VirtualTimestamps>; + using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format, VirtualTimestamps, AwsRegion>; }; struct CdcStreamAlterData : Table<96> { @@ -1555,9 +1556,10 @@ struct Schema : NIceDb::Schema { struct Mode : Column<5, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamMode; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamModeInvalid; }; struct Format : Column<6, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamFormat; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamFormatInvalid; }; struct VirtualTimestamps : Column<7, NScheme::NTypeIds::Bool> {}; + struct AwsRegion : Column<8, NScheme::NTypeIds::Utf8> {}; using TKey = TableKey<OwnerPathId, LocalPathId>; - using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format, VirtualTimestamps>; + using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format, VirtualTimestamps, AwsRegion>; }; struct CdcStreamScanShardStatus : Table<103> { diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp index 95b5c025c6a..909846991a7 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp @@ -265,6 +265,60 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { env.TestWaitNotification(runtime, txId); } + Y_UNIT_TEST(DocApi) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + // invalid mode + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeUpdate + Format: ECdcStreamFormatDocApiJson + } + )", {NKikimrScheme::StatusInvalidParameter}); + + // invalid aws region + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + AwsRegion: "foo" + } + )", {NKikimrScheme::StatusInvalidParameter}); + + // ok + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeNewAndOldImages + Format: ECdcStreamFormatDocApiJson + AwsRegion: "ru-central1" + } + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::PathExist, + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeNewAndOldImages), + NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatDocApiJson), + NLs::StreamAwsRegion("ru-central1"), + }); + } + Y_UNIT_TEST(Negative) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp index 800927f877a..7afe0741e5e 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp @@ -66,6 +66,43 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { CreateStream({}, true); } + Y_UNIT_TEST(CreateStreamWithAwsRegion) { + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + } + + TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeNewAndOldImages + Format: ECdcStreamFormatDocApiJson + AwsRegion: "ru-central1" + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + { + TInactiveZone inactive(activeZone); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::PathExist, + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeNewAndOldImages), + NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatDocApiJson), + NLs::StreamAwsRegion("ru-central1"), + }); + } + }); + } + Y_UNIT_TEST(DisableStream) { TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp index 75a374c57d4..4aa4cab0215 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp @@ -801,6 +801,12 @@ TCheckFunc StreamVirtualTimestamps(bool value) { }; } +TCheckFunc StreamAwsRegion(const TString& value) { + return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { + UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetCdcStreamDescription().GetAwsRegion(), value); + }; +} + TCheckFunc RetentionPeriod(const TDuration& value) { return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { UNIT_ASSERT_VALUES_EQUAL(value.Seconds(), record.GetPathDescription().GetPersQueueGroup() diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h index ed99b31869e..9738370f94f 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h @@ -133,6 +133,7 @@ namespace NLs { TCheckFunc StreamFormat(NKikimrSchemeOp::ECdcStreamFormat format); TCheckFunc StreamState(NKikimrSchemeOp::ECdcStreamState state); TCheckFunc StreamVirtualTimestamps(bool value); + TCheckFunc StreamAwsRegion(const TString& value); TCheckFunc RetentionPeriod(const TDuration& value); TCheckFunc HasBackupInFly(ui64 txId); diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 487ed9dad6d..183ae4f4852 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -6128,11 +6128,6 @@ ], "ColumnsAdded": [ { - "ColumnId": 7, - "ColumnName": "VirtualTimestamps", - "ColumnType": "Bool" - }, - { "ColumnId": 1, "ColumnName": "OwnerPathId", "ColumnType": "Uint64" @@ -6161,19 +6156,30 @@ "ColumnId": 6, "ColumnName": "Format", "ColumnType": "Uint32" + }, + { + "ColumnId": 7, + "ColumnName": "VirtualTimestamps", + "ColumnType": "Bool" + }, + { + "ColumnId": 8, + "ColumnName": "AwsRegion", + "ColumnType": "Utf8" } ], "ColumnsDropped": [], "ColumnFamilies": { "0": { "Columns": [ - 7, 1, 2, 3, 4, 5, - 6 + 6, + 7, + 8 ], "RoomID": 0, "Codec": 0, @@ -6200,11 +6206,6 @@ ], "ColumnsAdded": [ { - "ColumnId": 7, - "ColumnName": "VirtualTimestamps", - "ColumnType": "Bool" - }, - { "ColumnId": 1, "ColumnName": "OwnerPathId", "ColumnType": "Uint64" @@ -6233,19 +6234,30 @@ "ColumnId": 6, "ColumnName": "Format", "ColumnType": "Uint32" + }, + { + "ColumnId": 7, + "ColumnName": "VirtualTimestamps", + "ColumnType": "Bool" + }, + { + "ColumnId": 8, + "ColumnName": "AwsRegion", + "ColumnType": "Utf8" } ], "ColumnsDropped": [], "ColumnFamilies": { "0": { "Columns": [ - 7, 1, 2, 3, 4, 5, - 6 + 6, + 7, + 8 ], "RoomID": 0, "Codec": 0, |