diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-05-16 00:24:16 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-05-16 00:24:16 +0300 |
commit | 809153ed08e63d3a0ced63eb09123e9fd9702a0e (patch) | |
tree | 7ea1dfb00d1ace37d575a43c7949524423c0434c | |
parent | 230c8266f61ee8df2941560e54f4f396fcbe5d3f (diff) | |
download | ydb-809153ed08e63d3a0ced63eb09123e9fd9702a0e.tar.gz |
DOCUMENT_TABLE_JSON format
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 62 | ||||
-rw-r--r-- | ydb/core/ydb_convert/table_description.cpp | 6 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_table.proto | 3 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_table/table.cpp | 6 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_table/table_enum.h | 1 |
6 files changed, 68 insertions, 12 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index cf375c146c4..cfe0bd90c83 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -1291,6 +1291,8 @@ public: if (to_lower(format) == "json") { add_changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON); + } else if (to_lower(format) == "document_table_json") { + add_changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DOCUMENT_TABLE_JSON); } else { ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()), TStringBuilder() << "Unknown changefeed format: " << format)); diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index b3a0bbfc7fd..19699941468 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -2655,6 +2655,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) { switch (format) { case EChangefeedFormat::Json: return "JSON"; + case EChangefeedFormat::DocumentTableJson: + return "DOCUMENT_TABLE_JSON"; case EChangefeedFormat::Unknown: UNIT_ASSERT(false); return ""; @@ -2667,19 +2669,24 @@ Y_UNIT_TEST_SUITE(KqpScheme) { auto session = db.CreateSession().GetValueSync().GetSession(); { - auto query = R"( - --!syntax_v1 - CREATE TABLE `/Root/table` ( - Key Uint64, - Value String, - PRIMARY KEY (Key) - ); - )"; + auto builder = TTableBuilder() + .AddNullableColumn("Key", EPrimitiveType::Uint64) + .AddNullableColumn("Value", EPrimitiveType::String) + .SetPrimaryKeyColumn("Key"); - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + if (format == EChangefeedFormat::DocumentTableJson) { + builder.AddAttribute("__document_api_version", "1"); + } + + auto result = session.CreateTable("/Root/table", builder.Build()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } + auto execOpts = TExecSchemeQuerySettings(); + if (format == EChangefeedFormat::DocumentTableJson) { + execOpts.RequestType("_document_api_request"); + } + { auto query = Sprintf(R"( --!syntax_v1 @@ -2688,7 +2695,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { ); )", ModeToString(mode), FormatToString(format)); - const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + const auto result = session.ExecuteSchemeQuery(query, execOpts).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); auto describeResult = session.DescribeTable("/Root/table").GetValueSync(); @@ -2706,7 +2713,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { ALTER TABLE `/Root/table` DROP CHANGEFEED `feed`; )"; - const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + const auto result = session.ExecuteSchemeQuery(query, execOpts).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } } @@ -2718,8 +2725,16 @@ Y_UNIT_TEST_SUITE(KqpScheme) { } for (auto format : GetEnumAllValues<EChangefeedFormat>()) { - if (format == EChangefeedFormat::Unknown) { + switch (format) { + case EChangefeedFormat::Unknown: continue; + case EChangefeedFormat::DocumentTableJson: + if (mode == EChangefeedMode::Updates) { + continue; + } + break; + default: + break; } AddChangefeed(mode, format); @@ -2827,6 +2842,29 @@ Y_UNIT_TEST_SUITE(KqpScheme) { const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); } + + { + auto result = session.CreateTable("/Root/document-table", TTableBuilder() + .AddNullableColumn("Key", EPrimitiveType::Uint64) + .AddNullableColumn("Value", EPrimitiveType::String) + .SetPrimaryKeyColumn("Key") + .AddAttribute("__document_api_version", "1") + .Build() + ).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto query = R"( + --!syntax_v1 + ALTER TABLE `/Root/document-table` ADD CHANGEFEED `feed` WITH ( + MODE = 'UPDATES', FORMAT = 'DOCUMENT_TABLE_JSON' + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query, TExecSchemeQuerySettings().RequestType("_document_api_request")).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); + } } Y_UNIT_TEST(DropChangefeedNegative) { diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index 7f742899446..85738793a09 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -466,6 +466,9 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out, case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatJson: changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON); break; + case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDocApiJson: + changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DOCUMENT_TABLE_JSON); + break; default: break; } @@ -508,6 +511,9 @@ bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out, case Ydb::Table::ChangefeedFormat::FORMAT_JSON: out.SetFormat(NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatJson); break; + case Ydb::Table::ChangefeedFormat::FORMAT_DOCUMENT_TABLE_JSON: + out.SetFormat(NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDocApiJson); + break; default: status = Ydb::StatusIds::BAD_REQUEST; error = "Invalid changefeed format"; diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index 0f69b2d0fa1..ed7b6dcaefa 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -137,7 +137,10 @@ message ChangefeedMode { message ChangefeedFormat { enum Format { FORMAT_UNSPECIFIED = 0; + // Change record in JSON format for common (row oriented) tables FORMAT_JSON = 1; + // Change record in JSON format for document (DynamoDB-compatible) tables + FORMAT_DOCUMENT_TABLE_JSON = 2; } } diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 516c8d805f8..34e708203fc 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -4332,6 +4332,9 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) { case Ydb::Table::ChangefeedFormat::FORMAT_JSON: format = EChangefeedFormat::Json; break; + case Ydb::Table::ChangefeedFormat::FORMAT_DOCUMENT_TABLE_JSON: + format = EChangefeedFormat::DocumentTableJson; + break; default: format = EChangefeedFormat::Unknown; break; @@ -4395,6 +4398,9 @@ void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const { case EChangefeedFormat::Json: proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON); break; + case EChangefeedFormat::DocumentTableJson: + proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DOCUMENT_TABLE_JSON); + break; case EChangefeedFormat::Unknown: break; } diff --git a/ydb/public/sdk/cpp/client/ydb_table/table_enum.h b/ydb/public/sdk/cpp/client/ydb_table/table_enum.h index 7a96e160cb0..805c6c0b8cf 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table_enum.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table_enum.h @@ -41,6 +41,7 @@ enum class EChangefeedMode { enum class EChangefeedFormat { Json /* "JSON" */, + DocumentTableJson /* "DOCUMENT_TABLE_JSON" */, Unknown = std::numeric_limits<int>::max() }; |