aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-05-16 00:24:16 +0300
committerilnaz <ilnaz@ydb.tech>2023-05-16 00:24:16 +0300
commit809153ed08e63d3a0ced63eb09123e9fd9702a0e (patch)
tree7ea1dfb00d1ace37d575a43c7949524423c0434c
parent230c8266f61ee8df2941560e54f4f396fcbe5d3f (diff)
downloadydb-809153ed08e63d3a0ced63eb09123e9fd9702a0e.tar.gz
DOCUMENT_TABLE_JSON format
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp2
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp62
-rw-r--r--ydb/core/ydb_convert/table_description.cpp6
-rw-r--r--ydb/public/api/protos/ydb_table.proto3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.cpp6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table_enum.h1
6 files changed, 68 insertions, 12 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index cf375c146c4..cfe0bd90c83 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -1291,6 +1291,8 @@ public:
if (to_lower(format) == "json") {
add_changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON);
+ } else if (to_lower(format) == "document_table_json") {
+ add_changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DOCUMENT_TABLE_JSON);
} else {
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
TStringBuilder() << "Unknown changefeed format: " << format));
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index b3a0bbfc7fd..19699941468 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -2655,6 +2655,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
switch (format) {
case EChangefeedFormat::Json:
return "JSON";
+ case EChangefeedFormat::DocumentTableJson:
+ return "DOCUMENT_TABLE_JSON";
case EChangefeedFormat::Unknown:
UNIT_ASSERT(false);
return "";
@@ -2667,19 +2669,24 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
auto session = db.CreateSession().GetValueSync().GetSession();
{
- auto query = R"(
- --!syntax_v1
- CREATE TABLE `/Root/table` (
- Key Uint64,
- Value String,
- PRIMARY KEY (Key)
- );
- )";
+ auto builder = TTableBuilder()
+ .AddNullableColumn("Key", EPrimitiveType::Uint64)
+ .AddNullableColumn("Value", EPrimitiveType::String)
+ .SetPrimaryKeyColumn("Key");
- auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ if (format == EChangefeedFormat::DocumentTableJson) {
+ builder.AddAttribute("__document_api_version", "1");
+ }
+
+ auto result = session.CreateTable("/Root/table", builder.Build()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
+ auto execOpts = TExecSchemeQuerySettings();
+ if (format == EChangefeedFormat::DocumentTableJson) {
+ execOpts.RequestType("_document_api_request");
+ }
+
{
auto query = Sprintf(R"(
--!syntax_v1
@@ -2688,7 +2695,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
);
)", ModeToString(mode), FormatToString(format));
- const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ const auto result = session.ExecuteSchemeQuery(query, execOpts).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
auto describeResult = session.DescribeTable("/Root/table").GetValueSync();
@@ -2706,7 +2713,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
ALTER TABLE `/Root/table` DROP CHANGEFEED `feed`;
)";
- const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ const auto result = session.ExecuteSchemeQuery(query, execOpts).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
}
@@ -2718,8 +2725,16 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
}
for (auto format : GetEnumAllValues<EChangefeedFormat>()) {
- if (format == EChangefeedFormat::Unknown) {
+ switch (format) {
+ case EChangefeedFormat::Unknown:
continue;
+ case EChangefeedFormat::DocumentTableJson:
+ if (mode == EChangefeedMode::Updates) {
+ continue;
+ }
+ break;
+ default:
+ break;
}
AddChangefeed(mode, format);
@@ -2827,6 +2842,29 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
}
+
+ {
+ auto result = session.CreateTable("/Root/document-table", TTableBuilder()
+ .AddNullableColumn("Key", EPrimitiveType::Uint64)
+ .AddNullableColumn("Value", EPrimitiveType::String)
+ .SetPrimaryKeyColumn("Key")
+ .AddAttribute("__document_api_version", "1")
+ .Build()
+ ).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto query = R"(
+ --!syntax_v1
+ ALTER TABLE `/Root/document-table` ADD CHANGEFEED `feed` WITH (
+ MODE = 'UPDATES', FORMAT = 'DOCUMENT_TABLE_JSON'
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query, TExecSchemeQuerySettings().RequestType("_document_api_request")).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString());
+ }
}
Y_UNIT_TEST(DropChangefeedNegative) {
diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp
index 7f742899446..85738793a09 100644
--- a/ydb/core/ydb_convert/table_description.cpp
+++ b/ydb/core/ydb_convert/table_description.cpp
@@ -466,6 +466,9 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatJson:
changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON);
break;
+ case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDocApiJson:
+ changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DOCUMENT_TABLE_JSON);
+ break;
default:
break;
}
@@ -508,6 +511,9 @@ bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out,
case Ydb::Table::ChangefeedFormat::FORMAT_JSON:
out.SetFormat(NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatJson);
break;
+ case Ydb::Table::ChangefeedFormat::FORMAT_DOCUMENT_TABLE_JSON:
+ out.SetFormat(NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDocApiJson);
+ break;
default:
status = Ydb::StatusIds::BAD_REQUEST;
error = "Invalid changefeed format";
diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto
index 0f69b2d0fa1..ed7b6dcaefa 100644
--- a/ydb/public/api/protos/ydb_table.proto
+++ b/ydb/public/api/protos/ydb_table.proto
@@ -137,7 +137,10 @@ message ChangefeedMode {
message ChangefeedFormat {
enum Format {
FORMAT_UNSPECIFIED = 0;
+ // Change record in JSON format for common (row oriented) tables
FORMAT_JSON = 1;
+ // Change record in JSON format for document (DynamoDB-compatible) tables
+ FORMAT_DOCUMENT_TABLE_JSON = 2;
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
index 516c8d805f8..34e708203fc 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
@@ -4332,6 +4332,9 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) {
case Ydb::Table::ChangefeedFormat::FORMAT_JSON:
format = EChangefeedFormat::Json;
break;
+ case Ydb::Table::ChangefeedFormat::FORMAT_DOCUMENT_TABLE_JSON:
+ format = EChangefeedFormat::DocumentTableJson;
+ break;
default:
format = EChangefeedFormat::Unknown;
break;
@@ -4395,6 +4398,9 @@ void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const {
case EChangefeedFormat::Json:
proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON);
break;
+ case EChangefeedFormat::DocumentTableJson:
+ proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DOCUMENT_TABLE_JSON);
+ break;
case EChangefeedFormat::Unknown:
break;
}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table_enum.h b/ydb/public/sdk/cpp/client/ydb_table/table_enum.h
index 7a96e160cb0..805c6c0b8cf 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table_enum.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/table_enum.h
@@ -41,6 +41,7 @@ enum class EChangefeedMode {
enum class EChangefeedFormat {
Json /* "JSON" */,
+ DocumentTableJson /* "DOCUMENT_TABLE_JSON" */,
Unknown = std::numeric_limits<int>::max()
};