aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-02-21 22:07:31 +0300
committerIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-02-21 22:07:31 +0300
commit6152d04c21312ed439cce3ed7e8354619a0c9537 (patch)
treee1b8b6ac8e9221af1fc5b001d49eeed65ccac57c
parentc53684a42ee8692a90906715a64a7cd561fe3361 (diff)
downloadydb-6152d04c21312ed439cce3ed7e8354619a0c9537.tar.gz
Added tests for CHANGEFEED KIKIMR-14213
ref:ece98d67baf9aec96421a2b70c992b865e8c05ef
-rw-r--r--ydb/library/yql/sql/v1/sql.cpp6
-rw-r--r--ydb/library/yql/sql/v1/sql_ut.cpp75
2 files changed, 79 insertions, 2 deletions
diff --git a/ydb/library/yql/sql/v1/sql.cpp b/ydb/library/yql/sql/v1/sql.cpp
index 00358cf342..b3f12bb4e3 100644
--- a/ydb/library/yql/sql/v1/sql.cpp
+++ b/ydb/library/yql/sql/v1/sql.cpp
@@ -636,10 +636,12 @@ static bool ChangefeedSettingsEntry(const TRule_changefeed_settings_entry& node,
}
if (to_lower(id.Name) == "sink_type") {
- if (to_lower(value) == "local") {
+ auto parsed = StringContent(ctx.Context(), ctx.Context().Pos(), value);
+ YQL_ENSURE(parsed.Defined());
+ if (to_lower(parsed->Content) == "local") {
settings.SinkSettings = TChangefeedSettings::TLocalSinkSettings();
} else {
- ctx.Error() << "Unknown changefeed's sink type: " << to_upper(value);
+ ctx.Error() << "Unknown changefeed sink type: " << to_upper(parsed->Content);
return false;
}
} else if (to_lower(id.Name) == "mode") {
diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp
index 85b6e0dfcd..1fe963dc96 100644
--- a/ydb/library/yql/sql/v1/sql_ut.cpp
+++ b/ydb/library/yql/sql/v1/sql_ut.cpp
@@ -218,6 +218,16 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
UNIT_ASSERT(SqlToYql("USE plato; SELECT ASYNC FROM ASYNC").IsOk());
}
+ Y_UNIT_TEST(DisableKeywordNotReservedForNames) {
+ UNIT_ASSERT(SqlToYql("USE plato; CREATE TABLE DISABLE (DISABLE Uint32, PRIMARY KEY (DISABLE));").IsOk());
+ UNIT_ASSERT(SqlToYql("USE plato; SELECT DISABLE FROM DISABLE").IsOk());
+ }
+
+ Y_UNIT_TEST(ChangefeedKeywordNotReservedForNames) {
+ UNIT_ASSERT(SqlToYql("USE plato; CREATE TABLE CHANGEFEED (CHANGEFEED Uint32, PRIMARY KEY (CHANGEFEED));").IsOk());
+ UNIT_ASSERT(SqlToYql("USE plato; SELECT CHANGEFEED FROM CHANGEFEED").IsOk());
+ }
+
Y_UNIT_TEST(Jubilee) {
NYql::TAstParseResult res = SqlToYql("USE plato; INSERT INTO Arcadia (r2000000) VALUES (\"2M GET!!!\");");
UNIT_ASSERT(res.Root);
@@ -1415,6 +1425,31 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
}
+ Y_UNIT_TEST(ChangefeedParseCorrect) {
+ auto res = SqlToYql(R"( USE plato;
+ CREATE TABLE tableName (
+ Key Uint32, PRIMARY KEY (Key),
+ CHANGEFEED feedName WITH (MODE = 'KEYS_ONLY', FORMAT = 'json')
+ );
+ )");
+ UNIT_ASSERT_C(res.Root, Err2Str(res));
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "Write") {
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("changefeed"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("mode"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("KEYS_ONLY"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("format"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("json"));
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("Write"), 0} };
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
+ }
+
Y_UNIT_TEST(CloneForAsTableWorksWithCube) {
UNIT_ASSERT(SqlToYql("SELECT * FROM AS_TABLE([<|k1:1, k2:1|>]) GROUP BY CUBE(k1, k2);").IsOk());
}
@@ -1586,6 +1621,20 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
UNIT_ASSERT(SqlToYql("USE plato; ALTER TABLE table SET (TTL = Interval(\"PT3H\") ON column)").IsOk());
}
+ Y_UNIT_TEST(AlterTableAddChangefeedIsCorrect) {
+ UNIT_ASSERT(SqlToYql("USE plato; ALTER TABLE table ADD CHANGEFEED feed WITH (MODE = 'UPDATES', FORMAT = 'json')").IsOk());
+ }
+
+ Y_UNIT_TEST(AlterTableAlterChangefeedIsCorrect) {
+ UNIT_ASSERT(SqlToYql("USE plato; ALTER TABLE table ALTER CHANGEFEED feed DISABLE").IsOk());
+ ExpectFailWithError("USE plato; ALTER TABLE table ALTER CHANGEFEED feed SET (FORMAT = 'proto');",
+ "<main>:1:66: Error: FORMAT alter is not supported\n");
+ }
+
+ Y_UNIT_TEST(AlterTableDropChangefeedIsCorrect) {
+ UNIT_ASSERT(SqlToYql("USE plato; ALTER TABLE table DROP CHANGEFEED feed").IsOk());
+ }
+
Y_UNIT_TEST(AlterTableSetPartitioningIsCorrect) {
UNIT_ASSERT(SqlToYql("USE plato; ALTER TABLE table SET (AUTO_PARTITIONING_BY_SIZE = DISABLED)").IsOk());
}
@@ -2956,6 +3005,32 @@ select FormatType($f());
"<main>:4:26: Error: Invalid TTL settings\n");
}
+ Y_UNIT_TEST(InvalidChangefeedSink) {
+ auto req = R"(
+ USE plato;
+ CREATE TABLE tableName (
+ Key Uint32, PRIMARY KEY (Key),
+ CHANGEFEED feedName WITH (SINK_TYPE = "S3", MODE = "KEYS_ONLY", FORMAT = "json")
+ );
+ )";
+ auto res = SqlToYql(req);
+ UNIT_ASSERT(!res.Root);
+ UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:55: Error: Unknown changefeed sink type: S3\n");
+ }
+
+ Y_UNIT_TEST(InvalidChangefeedSettings) {
+ auto req = R"(
+ USE plato;
+ CREATE TABLE tableName (
+ Key Uint32, PRIMARY KEY (Key),
+ CHANGEFEED feedName WITH (SINK_TYPE = "local", FOO = "bar")
+ );
+ )";
+ auto res = SqlToYql(req);
+ UNIT_ASSERT(!res.Root);
+ UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:70: Error: Unknown changefeed setting: FOO\n");
+ }
+
Y_UNIT_TEST(ErrJoinWithGroupingSetsWithoutCorrelationName) {
auto req = "USE plato;\n"
"\n"