aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIurii Kravchenko <xjie6@ydb.tech>2025-04-15 18:53:00 +0200
committerGitHub <noreply@github.com>2025-04-15 18:53:00 +0200
commit476779e1e222b46d41445045f2c60f81a25d7a5c (patch)
treecceaf597c0e29c9c2a6d77e52bbfb73203401c6f
parent9e56c53c26fef73e9f19922912092c6d9eecfee8 (diff)
downloadydb-476779e1e222b46d41445045f2c60f81a25d7a5c.tar.gz
Reject programs without projection (#16505) (#17154)
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp38
-rw-r--r--ydb/core/tx/columnshard/test_helper/shard_reader.h12
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp81
-rw-r--r--ydb/core/tx/program/program.cpp6
4 files changed, 136 insertions, 1 deletions
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index afbaffb3659..981159a1c02 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -3518,5 +3518,43 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
testHelper.ReadData("SELECT a, b FROM `/Root/ColumnTableTest` WHERE b = 2 LIMIT 2", "[[2u;2u]]");
}
+
+ Y_UNIT_TEST(SimpleRequestHasProjections) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false);
+ TKikimrRunner kikimr(settings);
+ TLocalHelper(kikimr).CreateTestOlapTable();
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 20);
+ auto client = kikimr.GetTableClient();
+ Tests::NCommon::TLoggerInit(kikimr).Initialize();
+
+ {
+ auto it = client.StreamExecuteScanQuery(R"(
+ --!syntax_v1
+
+ SELECT 1
+ FROM `/Root/olapStore/olapTable`
+ )").GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TString result = StreamResultToYson(it);
+
+ CompareYson(result, R"([[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1];[1]])");
+ }
+
+ {
+ auto it = client.StreamExecuteScanQuery(R"(
+ --!syntax_v1
+
+ SELECT count(*)
+ FROM `/Root/olapStore/olapTable`
+ )").GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TString result = StreamResultToYson(it);
+
+ CompareYson(result, R"([[20u]])");
+ }
+ }
}
}
diff --git a/ydb/core/tx/columnshard/test_helper/shard_reader.h b/ydb/core/tx/columnshard/test_helper/shard_reader.h
index ebef456bbee..8f1d3afb1c8 100644
--- a/ydb/core/tx/columnshard/test_helper/shard_reader.h
+++ b/ydb/core/tx/columnshard/test_helper/shard_reader.h
@@ -35,6 +35,8 @@ private:
std::vector<std::shared_ptr<arrow::RecordBatch>> ResultBatches;
YDB_READONLY(ui32, IterationsCount, 0);
+ std::vector<Ydb::Issue::IssueMessage> Errors;
+
public:
ui64 GetReadStat(const TString& paramName) const {
AFL_VERIFY(IsCorrectlyFinished());
@@ -92,6 +94,10 @@ public:
return IsFinished() && *Finished == -1;
}
+ const std::vector<Ydb::Issue::IssueMessage>& GetErrors() const {
+ return Errors;
+ }
+
bool InitializeScanner() {
AFL_VERIFY(!ScanActorId);
const TActorId sender = Runtime.AllocateEdgeActor();
@@ -104,6 +110,9 @@ public:
ScanActorId = ActorIdFromProto(msg.GetScanActorId());
return true;
} else if (auto* evError = std::get<1>(event)) {
+ for (auto issue : evError->Record.GetIssues()) {
+ Errors.emplace_back(issue);
+ }
Finished = -1;
} else {
AFL_VERIFY(false);
@@ -136,6 +145,9 @@ public:
Finished = 1;
}
} else if (auto* evError = std::get<1>(event)) {
+ for (auto issue : evError->Record.GetIssues()) {
+ Errors.emplace_back(issue);
+ }
Finished = -1;
} else {
AFL_VERIFY(false);
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 46fda52afd4..9d3ee177a7b 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -1354,6 +1354,83 @@ void TestSomePrograms(const TestTableDescription& table) {
}
}
+void TestReadWithProgramNoProjection(const TestTableDescription& table = {}) {
+ TTestBasicRuntime runtime;
+ TTester::Setup(runtime);
+ auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
+
+ TActorId sender = runtime.AllocateEdgeActor();
+ CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
+
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
+ runtime.DispatchEvents(options);
+
+ ui64 writeId = 0;
+ ui64 tableId = 1;
+ ui64 txId = 100;
+
+ auto planStep = SetupSchema(runtime, sender, tableId, table);
+
+ { // write some data
+ std::vector<ui64> writeIds;
+ bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({ 0, 100 }, table.Schema), table.Schema, true, &writeIds);
+ UNIT_ASSERT(ok);
+ planStep = ProposeCommit(runtime, sender, txId, writeIds);
+ PlanCommit(runtime, sender, planStep, txId);
+ }
+
+ std::vector<TString> programs;
+ programs.push_back("XXXYYYZZZ");
+ {
+ NKikimrSSA::TProgram ssa = MakeSelect(TAssignment::FUNC_CMP_EQUAL);
+ TString serialized;
+ UNIT_ASSERT(ssa.SerializeToString(&serialized));
+
+ NKikimrSSA::TOlapProgram program;
+ program.SetProgram(serialized);
+
+ programs.push_back("");
+ UNIT_ASSERT(program.SerializeToString(&programs.back()));
+
+ //remove projections
+ auto* commands = ssa.MutableCommand();
+ for(int i = commands->size() - 1; i >= 0; --i) {
+ if ((*commands)[i].HasProjection()) {
+ commands->DeleteSubrange(i, 1);
+ }
+ }
+
+ UNIT_ASSERT(ssa.SerializeToString(&serialized));
+ program.SetProgram(serialized);
+ programs.push_back("");
+ UNIT_ASSERT(program.SerializeToString(&programs.back()));
+ }
+
+ ui32 i = 0;
+ for (auto& programText : programs) {
+ TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId));
+ reader.SetProgram(programText);
+ auto rb = reader.ReadAll();
+ switch(i) {
+ case 0:
+ UNIT_ASSERT(reader.IsError());
+ break;
+
+ case 1:
+ UNIT_ASSERT(!reader.IsError());
+ break;
+
+ case 2:
+ UNIT_ASSERT(reader.IsError());
+ UNIT_ASSERT(reader.GetErrors().back().Getmessage().Contains("program has no projections"));
+ break;
+ }
+ UNIT_ASSERT(reader.IsFinished());
+ ++i;
+ }
+}
+
struct TReadAggregateResult {
ui32 NumRows = 1;
@@ -1920,6 +1997,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
TestSomePrograms(table);
}
+ Y_UNIT_TEST(ReadWithProgramNoProjection) {
+ TestReadWithProgramNoProjection();
+ }
+
Y_UNIT_TEST(ReadAggregate) {
auto schema = TTestSchema::YdbAllTypesSchema();
auto testBlob = MakeTestBlob({ 0, 100 }, schema);
diff --git a/ydb/core/tx/program/program.cpp b/ydb/core/tx/program/program.cpp
index acc920cfe75..febf3fc8417 100644
--- a/ydb/core/tx/program/program.cpp
+++ b/ydb/core/tx/program/program.cpp
@@ -100,8 +100,8 @@ TConclusionStatus TProgramContainer::ParseProgram(const NArrow::NSSA::IColumnRes
using TId = NKikimrSSA::TProgram::TCommand;
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("parse_proto_program", program.DebugString());
-// Cerr << program.DebugString() << Endl;
NArrow::NSSA::TProgramBuilder programBuilder(columnResolver, KernelsRegistry);
+ bool hasProjection = false;
for (auto& cmd : program.GetCommand()) {
switch (cmd.GetLineCase()) {
case TId::kAssign: {
@@ -123,6 +123,7 @@ TConclusionStatus TProgramContainer::ParseProgram(const NArrow::NSSA::IColumnRes
if (status.IsFail()) {
return status;
}
+ hasProjection = true;
break;
}
case TId::kGroupBy: {
@@ -136,6 +137,9 @@ TConclusionStatus TProgramContainer::ParseProgram(const NArrow::NSSA::IColumnRes
return TConclusionStatus::Fail("incorrect SSA line case");
}
}
+ if (!hasProjection) {
+ return TConclusionStatus::Fail("program has no projections");
+ }
auto programStatus = programBuilder.Finish();
if (programStatus.IsFail()) {
return programStatus;