diff options
author | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-02-21 22:07:31 +0300 |
---|---|---|
committer | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-02-21 22:07:31 +0300 |
commit | 6152d04c21312ed439cce3ed7e8354619a0c9537 (patch) | |
tree | e1b8b6ac8e9221af1fc5b001d49eeed65ccac57c | |
parent | c53684a42ee8692a90906715a64a7cd561fe3361 (diff) | |
download | ydb-6152d04c21312ed439cce3ed7e8354619a0c9537.tar.gz |
Added tests for CHANGEFEED KIKIMR-14213
ref:ece98d67baf9aec96421a2b70c992b865e8c05ef
-rw-r--r-- | ydb/library/yql/sql/v1/sql.cpp | 6 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/sql_ut.cpp | 75 |
2 files changed, 79 insertions, 2 deletions
diff --git a/ydb/library/yql/sql/v1/sql.cpp b/ydb/library/yql/sql/v1/sql.cpp index 00358cf342..b3f12bb4e3 100644 --- a/ydb/library/yql/sql/v1/sql.cpp +++ b/ydb/library/yql/sql/v1/sql.cpp @@ -636,10 +636,12 @@ static bool ChangefeedSettingsEntry(const TRule_changefeed_settings_entry& node, } if (to_lower(id.Name) == "sink_type") { - if (to_lower(value) == "local") { + auto parsed = StringContent(ctx.Context(), ctx.Context().Pos(), value); + YQL_ENSURE(parsed.Defined()); + if (to_lower(parsed->Content) == "local") { settings.SinkSettings = TChangefeedSettings::TLocalSinkSettings(); } else { - ctx.Error() << "Unknown changefeed's sink type: " << to_upper(value); + ctx.Error() << "Unknown changefeed sink type: " << to_upper(parsed->Content); return false; } } else if (to_lower(id.Name) == "mode") { diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp index 85b6e0dfcd..1fe963dc96 100644 --- a/ydb/library/yql/sql/v1/sql_ut.cpp +++ b/ydb/library/yql/sql/v1/sql_ut.cpp @@ -218,6 +218,16 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { UNIT_ASSERT(SqlToYql("USE plato; SELECT ASYNC FROM ASYNC").IsOk()); } + Y_UNIT_TEST(DisableKeywordNotReservedForNames) { + UNIT_ASSERT(SqlToYql("USE plato; CREATE TABLE DISABLE (DISABLE Uint32, PRIMARY KEY (DISABLE));").IsOk()); + UNIT_ASSERT(SqlToYql("USE plato; SELECT DISABLE FROM DISABLE").IsOk()); + } + + Y_UNIT_TEST(ChangefeedKeywordNotReservedForNames) { + UNIT_ASSERT(SqlToYql("USE plato; CREATE TABLE CHANGEFEED (CHANGEFEED Uint32, PRIMARY KEY (CHANGEFEED));").IsOk()); + UNIT_ASSERT(SqlToYql("USE plato; SELECT CHANGEFEED FROM CHANGEFEED").IsOk()); + } + Y_UNIT_TEST(Jubilee) { NYql::TAstParseResult res = SqlToYql("USE plato; INSERT INTO Arcadia (r2000000) VALUES (\"2M GET!!!\");"); UNIT_ASSERT(res.Root); @@ -1415,6 +1425,31 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); } + Y_UNIT_TEST(ChangefeedParseCorrect) { + auto res = SqlToYql(R"( USE plato; + CREATE TABLE tableName ( + Key Uint32, PRIMARY KEY (Key), + CHANGEFEED feedName WITH (MODE = 'KEYS_ONLY', FORMAT = 'json') + ); + )"); + UNIT_ASSERT_C(res.Root, Err2Str(res)); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "Write") { + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("changefeed")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("mode")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("KEYS_ONLY")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("format")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("json")); + } + }; + + TWordCountHive elementStat = { {TString("Write"), 0} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); + } + Y_UNIT_TEST(CloneForAsTableWorksWithCube) { UNIT_ASSERT(SqlToYql("SELECT * FROM AS_TABLE([<|k1:1, k2:1|>]) GROUP BY CUBE(k1, k2);").IsOk()); } @@ -1586,6 +1621,20 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { UNIT_ASSERT(SqlToYql("USE plato; ALTER TABLE table SET (TTL = Interval(\"PT3H\") ON column)").IsOk()); } + Y_UNIT_TEST(AlterTableAddChangefeedIsCorrect) { + UNIT_ASSERT(SqlToYql("USE plato; ALTER TABLE table ADD CHANGEFEED feed WITH (MODE = 'UPDATES', FORMAT = 'json')").IsOk()); + } + + Y_UNIT_TEST(AlterTableAlterChangefeedIsCorrect) { + UNIT_ASSERT(SqlToYql("USE plato; ALTER TABLE table ALTER CHANGEFEED feed DISABLE").IsOk()); + ExpectFailWithError("USE plato; ALTER TABLE table ALTER CHANGEFEED feed SET (FORMAT = 'proto');", + "<main>:1:66: Error: FORMAT alter is not supported\n"); + } + + Y_UNIT_TEST(AlterTableDropChangefeedIsCorrect) { + UNIT_ASSERT(SqlToYql("USE plato; ALTER TABLE table DROP CHANGEFEED feed").IsOk()); + } + Y_UNIT_TEST(AlterTableSetPartitioningIsCorrect) { UNIT_ASSERT(SqlToYql("USE plato; ALTER TABLE table SET (AUTO_PARTITIONING_BY_SIZE = DISABLED)").IsOk()); } @@ -2956,6 +3005,32 @@ select FormatType($f()); "<main>:4:26: Error: Invalid TTL settings\n"); } + Y_UNIT_TEST(InvalidChangefeedSink) { + auto req = R"( + USE plato; + CREATE TABLE tableName ( + Key Uint32, PRIMARY KEY (Key), + CHANGEFEED feedName WITH (SINK_TYPE = "S3", MODE = "KEYS_ONLY", FORMAT = "json") + ); + )"; + auto res = SqlToYql(req); + UNIT_ASSERT(!res.Root); + UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:55: Error: Unknown changefeed sink type: S3\n"); + } + + Y_UNIT_TEST(InvalidChangefeedSettings) { + auto req = R"( + USE plato; + CREATE TABLE tableName ( + Key Uint32, PRIMARY KEY (Key), + CHANGEFEED feedName WITH (SINK_TYPE = "local", FOO = "bar") + ); + )"; + auto res = SqlToYql(req); + UNIT_ASSERT(!res.Root); + UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:70: Error: Unknown changefeed setting: FOO\n"); + } + Y_UNIT_TEST(ErrJoinWithGroupingSetsWithoutCorrelationName) { auto req = "USE plato;\n" "\n" |