aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-05-15 10:54:51 +0300
committerilnaz <ilnaz@ydb.tech>2023-05-15 10:54:51 +0300
commit90542cdcf93b2398f9abe97e8dc8cfe6787a663b (patch)
tree432c4592c1805e1b9a4222f79077af06c88226e5
parentb48461977a3888bf035dac67d2a2faefaf1f0fcd (diff)
downloadydb-90542cdcf93b2398f9abe97e8dc8cfe6787a663b.tar.gz
Forbid DocApiJson format on non-document tables
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.cpp9
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp5
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream.cpp28
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp2
6 files changed, 44 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 541038f4e57..173761d5a93 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -1224,6 +1224,9 @@ Y_UNIT_TEST_SUITE(Cdc) {
{"id_sort", "Utf8", true, false},
{"__RowData", "JsonDocument", false, false},
{"extra", "Bool", false, false},
+ })
+ .Attributes({
+ {"__document_api_version", "1"},
});
}
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp
index 1ede1405371..af40822413f 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_common.cpp
@@ -1048,6 +1048,7 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> MakeSQLRequest(const TString &sql,
request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
}
+ request->Record.SetRequestType("_document_api_request");
request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
request->Record.MutableRequest()->SetType(dml
? NKikimrKqp::QUERY_TYPE_SQL_DML
@@ -1143,6 +1144,12 @@ void CreateShardedTable(
}
}
+ for (const auto& [k, v] : opts.Attributes_) {
+ auto* attr = tx.MutableAlterUserAttributes()->AddUserAttributes();
+ attr->SetKey(k);
+ attr->SetValue(v);
+ }
+
desc->SetUniformPartitionsCount(opts.Shards_);
if (!opts.EnableOutOfOrder_)
@@ -1656,6 +1663,8 @@ ui64 AsyncAlterAddStream(
const TShardedTableOptions::TCdcStream& streamDesc)
{
auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpCreateCdcStream, workingDir);
+ request->Record.SetRequestType("_document_api_request");
+
auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableCreateCdcStream();
desc.SetTableName(tableName);
desc.MutableStreamDescription()->SetName(streamDesc.Name);
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index 2839c518a63..4d8f97bc25a 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -437,6 +437,8 @@ struct TShardedTableOptions {
bool VirtualTimestamps = false;
};
+ using TAttributes = THashMap<TString, TString>;
+
#define TABLE_OPTION_IMPL(type, name, defaultValue) \
TSelf& name(type value) {\
name##_ = std::move(value); \
@@ -458,6 +460,7 @@ struct TShardedTableOptions {
TABLE_OPTION(std::optional<ui64>, ExecutorCacheSize, std::nullopt);
TABLE_OPTION(bool, Replicated, false);
TABLE_OPTION(std::optional<EReplicationConsistency>, ReplicationConsistency, std::nullopt);
+ TABLE_OPTION(TAttributes, Attributes, {});
#undef TABLE_OPTION
#undef TABLE_OPTION_IMPL
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index 2370f7a703f..5d541b0a458 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -201,6 +201,11 @@ public:
}
break;
case NKikimrSchemeOp::ECdcStreamFormatDocApiJson:
+ if (tablePath.Base()->DocumentApiVersion < 1) {
+ result->SetError(NKikimrScheme::StatusInvalidParameter,
+ "DocApiJson format incompatible with non-document table");
+ return result;
+ }
break;
default:
result->SetError(NKikimrScheme::StatusInvalidParameter, TStringBuilder()
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
index 909846991a7..b61ac1c07ca 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
@@ -271,16 +271,34 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
ui64 txId = 100;
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
- Name: "Table"
+ Name: "RowTable"
Columns { Name: "key" Type: "Uint64" }
Columns { Name: "value" Type: "Uint64" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "DocumentTable"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )", {NKikimrScheme::StatusAccepted}, AlterUserAttrs({{"__document_api_version", "1"}}));
+ env.TestWaitNotification(runtime, txId);
+
+ // non-document table
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "RowTable"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeNewAndOldImages
+ Format: ECdcStreamFormatDocApiJson
+ }
+ )", {NKikimrScheme::StatusInvalidParameter});
+
// invalid mode
TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
- TableName: "Table"
+ TableName: "DocumentTable"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeUpdate
@@ -290,7 +308,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
// invalid aws region
TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
- TableName: "Table"
+ TableName: "DocumentTable"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
@@ -301,7 +319,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
// ok
TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
- TableName: "Table"
+ TableName: "DocumentTable"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeNewAndOldImages
@@ -311,7 +329,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
)");
env.TestWaitNotification(runtime, txId);
- TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/DocumentTable/Stream"), {
NLs::PathExist,
NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeNewAndOldImages),
NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatDocApiJson),
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
index 7afe0741e5e..ed6640335de 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
@@ -76,7 +76,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
Columns { Name: "key" Type: "Uint64" }
Columns { Name: "value" Type: "Uint64" }
KeyColumnNames: ["key"]
- )");
+ )", {NKikimrScheme::StatusAccepted}, AlterUserAttrs({{"__document_api_version", "1"}}));
t.TestEnv->TestWaitNotification(runtime, t.TxId);
}