diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-05-15 10:54:51 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-05-15 10:54:51 +0300 |
commit | 90542cdcf93b2398f9abe97e8dc8cfe6787a663b (patch) | |
tree | 432c4592c1805e1b9a4222f79077af06c88226e5 | |
parent | b48461977a3888bf035dac67d2a2faefaf1f0fcd (diff) | |
download | ydb-90542cdcf93b2398f9abe97e8dc8cfe6787a663b.tar.gz |
Forbid DocApiJson format on non-document tables
6 files changed, 44 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 541038f4e57..173761d5a93 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -1224,6 +1224,9 @@ Y_UNIT_TEST_SUITE(Cdc) { {"id_sort", "Utf8", true, false}, {"__RowData", "JsonDocument", false, false}, {"extra", "Bool", false, false}, + }) + .Attributes({ + {"__document_api_version", "1"}, }); } diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp index 1ede1405371..af40822413f 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/datashard_ut_common.cpp @@ -1048,6 +1048,7 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> MakeSQLRequest(const TString &sql, request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); } + request->Record.SetRequestType("_document_api_request"); request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); request->Record.MutableRequest()->SetType(dml ? NKikimrKqp::QUERY_TYPE_SQL_DML @@ -1143,6 +1144,12 @@ void CreateShardedTable( } } + for (const auto& [k, v] : opts.Attributes_) { + auto* attr = tx.MutableAlterUserAttributes()->AddUserAttributes(); + attr->SetKey(k); + attr->SetValue(v); + } + desc->SetUniformPartitionsCount(opts.Shards_); if (!opts.EnableOutOfOrder_) @@ -1656,6 +1663,8 @@ ui64 AsyncAlterAddStream( const TShardedTableOptions::TCdcStream& streamDesc) { auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpCreateCdcStream, workingDir); + request->Record.SetRequestType("_document_api_request"); + auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableCreateCdcStream(); desc.SetTableName(tableName); desc.MutableStreamDescription()->SetName(streamDesc.Name); diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index 2839c518a63..4d8f97bc25a 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -437,6 +437,8 @@ struct TShardedTableOptions { bool VirtualTimestamps = false; }; + using TAttributes = THashMap<TString, TString>; + #define TABLE_OPTION_IMPL(type, name, defaultValue) \ TSelf& name(type value) {\ name##_ = std::move(value); \ @@ -458,6 +460,7 @@ struct TShardedTableOptions { TABLE_OPTION(std::optional<ui64>, ExecutorCacheSize, std::nullopt); TABLE_OPTION(bool, Replicated, false); TABLE_OPTION(std::optional<EReplicationConsistency>, ReplicationConsistency, std::nullopt); + TABLE_OPTION(TAttributes, Attributes, {}); #undef TABLE_OPTION #undef TABLE_OPTION_IMPL diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 2370f7a703f..5d541b0a458 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -201,6 +201,11 @@ public: } break; case NKikimrSchemeOp::ECdcStreamFormatDocApiJson: + if (tablePath.Base()->DocumentApiVersion < 1) { + result->SetError(NKikimrScheme::StatusInvalidParameter, + "DocApiJson format incompatible with non-document table"); + return result; + } break; default: result->SetError(NKikimrScheme::StatusInvalidParameter, TStringBuilder() diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp index 909846991a7..b61ac1c07ca 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp @@ -271,16 +271,34 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { ui64 txId = 100; TestCreateTable(runtime, ++txId, "/MyRoot", R"( - Name: "Table" + Name: "RowTable" Columns { Name: "key" Type: "Uint64" } Columns { Name: "value" Type: "Uint64" } KeyColumnNames: ["key"] )"); env.TestWaitNotification(runtime, txId); + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "DocumentTable" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )", {NKikimrScheme::StatusAccepted}, AlterUserAttrs({{"__document_api_version", "1"}})); + env.TestWaitNotification(runtime, txId); + + // non-document table + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "RowTable" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeNewAndOldImages + Format: ECdcStreamFormatDocApiJson + } + )", {NKikimrScheme::StatusInvalidParameter}); + // invalid mode TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( - TableName: "Table" + TableName: "DocumentTable" StreamDescription { Name: "Stream" Mode: ECdcStreamModeUpdate @@ -290,7 +308,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { // invalid aws region TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( - TableName: "Table" + TableName: "DocumentTable" StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly @@ -301,7 +319,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { // ok TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( - TableName: "Table" + TableName: "DocumentTable" StreamDescription { Name: "Stream" Mode: ECdcStreamModeNewAndOldImages @@ -311,7 +329,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { )"); env.TestWaitNotification(runtime, txId); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/DocumentTable/Stream"), { NLs::PathExist, NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeNewAndOldImages), NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatDocApiJson), diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp index 7afe0741e5e..ed6640335de 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp @@ -76,7 +76,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { Columns { Name: "key" Type: "Uint64" } Columns { Name: "value" Type: "Uint64" } KeyColumnNames: ["key"] - )"); + )", {NKikimrScheme::StatusAccepted}, AlterUserAttrs({{"__document_api_version", "1"}})); t.TestEnv->TestWaitNotification(runtime, t.TxId); } |