diff options
author | Iurii Kravchenko <xjie6@ydb.tech> | 2025-04-15 18:53:00 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-15 18:53:00 +0200 |
commit | 476779e1e222b46d41445045f2c60f81a25d7a5c (patch) | |
tree | cceaf597c0e29c9c2a6d77e52bbfb73203401c6f | |
parent | 9e56c53c26fef73e9f19922912092c6d9eecfee8 (diff) | |
download | ydb-476779e1e222b46d41445045f2c60f81a25d7a5c.tar.gz |
Reject programs without projection (#16505) (#17154)
-rw-r--r-- | ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 38 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/test_helper/shard_reader.h | 12 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp | 81 | ||||
-rw-r--r-- | ydb/core/tx/program/program.cpp | 6 |
4 files changed, 136 insertions, 1 deletions
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index afbaffb3659..981159a1c02 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -3518,5 +3518,43 @@ Y_UNIT_TEST_SUITE(KqpOlap) { testHelper.ReadData("SELECT a, b FROM `/Root/ColumnTableTest` WHERE b = 2 LIMIT 2", "[[2u;2u]]"); } + + Y_UNIT_TEST(SimpleRequestHasProjections) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + TLocalHelper(kikimr).CreateTestOlapTable(); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 20); + auto client = kikimr.GetTableClient(); + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + + { + auto it = client.StreamExecuteScanQuery(R"( + --!syntax_v1 + + SELECT 1 + FROM `/Root/olapStore/olapTable` + )").GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TString result = StreamResultToYson(it); + + CompareYson(result, R"([[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1]])"); + } + + { + auto it = client.StreamExecuteScanQuery(R"( + --!syntax_v1 + + SELECT count(*) + FROM `/Root/olapStore/olapTable` + )").GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TString result = StreamResultToYson(it); + + CompareYson(result, R"([[20u]])"); + } + } } } diff --git a/ydb/core/tx/columnshard/test_helper/shard_reader.h b/ydb/core/tx/columnshard/test_helper/shard_reader.h index ebef456bbee..8f1d3afb1c8 100644 --- a/ydb/core/tx/columnshard/test_helper/shard_reader.h +++ b/ydb/core/tx/columnshard/test_helper/shard_reader.h @@ -35,6 +35,8 @@ private: std::vector<std::shared_ptr<arrow::RecordBatch>> ResultBatches; YDB_READONLY(ui32, IterationsCount, 0); + std::vector<Ydb::Issue::IssueMessage> Errors; + public: ui64 GetReadStat(const TString& paramName) const { AFL_VERIFY(IsCorrectlyFinished()); @@ -92,6 +94,10 @@ public: return IsFinished() && *Finished == -1; } + const std::vector<Ydb::Issue::IssueMessage>& GetErrors() const { + return Errors; + } + bool InitializeScanner() { AFL_VERIFY(!ScanActorId); const TActorId sender = Runtime.AllocateEdgeActor(); @@ -104,6 +110,9 @@ public: ScanActorId = ActorIdFromProto(msg.GetScanActorId()); return true; } else if (auto* evError = std::get<1>(event)) { + for (auto issue : evError->Record.GetIssues()) { + Errors.emplace_back(issue); + } Finished = -1; } else { AFL_VERIFY(false); @@ -136,6 +145,9 @@ public: Finished = 1; } } else if (auto* evError = std::get<1>(event)) { + for (auto issue : evError->Record.GetIssues()) { + Errors.emplace_back(issue); + } Finished = -1; } else { AFL_VERIFY(false); diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 46fda52afd4..9d3ee177a7b 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -1354,6 +1354,83 @@ void TestSomePrograms(const TestTableDescription& table) { } } +void TestReadWithProgramNoProjection(const TestTableDescription& table = {}) { + TTestBasicRuntime runtime; + TTester::Setup(runtime); + auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>(); + + TActorId sender = runtime.AllocateEdgeActor(); + CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard); + + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); + runtime.DispatchEvents(options); + + ui64 writeId = 0; + ui64 tableId = 1; + ui64 txId = 100; + + auto planStep = SetupSchema(runtime, sender, tableId, table); + + { // write some data + std::vector<ui64> writeIds; + bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({ 0, 100 }, table.Schema), table.Schema, true, &writeIds); + UNIT_ASSERT(ok); + planStep = ProposeCommit(runtime, sender, txId, writeIds); + PlanCommit(runtime, sender, planStep, txId); + } + + std::vector<TString> programs; + programs.push_back("XXXYYYZZZ"); + { + NKikimrSSA::TProgram ssa = MakeSelect(TAssignment::FUNC_CMP_EQUAL); + TString serialized; + UNIT_ASSERT(ssa.SerializeToString(&serialized)); + + NKikimrSSA::TOlapProgram program; + program.SetProgram(serialized); + + programs.push_back(""); + UNIT_ASSERT(program.SerializeToString(&programs.back())); + + //remove projections + auto* commands = ssa.MutableCommand(); + for(int i = commands->size() - 1; i >= 0; --i) { + if ((*commands)[i].HasProjection()) { + commands->DeleteSubrange(i, 1); + } + } + + UNIT_ASSERT(ssa.SerializeToString(&serialized)); + program.SetProgram(serialized); + programs.push_back(""); + UNIT_ASSERT(program.SerializeToString(&programs.back())); + } + + ui32 i = 0; + for (auto& programText : programs) { + TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId)); + reader.SetProgram(programText); + auto rb = reader.ReadAll(); + switch(i) { + case 0: + UNIT_ASSERT(reader.IsError()); + break; + + case 1: + UNIT_ASSERT(!reader.IsError()); + break; + + case 2: + UNIT_ASSERT(reader.IsError()); + UNIT_ASSERT(reader.GetErrors().back().Getmessage().Contains("program has no projections")); + break; + } + UNIT_ASSERT(reader.IsFinished()); + ++i; + } +} + struct TReadAggregateResult { ui32 NumRows = 1; @@ -1920,6 +1997,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { TestSomePrograms(table); } + Y_UNIT_TEST(ReadWithProgramNoProjection) { + TestReadWithProgramNoProjection(); + } + Y_UNIT_TEST(ReadAggregate) { auto schema = TTestSchema::YdbAllTypesSchema(); auto testBlob = MakeTestBlob({ 0, 100 }, schema); diff --git a/ydb/core/tx/program/program.cpp b/ydb/core/tx/program/program.cpp index acc920cfe75..febf3fc8417 100644 --- a/ydb/core/tx/program/program.cpp +++ b/ydb/core/tx/program/program.cpp @@ -100,8 +100,8 @@ TConclusionStatus TProgramContainer::ParseProgram(const NArrow::NSSA::IColumnRes using TId = NKikimrSSA::TProgram::TCommand; AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("parse_proto_program", program.DebugString()); -// Cerr << program.DebugString() << Endl; NArrow::NSSA::TProgramBuilder programBuilder(columnResolver, KernelsRegistry); + bool hasProjection = false; for (auto& cmd : program.GetCommand()) { switch (cmd.GetLineCase()) { case TId::kAssign: { @@ -123,6 +123,7 @@ TConclusionStatus TProgramContainer::ParseProgram(const NArrow::NSSA::IColumnRes if (status.IsFail()) { return status; } + hasProjection = true; break; } case TId::kGroupBy: { @@ -136,6 +137,9 @@ TConclusionStatus TProgramContainer::ParseProgram(const NArrow::NSSA::IColumnRes return TConclusionStatus::Fail("incorrect SSA line case"); } } + if (!hasProjection) { + return TConclusionStatus::Fail("program has no projections"); + } auto programStatus = programBuilder.Finish(); if (programStatus.IsFail()) { return programStatus; |