summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <[email protected]>2023-05-12 23:14:49 +0300
committerilnaz <[email protected]>2023-05-12 23:14:49 +0300
commit44241885851a00105e467af090e06be035ed015e (patch)
tree1c99a42d6e1dbec3bb5e23a207844ded14ac1c49
parent4c8d0ad1db6cd949eba0a9ce79b36ee1348f65f8 (diff)
Document table cdc records
-rw-r--r--ydb/core/protos/flat_scheme_op.proto3
-rw-r--r--ydb/core/tx/datashard/change_record.cpp223
-rw-r--r--ydb/core/tx/datashard/change_record.h12
-rw-r--r--ydb/core/tx/datashard/change_sender_async_index.cpp2
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp24
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.cpp16
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.h2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp202
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp15
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h10
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_describer.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_schema.h6
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream.cpp54
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp37
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp6
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/ls_checks.h1
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema40
19 files changed, 560 insertions, 104 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 895d6040db1..6f2009058cc 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -715,6 +715,7 @@ enum ECdcStreamFormat {
ECdcStreamFormatInvalid = 0;
ECdcStreamFormatProto = 1;
ECdcStreamFormatJson = 2;
+ ECdcStreamFormatDocApiJson = 3;
}
message TCdcStreamDescription {
@@ -726,6 +727,8 @@ message TCdcStreamDescription {
optional ECdcStreamState State = 4;
optional uint64 SchemaVersion = 5;
repeated TUserAttribute UserAttributes = 8;
+ // AwsRegion used to mark records in DynamoDB-compatible mode (FormatDocApiJson)
+ optional string AwsRegion = 9;
}
message TCreateCdcStream {
diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp
index cba5084194c..14b20389468 100644
--- a/ydb/core/tx/datashard/change_record.cpp
+++ b/ydb/core/tx/datashard/change_record.cpp
@@ -12,10 +12,11 @@
#include <ydb/library/binary_json/read.h>
#include <util/stream/str.h>
+#include <util/string/printf.h>
namespace NKikimr::NDataShard {
-void TChangeRecord::SerializeTo(NKikimrChangeExchange::TChangeRecord& record) const {
+void TChangeRecord::SerializeToProto(NKikimrChangeExchange::TChangeRecord& record) const {
record.SetOrder(Order);
record.SetGroup(Group);
record.SetStep(Step);
@@ -35,6 +36,12 @@ void TChangeRecord::SerializeTo(NKikimrChangeExchange::TChangeRecord& record) co
}
}
+static auto ParseBody(const TString& protoBody) {
+ NKikimrChangeExchange::TChangeRecord::TDataChange body;
+ Y_VERIFY(body.ParseFromArray(protoBody.data(), protoBody.size()));
+ return body;
+}
+
static NJson::TJsonValue StringToJson(TStringBuf in) {
NJson::TJsonValue result;
Y_VERIFY(NJson::ReadJsonTree(in, &result));
@@ -145,60 +152,185 @@ static void SerializeJsonValue(TUserTable::TCPtr schema, NJson::TJsonValue& valu
}
}
-void TChangeRecord::SerializeTo(NJson::TJsonValue& json, bool virtualTimestamps) const {
- switch (Kind) {
- case EKind::CdcDataChange: {
- Y_VERIFY(Schema);
+void TChangeRecord::SerializeToYdbJson(NJson::TJsonValue& json, bool virtualTimestamps) const {
+ Y_VERIFY(Kind == EKind::CdcDataChange);
+ Y_VERIFY(Schema);
- NKikimrChangeExchange::TChangeRecord::TDataChange body;
- Y_VERIFY(body.ParseFromArray(Body.data(), Body.size()));
+ const auto body = ParseBody(Body);
+ SerializeJsonKey(Schema, json["key"], body.GetKey());
- SerializeJsonKey(Schema, json["key"], body.GetKey());
+ if (body.HasOldImage()) {
+ SerializeJsonValue(Schema, json["oldImage"], body.GetOldImage());
+ }
- if (body.HasOldImage()) {
- SerializeJsonValue(Schema, json["oldImage"], body.GetOldImage());
- }
+ if (body.HasNewImage()) {
+ SerializeJsonValue(Schema, json["newImage"], body.GetNewImage());
+ }
- if (body.HasNewImage()) {
- SerializeJsonValue(Schema, json["newImage"], body.GetNewImage());
+ const auto hasAnyImage = body.HasOldImage() || body.HasNewImage();
+ switch (body.GetRowOperationCase()) {
+ case NKikimrChangeExchange::TChangeRecord::TDataChange::kUpsert:
+ json["update"].SetType(NJson::JSON_MAP);
+ if (!hasAnyImage) {
+ SerializeJsonValue(Schema, json["update"], body.GetUpsert());
}
+ break;
+ case NKikimrChangeExchange::TChangeRecord::TDataChange::kReset:
+ json["reset"].SetType(NJson::JSON_MAP);
+ if (!hasAnyImage) {
+ SerializeJsonValue(Schema, json["reset"], body.GetReset());
+ }
+ break;
+ case NKikimrChangeExchange::TChangeRecord::TDataChange::kErase:
+ json["erase"].SetType(NJson::JSON_MAP);
+ break;
+ default:
+ Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase()));
+ }
- const auto hasAnyImage = body.HasOldImage() || body.HasNewImage();
- switch (body.GetRowOperationCase()) {
- case NKikimrChangeExchange::TChangeRecord::TDataChange::kUpsert:
- json["update"].SetType(NJson::EJsonValueType::JSON_MAP);
- if (!hasAnyImage) {
- SerializeJsonValue(Schema, json["update"], body.GetUpsert());
- }
- break;
- case NKikimrChangeExchange::TChangeRecord::TDataChange::kReset:
- json["reset"].SetType(NJson::EJsonValueType::JSON_MAP);
- if (!hasAnyImage) {
- SerializeJsonValue(Schema, json["reset"], body.GetReset());
- }
- break;
- case NKikimrChangeExchange::TChangeRecord::TDataChange::kErase:
- json["erase"].SetType(NJson::EJsonValueType::JSON_MAP);
+ if (virtualTimestamps) {
+ for (auto v : {Step, TxId}) {
+ json["ts"].AppendValue(v);
+ }
+ }
+}
+
+static void ExtendJson(NJson::TJsonValue& value, const NJson::TJsonValue& ext) {
+ Y_VERIFY(ext.GetType() == NJson::JSON_MAP);
+ for (const auto& [k, v] : ext.GetMapSafe()) {
+ value.InsertValue(k, v);
+ }
+}
+
+static void ToAttributeValues(TUserTable::TCPtr schema, NJson::TJsonValue& value,
+ const NKikimrChangeExchange::TChangeRecord::TDataChange::TSerializedCells& in)
+{
+ TSerializedCellVec cells;
+ Y_VERIFY(TSerializedCellVec::TryParse(in.GetData(), cells));
+ Y_VERIFY(in.TagsSize() == cells.GetCells().size());
+
+ for (ui32 i = 0; i < in.TagsSize(); ++i) {
+ const auto tag = in.GetTags(i);
+ const auto& cell = cells.GetCells().at(i);
+
+ if (cell.IsNull()) {
+ continue;
+ }
+
+ auto it = schema->Columns.find(tag);
+ Y_VERIFY(it != schema->Columns.end());
+
+ const auto& column = it->second;
+ const auto& name = column.Name;
+ const auto type = column.Type.GetTypeId();
+
+ if (name == "__Hash" || name == "__CreatedAt") {
+ continue; // hidden column
+ } else if (name.StartsWith("__Hash_")) {
+ bool indexed = false;
+ for (const auto& [_, index] : schema->Indexes) {
+ Y_VERIFY(index.KeyColumnIds.size() >= 1);
+ if (index.KeyColumnIds.at(0) == tag) {
+ indexed = true;
break;
- default:
- Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase()));
+ }
}
-
- if (virtualTimestamps) {
- for (auto v : {Step, TxId}) {
- json["ts"].AppendValue(v);
+ if (indexed) {
+ continue; // index hash column
+ }
+ } else if (name == "__RowData") {
+ Y_VERIFY_DEBUG(type == NScheme::NTypeIds::JsonDocument);
+ const auto rowData = StringToJson(NBinaryJson::SerializeToJson(cell.AsBuf()));
+ if (rowData.GetType() == NJson::JSON_MAP) {
+ auto map = rowData.GetMapSafe().find("M");
+ if (map != rowData.GetMapSafe().end()) {
+ if (map->second.GetType() == NJson::JSON_MAP) {
+ ExtendJson(value, map->second);
+ }
}
}
-
- break;
}
- case EKind::AsyncIndex: {
- Y_FAIL("Not supported");
+ if (type == NScheme::NTypeIds::Bool) {
+ value.InsertValue(name, NJson::TJsonMap({{"BOOL", cell.AsValue<bool>()}}));
+ } else if (type == NScheme::NTypeIds::DyNumber) {
+ value.InsertValue(name, NJson::TJsonMap({{"N", DyNumberToString(cell.AsBuf())}}));
+ } else if (type == NScheme::NTypeIds::String) {
+ value.InsertValue(name, NJson::TJsonMap({{"B", Base64Encode(cell.AsBuf())}}));
+ } else if (type == NScheme::NTypeIds::Utf8) {
+ value.InsertValue(name, NJson::TJsonMap({{"S", cell.AsBuf()}}));
}
}
}
+void TChangeRecord::SerializeToDocApiJson(NJson::TJsonValue& json, const TDocApiJsonOptions& opts) const {
+ Y_VERIFY(Kind == EKind::CdcDataChange);
+ Y_VERIFY(Schema);
+
+ json = NJson::TJsonMap({
+ {"awsRegion", opts.AwsRegion},
+ {"dynamodb", NJson::TJsonMap({
+ {"ApproximateCreationDateTime", GetApproximateCreationDateTime().MilliSeconds()},
+ {"SequenceNumber", Sprintf("%0*" PRIi64, 21 /* min length */, GetSeqNo())},
+ })},
+ {"eventID", Sprintf("%" PRIu64 "-%" PRIi64, opts.ShardId, GetSeqNo())},
+ {"eventSource", "ydb:document-table"},
+ {"eventVersion", "1.0"},
+ });
+
+ auto& dynamodb = json["dynamodb"];
+ const auto body = ParseBody(Body);
+
+ bool keysOnly = false;
+ bool newAndOldImages = false;
+ switch (opts.StreamMode) {
+ case TUserTable::TCdcStream::EMode::ECdcStreamModeNewImage:
+ dynamodb["StreamViewType"] = "NEW_IMAGE";
+ break;
+ case TUserTable::TCdcStream::EMode::ECdcStreamModeOldImage:
+ dynamodb["StreamViewType"] = "OLD_IMAGE";
+ break;
+ case TUserTable::TCdcStream::EMode::ECdcStreamModeNewAndOldImages:
+ dynamodb["StreamViewType"] = "NEW_AND_OLD_IMAGES";
+ newAndOldImages = true;
+ break;
+ default:
+ dynamodb["StreamViewType"] = "KEYS_ONLY";
+ keysOnly = true;
+ break;
+ }
+
+ NJson::TJsonMap keys;
+ ToAttributeValues(Schema, keys, body.GetKey());
+ dynamodb["Keys"] = keys;
+
+ if (!keysOnly && body.HasOldImage()) {
+ ToAttributeValues(Schema, dynamodb["OldImage"], body.GetOldImage());
+ ExtendJson(dynamodb["OldImage"], keys);
+ }
+
+ if (!keysOnly && body.HasNewImage()) {
+ ToAttributeValues(Schema, dynamodb["NewImage"], body.GetNewImage());
+ ExtendJson(dynamodb["NewImage"], keys);
+ }
+
+ switch (body.GetRowOperationCase()) {
+ case NKikimrChangeExchange::TChangeRecord::TDataChange::kUpsert:
+ case NKikimrChangeExchange::TChangeRecord::TDataChange::kReset:
+ if (newAndOldImages) {
+ json["eventName"] = body.HasOldImage() ? "MODIFY" : "INSERT";
+ } else {
+ json["eventName"] = "MODIFY";
+ }
+ break;
+ case NKikimrChangeExchange::TChangeRecord::TDataChange::kErase:
+ json["eventName"] = "REMOVE";
+ break;
+ default:
+ Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase()));
+ }
+}
+
TConstArrayRef<TCell> TChangeRecord::GetKey() const {
if (Key) {
return *Key;
@@ -207,8 +339,7 @@ TConstArrayRef<TCell> TChangeRecord::GetKey() const {
switch (Kind) {
case EKind::AsyncIndex:
case EKind::CdcDataChange: {
- NKikimrChangeExchange::TChangeRecord::TDataChange parsed;
- Y_VERIFY(parsed.ParseFromArray(Body.data(), Body.size()));
+ const auto parsed = ParseBody(Body);
TSerializedCellVec key;
Y_VERIFY(TSerializedCellVec::TryParse(parsed.GetKey().GetData(), key));
@@ -235,9 +366,7 @@ TString TChangeRecord::GetPartitionKey() const {
switch (Kind) {
case EKind::CdcDataChange: {
Y_VERIFY(Schema);
-
- NKikimrChangeExchange::TChangeRecord::TDataChange body;
- Y_VERIFY(body.ParseFromArray(Body.data(), Body.size()));
+ const auto body = ParseBody(Body);
NJson::TJsonValue key;
SerializeJsonKey(Schema, key, body.GetKey());
@@ -255,6 +384,12 @@ TString TChangeRecord::GetPartitionKey() const {
return *PartitionKey;
}
+TInstant TChangeRecord::GetApproximateCreationDateTime() const {
+ return GetGroup()
+ ? TInstant::FromValue(GetGroup())
+ : TInstant::MilliSeconds(GetStep());
+}
+
TString TChangeRecord::ToString() const {
TString result;
TStringOutput out(result);
diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h
index decfa6dce6d..4668e2717c4 100644
--- a/ydb/core/tx/datashard/change_record.h
+++ b/ydb/core/tx/datashard/change_record.h
@@ -27,6 +27,12 @@ public:
CdcDataChange,
};
+ struct TDocApiJsonOptions {
+ TString AwsRegion;
+ NKikimrSchemeOp::ECdcStreamMode StreamMode;
+ ui64 ShardId;
+ };
+
public:
ui64 GetOrder() const { return Order; }
ui64 GetGroup() const { return Group; }
@@ -41,12 +47,14 @@ public:
const TPathId& GetTableId() const { return TableId; }
ui64 GetSchemaVersion() const { return SchemaVersion; }
- void SerializeTo(NKikimrChangeExchange::TChangeRecord& record) const;
- void SerializeTo(NJson::TJsonValue& json, bool virtualTimestamps) const;
+ void SerializeToProto(NKikimrChangeExchange::TChangeRecord& record) const;
+ void SerializeToYdbJson(NJson::TJsonValue& json, bool virtualTimestamps) const;
+ void SerializeToDocApiJson(NJson::TJsonValue& json, const TDocApiJsonOptions& opts) const;
TConstArrayRef<TCell> GetKey() const;
i64 GetSeqNo() const;
TString GetPartitionKey() const;
+ TInstant GetApproximateCreationDateTime() const;
TString ToString() const;
void Out(IOutputStream& out) const;
diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp
index 402a84e8495..99a25a8b593 100644
--- a/ydb/core/tx/datashard/change_sender_async_index.cpp
+++ b/ydb/core/tx/datashard/change_sender_async_index.cpp
@@ -118,7 +118,7 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS
}
auto& proto = *records->Record.AddRecords();
- record.SerializeTo(proto);
+ record.SerializeToProto(proto);
Adjust(proto);
}
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index f488f7b65e1..e1df72d2733 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -87,14 +87,10 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
continue;
}
- const auto createdAt = record.GetGroup()
- ? TInstant::FromValue(record.GetGroup())
- : TInstant::MilliSeconds(record.GetStep());
-
auto& cmd = *request.MutablePartitionRequest()->AddCmdWrite();
cmd.SetSeqNo(record.GetSeqNo());
cmd.SetSourceId(NSourceIdEncoding::EncodeSimple(SourceId));
- cmd.SetCreateTimeMS(createdAt.MilliSeconds());
+ cmd.SetCreateTimeMS(record.GetApproximateCreationDateTime().MilliSeconds());
cmd.SetIgnoreQuotaDeadline(true);
NKikimrPQClient::TDataChunk data;
@@ -103,14 +99,23 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
switch (Stream.Format) {
case NKikimrSchemeOp::ECdcStreamFormatProto: {
NKikimrChangeExchange::TChangeRecord protoRecord;
- record.SerializeTo(protoRecord);
+ record.SerializeToProto(protoRecord);
data.SetData(protoRecord.SerializeAsString());
break;
}
- case NKikimrSchemeOp::ECdcStreamFormatJson: {
+ case NKikimrSchemeOp::ECdcStreamFormatJson:
+ case NKikimrSchemeOp::ECdcStreamFormatDocApiJson: {
NJson::TJsonValue json;
- record.SerializeTo(json, Stream.VirtualTimestamps);
+ if (Stream.Format == NKikimrSchemeOp::ECdcStreamFormatDocApiJson) {
+ record.SerializeToDocApiJson(json, TChangeRecord::TDocApiJsonOptions{
+ .AwsRegion = Stream.AwsRegion,
+ .StreamMode = Stream.Mode,
+ .ShardId = DataShard.TabletId,
+ });
+ } else {
+ record.SerializeToYdbJson(json, Stream.VirtualTimestamps);
+ }
TStringStream str;
NJson::TJsonWriterConfig jsonConfig;
@@ -691,7 +696,8 @@ class TCdcChangeSenderMain
return it->PartitionId;
}
- case NKikimrSchemeOp::ECdcStreamFormatJson: {
+ case NKikimrSchemeOp::ECdcStreamFormatJson:
+ case NKikimrSchemeOp::ECdcStreamFormatDocApiJson: {
using namespace NKikimr::NDataStreams::V1;
const auto hashKey = HexBytesToDecimal(record.GetPartitionKey() /* MD5 */);
return ShardFromDecimal(hashKey, KeyDesc->Partitions.size());
diff --git a/ydb/core/tx/datashard/datashard_user_table.cpp b/ydb/core/tx/datashard/datashard_user_table.cpp
index 217f079987f..d92ebe478dd 100644
--- a/ydb/core/tx/datashard/datashard_user_table.cpp
+++ b/ydb/core/tx/datashard/datashard_user_table.cpp
@@ -111,6 +111,16 @@ bool TUserTable::HasAsyncIndexes() const {
return AsyncIndexCount > 0;
}
+static bool IsJsonCdcStream(TUserTable::TCdcStream::EFormat format) {
+ switch (format) {
+ case TUserTable::TCdcStream::EFormat::ECdcStreamFormatJson:
+ case TUserTable::TCdcStream::EFormat::ECdcStreamFormatDocApiJson:
+ return true;
+ default:
+ return false;
+ }
+}
+
void TUserTable::AddCdcStream(const NKikimrSchemeOp::TCdcStreamDescription& streamDesc) {
Y_VERIFY(streamDesc.HasPathId());
const auto streamPathId = PathIdFromPathId(streamDesc.GetPathId());
@@ -120,7 +130,7 @@ void TUserTable::AddCdcStream(const NKikimrSchemeOp::TCdcStreamDescription& stre
}
CdcStreams.emplace(streamPathId, TCdcStream(streamDesc));
- JsonCdcStreamCount += ui32(streamDesc.GetFormat() == TCdcStream::EFormat::ECdcStreamFormatJson);
+ JsonCdcStreamCount += ui32(IsJsonCdcStream(streamDesc.GetFormat()));
NKikimrSchemeOp::TTableDescription schema;
GetSchema(schema);
@@ -160,7 +170,7 @@ void TUserTable::DropCdcStream(const TPathId& streamPathId) {
return;
}
- JsonCdcStreamCount -= ui32(it->second.Format == TCdcStream::EFormat::ECdcStreamFormatJson);
+ JsonCdcStreamCount -= ui32(IsJsonCdcStream(it->second.Format));
CdcStreams.erase(it);
NKikimrSchemeOp::TTableDescription schema;
@@ -289,7 +299,7 @@ void TUserTable::ParseProto(const NKikimrSchemeOp::TTableDescription& descr)
for (const auto& streamDesc : descr.GetCdcStreams()) {
Y_VERIFY(streamDesc.HasPathId());
CdcStreams.emplace(PathIdFromPathId(streamDesc.GetPathId()), TCdcStream(streamDesc));
- JsonCdcStreamCount += ui32(streamDesc.GetFormat() == TCdcStream::EFormat::ECdcStreamFormatJson);
+ JsonCdcStreamCount += ui32(IsJsonCdcStream(streamDesc.GetFormat()));
}
}
diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h
index c1b31207525..631e5209255 100644
--- a/ydb/core/tx/datashard/datashard_user_table.h
+++ b/ydb/core/tx/datashard/datashard_user_table.h
@@ -295,6 +295,7 @@ struct TUserTable : public TThrRefBase {
EFormat Format;
EState State;
bool VirtualTimestamps = false;
+ TString AwsRegion;
TCdcStream() = default;
@@ -304,6 +305,7 @@ struct TUserTable : public TThrRefBase {
, Format(streamDesc.GetFormat())
, State(streamDesc.GetState())
, VirtualTimestamps(streamDesc.GetVirtualTimestamps())
+ , AwsRegion(streamDesc.GetAwsRegion())
{
}
};
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 6be8efdac38..541038f4e57 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -2,6 +2,7 @@
#include <library/cpp/digest/md5/md5.h>
#include <library/cpp/json/json_reader.h>
+#include <library/cpp/json/json_writer.h>
#include <ydb/core/base/path.h>
#include <ydb/core/persqueue/events/global.h>
@@ -839,9 +840,51 @@ Y_UNIT_TEST_SUITE(Cdc) {
return MD5::Calc(root.at("key").GetStringRobust());
}
+ static bool AreJsonsEqual(const TString& actual, const TString& expected) {
+ NJson::TJsonValue actualJson;
+ UNIT_ASSERT(NJson::ReadJsonTree(actual, &actualJson));
+ NJson::TJsonValue expectedJson;
+ UNIT_ASSERT(NJson::ReadJsonTree(expected, &expectedJson));
+
+ class TScanner: public NJson::IScanCallback {
+ NJson::TJsonValue& Actual;
+ bool Success = true;
+
+ public:
+ explicit TScanner(NJson::TJsonValue& actual)
+ : Actual(actual)
+ {}
+
+ bool Do(const TString& path, NJson::TJsonValue*, NJson::TJsonValue& expectedValue) override {
+ if (expectedValue.GetStringRobust() != "***") {
+ return true;
+ }
+
+ NJson::TJsonValue actualValue;
+ if (!Actual.GetValueByPath(path, actualValue)) {
+ Success = false;
+ return false;
+ }
+
+ expectedValue = actualValue;
+ return true;
+ }
+
+ bool IsSuccess() const {
+ return Success;
+ }
+ };
+
+ TScanner scanner(actualJson);
+ expectedJson.Scan(scanner);
+
+ UNIT_ASSERT(scanner.IsSuccess());
+ return actualJson == expectedJson;
+ }
+
struct PqRunner {
static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc,
- const TVector<TString>& queries, const TVector<TString>& records, bool strict = true)
+ const TVector<TString>& queries, const TVector<TString>& records, bool checkKey = true)
{
TTestPqEnv env(tableDesc, streamDesc);
@@ -875,11 +918,9 @@ Y_UNIT_TEST_SUITE(Cdc) {
pStream = data->GetPartitionStream();
for (const auto& item : data->GetMessages()) {
const auto& record = records.at(reads++);
- if (strict) {
- UNIT_ASSERT_VALUES_EQUAL(item.GetData(), record);
+ UNIT_ASSERT(AreJsonsEqual(item.GetData(), record));
+ if (checkKey) {
UNIT_ASSERT_VALUES_EQUAL(item.GetPartitionKey(), CalcPartitionKey(record));
- } else {
- UNIT_ASSERT_STRING_CONTAINS(item.GetData(), record);
}
}
} else if (auto* create = std::get_if<TReadSessionEvent::TCreatePartitionStreamEvent>(&*ev)) {
@@ -930,7 +971,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
struct YdsRunner {
static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc,
- const TVector<TString>& queries, const TVector<TString>& records, bool strict = true)
+ const TVector<TString>& queries, const TVector<TString>& records, bool checkKey = true)
{
TTestYdsEnv env(tableDesc, streamDesc);
@@ -981,11 +1022,9 @@ Y_UNIT_TEST_SUITE(Cdc) {
for (ui32 i = 0; i < records.size(); ++i) {
const auto& actual = res.GetResult().records().at(i);
const auto& expected = records.at(i);
- if (strict) {
- UNIT_ASSERT_VALUES_EQUAL(actual.data(), expected);
+ UNIT_ASSERT(AreJsonsEqual(actual.data(), expected));
+ if (checkKey) {
UNIT_ASSERT_VALUES_EQUAL(actual.partition_key(), CalcPartitionKey(expected));
- } else {
- UNIT_ASSERT_STRING_CONTAINS(actual.data(), expected);
}
}
}
@@ -1014,7 +1053,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
struct TopicRunner {
static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc,
- const TVector<TString>& queries, const TVector<TString>& records, bool strict = true)
+ const TVector<TString>& queries, const TVector<TString>& records, bool checkKey = true)
{
TTestTopicEnv env(tableDesc, streamDesc);
@@ -1047,12 +1086,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
pStream = data->GetPartitionSession();
for (const auto& item : data->GetMessages()) {
const auto& record = records.at(reads++);
- if (strict) {
- UNIT_ASSERT_VALUES_EQUAL(item.GetData(), record);
- // TODO: check here partition key
- } else {
- UNIT_ASSERT_STRING_CONTAINS(item.GetData(), record);
- }
+ UNIT_ASSERT(AreJsonsEqual(item.GetData(), record));
+ Y_UNUSED(checkKey);
}
} else if (auto* create = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*ev)) {
pStream = create->GetPartitionSession();
@@ -1175,10 +1210,135 @@ Y_UNIT_TEST_SUITE(Cdc) {
(2, 20),
(3, 30);
)"}, {
- R"({"update":{},"key":[1],"ts":[)",
- R"({"update":{},"key":[2],"ts":[)",
- R"({"update":{},"key":[3],"ts":[)",
- }, false /* non-strict because of variadic timestamps */);
+ R"({"update":{},"key":[1],"ts":"***"})",
+ R"({"update":{},"key":[2],"ts":"***"})",
+ R"({"update":{},"key":[3],"ts":"***"})",
+ });
+ }
+
+ TShardedTableOptions DocApiTable() {
+ return TShardedTableOptions()
+ .Columns({
+ {"__Hash", "Uint64", true, false},
+ {"id_shard", "Utf8", true, false},
+ {"id_sort", "Utf8", true, false},
+ {"__RowData", "JsonDocument", false, false},
+ {"extra", "Bool", false, false},
+ });
+ }
+
+ Y_UNIT_TEST_TRIPLET(DocApi, PqRunner, YdsRunner, TopicRunner) {
+ TRunner::Read(DocApiTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDocApiJson), {R"(
+ UPSERT INTO `/Root/Table` (__Hash, id_shard, id_sort, __RowData) VALUES (
+ 1, "10", "100", JsonDocument('{"M":{"color":{"S":"pink"},"weight":{"N":"4.5"}}}')
+ );
+ )"}, {
+ WriteJson(NJson::TJsonMap({
+ {"awsRegion", ""},
+ {"dynamodb", NJson::TJsonMap({
+ {"ApproximateCreationDateTime", "***"},
+ {"Keys", NJson::TJsonMap({
+ {"id_shard", NJson::TJsonMap({{"S", "10"}})},
+ {"id_sort", NJson::TJsonMap({{"S", "100"}})},
+ })},
+ {"SequenceNumber", "000000000000000000001"},
+ {"StreamViewType", "KEYS_ONLY"},
+ })},
+ {"eventID", "***"},
+ {"eventName", "MODIFY"},
+ {"eventSource", "ydb:document-table"},
+ {"eventVersion", "1.0"},
+ }), false),
+ }, false /* do not check key */);
+
+ TRunner::Read(DocApiTable(), NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatDocApiJson), {R"(
+ UPSERT INTO `/Root/Table` (__Hash, id_shard, id_sort, __RowData, extra) VALUES (
+ 1, "10", "100", JsonDocument('{"M":{"color":{"S":"pink"},"weight":{"N":"4.5"}}}'), true
+ );
+ )", R"(
+ UPSERT INTO `/Root/Table` (__Hash, id_shard, id_sort, __RowData, extra) VALUES (
+ 1, "10", "100", JsonDocument('{"M":{"color":{"S":"yellow"},"weight":{"N":"5.4"}}}'), false
+ );
+ )", R"(
+ DELETE FROM `/Root/Table` WHERE __Hash = 1;
+ )"}, {
+ WriteJson(NJson::TJsonMap({
+ {"awsRegion", ""},
+ {"dynamodb", NJson::TJsonMap({
+ {"ApproximateCreationDateTime", "***"},
+ {"Keys", NJson::TJsonMap({
+ {"id_shard", NJson::TJsonMap({{"S", "10"}})},
+ {"id_sort", NJson::TJsonMap({{"S", "100"}})},
+ })},
+ {"NewImage", NJson::TJsonMap({
+ {"id_shard", NJson::TJsonMap({{"S", "10"}})},
+ {"id_sort", NJson::TJsonMap({{"S", "100"}})},
+ {"color", NJson::TJsonMap({{"S", "pink"}})},
+ {"weight", NJson::TJsonMap({{"N", "4.5"}})},
+ {"extra", NJson::TJsonMap({{"BOOL", true}})},
+ })},
+ {"SequenceNumber", "000000000000000000001"},
+ {"StreamViewType", "NEW_AND_OLD_IMAGES"},
+ })},
+ {"eventID", "***"},
+ {"eventName", "INSERT"},
+ {"eventSource", "ydb:document-table"},
+ {"eventVersion", "1.0"},
+ }), false),
+ WriteJson(NJson::TJsonMap({
+ {"awsRegion", ""},
+ {"dynamodb", NJson::TJsonMap({
+ {"ApproximateCreationDateTime", "***"},
+ {"Keys", NJson::TJsonMap({
+ {"id_shard", NJson::TJsonMap({{"S", "10"}})},
+ {"id_sort", NJson::TJsonMap({{"S", "100"}})},
+ })},
+ {"OldImage", NJson::TJsonMap({
+ {"id_shard", NJson::TJsonMap({{"S", "10"}})},
+ {"id_sort", NJson::TJsonMap({{"S", "100"}})},
+ {"color", NJson::TJsonMap({{"S", "pink"}})},
+ {"weight", NJson::TJsonMap({{"N", "4.5"}})},
+ {"extra", NJson::TJsonMap({{"BOOL", true}})},
+ })},
+ {"NewImage", NJson::TJsonMap({
+ {"id_shard", NJson::TJsonMap({{"S", "10"}})},
+ {"id_sort", NJson::TJsonMap({{"S", "100"}})},
+ {"color", NJson::TJsonMap({{"S", "yellow"}})},
+ {"weight", NJson::TJsonMap({{"N", "5.4"}})},
+ {"extra", NJson::TJsonMap({{"BOOL", false}})},
+ })},
+ {"SequenceNumber", "000000000000000000002"},
+ {"StreamViewType", "NEW_AND_OLD_IMAGES"},
+ })},
+ {"eventID", "***"},
+ {"eventName", "MODIFY"},
+ {"eventSource", "ydb:document-table"},
+ {"eventVersion", "1.0"},
+ }), false),
+ WriteJson(NJson::TJsonMap({
+ {"awsRegion", ""},
+ {"dynamodb", NJson::TJsonMap({
+ {"ApproximateCreationDateTime", "***"},
+ {"Keys", NJson::TJsonMap({
+ {"id_shard", NJson::TJsonMap({{"S", "10"}})},
+ {"id_sort", NJson::TJsonMap({{"S", "100"}})},
+ })},
+ {"OldImage", NJson::TJsonMap({
+ {"id_shard", NJson::TJsonMap({{"S", "10"}})},
+ {"id_sort", NJson::TJsonMap({{"S", "100"}})},
+ {"color", NJson::TJsonMap({{"S", "yellow"}})},
+ {"weight", NJson::TJsonMap({{"N", "5.4"}})},
+ {"extra", NJson::TJsonMap({{"BOOL", false}})},
+ })},
+ {"SequenceNumber", "000000000000000000003"},
+ {"StreamViewType", "NEW_AND_OLD_IMAGES"},
+ })},
+ {"eventID", "***"},
+ {"eventName", "REMOVE"},
+ {"eventSource", "ydb:document-table"},
+ {"eventVersion", "1.0"},
+ }), false),
+ }, false /* do not check key */);
}
Y_UNIT_TEST_TRIPLET(NaN, PqRunner, YdsRunner, TopicRunner) {
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index ae94c7e39dc..065cb8be550 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -2804,6 +2804,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
auto mode = rowset.GetValue<Schema::CdcStream::Mode>();
auto format = rowset.GetValue<Schema::CdcStream::Format>();
auto vt = rowset.GetValueOrDefault<Schema::CdcStream::VirtualTimestamps>(false);
+ auto awsRegion = rowset.GetValue<Schema::CdcStream::AwsRegion>();
auto state = rowset.GetValue<Schema::CdcStream::State>();
Y_VERIFY_S(Self->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId);
@@ -2814,7 +2815,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
<< ", path type: " << NKikimrSchemeOp::EPathType_Name(path->PathType));
Y_VERIFY(!Self->CdcStreams.contains(pathId));
- Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, mode, format, vt, state);
+ Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, mode, format, vt, awsRegion, state);
Self->IncrementPathDbRefCount(pathId);
if (state == NKikimrSchemeOp::ECdcStreamStateScan) {
@@ -2847,6 +2848,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
auto mode = rowset.GetValue<Schema::CdcStreamAlterData::Mode>();
auto format = rowset.GetValue<Schema::CdcStreamAlterData::Format>();
auto vt = rowset.GetValueOrDefault<Schema::CdcStreamAlterData::VirtualTimestamps>(false);
+ auto awsRegion = rowset.GetValue<Schema::CdcStreamAlterData::AwsRegion>();
auto state = rowset.GetValue<Schema::CdcStreamAlterData::State>();
Y_VERIFY_S(Self->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId);
@@ -2858,14 +2860,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
if (!Self->CdcStreams.contains(pathId)) {
Y_VERIFY(alterVersion == 1);
- Self->CdcStreams[pathId] = TCdcStreamInfo::New(mode, format, vt);
+ Self->CdcStreams[pathId] = TCdcStreamInfo::New(mode, format, vt, awsRegion);
Self->IncrementPathDbRefCount(pathId);
}
auto stream = Self->CdcStreams.at(pathId);
Y_VERIFY(stream->AlterData == nullptr);
Y_VERIFY(stream->AlterVersion < alterVersion);
- stream->AlterData = new TCdcStreamInfo(alterVersion, mode, format, vt, state);
+ stream->AlterData = new TCdcStreamInfo(alterVersion, mode, format, vt, awsRegion, state);
Y_VERIFY_S(Self->PathsById.contains(path->ParentPathId), "Parent path is not found"
<< ", cdc stream pathId: " << pathId
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index 4fd6b490db0..2370f7a703f 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -174,11 +174,17 @@ public:
switch (streamDesc.GetMode()) {
case NKikimrSchemeOp::ECdcStreamModeKeysOnly:
- case NKikimrSchemeOp::ECdcStreamModeUpdate:
case NKikimrSchemeOp::ECdcStreamModeNewImage:
case NKikimrSchemeOp::ECdcStreamModeOldImage:
case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages:
break;
+ case NKikimrSchemeOp::ECdcStreamModeUpdate:
+ if (streamDesc.GetFormat() == NKikimrSchemeOp::ECdcStreamFormatDocApiJson) {
+ result->SetError(NKikimrScheme::StatusInvalidParameter,
+ "DocApiJson format incompatible with specified stream mode");
+ return result;
+ }
+ break;
default:
result->SetError(NKikimrScheme::StatusInvalidParameter, TStringBuilder()
<< "Invalid stream mode: " << static_cast<ui32>(streamDesc.GetMode()));
@@ -188,6 +194,13 @@ public:
switch (streamDesc.GetFormat()) {
case NKikimrSchemeOp::ECdcStreamFormatProto:
case NKikimrSchemeOp::ECdcStreamFormatJson:
+ if (!streamDesc.GetAwsRegion().empty()) {
+ result->SetError(NKikimrScheme::StatusInvalidParameter,
+ "AwsRegion option incompatible with specified stream format");
+ return result;
+ }
+ break;
+ case NKikimrSchemeOp::ECdcStreamFormatDocApiJson:
break;
default:
result->SetError(NKikimrScheme::StatusInvalidParameter, TStringBuilder()
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 8787e0d877d..675394eed80 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -1546,6 +1546,7 @@ void TSchemeShard::PersistCdcStream(NIceDb::TNiceDb& db, const TPathId& pathId)
NIceDb::TUpdate<Schema::CdcStream::Mode>(alterData->Mode),
NIceDb::TUpdate<Schema::CdcStream::Format>(alterData->Format),
NIceDb::TUpdate<Schema::CdcStream::VirtualTimestamps>(alterData->VirtualTimestamps),
+ NIceDb::TUpdate<Schema::CdcStream::AwsRegion>(alterData->AwsRegion),
NIceDb::TUpdate<Schema::CdcStream::State>(alterData->State)
);
@@ -1570,6 +1571,7 @@ void TSchemeShard::PersistCdcStreamAlterData(NIceDb::TNiceDb& db, const TPathId&
NIceDb::TUpdate<Schema::CdcStreamAlterData::Mode>(alterData->Mode),
NIceDb::TUpdate<Schema::CdcStreamAlterData::Format>(alterData->Format),
NIceDb::TUpdate<Schema::CdcStreamAlterData::VirtualTimestamps>(alterData->VirtualTimestamps),
+ NIceDb::TUpdate<Schema::CdcStreamAlterData::AwsRegion>(alterData->AwsRegion),
NIceDb::TUpdate<Schema::CdcStreamAlterData::State>(alterData->State)
);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 418df0824ff..f0478f79f87 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -2336,11 +2336,12 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
static constexpr ui32 MaxInProgressShards = 10;
- TCdcStreamInfo(ui64 version, EMode mode, EFormat format, bool vt, EState state)
+ TCdcStreamInfo(ui64 version, EMode mode, EFormat format, bool vt, const TString& awsRegion, EState state)
: AlterVersion(version)
, Mode(mode)
, Format(format)
, VirtualTimestamps(vt)
+ , AwsRegion(awsRegion)
, State(state)
{}
@@ -2354,12 +2355,12 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
return result;
}
- static TPtr New(EMode mode, EFormat format, bool vt) {
- return new TCdcStreamInfo(0, mode, format, vt, EState::ECdcStreamStateInvalid);
+ static TPtr New(EMode mode, EFormat format, bool vt, const TString& awsRegion) {
+ return new TCdcStreamInfo(0, mode, format, vt, awsRegion, EState::ECdcStreamStateInvalid);
}
static TPtr Create(const NKikimrSchemeOp::TCdcStreamDescription& desc) {
- TPtr result = New(desc.GetMode(), desc.GetFormat(), desc.GetVirtualTimestamps());
+ TPtr result = New(desc.GetMode(), desc.GetFormat(), desc.GetVirtualTimestamps(), desc.GetAwsRegion());
TPtr alterData = result->CreateNextVersion();
alterData->State = EState::ECdcStreamStateReady;
if (desc.HasState()) {
@@ -2373,6 +2374,7 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
EMode Mode;
EFormat Format;
bool VirtualTimestamps;
+ TString AwsRegion;
EState State;
TCdcStreamInfo::TPtr AlterData = nullptr;
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
index 124092d7061..18d77bb4758 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
@@ -1162,6 +1162,7 @@ void TSchemeShard::DescribeCdcStream(const TPathId& pathId, const TString& name,
desc.SetMode(info->Mode);
desc.SetFormat(info->Format);
desc.SetVirtualTimestamps(info->VirtualTimestamps);
+ desc.SetAwsRegion(info->AwsRegion);
PathIdFromPathId(pathId, desc.MutablePathId());
desc.SetState(info->State);
desc.SetSchemaVersion(info->AlterVersion);
diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h
index 9f69cbe17c1..330ee8c2962 100644
--- a/ydb/core/tx/schemeshard/schemeshard_schema.h
+++ b/ydb/core/tx/schemeshard/schemeshard_schema.h
@@ -1542,9 +1542,10 @@ struct Schema : NIceDb::Schema {
struct Mode : Column<5, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamMode; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamModeInvalid; };
struct Format : Column<6, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamFormat; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamFormatInvalid; };
struct VirtualTimestamps : Column<7, NScheme::NTypeIds::Bool> {};
+ struct AwsRegion : Column<8, NScheme::NTypeIds::Utf8> {};
using TKey = TableKey<OwnerPathId, LocalPathId>;
- using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format, VirtualTimestamps>;
+ using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format, VirtualTimestamps, AwsRegion>;
};
struct CdcStreamAlterData : Table<96> {
@@ -1555,9 +1556,10 @@ struct Schema : NIceDb::Schema {
struct Mode : Column<5, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamMode; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamModeInvalid; };
struct Format : Column<6, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamFormat; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamFormatInvalid; };
struct VirtualTimestamps : Column<7, NScheme::NTypeIds::Bool> {};
+ struct AwsRegion : Column<8, NScheme::NTypeIds::Utf8> {};
using TKey = TableKey<OwnerPathId, LocalPathId>;
- using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format, VirtualTimestamps>;
+ using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format, VirtualTimestamps, AwsRegion>;
};
struct CdcStreamScanShardStatus : Table<103> {
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
index 95b5c025c6a..909846991a7 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
@@ -265,6 +265,60 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
env.TestWaitNotification(runtime, txId);
}
+ Y_UNIT_TEST(DocApi) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ // invalid mode
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeUpdate
+ Format: ECdcStreamFormatDocApiJson
+ }
+ )", {NKikimrScheme::StatusInvalidParameter});
+
+ // invalid aws region
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ AwsRegion: "foo"
+ }
+ )", {NKikimrScheme::StatusInvalidParameter});
+
+ // ok
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeNewAndOldImages
+ Format: ECdcStreamFormatDocApiJson
+ AwsRegion: "ru-central1"
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ NLs::PathExist,
+ NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeNewAndOldImages),
+ NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatDocApiJson),
+ NLs::StreamAwsRegion("ru-central1"),
+ });
+ }
+
Y_UNIT_TEST(Negative) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
index 800927f877a..7afe0741e5e 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
@@ -66,6 +66,43 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
CreateStream({}, true);
}
+ Y_UNIT_TEST(CreateStreamWithAwsRegion) {
+ TTestWithReboots t;
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ {
+ TInactiveZone inactive(activeZone);
+ TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+ }
+
+ TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeNewAndOldImages
+ Format: ECdcStreamFormatDocApiJson
+ AwsRegion: "ru-central1"
+ }
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ {
+ TInactiveZone inactive(activeZone);
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ NLs::PathExist,
+ NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeNewAndOldImages),
+ NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatDocApiJson),
+ NLs::StreamAwsRegion("ru-central1"),
+ });
+ }
+ });
+ }
+
Y_UNIT_TEST(DisableStream) {
TTestWithReboots t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
index 75a374c57d4..4aa4cab0215 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
@@ -801,6 +801,12 @@ TCheckFunc StreamVirtualTimestamps(bool value) {
};
}
+TCheckFunc StreamAwsRegion(const TString& value) {
+ return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
+ UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetCdcStreamDescription().GetAwsRegion(), value);
+ };
+}
+
TCheckFunc RetentionPeriod(const TDuration& value) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_VALUES_EQUAL(value.Seconds(), record.GetPathDescription().GetPersQueueGroup()
diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
index ed99b31869e..9738370f94f 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
@@ -133,6 +133,7 @@ namespace NLs {
TCheckFunc StreamFormat(NKikimrSchemeOp::ECdcStreamFormat format);
TCheckFunc StreamState(NKikimrSchemeOp::ECdcStreamState state);
TCheckFunc StreamVirtualTimestamps(bool value);
+ TCheckFunc StreamAwsRegion(const TString& value);
TCheckFunc RetentionPeriod(const TDuration& value);
TCheckFunc HasBackupInFly(ui64 txId);
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
index 487ed9dad6d..183ae4f4852 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
@@ -6128,11 +6128,6 @@
],
"ColumnsAdded": [
{
- "ColumnId": 7,
- "ColumnName": "VirtualTimestamps",
- "ColumnType": "Bool"
- },
- {
"ColumnId": 1,
"ColumnName": "OwnerPathId",
"ColumnType": "Uint64"
@@ -6161,19 +6156,30 @@
"ColumnId": 6,
"ColumnName": "Format",
"ColumnType": "Uint32"
+ },
+ {
+ "ColumnId": 7,
+ "ColumnName": "VirtualTimestamps",
+ "ColumnType": "Bool"
+ },
+ {
+ "ColumnId": 8,
+ "ColumnName": "AwsRegion",
+ "ColumnType": "Utf8"
}
],
"ColumnsDropped": [],
"ColumnFamilies": {
"0": {
"Columns": [
- 7,
1,
2,
3,
4,
5,
- 6
+ 6,
+ 7,
+ 8
],
"RoomID": 0,
"Codec": 0,
@@ -6200,11 +6206,6 @@
],
"ColumnsAdded": [
{
- "ColumnId": 7,
- "ColumnName": "VirtualTimestamps",
- "ColumnType": "Bool"
- },
- {
"ColumnId": 1,
"ColumnName": "OwnerPathId",
"ColumnType": "Uint64"
@@ -6233,19 +6234,30 @@
"ColumnId": 6,
"ColumnName": "Format",
"ColumnType": "Uint32"
+ },
+ {
+ "ColumnId": 7,
+ "ColumnName": "VirtualTimestamps",
+ "ColumnType": "Bool"
+ },
+ {
+ "ColumnId": 8,
+ "ColumnName": "AwsRegion",
+ "ColumnType": "Utf8"
}
],
"ColumnsDropped": [],
"ColumnFamilies": {
"0": {
"Columns": [
- 7,
1,
2,
3,
4,
5,
- 6
+ 6,
+ 7,
+ 8
],
"RoomID": 0,
"Codec": 0,