aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-05-26 19:12:41 +0300
committernsofya <nsofya@yandex-team.com>2023-05-26 19:12:41 +0300
commite0c56556489d46590343c4276a5444e7ca85d7c7 (patch)
tree55e7a613ebdf053ad6e5361aa6691c646f68cc9d
parente54492a5b591e75a7c69443cce17cbc3881a9e9e (diff)
downloadydb-e0c56556489d46590343c4276a5444e7ca85d7c7.tar.gz
Group all program related stuff
Подвигала код, работаюший с программой, объединила в один класс общий, чтобы потом парсинг ядер в него же прятать
-rw-r--r--ydb/core/tx/columnshard/columnshard__read.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard__read_base.cpp63
-rw-r--r--ydb/core/tx/columnshard/columnshard__read_base.h2
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp25
-rw-r--r--ydb/core/tx/columnshard/columnshard__stats_scan.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/description.cpp428
-rw-r--r--ydb/core/tx/columnshard/engines/reader/description.h25
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/program.cpp519
-rw-r--r--ydb/core/tx/columnshard/engines/reader/program.h44
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.cpp28
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.h20
18 files changed, 632 insertions, 555 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp
index cff05627660..117aefeba55 100644
--- a/ydb/core/tx/columnshard/columnshard__read.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read.cpp
@@ -77,12 +77,12 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) {
}
Y_VERIFY(read.PKRangesFilter.Add(fromPredicate, toPredicate, &indexInfo));
- bool parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read,
+ bool parseResult = ParseProgram(record.GetOlapProgramType(), record.GetOlapProgram(), read,
TIndexColumnResolver(indexInfo));
std::shared_ptr<NOlap::TReadMetadata> metadata;
if (parseResult) {
- metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(), Self->BatchCache,
+ metadata = PrepareReadMetadata(read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(), Self->BatchCache,
ErrorDescription, false);
}
diff --git a/ydb/core/tx/columnshard/columnshard__read_base.cpp b/ydb/core/tx/columnshard/columnshard__read_base.cpp
index 3a69c0d33f3..6ebc47ddc79 100644
--- a/ydb/core/tx/columnshard/columnshard__read_base.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read_base.cpp
@@ -1,19 +1,16 @@
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/columnshard__read_base.h>
#include <ydb/core/tx/columnshard/columnshard__index_scan.h>
-#include <ydb/core/formats/arrow/ssa_program_optimizer.h>
namespace NKikimr::NColumnShard {
std::shared_ptr<NOlap::TReadMetadata>
-TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const NOlap::TReadDescription& read,
+TTxReadBase::PrepareReadMetadata(const NOlap::TReadDescription& read,
const std::unique_ptr<NOlap::TInsertTable>& insertTable,
const std::unique_ptr<NOlap::IColumnEngine>& index,
const TBatchCache& batchCache,
TString& error, const bool isReverse) const {
- Y_UNUSED(ctx);
-
if (!insertTable || !index) {
return nullptr;
}
@@ -24,7 +21,8 @@ TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const NOlap::TReadDes
}
NOlap::TDataStorageAccessor dataAccessor(insertTable, index, batchCache);
- auto readMetadata = std::make_shared<NOlap::TReadMetadata>(index->GetVersionedIndex(), read.GetSnapshot(), isReverse ? NOlap::TReadMetadata::ESorting::DESC : NOlap::TReadMetadata::ESorting::ASC);
+ auto readMetadata = std::make_shared<NOlap::TReadMetadata>(index->GetVersionedIndex(), read.GetSnapshot(),
+ isReverse ? NOlap::TReadMetadata::ESorting::DESC : NOlap::TReadMetadata::ESorting::ASC, read.GetProgram());
if (!readMetadata->Init(read, dataAccessor, error)) {
return nullptr;
@@ -32,59 +30,18 @@ TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const NOlap::TReadDes
return readMetadata;
}
-bool TTxReadBase::ParseProgram(const TActorContext& ctx, NKikimrSchemeOp::EOlapProgramType programType,
- TString serializedProgram, NOlap::TReadDescription& read, const NOlap::IColumnResolver& columnResolver)
-{
+bool TTxReadBase::ParseProgram(NKikimrSchemeOp::EOlapProgramType programType,
+ TString serializedProgram, NOlap::TReadDescription& read, const NOlap::IColumnResolver& columnResolver) {
if (serializedProgram.empty()) {
return true;
}
-
- NKikimrSSA::TProgram program;
- NKikimrSSA::TOlapProgram olapProgram;
-
- switch (programType) {
- case NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS:
- if (!olapProgram.ParseFromString(serializedProgram)) {
- ErrorDescription = TStringBuilder() << "Can't parse TOlapProgram at " << Self->TabletID();
- return false;
- }
-
- if (!program.ParseFromString(olapProgram.GetProgram())) {
- ErrorDescription = TStringBuilder() << "Can't parse TProgram at " << Self->TabletID();
- return false;
- }
-
- break;
- default:
- ErrorDescription = TStringBuilder() << "Unsupported olap program version: " << (ui32)programType;
- return false;
- }
-
- if (ctx.LoggerSettings() &&
- ctx.LoggerSettings()->Satisfies(NActors::NLog::PRI_DEBUG, NKikimrServices::TX_COLUMNSHARD))
- {
- TString out;
- ::google::protobuf::TextFormat::PrintToString(program, &out);
- LOG_S_DEBUG("Process program: " << Endl << out);
- }
-
- if (olapProgram.HasParameters()) {
- Y_VERIFY(olapProgram.HasParametersSchema(), "Parameters are present, but there is no schema.");
-
- auto schema = NArrow::DeserializeSchema(olapProgram.GetParametersSchema());
- read.ProgramParameters = NArrow::DeserializeBatch(olapProgram.GetParameters(), schema);
- }
-
- auto ssaProgram = read.AddProgram(columnResolver, program);
- if (!ssaProgram) {
- ErrorDescription = TStringBuilder() << "Wrong olap program";
+ NOlap::TProgramContainer ssaProgram;
+ TString error;
+ if (!ssaProgram.Init(columnResolver, programType, serializedProgram, error)) {
+ ErrorDescription = TStringBuilder() << "Can't parse SsaProgram at " << Self->TabletID() << " / " << error;
return false;
}
- if (!ssaProgram->Steps.empty() && Self->TablesManager.HasPrimaryIndex()) {
- NSsa::OptimizeProgram(*ssaProgram);
- }
-
- read.Program = ssaProgram;
+ read.SetProgram(std::move(ssaProgram));
return true;
}
diff --git a/ydb/core/tx/columnshard/columnshard__read_base.h b/ydb/core/tx/columnshard/columnshard__read_base.h
index fe62c418452..148971bb22e 100644
--- a/ydb/core/tx/columnshard/columnshard__read_base.h
+++ b/ydb/core/tx/columnshard/columnshard__read_base.h
@@ -12,7 +12,6 @@ protected:
{}
std::shared_ptr<NOlap::TReadMetadata> PrepareReadMetadata(
- const TActorContext& ctx,
const NOlap::TReadDescription& readDescription,
const std::unique_ptr<NOlap::TInsertTable>& insertTable,
const std::unique_ptr<NOlap::IColumnEngine>& index,
@@ -21,7 +20,6 @@ protected:
protected:
bool ParseProgram(
- const TActorContext& ctx,
NKikimrSchemeOp::EOlapProgramType programType,
TString serializedProgram,
NOlap::TReadDescription& read,
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 9de12ac7228..8d1f53df638 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -36,7 +36,7 @@ public:
TTxType GetTxType() const override { return TXTYPE_START_SCAN; }
private:
- std::shared_ptr<NOlap::TReadMetadataBase> CreateReadMetadata(const TActorContext& ctx, NOlap::TReadDescription& read,
+ std::shared_ptr<NOlap::TReadMetadataBase> CreateReadMetadata(NOlap::TReadDescription& read,
bool isIndexStats, bool isReverse, ui64 limit);
private:
@@ -761,10 +761,8 @@ static bool FillPredicatesFromRange(NOlap::TReadDescription& read, const ::NKiki
std::shared_ptr<NOlap::TReadStatsMetadata>
PrepareStatsReadMetadata(ui64 tabletId, const NOlap::TReadDescription& read, const std::unique_ptr<NOlap::IColumnEngine>& index, TString& error, const bool isReverse) {
THashSet<ui32> readColumnIds(read.ColumnIds.begin(), read.ColumnIds.end());
- if (read.Program) {
- for (auto& [id, name] : read.Program->SourceColumns) {
- readColumnIds.insert(id);
- }
+ for (auto& [id, name] : read.GetProgram().GetSourceColumns()) {
+ readColumnIds.insert(id);
}
for (ui32 colId : readColumnIds) {
@@ -774,12 +772,13 @@ PrepareStatsReadMetadata(ui64 tabletId, const NOlap::TReadDescription& read, con
}
}
- auto out = std::make_shared<NOlap::TReadStatsMetadata>(tabletId, isReverse ? NOlap::TReadStatsMetadata::ESorting::DESC : NOlap::TReadStatsMetadata::ESorting::ASC);
+ auto out = std::make_shared<NOlap::TReadStatsMetadata>(tabletId,
+ isReverse ? NOlap::TReadStatsMetadata::ESorting::DESC : NOlap::TReadStatsMetadata::ESorting::ASC,
+ read.GetProgram());
out->SetPKRangesFilter(read.PKRangesFilter);
out->ReadColumnIds.assign(readColumnIds.begin(), readColumnIds.end());
out->ResultColumnIds = read.ColumnIds;
- out->Program = read.Program;
if (!index) {
return out;
@@ -805,14 +804,14 @@ PrepareStatsReadMetadata(ui64 tabletId, const NOlap::TReadDescription& read, con
return out;
}
-std::shared_ptr<NOlap::TReadMetadataBase> TTxScan::CreateReadMetadata(const TActorContext& ctx, NOlap::TReadDescription& read,
+std::shared_ptr<NOlap::TReadMetadataBase> TTxScan::CreateReadMetadata(NOlap::TReadDescription& read,
bool indexStats, bool isReverse, ui64 itemsLimit)
{
std::shared_ptr<NOlap::TReadMetadataBase> metadata;
if (indexStats) {
metadata = PrepareStatsReadMetadata(Self->TabletID(), read, Self->TablesManager.GetPrimaryIndex(), ErrorDescription, isReverse);
} else {
- metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(), Self->BatchCache,
+ metadata = PrepareReadMetadata(read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(), Self->BatchCache,
ErrorDescription, isReverse);
}
@@ -868,10 +867,10 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
if (!isIndexStats) {
TIndexColumnResolver columnResolver(*indexInfo);
- parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read, columnResolver);
+ parseResult = ParseProgram(record.GetOlapProgramType(), record.GetOlapProgram(), read, columnResolver);
} else {
TStatsColumnResolver columnResolver;
- parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read, columnResolver);
+ parseResult = ParseProgram(record.GetOlapProgramType(), record.GetOlapProgram(), read, columnResolver);
}
if (!parseResult) {
@@ -879,7 +878,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
}
if (!record.RangesSize()) {
- auto range = CreateReadMetadata(ctx, read, isIndexStats, record.GetReverse(), itemsLimit);
+ auto range = CreateReadMetadata(read, isIndexStats, record.GetReverse(), itemsLimit);
if (range) {
if (!isIndexStats) {
Self->MapExternBlobs(ctx, static_cast<NOlap::TReadMetadata&>(*range));
@@ -902,7 +901,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) {
}
}
{
- auto newRange = CreateReadMetadata(ctx, read, isIndexStats, record.GetReverse(), itemsLimit);
+ auto newRange = CreateReadMetadata(read, isIndexStats, record.GetReverse(), itemsLimit);
if (!newRange) {
ReadMetadataRanges.clear();
return true;
diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp
index ddd5f2871b9..ffe33890fc0 100644
--- a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp
@@ -20,11 +20,9 @@ NKikimr::NOlap::TPartialReadResult TStatsIterator::GetBatch() {
.LastReadKey = std::move(lastKey)
};
- if (ReadMetadata->Program) {
- auto status = ApplyProgram(out.ResultBatch, *ReadMetadata->Program, NArrow::GetCustomExecContext());
- if (!status.ok()) {
- out.ErrorString = status.message();
- }
+ auto status = ReadMetadata->GetProgram().ApplyProgram(out.ResultBatch);
+ if (!status.ok()) {
+ out.ErrorString = status.message();
}
return out;
}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index cb95e0b13d4..223a581a97f 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -433,12 +433,11 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco
});
}
}
-
- if (ReadMetadata->Program) {
+
+ if (ReadMetadata->GetProgram().HasProgram()) {
MergeTooSmallBatches(out);
-
for (auto& result : out) {
- auto status = ApplyProgram(result.ResultBatch, *ReadMetadata->Program, NArrow::GetCustomExecContext());
+ auto status = ReadMetadata->GetProgram().ApplyProgram(result.ResultBatch);
if (!status.ok()) {
result.ErrorString = status.message();
}
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
index a47869de0cd..099c560c72e 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
@@ -40,6 +40,7 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/program.cpp
)
generate_enum_serilization(columnshard-engines-reader
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
index 4e5ec184914..4c3c3a73226 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
@@ -41,6 +41,7 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/program.cpp
)
generate_enum_serilization(columnshard-engines-reader
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
index 4e5ec184914..4c3c3a73226 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
@@ -41,6 +41,7 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/program.cpp
)
generate_enum_serilization(columnshard-engines-reader
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
index a47869de0cd..099c560c72e 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
@@ -40,6 +40,7 @@ target_sources(columnshard-engines-reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/program.cpp
)
generate_enum_serilization(columnshard-engines-reader
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h
diff --git a/ydb/core/tx/columnshard/engines/reader/description.cpp b/ydb/core/tx/columnshard/engines/reader/description.cpp
index 761304aad10..e11e6c49da1 100644
--- a/ydb/core/tx/columnshard/engines/reader/description.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/description.cpp
@@ -1,432 +1,4 @@
#include "description.h"
namespace NKikimr::NOlap {
-
-namespace {
-
-using EOperation = NArrow::EOperation;
-using EAggregate = NArrow::EAggregate;
-using TAssign = NSsa::TAssign;
-using TAggregateAssign = NSsa::TAggregateAssign;
-
-struct TContext {
- const IColumnResolver& ColumnResolver;
- mutable THashMap<ui32, TString> Sources;
- mutable THashMap<TString, std::shared_ptr<arrow::Scalar>> Constants;
-
- explicit TContext(const IColumnResolver& columnResolver)
- : ColumnResolver(columnResolver)
- {}
-
- std::string GetName(const NKikimrSSA::TProgram::TColumn& column) const {
- ui32 columnId = column.GetId();
- TString name = ColumnResolver.GetColumnName(columnId, false);
- if (name.Empty()) {
- return GenerateName(column);
- } else {
- Sources[columnId] = name;
- }
- return std::string(name.data(), name.size());
- }
-
- std::string GenerateName(const NKikimrSSA::TProgram::TColumn& column) const {
- TString name;
- if (column.HasName()) {
- name = column.GetName();
- } else {
- name = ToString(column.GetId());
- }
- return std::string(name.data(), name.size());
- }
-};
-
-TAssign MakeFunction(const TContext& info, const std::string& name,
- const NKikimrSSA::TProgram::TAssignment::TFunction& func) {
- using TId = NKikimrSSA::TProgram::TAssignment;
-
- std::vector<std::string> arguments;
- for (auto& col : func.GetArguments()) {
- arguments.push_back(info.GetName(col));
- }
-
- auto mkCastOptions = [](std::shared_ptr<arrow::DataType> dataType) {
- // TODO: support CAST with OrDefault/OrNull logic (second argument is default value)
- auto castOpts = std::make_shared<arrow::compute::CastOptions>(false);
- castOpts->to_type = dataType;
- return castOpts;
- };
-
- auto mkLikeOptions = [&](bool ignoreCase) {
- if (arguments.size() != 2 || !info.Constants.contains(arguments[1])) {
- return std::shared_ptr<arrow::compute::MatchSubstringOptions>();
- }
- auto patternScalar = info.Constants[arguments[1]];
- if (!arrow::is_base_binary_like(patternScalar->type->id())) {
- return std::shared_ptr<arrow::compute::MatchSubstringOptions>();
- }
- arguments.resize(1);
- auto& pattern = static_cast<arrow::BaseBinaryScalar&>(*patternScalar).value;
- return std::make_shared<arrow::compute::MatchSubstringOptions>(pattern->ToString(), ignoreCase);
- };
-
- switch (func.GetId()) {
- case TId::FUNC_CMP_EQUAL:
- return TAssign(name, EOperation::Equal, std::move(arguments));
- case TId::FUNC_CMP_NOT_EQUAL:
- return TAssign(name, EOperation::NotEqual, std::move(arguments));
- case TId::FUNC_CMP_LESS:
- return TAssign(name, EOperation::Less, std::move(arguments));
- case TId::FUNC_CMP_LESS_EQUAL:
- return TAssign(name, EOperation::LessEqual, std::move(arguments));
- case TId::FUNC_CMP_GREATER:
- return TAssign(name, EOperation::Greater, std::move(arguments));
- case TId::FUNC_CMP_GREATER_EQUAL:
- return TAssign(name, EOperation::GreaterEqual, std::move(arguments));
- case TId::FUNC_IS_NULL:
- return TAssign(name, EOperation::IsNull, std::move(arguments));
- case TId::FUNC_STR_LENGTH:
- return TAssign(name, EOperation::BinaryLength, std::move(arguments));
- case TId::FUNC_STR_MATCH: {
- if (auto opts = mkLikeOptions(false)) {
- return TAssign(name, EOperation::MatchSubstring, std::move(arguments), opts);
- }
- break;
- }
- case TId::FUNC_STR_MATCH_LIKE: {
- if (auto opts = mkLikeOptions(false)) {
- return TAssign(name, EOperation::MatchLike, std::move(arguments), opts);
- }
- break;
- }
- case TId::FUNC_STR_STARTS_WITH: {
- if (auto opts = mkLikeOptions(false)) {
- return TAssign(name, EOperation::StartsWith, std::move(arguments), opts);
- }
- break;
- }
- case TId::FUNC_STR_ENDS_WITH: {
- if (auto opts = mkLikeOptions(false)) {
- return TAssign(name, EOperation::EndsWith, std::move(arguments), opts);
- }
- break;
- }
- case TId::FUNC_STR_MATCH_IGNORE_CASE: {
- if (auto opts = mkLikeOptions(true)) {
- return TAssign(name, EOperation::MatchSubstring, std::move(arguments), opts);
- }
- break;
- }
- case TId::FUNC_STR_STARTS_WITH_IGNORE_CASE: {
- if (auto opts = mkLikeOptions(true)) {
- return TAssign(name, EOperation::StartsWith, std::move(arguments), opts);
- }
- break;
- }
- case TId::FUNC_STR_ENDS_WITH_IGNORE_CASE: {
- if (auto opts = mkLikeOptions(true)) {
- return TAssign(name, EOperation::EndsWith, std::move(arguments), opts);
- }
- break;
- }
- case TId::FUNC_BINARY_NOT:
- return TAssign(name, EOperation::Invert, std::move(arguments));
- case TId::FUNC_BINARY_AND:
- return TAssign(name, EOperation::And, std::move(arguments));
- case TId::FUNC_BINARY_OR:
- return TAssign(name, EOperation::Or, std::move(arguments));
- case TId::FUNC_BINARY_XOR:
- return TAssign(name, EOperation::Xor, std::move(arguments));
- case TId::FUNC_MATH_ADD:
- return TAssign(name, EOperation::Add, std::move(arguments));
- case TId::FUNC_MATH_SUBTRACT:
- return TAssign(name, EOperation::Subtract, std::move(arguments));
- case TId::FUNC_MATH_MULTIPLY:
- return TAssign(name, EOperation::Multiply, std::move(arguments));
- case TId::FUNC_MATH_DIVIDE:
- return TAssign(name, EOperation::Divide, std::move(arguments));
- case TId::FUNC_CAST_TO_INT8:
- return TAssign(name, EOperation::CastInt8, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::Int8Type>()));
- case TId::FUNC_CAST_TO_INT16:
- return TAssign(name, EOperation::CastInt16, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::Int16Type>()));
- case TId::FUNC_CAST_TO_INT32:
- return TAssign(name, EOperation::CastInt32, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::Int32Type>()));
- case TId::FUNC_CAST_TO_INT64:
- return TAssign(name, EOperation::CastInt64, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::Int64Type>()));
- case TId::FUNC_CAST_TO_UINT8:
- return TAssign(name, EOperation::CastUInt8, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::UInt8Type>()));
- case TId::FUNC_CAST_TO_UINT16:
- return TAssign(name, EOperation::CastUInt16, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::UInt16Type>()));
- case TId::FUNC_CAST_TO_UINT32:
- return TAssign(name, EOperation::CastUInt32, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::UInt32Type>()));
- case TId::FUNC_CAST_TO_UINT64:
- return TAssign(name, EOperation::CastUInt64, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::UInt64Type>()));
- case TId::FUNC_CAST_TO_FLOAT:
- return TAssign(name, EOperation::CastFloat, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::FloatType>()));
- case TId::FUNC_CAST_TO_DOUBLE:
- return TAssign(name, EOperation::CastDouble, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::DoubleType>()));
- case TId::FUNC_CAST_TO_TIMESTAMP:
- return TAssign(name, EOperation::CastTimestamp, std::move(arguments),
- mkCastOptions(std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO)));
- case TId::FUNC_CAST_TO_BINARY:
- case TId::FUNC_CAST_TO_FIXED_SIZE_BINARY:
- case TId::FUNC_UNSPECIFIED:
- break;
- }
- return TAssign(name, EOperation::Unspecified, std::move(arguments));
-}
-
-NSsa::TAssign MakeConstant(const std::string& name, const NKikimrSSA::TProgram::TConstant& constant) {
- using TId = NKikimrSSA::TProgram::TConstant;
-
- switch (constant.GetValueCase()) {
- case TId::kBool:
- return TAssign(name, constant.GetBool());
- case TId::kInt32:
- return TAssign(name, constant.GetInt32());
- case TId::kUint32:
- return TAssign(name, constant.GetUint32());
- case TId::kInt64:
- return TAssign(name, constant.GetInt64());
- case TId::kUint64:
- return TAssign(name, constant.GetUint64());
- case TId::kFloat:
- return TAssign(name, constant.GetFloat());
- case TId::kDouble:
- return TAssign(name, constant.GetDouble());
- case TId::kBytes:
- {
- TString str = constant.GetBytes();
- return TAssign(name, std::string(str.data(), str.size()));
- }
- case TId::kText:
- {
- TString str = constant.GetText();
- return TAssign(name, std::string(str.data(), str.size()));
- }
- case TId::VALUE_NOT_SET:
- break;
- }
- return TAssign(name, EOperation::Unspecified, {});
-}
-
-NSsa::TAggregateAssign MakeAggregate(const TContext& info, const std::string& name,
- const NKikimrSSA::TProgram::TAggregateAssignment::TAggregateFunction& func)
-{
- using TId = NKikimrSSA::TProgram::TAggregateAssignment;
-
- if (func.ArgumentsSize() == 1) {
- std::string argument = info.GetName(func.GetArguments()[0]);
-
- switch (func.GetId()) {
- case TId::AGG_SOME:
- return TAggregateAssign(name, EAggregate::Some, std::move(argument));
- case TId::AGG_COUNT:
- return TAggregateAssign(name, EAggregate::Count, std::move(argument));
- case TId::AGG_MIN:
- return TAggregateAssign(name, EAggregate::Min, std::move(argument));
- case TId::AGG_MAX:
- return TAggregateAssign(name, EAggregate::Max, std::move(argument));
- case TId::AGG_SUM:
- return TAggregateAssign(name, EAggregate::Sum, std::move(argument));
-#if 0 // TODO
- case TId::AGG_AVG:
- return TAggregateAssign(name, EAggregate::Avg, std::move(argument));
-#endif
- case TId::AGG_UNSPECIFIED:
- break;
- }
- } else if (func.ArgumentsSize() == 0 && func.GetId() == TId::AGG_COUNT) {
- // COUNT(*) case
- return TAggregateAssign(name, EAggregate::Count);
- }
- return TAggregateAssign(name); // !ok()
-}
-
-NSsa::TAssign MaterializeParameter(const std::string& name, const NKikimrSSA::TProgram::TParameter& parameter,
- const std::shared_ptr<arrow::RecordBatch>& parameterValues)
-{
- auto parameterName = parameter.GetName();
- auto column = parameterValues->GetColumnByName(parameterName);
-#if 0
- Y_VERIFY(
- column,
- "No parameter %s in serialized parameters.", parameterName.c_str()
- );
- Y_VERIFY(
- column->length() == 1,
- "Incorrect values count in parameter array"
- );
-#else
- if (!column || column->length() != 1) {
- return TAssign(name, NArrow::EOperation::Unspecified, {});
- }
-#endif
- return TAssign(name, *column->GetScalar(0));
-}
-
-bool ExtractAssign(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TAssignment& assign,
- const std::shared_ptr<arrow::RecordBatch>& parameterValues)
-{
- using TId = NKikimrSSA::TProgram::TAssignment;
-
- std::string columnName = info.GetName(assign.GetColumn());
-
- switch (assign.GetExpressionCase()) {
- case TId::kFunction:
- {
- auto func = MakeFunction(info, columnName, assign.GetFunction());
- if (!func.IsOk()) {
- return false;
- }
- step.Assignes.emplace_back(std::move(func));
- break;
- }
- case TId::kConstant:
- {
- auto cnst = MakeConstant(columnName, assign.GetConstant());
- if (!cnst.IsConstant()) {
- return false;
- }
- info.Constants[columnName] = cnst.GetConstant();
- step.Assignes.emplace_back(std::move(cnst));
- break;
- }
- case TId::kParameter:
- {
- auto param = MaterializeParameter(columnName, assign.GetParameter(), parameterValues);
- if (!param.IsConstant()) {
- return false;
- }
- step.Assignes.emplace_back(std::move(param));
- break;
- }
- case TId::kExternalFunction:
- case TId::kNull:
- case TId::EXPRESSION_NOT_SET:
- return false;
- }
- return true;
-}
-
-bool ExtractFilter(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TFilter& filter) {
- auto& column = filter.GetPredicate();
- if (!column.HasId() && !column.HasName()) {
- return false;
- }
- // NOTE: Name maskes Id for column. If column assigned with name it's accessible only by name.
- step.Filters.push_back(info.GetName(column));
- return true;
-}
-
-bool ExtractProjection(const TContext& info, NSsa::TProgramStep& step,
- const NKikimrSSA::TProgram::TProjection& projection) {
- step.Projection.reserve(projection.ColumnsSize());
- for (auto& col : projection.GetColumns()) {
- // NOTE: Name maskes Id for column. If column assigned with name it's accessible only by name.
- step.Projection.push_back(info.GetName(col));
- }
- return true;
-}
-
-bool ExtractGroupBy(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TGroupBy& groupBy) {
- if (!groupBy.AggregatesSize()) {
- return false;
- }
-
- step.GroupBy.reserve(groupBy.AggregatesSize());
- step.GroupByKeys.reserve(groupBy.KeyColumnsSize());
- for (auto& agg : groupBy.GetAggregates()) {
- auto& resColumn = agg.GetColumn();
- TString columnName = info.GenerateName(resColumn);
-
- auto func = MakeAggregate(info, columnName, agg.GetFunction());
- if (!func.IsOk()) {
- return false;
- }
- step.GroupBy.push_back(std::move(func));
- }
- for (auto& key : groupBy.GetKeyColumns()) {
- step.GroupByKeys.push_back(info.GetName(key));
- }
-
- return true;
-}
-
-}
-
-using EOperation = NArrow::EOperation;
-using TPredicate = NOlap::TPredicate;
-
-std::shared_ptr<NSsa::TProgram> TReadDescription::AddProgram(const IColumnResolver& columnResolver,
- const NKikimrSSA::TProgram& program)
-{
- using TId = NKikimrSSA::TProgram::TCommand;
-
- auto ssaProgram = std::make_shared<NSsa::TProgram>();
- TContext info(columnResolver);
- auto step = std::make_shared<NSsa::TProgramStep>();
- for (auto& cmd : program.GetCommand()) {
- switch (cmd.GetLineCase()) {
- case TId::kAssign:
- if (!ExtractAssign(info, *step, cmd.GetAssign(), ProgramParameters)) {
- return nullptr;
- }
- break;
- case TId::kFilter:
- if (!ExtractFilter(info, *step, cmd.GetFilter())) {
- return nullptr;
- }
- break;
- case TId::kProjection:
- if (!ExtractProjection(info, *step, cmd.GetProjection())) {
- return nullptr;
- }
- ssaProgram->Steps.push_back(step);
- step = std::make_shared<NSsa::TProgramStep>();
- break;
- case TId::kGroupBy:
- if (!ExtractGroupBy(info, *step, cmd.GetGroupBy())) {
- return nullptr;
- }
- ssaProgram->Steps.push_back(step);
- step = std::make_shared<NSsa::TProgramStep>();
- break;
- case TId::LINE_NOT_SET:
- return nullptr;
- }
- }
-
- // final step without final projection
- if (!step->Empty()) {
- ssaProgram->Steps.push_back(step);
- }
-
- ssaProgram->SourceColumns = std::move(info.Sources);
-
- // Query 'SELECT count(*) FROM table' needs a column
- if (ssaProgram->SourceColumns.empty()) {
- auto& ydbSchema = columnResolver.GetSchema();
-
- Y_VERIFY(!ydbSchema.KeyColumns.empty());
- ui32 key = ydbSchema.KeyColumns[0];
-
- auto it = ydbSchema.Columns.find(key);
- Y_VERIFY(it != ydbSchema.Columns.end());
-
- ssaProgram->SourceColumns[key] = it->second.Name;
- }
-
- return ssaProgram;
-}
-
}
diff --git a/ydb/core/tx/columnshard/engines/reader/description.h b/ydb/core/tx/columnshard/engines/reader/description.h
index b2928673e2a..b581055388f 100644
--- a/ydb/core/tx/columnshard/engines/reader/description.h
+++ b/ydb/core/tx/columnshard/engines/reader/description.h
@@ -1,22 +1,14 @@
#pragma once
+#include "program.h"
#include <ydb/core/tx/columnshard/engines/predicate/filter.h>
-#include <ydb/core/tablet_flat/flat_dbase_scheme.h>
-#include <ydb/core/formats/arrow/program.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
namespace NKikimr::NOlap {
-class IColumnResolver {
-public:
- virtual ~IColumnResolver() = default;
- virtual TString GetColumnName(ui32 id, bool required = true) const = 0;
- virtual const NTable::TScheme::TTableSchema& GetSchema() const = 0;
-};
-
// Describes read/scan request
struct TReadDescription {
private:
TSnapshot Snapshot;
+ TProgramContainer Program;
public:
// Table
ui64 PathId = 0;
@@ -27,25 +19,26 @@ public:
// operations with potentially different columns. We have to remove columns to support -Inf (Null) and +Inf.
NOlap::TPKRangesFilter PKRangesFilter;
- // SSA Program
- std::shared_ptr<NSsa::TProgram> Program;
- std::shared_ptr<arrow::RecordBatch> ProgramParameters; // TODO
-
// List of columns
std::vector<ui32> ColumnIds;
std::vector<TString> ColumnNames;
-
- std::shared_ptr<NSsa::TProgram> AddProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program);
TReadDescription(const TSnapshot& snapshot, const bool isReverse)
: Snapshot(snapshot)
, PKRangesFilter(isReverse) {
+ }
+ void SetProgram(TProgramContainer&& value) {
+ Program = std::move(value);
}
const TSnapshot& GetSnapshot() const {
return Snapshot;
}
+
+ const TProgramContainer& GetProgram() const {
+ return Program;
+ }
};
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
index 24a29917d86..7389d88286c 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
@@ -20,9 +20,9 @@ bool TAssembleFilter::DoExecuteImpl() {
FilteredBatch = nullptr;
return true;
}
- if (ReadMetadata->Program) {
+ auto earlyFilter = ReadMetadata->GetProgram().BuildEarlyFilter(batch);
+ if (earlyFilter) {
if (AllowEarlyFilter) {
- auto earlyFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, ReadMetadata->Program));
Filter = std::make_shared<NArrow::TColumnFilter>(Filter->CombineSequentialAnd(*earlyFilter));
if (!earlyFilter->Apply(batch)) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount);
@@ -30,7 +30,7 @@ bool TAssembleFilter::DoExecuteImpl() {
return true;
}
} else if (BatchesOrderPolicy->NeedNotAppliedEarlyFilter()) {
- EarlyFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, ReadMetadata->Program));
+ EarlyFilter = earlyFilter;
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
index ff24281f35c..0afae0b72b9 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp
@@ -111,9 +111,7 @@ void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) {
if (NotIndexedBatch) {
RecordBatches.emplace_back(NotIndexedBatch);
}
- if (Owner->GetReadMetadata()->Program) {
- NotIndexedBatchFutureFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, Owner->GetReadMetadata()->Program));
- }
+ NotIndexedBatchFutureFilter = Owner->GetReadMetadata()->GetProgram().BuildEarlyFilter(batch);
DuplicationsAvailableFlag = true;
}
CheckReady();
diff --git a/ydb/core/tx/columnshard/engines/reader/program.cpp b/ydb/core/tx/columnshard/engines/reader/program.cpp
new file mode 100644
index 00000000000..1d693164cf0
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/program.cpp
@@ -0,0 +1,519 @@
+#include "program.h"
+
+#include <ydb/core/formats/arrow/ssa_program_optimizer.h>
+#include <ydb/core/tx/columnshard/engines/filter.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/cast.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api_scalar.h>
+#include <google/protobuf/text_format.h>
+
+namespace NKikimr::NOlap {
+
+namespace {
+
+using EOperation = NArrow::EOperation;
+using EAggregate = NArrow::EAggregate;
+using TAssign = NSsa::TAssign;
+using TAggregateAssign = NSsa::TAggregateAssign;
+
+struct TContext {
+ const IColumnResolver& ColumnResolver;
+ mutable THashMap<ui32, TString> Sources;
+ mutable THashMap<TString, std::shared_ptr<arrow::Scalar>> Constants;
+
+ explicit TContext(const IColumnResolver& columnResolver)
+ : ColumnResolver(columnResolver)
+ {}
+
+ std::string GetName(const NKikimrSSA::TProgram::TColumn& column) const {
+ ui32 columnId = column.GetId();
+ TString name = ColumnResolver.GetColumnName(columnId, false);
+ if (name.Empty()) {
+ return GenerateName(column);
+ } else {
+ Sources[columnId] = name;
+ }
+ return std::string(name.data(), name.size());
+ }
+
+ std::string GenerateName(const NKikimrSSA::TProgram::TColumn& column) const {
+ TString name;
+ if (column.HasName()) {
+ name = column.GetName();
+ } else {
+ name = ToString(column.GetId());
+ }
+ return std::string(name.data(), name.size());
+ }
+};
+
+TAssign MakeFunction(const TContext& info, const std::string& name,
+ const NKikimrSSA::TProgram::TAssignment::TFunction& func) {
+ using TId = NKikimrSSA::TProgram::TAssignment;
+
+ std::vector<std::string> arguments;
+ for (auto& col : func.GetArguments()) {
+ arguments.push_back(info.GetName(col));
+ }
+
+ auto mkCastOptions = [](std::shared_ptr<arrow::DataType> dataType) {
+ // TODO: support CAST with OrDefault/OrNull logic (second argument is default value)
+ auto castOpts = std::make_shared<arrow::compute::CastOptions>(false);
+ castOpts->to_type = dataType;
+ return castOpts;
+ };
+
+ auto mkLikeOptions = [&](bool ignoreCase) {
+ if (arguments.size() != 2 || !info.Constants.contains(arguments[1])) {
+ return std::shared_ptr<arrow::compute::MatchSubstringOptions>();
+ }
+ auto patternScalar = info.Constants[arguments[1]];
+ if (!arrow::is_base_binary_like(patternScalar->type->id())) {
+ return std::shared_ptr<arrow::compute::MatchSubstringOptions>();
+ }
+ arguments.resize(1);
+ auto& pattern = static_cast<arrow::BaseBinaryScalar&>(*patternScalar).value;
+ return std::make_shared<arrow::compute::MatchSubstringOptions>(pattern->ToString(), ignoreCase);
+ };
+
+ switch (func.GetId()) {
+ case TId::FUNC_CMP_EQUAL:
+ return TAssign(name, EOperation::Equal, std::move(arguments));
+ case TId::FUNC_CMP_NOT_EQUAL:
+ return TAssign(name, EOperation::NotEqual, std::move(arguments));
+ case TId::FUNC_CMP_LESS:
+ return TAssign(name, EOperation::Less, std::move(arguments));
+ case TId::FUNC_CMP_LESS_EQUAL:
+ return TAssign(name, EOperation::LessEqual, std::move(arguments));
+ case TId::FUNC_CMP_GREATER:
+ return TAssign(name, EOperation::Greater, std::move(arguments));
+ case TId::FUNC_CMP_GREATER_EQUAL:
+ return TAssign(name, EOperation::GreaterEqual, std::move(arguments));
+ case TId::FUNC_IS_NULL:
+ return TAssign(name, EOperation::IsNull, std::move(arguments));
+ case TId::FUNC_STR_LENGTH:
+ return TAssign(name, EOperation::BinaryLength, std::move(arguments));
+ case TId::FUNC_STR_MATCH: {
+ if (auto opts = mkLikeOptions(false)) {
+ return TAssign(name, EOperation::MatchSubstring, std::move(arguments), opts);
+ }
+ break;
+ }
+ case TId::FUNC_STR_MATCH_LIKE: {
+ if (auto opts = mkLikeOptions(false)) {
+ return TAssign(name, EOperation::MatchLike, std::move(arguments), opts);
+ }
+ break;
+ }
+ case TId::FUNC_STR_STARTS_WITH: {
+ if (auto opts = mkLikeOptions(false)) {
+ return TAssign(name, EOperation::StartsWith, std::move(arguments), opts);
+ }
+ break;
+ }
+ case TId::FUNC_STR_ENDS_WITH: {
+ if (auto opts = mkLikeOptions(false)) {
+ return TAssign(name, EOperation::EndsWith, std::move(arguments), opts);
+ }
+ break;
+ }
+ case TId::FUNC_STR_MATCH_IGNORE_CASE: {
+ if (auto opts = mkLikeOptions(true)) {
+ return TAssign(name, EOperation::MatchSubstring, std::move(arguments), opts);
+ }
+ break;
+ }
+ case TId::FUNC_STR_STARTS_WITH_IGNORE_CASE: {
+ if (auto opts = mkLikeOptions(true)) {
+ return TAssign(name, EOperation::StartsWith, std::move(arguments), opts);
+ }
+ break;
+ }
+ case TId::FUNC_STR_ENDS_WITH_IGNORE_CASE: {
+ if (auto opts = mkLikeOptions(true)) {
+ return TAssign(name, EOperation::EndsWith, std::move(arguments), opts);
+ }
+ break;
+ }
+ case TId::FUNC_BINARY_NOT:
+ return TAssign(name, EOperation::Invert, std::move(arguments));
+ case TId::FUNC_BINARY_AND:
+ return TAssign(name, EOperation::And, std::move(arguments));
+ case TId::FUNC_BINARY_OR:
+ return TAssign(name, EOperation::Or, std::move(arguments));
+ case TId::FUNC_BINARY_XOR:
+ return TAssign(name, EOperation::Xor, std::move(arguments));
+ case TId::FUNC_MATH_ADD:
+ return TAssign(name, EOperation::Add, std::move(arguments));
+ case TId::FUNC_MATH_SUBTRACT:
+ return TAssign(name, EOperation::Subtract, std::move(arguments));
+ case TId::FUNC_MATH_MULTIPLY:
+ return TAssign(name, EOperation::Multiply, std::move(arguments));
+ case TId::FUNC_MATH_DIVIDE:
+ return TAssign(name, EOperation::Divide, std::move(arguments));
+ case TId::FUNC_CAST_TO_INT8:
+ return TAssign(name, EOperation::CastInt8, std::move(arguments),
+ mkCastOptions(std::make_shared<arrow::Int8Type>()));
+ case TId::FUNC_CAST_TO_INT16:
+ return TAssign(name, EOperation::CastInt16, std::move(arguments),
+ mkCastOptions(std::make_shared<arrow::Int16Type>()));
+ case TId::FUNC_CAST_TO_INT32:
+ return TAssign(name, EOperation::CastInt32, std::move(arguments),
+ mkCastOptions(std::make_shared<arrow::Int32Type>()));
+ case TId::FUNC_CAST_TO_INT64:
+ return TAssign(name, EOperation::CastInt64, std::move(arguments),
+ mkCastOptions(std::make_shared<arrow::Int64Type>()));
+ case TId::FUNC_CAST_TO_UINT8:
+ return TAssign(name, EOperation::CastUInt8, std::move(arguments),
+ mkCastOptions(std::make_shared<arrow::UInt8Type>()));
+ case TId::FUNC_CAST_TO_UINT16:
+ return TAssign(name, EOperation::CastUInt16, std::move(arguments),
+ mkCastOptions(std::make_shared<arrow::UInt16Type>()));
+ case TId::FUNC_CAST_TO_UINT32:
+ return TAssign(name, EOperation::CastUInt32, std::move(arguments),
+ mkCastOptions(std::make_shared<arrow::UInt32Type>()));
+ case TId::FUNC_CAST_TO_UINT64:
+ return TAssign(name, EOperation::CastUInt64, std::move(arguments),
+ mkCastOptions(std::make_shared<arrow::UInt64Type>()));
+ case TId::FUNC_CAST_TO_FLOAT:
+ return TAssign(name, EOperation::CastFloat, std::move(arguments),
+ mkCastOptions(std::make_shared<arrow::FloatType>()));
+ case TId::FUNC_CAST_TO_DOUBLE:
+ return TAssign(name, EOperation::CastDouble, std::move(arguments),
+ mkCastOptions(std::make_shared<arrow::DoubleType>()));
+ case TId::FUNC_CAST_TO_TIMESTAMP:
+ return TAssign(name, EOperation::CastTimestamp, std::move(arguments),
+ mkCastOptions(std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO)));
+ case TId::FUNC_CAST_TO_BINARY:
+ case TId::FUNC_CAST_TO_FIXED_SIZE_BINARY:
+ case TId::FUNC_UNSPECIFIED:
+ break;
+ }
+ return TAssign(name, EOperation::Unspecified, std::move(arguments));
+}
+
+NSsa::TAssign MakeConstant(const std::string& name, const NKikimrSSA::TProgram::TConstant& constant) {
+ using TId = NKikimrSSA::TProgram::TConstant;
+
+ switch (constant.GetValueCase()) {
+ case TId::kBool:
+ return TAssign(name, constant.GetBool());
+ case TId::kInt32:
+ return TAssign(name, constant.GetInt32());
+ case TId::kUint32:
+ return TAssign(name, constant.GetUint32());
+ case TId::kInt64:
+ return TAssign(name, constant.GetInt64());
+ case TId::kUint64:
+ return TAssign(name, constant.GetUint64());
+ case TId::kFloat:
+ return TAssign(name, constant.GetFloat());
+ case TId::kDouble:
+ return TAssign(name, constant.GetDouble());
+ case TId::kBytes:
+ {
+ TString str = constant.GetBytes();
+ return TAssign(name, std::string(str.data(), str.size()));
+ }
+ case TId::kText:
+ {
+ TString str = constant.GetText();
+ return TAssign(name, std::string(str.data(), str.size()));
+ }
+ case TId::VALUE_NOT_SET:
+ break;
+ }
+ return TAssign(name, EOperation::Unspecified, {});
+}
+
+NSsa::TAggregateAssign MakeAggregate(const TContext& info, const std::string& name,
+ const NKikimrSSA::TProgram::TAggregateAssignment::TAggregateFunction& func)
+{
+ using TId = NKikimrSSA::TProgram::TAggregateAssignment;
+
+ if (func.ArgumentsSize() == 1) {
+ std::string argument = info.GetName(func.GetArguments()[0]);
+
+ switch (func.GetId()) {
+ case TId::AGG_SOME:
+ return TAggregateAssign(name, EAggregate::Some, std::move(argument));
+ case TId::AGG_COUNT:
+ return TAggregateAssign(name, EAggregate::Count, std::move(argument));
+ case TId::AGG_MIN:
+ return TAggregateAssign(name, EAggregate::Min, std::move(argument));
+ case TId::AGG_MAX:
+ return TAggregateAssign(name, EAggregate::Max, std::move(argument));
+ case TId::AGG_SUM:
+ return TAggregateAssign(name, EAggregate::Sum, std::move(argument));
+#if 0 // TODO
+ case TId::AGG_AVG:
+ return TAggregateAssign(name, EAggregate::Avg, std::move(argument));
+#endif
+ case TId::AGG_UNSPECIFIED:
+ break;
+ }
+ } else if (func.ArgumentsSize() == 0 && func.GetId() == TId::AGG_COUNT) {
+ // COUNT(*) case
+ return TAggregateAssign(name, EAggregate::Count);
+ }
+ return TAggregateAssign(name); // !ok()
+}
+
+NSsa::TAssign MaterializeParameter(const std::string& name, const NKikimrSSA::TProgram::TParameter& parameter,
+ const std::shared_ptr<arrow::RecordBatch>& parameterValues)
+{
+ auto parameterName = parameter.GetName();
+ auto column = parameterValues->GetColumnByName(parameterName);
+#if 0
+ Y_VERIFY(
+ column,
+ "No parameter %s in serialized parameters.", parameterName.c_str()
+ );
+ Y_VERIFY(
+ column->length() == 1,
+ "Incorrect values count in parameter array"
+ );
+#else
+ if (!column || column->length() != 1) {
+ return TAssign(name, NArrow::EOperation::Unspecified, {});
+ }
+#endif
+ return TAssign(name, *column->GetScalar(0));
+}
+
+bool ExtractAssign(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TAssignment& assign,
+ const std::shared_ptr<arrow::RecordBatch>& parameterValues)
+{
+ using TId = NKikimrSSA::TProgram::TAssignment;
+
+ std::string columnName = info.GetName(assign.GetColumn());
+
+ switch (assign.GetExpressionCase()) {
+ case TId::kFunction:
+ {
+ auto func = MakeFunction(info, columnName, assign.GetFunction());
+ if (!func.IsOk()) {
+ return false;
+ }
+ step.Assignes.emplace_back(std::move(func));
+ break;
+ }
+ case TId::kConstant:
+ {
+ auto cnst = MakeConstant(columnName, assign.GetConstant());
+ if (!cnst.IsConstant()) {
+ return false;
+ }
+ info.Constants[columnName] = cnst.GetConstant();
+ step.Assignes.emplace_back(std::move(cnst));
+ break;
+ }
+ case TId::kParameter:
+ {
+ auto param = MaterializeParameter(columnName, assign.GetParameter(), parameterValues);
+ if (!param.IsConstant()) {
+ return false;
+ }
+ step.Assignes.emplace_back(std::move(param));
+ break;
+ }
+ case TId::kExternalFunction:
+ case TId::kNull:
+ case TId::EXPRESSION_NOT_SET:
+ return false;
+ }
+ return true;
+}
+
+bool ExtractFilter(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TFilter& filter) {
+ auto& column = filter.GetPredicate();
+ if (!column.HasId() && !column.HasName()) {
+ return false;
+ }
+ // NOTE: Name maskes Id for column. If column assigned with name it's accessible only by name.
+ step.Filters.push_back(info.GetName(column));
+ return true;
+}
+
+bool ExtractProjection(const TContext& info, NSsa::TProgramStep& step,
+ const NKikimrSSA::TProgram::TProjection& projection) {
+ step.Projection.reserve(projection.ColumnsSize());
+ for (auto& col : projection.GetColumns()) {
+ // NOTE: Name maskes Id for column. If column assigned with name it's accessible only by name.
+ step.Projection.push_back(info.GetName(col));
+ }
+ return true;
+}
+
+bool ExtractGroupBy(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TGroupBy& groupBy) {
+ if (!groupBy.AggregatesSize()) {
+ return false;
+ }
+
+ step.GroupBy.reserve(groupBy.AggregatesSize());
+ step.GroupByKeys.reserve(groupBy.KeyColumnsSize());
+ for (auto& agg : groupBy.GetAggregates()) {
+ auto& resColumn = agg.GetColumn();
+ TString columnName = info.GenerateName(resColumn);
+
+ auto func = MakeAggregate(info, columnName, agg.GetFunction());
+ if (!func.IsOk()) {
+ return false;
+ }
+ step.GroupBy.push_back(std::move(func));
+ }
+ for (auto& key : groupBy.GetKeyColumns()) {
+ step.GroupByKeys.push_back(info.GetName(key));
+ }
+
+ return true;
+}
+
+}
+
+const THashMap<ui32, TString>& TProgramContainer::GetSourceColumns() const {
+ if (!Program) {
+ return Default<THashMap<ui32, TString>>();
+ }
+ return Program->SourceColumns;
+}
+
+bool TProgramContainer::HasProgram() const {
+ return !!Program;
+}
+
+std::shared_ptr<NArrow::TColumnFilter> TProgramContainer::BuildEarlyFilter(std::shared_ptr<arrow::RecordBatch> batch) const {
+ if (Program) {
+ return std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, Program));
+ }
+ return nullptr;
+}
+
+std::set<std::string> TProgramContainer::GetEarlyFilterColumns() const {
+ if (Program) {
+ return Program->GetEarlyFilterColumns();
+ }
+ return Default<std::set<std::string>>();
+}
+
+bool TProgramContainer::HasEarlyFilterOnly() const {
+ if (!Program) {
+ return true;
+ }
+ for (ui32 i = 1; i < Program->Steps.size(); ++i) {
+ if (Program->Steps[i]->Filters.size()) {
+ return false;
+ }
+ }
+ return true;
+}
+
+bool TProgramContainer::Init(const IColumnResolver& columnResolver, NKikimrSchemeOp::EOlapProgramType programType, TString serializedProgram, TString& error) {
+ Y_VERIFY(serializedProgram);
+
+ NKikimrSSA::TProgram programProto;
+ NKikimrSSA::TOlapProgram olapProgramProto;
+
+ switch (programType) {
+ case NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS:
+ if (!olapProgramProto.ParseFromString(serializedProgram)) {
+ error = TStringBuilder() << "Can't parse TOlapProgram";
+ return false;
+ }
+
+ if (!programProto.ParseFromString(olapProgramProto.GetProgram())) {
+ error = TStringBuilder() << "Can't parse TProgram";
+ return false;
+ }
+
+ break;
+ default:
+ error = TStringBuilder() << "Unsupported olap program version: " << (ui32)programType;
+ return false;
+ }
+
+ if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) {
+ TString out;
+ ::google::protobuf::TextFormat::PrintToString(programProto, &out);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("program", out);
+ }
+
+ if (olapProgramProto.HasParameters()) {
+ Y_VERIFY(olapProgramProto.HasParametersSchema(), "Parameters are present, but there is no schema.");
+
+ auto schema = NArrow::DeserializeSchema(olapProgramProto.GetParametersSchema());
+ ProgramParameters = NArrow::DeserializeBatch(olapProgramProto.GetParameters(), schema);
+ }
+
+ NOlap::TProgramContainer ssaProgram;
+ if (!ParseProgram(columnResolver, programProto)) {
+ error = TStringBuilder() << "Wrong olap program";
+ return false;
+ }
+ return true;
+}
+
+bool TProgramContainer::ParseProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program) {
+ using TId = NKikimrSSA::TProgram::TCommand;
+
+ auto ssaProgram = std::make_shared<NSsa::TProgram>();
+ TContext info(columnResolver);
+ auto step = std::make_shared<NSsa::TProgramStep>();
+ for (auto& cmd : program.GetCommand()) {
+ switch (cmd.GetLineCase()) {
+ case TId::kAssign:
+ if (!ExtractAssign(info, *step, cmd.GetAssign(), ProgramParameters)) {
+ return false;
+ }
+ break;
+ case TId::kFilter:
+ if (!ExtractFilter(info, *step, cmd.GetFilter())) {
+ return false;
+ }
+ break;
+ case TId::kProjection:
+ if (!ExtractProjection(info, *step, cmd.GetProjection())) {
+ return false;
+ }
+ ssaProgram->Steps.push_back(step);
+ step = std::make_shared<NSsa::TProgramStep>();
+ break;
+ case TId::kGroupBy:
+ if (!ExtractGroupBy(info, *step, cmd.GetGroupBy())) {
+ return false;
+ }
+ ssaProgram->Steps.push_back(step);
+ step = std::make_shared<NSsa::TProgramStep>();
+ break;
+ case TId::LINE_NOT_SET:
+ return false;
+ }
+ }
+
+ // final step without final projection
+ if (!step->Empty()) {
+ ssaProgram->Steps.push_back(step);
+ }
+
+ ssaProgram->SourceColumns = std::move(info.Sources);
+
+ // Query 'SELECT count(*) FROM table' needs a column
+ if (ssaProgram->SourceColumns.empty()) {
+ auto& ydbSchema = columnResolver.GetSchema();
+
+ Y_VERIFY(!ydbSchema.KeyColumns.empty());
+ ui32 key = ydbSchema.KeyColumns[0];
+
+ auto it = ydbSchema.Columns.find(key);
+ Y_VERIFY(it != ydbSchema.Columns.end());
+
+ ssaProgram->SourceColumns[key] = it->second.Name;
+ }
+
+ if (!ssaProgram->Steps.empty()) {
+ NSsa::OptimizeProgram(*ssaProgram);
+ }
+ Program = ssaProgram;
+ return true;
+ }
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/program.h b/ydb/core/tx/columnshard/engines/reader/program.h
new file mode 100644
index 00000000000..494018def88
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/program.h
@@ -0,0 +1,44 @@
+#pragma once
+
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/core/protos/ssa.pb.h>
+#include <ydb/core/formats/arrow/program.h>
+#include <ydb/core/formats/arrow/custom_registry.h>
+#include <ydb/core/tablet_flat/flat_dbase_scheme.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+
+
+namespace NKikimr::NOlap {
+class IColumnResolver {
+public:
+ virtual ~IColumnResolver() = default;
+ virtual TString GetColumnName(ui32 id, bool required = true) const = 0;
+ virtual const NTable::TScheme::TTableSchema& GetSchema() const = 0;
+};
+
+class TProgramContainer {
+private:
+ std::shared_ptr<NSsa::TProgram> Program;
+ std::shared_ptr<arrow::RecordBatch> ProgramParameters; // TODO
+public:
+ bool Init(const IColumnResolver& columnResolver, NKikimrSchemeOp::EOlapProgramType programType, TString serializedProgram, TString& error);
+
+ inline arrow::Status ApplyProgram(std::shared_ptr<arrow::RecordBatch>& batch) const {
+ if (Program) {
+ return Program->ApplyTo(batch, NArrow::GetCustomExecContext());
+ }
+ return arrow::Status::OK();
+ }
+
+ const THashMap<ui32, TString>& GetSourceColumns() const;
+ bool HasProgram() const;
+
+ std::shared_ptr<NArrow::TColumnFilter> BuildEarlyFilter(std::shared_ptr<arrow::RecordBatch> batch) const;
+ std::set<std::string> GetEarlyFilterColumns() const;
+
+ bool HasEarlyFilterOnly() const;
+private:
+ bool ParseProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program);
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
index 626eb64a46d..8ecf5fb4907 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
@@ -100,11 +100,8 @@ bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataSto
columnIds.insert(columnId);
}
- Program = readDescription.Program;
- if (Program) {
- for (auto& [id, name] : Program->SourceColumns) {
- columnIds.insert(id);
- }
+ for (auto& [id, name] : GetProgram().GetSourceColumns()) {
+ columnIds.insert(id);
}
SelectInfo = dataAccessor.Select(readDescription, columnIds);
@@ -126,13 +123,11 @@ std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
}
}
- if (Program) {
- for (auto&& i : Program->GetEarlyFilterColumns()) {
- auto id = indexInfo.GetColumnIdOptional(i);
- if (id) {
- result.emplace(*id);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
- }
+ for (auto&& i : GetProgram().GetEarlyFilterColumns()) {
+ auto id = indexInfo.GetColumnIdOptional(i);
+ if (id) {
+ result.emplace(*id);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
}
}
if (Snapshot.GetPlanStep()) {
@@ -217,12 +212,9 @@ NIndexedReader::IOrderPolicy::TPtr TReadMetadata::BuildSortingPolicy() const {
}
++idx;
}
- if (Program) {
- for (ui32 i = 1; i < Program->Steps.size(); ++i) {
- if (Program->Steps[i]->Filters.size()) {
- return std::make_shared<NIndexedReader::TAnySorting>(this->shared_from_this());
- }
- }
+
+ if (!GetProgram().HasEarlyFilterOnly()) {
+ return std::make_shared<NIndexedReader::TAnySorting>(this->shared_from_this());
}
return std::make_shared<NIndexedReader::TPKSortingWithLimit>(this->shared_from_this());
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
index 52dac3522d9..77a31505227 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
@@ -77,6 +77,7 @@ public:
private:
const ESorting Sorting = ESorting::ASC; // Sorting inside returned batches
std::optional<TPKRangesFilter> PKRangesFilter;
+ TProgramContainer Program;
public:
using TConstPtr = std::shared_ptr<const TReadMetadataBase>;
@@ -91,16 +92,15 @@ public:
return *PKRangesFilter;
}
- TReadMetadataBase(const ESorting sorting)
+ TReadMetadataBase(const ESorting sorting, const TProgramContainer& ssaProgram)
: Sorting(sorting)
+ , Program(ssaProgram)
{
-
}
virtual ~TReadMetadataBase() = default;
std::shared_ptr<NOlap::TPredicate> LessPredicate;
std::shared_ptr<NOlap::TPredicate> GreaterPredicate;
- std::shared_ptr<NSsa::TProgram> Program;
std::shared_ptr<const THashSet<TUnifiedBlobId>> ExternBlobs;
ui64 Limit{0}; // TODO
@@ -122,6 +122,10 @@ public:
meta.Dump(out);
return out;
}
+
+ const TProgramContainer& GetProgram() const {
+ return Program;
+ }
};
// Holds all metadata that is needed to perform read/scan
@@ -147,8 +151,8 @@ public:
std::shared_ptr<NIndexedReader::IOrderPolicy> BuildSortingPolicy() const;
- TReadMetadata(const TVersionedIndex& info, const TSnapshot& snapshot, const ESorting sorting)
- : TBase(sorting)
+ TReadMetadata(const TVersionedIndex& info, const TSnapshot& snapshot, const ESorting sorting, const TProgramContainer& ssaProgram)
+ : TBase(sorting, ssaProgram)
, IndexVersions(info)
, Snapshot(snapshot)
, ResultIndexSchema(info.GetSchema(Snapshot))
@@ -251,7 +255,7 @@ public:
<< " index records: " << NumIndexedRecords()
<< " index blobs: " << NumIndexedBlobs()
<< " committed blobs: " << CommittedBlobs.size()
- << " with program steps: " << (Program ? Program->Steps.size() : 0)
+ // << " with program steps: " << (Program ? Program->Steps.size() : 0)
<< " at snapshot: " << Snapshot.GetPlanStep() << ":" << Snapshot.GetTxId();
TBase::Dump(out);
if (SelectInfo) {
@@ -276,8 +280,8 @@ public:
std::vector<ui32> ResultColumnIds;
THashMap<ui64, std::shared_ptr<NOlap::TColumnEngineStats>> IndexStats;
- explicit TReadStatsMetadata(ui64 tabletId, const ESorting sorting)
- : TBase(sorting)
+ explicit TReadStatsMetadata(ui64 tabletId, const ESorting sorting, const TProgramContainer& ssaProgram)
+ : TBase(sorting, ssaProgram)
, TabletId(tabletId)
{}