diff options
author | nsofya <nsofya@yandex-team.com> | 2023-05-26 19:12:41 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-05-26 19:12:41 +0300 |
commit | e0c56556489d46590343c4276a5444e7ca85d7c7 (patch) | |
tree | 55e7a613ebdf053ad6e5361aa6691c646f68cc9d | |
parent | e54492a5b591e75a7c69443cce17cbc3881a9e9e (diff) | |
download | ydb-e0c56556489d46590343c4276a5444e7ca85d7c7.tar.gz |
Group all program related stuff
Подвигала код, работаюший с программой, объединила в один класс общий, чтобы потом парсинг ядер в него же прятать
18 files changed, 632 insertions, 555 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp index cff05627660..117aefeba55 100644 --- a/ydb/core/tx/columnshard/columnshard__read.cpp +++ b/ydb/core/tx/columnshard/columnshard__read.cpp @@ -77,12 +77,12 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) { } Y_VERIFY(read.PKRangesFilter.Add(fromPredicate, toPredicate, &indexInfo)); - bool parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read, + bool parseResult = ParseProgram(record.GetOlapProgramType(), record.GetOlapProgram(), read, TIndexColumnResolver(indexInfo)); std::shared_ptr<NOlap::TReadMetadata> metadata; if (parseResult) { - metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(), Self->BatchCache, + metadata = PrepareReadMetadata(read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(), Self->BatchCache, ErrorDescription, false); } diff --git a/ydb/core/tx/columnshard/columnshard__read_base.cpp b/ydb/core/tx/columnshard/columnshard__read_base.cpp index 3a69c0d33f3..6ebc47ddc79 100644 --- a/ydb/core/tx/columnshard/columnshard__read_base.cpp +++ b/ydb/core/tx/columnshard/columnshard__read_base.cpp @@ -1,19 +1,16 @@ #include <ydb/core/tx/columnshard/columnshard_impl.h> #include <ydb/core/tx/columnshard/columnshard__read_base.h> #include <ydb/core/tx/columnshard/columnshard__index_scan.h> -#include <ydb/core/formats/arrow/ssa_program_optimizer.h> namespace NKikimr::NColumnShard { std::shared_ptr<NOlap::TReadMetadata> -TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const NOlap::TReadDescription& read, +TTxReadBase::PrepareReadMetadata(const NOlap::TReadDescription& read, const std::unique_ptr<NOlap::TInsertTable>& insertTable, const std::unique_ptr<NOlap::IColumnEngine>& index, const TBatchCache& batchCache, TString& error, const bool isReverse) const { - Y_UNUSED(ctx); - if (!insertTable || !index) { return nullptr; } @@ -24,7 +21,8 @@ TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const NOlap::TReadDes } NOlap::TDataStorageAccessor dataAccessor(insertTable, index, batchCache); - auto readMetadata = std::make_shared<NOlap::TReadMetadata>(index->GetVersionedIndex(), read.GetSnapshot(), isReverse ? NOlap::TReadMetadata::ESorting::DESC : NOlap::TReadMetadata::ESorting::ASC); + auto readMetadata = std::make_shared<NOlap::TReadMetadata>(index->GetVersionedIndex(), read.GetSnapshot(), + isReverse ? NOlap::TReadMetadata::ESorting::DESC : NOlap::TReadMetadata::ESorting::ASC, read.GetProgram()); if (!readMetadata->Init(read, dataAccessor, error)) { return nullptr; @@ -32,59 +30,18 @@ TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const NOlap::TReadDes return readMetadata; } -bool TTxReadBase::ParseProgram(const TActorContext& ctx, NKikimrSchemeOp::EOlapProgramType programType, - TString serializedProgram, NOlap::TReadDescription& read, const NOlap::IColumnResolver& columnResolver) -{ +bool TTxReadBase::ParseProgram(NKikimrSchemeOp::EOlapProgramType programType, + TString serializedProgram, NOlap::TReadDescription& read, const NOlap::IColumnResolver& columnResolver) { if (serializedProgram.empty()) { return true; } - - NKikimrSSA::TProgram program; - NKikimrSSA::TOlapProgram olapProgram; - - switch (programType) { - case NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS: - if (!olapProgram.ParseFromString(serializedProgram)) { - ErrorDescription = TStringBuilder() << "Can't parse TOlapProgram at " << Self->TabletID(); - return false; - } - - if (!program.ParseFromString(olapProgram.GetProgram())) { - ErrorDescription = TStringBuilder() << "Can't parse TProgram at " << Self->TabletID(); - return false; - } - - break; - default: - ErrorDescription = TStringBuilder() << "Unsupported olap program version: " << (ui32)programType; - return false; - } - - if (ctx.LoggerSettings() && - ctx.LoggerSettings()->Satisfies(NActors::NLog::PRI_DEBUG, NKikimrServices::TX_COLUMNSHARD)) - { - TString out; - ::google::protobuf::TextFormat::PrintToString(program, &out); - LOG_S_DEBUG("Process program: " << Endl << out); - } - - if (olapProgram.HasParameters()) { - Y_VERIFY(olapProgram.HasParametersSchema(), "Parameters are present, but there is no schema."); - - auto schema = NArrow::DeserializeSchema(olapProgram.GetParametersSchema()); - read.ProgramParameters = NArrow::DeserializeBatch(olapProgram.GetParameters(), schema); - } - - auto ssaProgram = read.AddProgram(columnResolver, program); - if (!ssaProgram) { - ErrorDescription = TStringBuilder() << "Wrong olap program"; + NOlap::TProgramContainer ssaProgram; + TString error; + if (!ssaProgram.Init(columnResolver, programType, serializedProgram, error)) { + ErrorDescription = TStringBuilder() << "Can't parse SsaProgram at " << Self->TabletID() << " / " << error; return false; } - if (!ssaProgram->Steps.empty() && Self->TablesManager.HasPrimaryIndex()) { - NSsa::OptimizeProgram(*ssaProgram); - } - - read.Program = ssaProgram; + read.SetProgram(std::move(ssaProgram)); return true; } diff --git a/ydb/core/tx/columnshard/columnshard__read_base.h b/ydb/core/tx/columnshard/columnshard__read_base.h index fe62c418452..148971bb22e 100644 --- a/ydb/core/tx/columnshard/columnshard__read_base.h +++ b/ydb/core/tx/columnshard/columnshard__read_base.h @@ -12,7 +12,6 @@ protected: {} std::shared_ptr<NOlap::TReadMetadata> PrepareReadMetadata( - const TActorContext& ctx, const NOlap::TReadDescription& readDescription, const std::unique_ptr<NOlap::TInsertTable>& insertTable, const std::unique_ptr<NOlap::IColumnEngine>& index, @@ -21,7 +20,6 @@ protected: protected: bool ParseProgram( - const TActorContext& ctx, NKikimrSchemeOp::EOlapProgramType programType, TString serializedProgram, NOlap::TReadDescription& read, diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 9de12ac7228..8d1f53df638 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -36,7 +36,7 @@ public: TTxType GetTxType() const override { return TXTYPE_START_SCAN; } private: - std::shared_ptr<NOlap::TReadMetadataBase> CreateReadMetadata(const TActorContext& ctx, NOlap::TReadDescription& read, + std::shared_ptr<NOlap::TReadMetadataBase> CreateReadMetadata(NOlap::TReadDescription& read, bool isIndexStats, bool isReverse, ui64 limit); private: @@ -761,10 +761,8 @@ static bool FillPredicatesFromRange(NOlap::TReadDescription& read, const ::NKiki std::shared_ptr<NOlap::TReadStatsMetadata> PrepareStatsReadMetadata(ui64 tabletId, const NOlap::TReadDescription& read, const std::unique_ptr<NOlap::IColumnEngine>& index, TString& error, const bool isReverse) { THashSet<ui32> readColumnIds(read.ColumnIds.begin(), read.ColumnIds.end()); - if (read.Program) { - for (auto& [id, name] : read.Program->SourceColumns) { - readColumnIds.insert(id); - } + for (auto& [id, name] : read.GetProgram().GetSourceColumns()) { + readColumnIds.insert(id); } for (ui32 colId : readColumnIds) { @@ -774,12 +772,13 @@ PrepareStatsReadMetadata(ui64 tabletId, const NOlap::TReadDescription& read, con } } - auto out = std::make_shared<NOlap::TReadStatsMetadata>(tabletId, isReverse ? NOlap::TReadStatsMetadata::ESorting::DESC : NOlap::TReadStatsMetadata::ESorting::ASC); + auto out = std::make_shared<NOlap::TReadStatsMetadata>(tabletId, + isReverse ? NOlap::TReadStatsMetadata::ESorting::DESC : NOlap::TReadStatsMetadata::ESorting::ASC, + read.GetProgram()); out->SetPKRangesFilter(read.PKRangesFilter); out->ReadColumnIds.assign(readColumnIds.begin(), readColumnIds.end()); out->ResultColumnIds = read.ColumnIds; - out->Program = read.Program; if (!index) { return out; @@ -805,14 +804,14 @@ PrepareStatsReadMetadata(ui64 tabletId, const NOlap::TReadDescription& read, con return out; } -std::shared_ptr<NOlap::TReadMetadataBase> TTxScan::CreateReadMetadata(const TActorContext& ctx, NOlap::TReadDescription& read, +std::shared_ptr<NOlap::TReadMetadataBase> TTxScan::CreateReadMetadata(NOlap::TReadDescription& read, bool indexStats, bool isReverse, ui64 itemsLimit) { std::shared_ptr<NOlap::TReadMetadataBase> metadata; if (indexStats) { metadata = PrepareStatsReadMetadata(Self->TabletID(), read, Self->TablesManager.GetPrimaryIndex(), ErrorDescription, isReverse); } else { - metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(), Self->BatchCache, + metadata = PrepareReadMetadata(read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(), Self->BatchCache, ErrorDescription, isReverse); } @@ -868,10 +867,10 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { if (!isIndexStats) { TIndexColumnResolver columnResolver(*indexInfo); - parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read, columnResolver); + parseResult = ParseProgram(record.GetOlapProgramType(), record.GetOlapProgram(), read, columnResolver); } else { TStatsColumnResolver columnResolver; - parseResult = ParseProgram(ctx, record.GetOlapProgramType(), record.GetOlapProgram(), read, columnResolver); + parseResult = ParseProgram(record.GetOlapProgramType(), record.GetOlapProgram(), read, columnResolver); } if (!parseResult) { @@ -879,7 +878,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { } if (!record.RangesSize()) { - auto range = CreateReadMetadata(ctx, read, isIndexStats, record.GetReverse(), itemsLimit); + auto range = CreateReadMetadata(read, isIndexStats, record.GetReverse(), itemsLimit); if (range) { if (!isIndexStats) { Self->MapExternBlobs(ctx, static_cast<NOlap::TReadMetadata&>(*range)); @@ -902,7 +901,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& ctx) { } } { - auto newRange = CreateReadMetadata(ctx, read, isIndexStats, record.GetReverse(), itemsLimit); + auto newRange = CreateReadMetadata(read, isIndexStats, record.GetReverse(), itemsLimit); if (!newRange) { ReadMetadataRanges.clear(); return true; diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp index ddd5f2871b9..ffe33890fc0 100644 --- a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp @@ -20,11 +20,9 @@ NKikimr::NOlap::TPartialReadResult TStatsIterator::GetBatch() { .LastReadKey = std::move(lastKey) }; - if (ReadMetadata->Program) { - auto status = ApplyProgram(out.ResultBatch, *ReadMetadata->Program, NArrow::GetCustomExecContext()); - if (!status.ok()) { - out.ErrorString = status.message(); - } + auto status = ReadMetadata->GetProgram().ApplyProgram(out.ResultBatch); + if (!status.ok()) { + out.ErrorString = status.message(); } return out; } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index cb95e0b13d4..223a581a97f 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -433,12 +433,11 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco }); } } - - if (ReadMetadata->Program) { + + if (ReadMetadata->GetProgram().HasProgram()) { MergeTooSmallBatches(out); - for (auto& result : out) { - auto status = ApplyProgram(result.ResultBatch, *ReadMetadata->Program, NArrow::GetCustomExecContext()); + auto status = ReadMetadata->GetProgram().ApplyProgram(result.ResultBatch); if (!status.ok()) { result.ErrorString = status.message(); } diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt index a47869de0cd..099c560c72e 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt @@ -40,6 +40,7 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/program.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt index 4e5ec184914..4c3c3a73226 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt @@ -41,6 +41,7 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/program.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt index 4e5ec184914..4c3c3a73226 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt @@ -41,6 +41,7 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/program.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt index a47869de0cd..099c560c72e 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt @@ -40,6 +40,7 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/program.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/description.cpp b/ydb/core/tx/columnshard/engines/reader/description.cpp index 761304aad10..e11e6c49da1 100644 --- a/ydb/core/tx/columnshard/engines/reader/description.cpp +++ b/ydb/core/tx/columnshard/engines/reader/description.cpp @@ -1,432 +1,4 @@ #include "description.h" namespace NKikimr::NOlap { - -namespace { - -using EOperation = NArrow::EOperation; -using EAggregate = NArrow::EAggregate; -using TAssign = NSsa::TAssign; -using TAggregateAssign = NSsa::TAggregateAssign; - -struct TContext { - const IColumnResolver& ColumnResolver; - mutable THashMap<ui32, TString> Sources; - mutable THashMap<TString, std::shared_ptr<arrow::Scalar>> Constants; - - explicit TContext(const IColumnResolver& columnResolver) - : ColumnResolver(columnResolver) - {} - - std::string GetName(const NKikimrSSA::TProgram::TColumn& column) const { - ui32 columnId = column.GetId(); - TString name = ColumnResolver.GetColumnName(columnId, false); - if (name.Empty()) { - return GenerateName(column); - } else { - Sources[columnId] = name; - } - return std::string(name.data(), name.size()); - } - - std::string GenerateName(const NKikimrSSA::TProgram::TColumn& column) const { - TString name; - if (column.HasName()) { - name = column.GetName(); - } else { - name = ToString(column.GetId()); - } - return std::string(name.data(), name.size()); - } -}; - -TAssign MakeFunction(const TContext& info, const std::string& name, - const NKikimrSSA::TProgram::TAssignment::TFunction& func) { - using TId = NKikimrSSA::TProgram::TAssignment; - - std::vector<std::string> arguments; - for (auto& col : func.GetArguments()) { - arguments.push_back(info.GetName(col)); - } - - auto mkCastOptions = [](std::shared_ptr<arrow::DataType> dataType) { - // TODO: support CAST with OrDefault/OrNull logic (second argument is default value) - auto castOpts = std::make_shared<arrow::compute::CastOptions>(false); - castOpts->to_type = dataType; - return castOpts; - }; - - auto mkLikeOptions = [&](bool ignoreCase) { - if (arguments.size() != 2 || !info.Constants.contains(arguments[1])) { - return std::shared_ptr<arrow::compute::MatchSubstringOptions>(); - } - auto patternScalar = info.Constants[arguments[1]]; - if (!arrow::is_base_binary_like(patternScalar->type->id())) { - return std::shared_ptr<arrow::compute::MatchSubstringOptions>(); - } - arguments.resize(1); - auto& pattern = static_cast<arrow::BaseBinaryScalar&>(*patternScalar).value; - return std::make_shared<arrow::compute::MatchSubstringOptions>(pattern->ToString(), ignoreCase); - }; - - switch (func.GetId()) { - case TId::FUNC_CMP_EQUAL: - return TAssign(name, EOperation::Equal, std::move(arguments)); - case TId::FUNC_CMP_NOT_EQUAL: - return TAssign(name, EOperation::NotEqual, std::move(arguments)); - case TId::FUNC_CMP_LESS: - return TAssign(name, EOperation::Less, std::move(arguments)); - case TId::FUNC_CMP_LESS_EQUAL: - return TAssign(name, EOperation::LessEqual, std::move(arguments)); - case TId::FUNC_CMP_GREATER: - return TAssign(name, EOperation::Greater, std::move(arguments)); - case TId::FUNC_CMP_GREATER_EQUAL: - return TAssign(name, EOperation::GreaterEqual, std::move(arguments)); - case TId::FUNC_IS_NULL: - return TAssign(name, EOperation::IsNull, std::move(arguments)); - case TId::FUNC_STR_LENGTH: - return TAssign(name, EOperation::BinaryLength, std::move(arguments)); - case TId::FUNC_STR_MATCH: { - if (auto opts = mkLikeOptions(false)) { - return TAssign(name, EOperation::MatchSubstring, std::move(arguments), opts); - } - break; - } - case TId::FUNC_STR_MATCH_LIKE: { - if (auto opts = mkLikeOptions(false)) { - return TAssign(name, EOperation::MatchLike, std::move(arguments), opts); - } - break; - } - case TId::FUNC_STR_STARTS_WITH: { - if (auto opts = mkLikeOptions(false)) { - return TAssign(name, EOperation::StartsWith, std::move(arguments), opts); - } - break; - } - case TId::FUNC_STR_ENDS_WITH: { - if (auto opts = mkLikeOptions(false)) { - return TAssign(name, EOperation::EndsWith, std::move(arguments), opts); - } - break; - } - case TId::FUNC_STR_MATCH_IGNORE_CASE: { - if (auto opts = mkLikeOptions(true)) { - return TAssign(name, EOperation::MatchSubstring, std::move(arguments), opts); - } - break; - } - case TId::FUNC_STR_STARTS_WITH_IGNORE_CASE: { - if (auto opts = mkLikeOptions(true)) { - return TAssign(name, EOperation::StartsWith, std::move(arguments), opts); - } - break; - } - case TId::FUNC_STR_ENDS_WITH_IGNORE_CASE: { - if (auto opts = mkLikeOptions(true)) { - return TAssign(name, EOperation::EndsWith, std::move(arguments), opts); - } - break; - } - case TId::FUNC_BINARY_NOT: - return TAssign(name, EOperation::Invert, std::move(arguments)); - case TId::FUNC_BINARY_AND: - return TAssign(name, EOperation::And, std::move(arguments)); - case TId::FUNC_BINARY_OR: - return TAssign(name, EOperation::Or, std::move(arguments)); - case TId::FUNC_BINARY_XOR: - return TAssign(name, EOperation::Xor, std::move(arguments)); - case TId::FUNC_MATH_ADD: - return TAssign(name, EOperation::Add, std::move(arguments)); - case TId::FUNC_MATH_SUBTRACT: - return TAssign(name, EOperation::Subtract, std::move(arguments)); - case TId::FUNC_MATH_MULTIPLY: - return TAssign(name, EOperation::Multiply, std::move(arguments)); - case TId::FUNC_MATH_DIVIDE: - return TAssign(name, EOperation::Divide, std::move(arguments)); - case TId::FUNC_CAST_TO_INT8: - return TAssign(name, EOperation::CastInt8, std::move(arguments), - mkCastOptions(std::make_shared<arrow::Int8Type>())); - case TId::FUNC_CAST_TO_INT16: - return TAssign(name, EOperation::CastInt16, std::move(arguments), - mkCastOptions(std::make_shared<arrow::Int16Type>())); - case TId::FUNC_CAST_TO_INT32: - return TAssign(name, EOperation::CastInt32, std::move(arguments), - mkCastOptions(std::make_shared<arrow::Int32Type>())); - case TId::FUNC_CAST_TO_INT64: - return TAssign(name, EOperation::CastInt64, std::move(arguments), - mkCastOptions(std::make_shared<arrow::Int64Type>())); - case TId::FUNC_CAST_TO_UINT8: - return TAssign(name, EOperation::CastUInt8, std::move(arguments), - mkCastOptions(std::make_shared<arrow::UInt8Type>())); - case TId::FUNC_CAST_TO_UINT16: - return TAssign(name, EOperation::CastUInt16, std::move(arguments), - mkCastOptions(std::make_shared<arrow::UInt16Type>())); - case TId::FUNC_CAST_TO_UINT32: - return TAssign(name, EOperation::CastUInt32, std::move(arguments), - mkCastOptions(std::make_shared<arrow::UInt32Type>())); - case TId::FUNC_CAST_TO_UINT64: - return TAssign(name, EOperation::CastUInt64, std::move(arguments), - mkCastOptions(std::make_shared<arrow::UInt64Type>())); - case TId::FUNC_CAST_TO_FLOAT: - return TAssign(name, EOperation::CastFloat, std::move(arguments), - mkCastOptions(std::make_shared<arrow::FloatType>())); - case TId::FUNC_CAST_TO_DOUBLE: - return TAssign(name, EOperation::CastDouble, std::move(arguments), - mkCastOptions(std::make_shared<arrow::DoubleType>())); - case TId::FUNC_CAST_TO_TIMESTAMP: - return TAssign(name, EOperation::CastTimestamp, std::move(arguments), - mkCastOptions(std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO))); - case TId::FUNC_CAST_TO_BINARY: - case TId::FUNC_CAST_TO_FIXED_SIZE_BINARY: - case TId::FUNC_UNSPECIFIED: - break; - } - return TAssign(name, EOperation::Unspecified, std::move(arguments)); -} - -NSsa::TAssign MakeConstant(const std::string& name, const NKikimrSSA::TProgram::TConstant& constant) { - using TId = NKikimrSSA::TProgram::TConstant; - - switch (constant.GetValueCase()) { - case TId::kBool: - return TAssign(name, constant.GetBool()); - case TId::kInt32: - return TAssign(name, constant.GetInt32()); - case TId::kUint32: - return TAssign(name, constant.GetUint32()); - case TId::kInt64: - return TAssign(name, constant.GetInt64()); - case TId::kUint64: - return TAssign(name, constant.GetUint64()); - case TId::kFloat: - return TAssign(name, constant.GetFloat()); - case TId::kDouble: - return TAssign(name, constant.GetDouble()); - case TId::kBytes: - { - TString str = constant.GetBytes(); - return TAssign(name, std::string(str.data(), str.size())); - } - case TId::kText: - { - TString str = constant.GetText(); - return TAssign(name, std::string(str.data(), str.size())); - } - case TId::VALUE_NOT_SET: - break; - } - return TAssign(name, EOperation::Unspecified, {}); -} - -NSsa::TAggregateAssign MakeAggregate(const TContext& info, const std::string& name, - const NKikimrSSA::TProgram::TAggregateAssignment::TAggregateFunction& func) -{ - using TId = NKikimrSSA::TProgram::TAggregateAssignment; - - if (func.ArgumentsSize() == 1) { - std::string argument = info.GetName(func.GetArguments()[0]); - - switch (func.GetId()) { - case TId::AGG_SOME: - return TAggregateAssign(name, EAggregate::Some, std::move(argument)); - case TId::AGG_COUNT: - return TAggregateAssign(name, EAggregate::Count, std::move(argument)); - case TId::AGG_MIN: - return TAggregateAssign(name, EAggregate::Min, std::move(argument)); - case TId::AGG_MAX: - return TAggregateAssign(name, EAggregate::Max, std::move(argument)); - case TId::AGG_SUM: - return TAggregateAssign(name, EAggregate::Sum, std::move(argument)); -#if 0 // TODO - case TId::AGG_AVG: - return TAggregateAssign(name, EAggregate::Avg, std::move(argument)); -#endif - case TId::AGG_UNSPECIFIED: - break; - } - } else if (func.ArgumentsSize() == 0 && func.GetId() == TId::AGG_COUNT) { - // COUNT(*) case - return TAggregateAssign(name, EAggregate::Count); - } - return TAggregateAssign(name); // !ok() -} - -NSsa::TAssign MaterializeParameter(const std::string& name, const NKikimrSSA::TProgram::TParameter& parameter, - const std::shared_ptr<arrow::RecordBatch>& parameterValues) -{ - auto parameterName = parameter.GetName(); - auto column = parameterValues->GetColumnByName(parameterName); -#if 0 - Y_VERIFY( - column, - "No parameter %s in serialized parameters.", parameterName.c_str() - ); - Y_VERIFY( - column->length() == 1, - "Incorrect values count in parameter array" - ); -#else - if (!column || column->length() != 1) { - return TAssign(name, NArrow::EOperation::Unspecified, {}); - } -#endif - return TAssign(name, *column->GetScalar(0)); -} - -bool ExtractAssign(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TAssignment& assign, - const std::shared_ptr<arrow::RecordBatch>& parameterValues) -{ - using TId = NKikimrSSA::TProgram::TAssignment; - - std::string columnName = info.GetName(assign.GetColumn()); - - switch (assign.GetExpressionCase()) { - case TId::kFunction: - { - auto func = MakeFunction(info, columnName, assign.GetFunction()); - if (!func.IsOk()) { - return false; - } - step.Assignes.emplace_back(std::move(func)); - break; - } - case TId::kConstant: - { - auto cnst = MakeConstant(columnName, assign.GetConstant()); - if (!cnst.IsConstant()) { - return false; - } - info.Constants[columnName] = cnst.GetConstant(); - step.Assignes.emplace_back(std::move(cnst)); - break; - } - case TId::kParameter: - { - auto param = MaterializeParameter(columnName, assign.GetParameter(), parameterValues); - if (!param.IsConstant()) { - return false; - } - step.Assignes.emplace_back(std::move(param)); - break; - } - case TId::kExternalFunction: - case TId::kNull: - case TId::EXPRESSION_NOT_SET: - return false; - } - return true; -} - -bool ExtractFilter(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TFilter& filter) { - auto& column = filter.GetPredicate(); - if (!column.HasId() && !column.HasName()) { - return false; - } - // NOTE: Name maskes Id for column. If column assigned with name it's accessible only by name. - step.Filters.push_back(info.GetName(column)); - return true; -} - -bool ExtractProjection(const TContext& info, NSsa::TProgramStep& step, - const NKikimrSSA::TProgram::TProjection& projection) { - step.Projection.reserve(projection.ColumnsSize()); - for (auto& col : projection.GetColumns()) { - // NOTE: Name maskes Id for column. If column assigned with name it's accessible only by name. - step.Projection.push_back(info.GetName(col)); - } - return true; -} - -bool ExtractGroupBy(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TGroupBy& groupBy) { - if (!groupBy.AggregatesSize()) { - return false; - } - - step.GroupBy.reserve(groupBy.AggregatesSize()); - step.GroupByKeys.reserve(groupBy.KeyColumnsSize()); - for (auto& agg : groupBy.GetAggregates()) { - auto& resColumn = agg.GetColumn(); - TString columnName = info.GenerateName(resColumn); - - auto func = MakeAggregate(info, columnName, agg.GetFunction()); - if (!func.IsOk()) { - return false; - } - step.GroupBy.push_back(std::move(func)); - } - for (auto& key : groupBy.GetKeyColumns()) { - step.GroupByKeys.push_back(info.GetName(key)); - } - - return true; -} - -} - -using EOperation = NArrow::EOperation; -using TPredicate = NOlap::TPredicate; - -std::shared_ptr<NSsa::TProgram> TReadDescription::AddProgram(const IColumnResolver& columnResolver, - const NKikimrSSA::TProgram& program) -{ - using TId = NKikimrSSA::TProgram::TCommand; - - auto ssaProgram = std::make_shared<NSsa::TProgram>(); - TContext info(columnResolver); - auto step = std::make_shared<NSsa::TProgramStep>(); - for (auto& cmd : program.GetCommand()) { - switch (cmd.GetLineCase()) { - case TId::kAssign: - if (!ExtractAssign(info, *step, cmd.GetAssign(), ProgramParameters)) { - return nullptr; - } - break; - case TId::kFilter: - if (!ExtractFilter(info, *step, cmd.GetFilter())) { - return nullptr; - } - break; - case TId::kProjection: - if (!ExtractProjection(info, *step, cmd.GetProjection())) { - return nullptr; - } - ssaProgram->Steps.push_back(step); - step = std::make_shared<NSsa::TProgramStep>(); - break; - case TId::kGroupBy: - if (!ExtractGroupBy(info, *step, cmd.GetGroupBy())) { - return nullptr; - } - ssaProgram->Steps.push_back(step); - step = std::make_shared<NSsa::TProgramStep>(); - break; - case TId::LINE_NOT_SET: - return nullptr; - } - } - - // final step without final projection - if (!step->Empty()) { - ssaProgram->Steps.push_back(step); - } - - ssaProgram->SourceColumns = std::move(info.Sources); - - // Query 'SELECT count(*) FROM table' needs a column - if (ssaProgram->SourceColumns.empty()) { - auto& ydbSchema = columnResolver.GetSchema(); - - Y_VERIFY(!ydbSchema.KeyColumns.empty()); - ui32 key = ydbSchema.KeyColumns[0]; - - auto it = ydbSchema.Columns.find(key); - Y_VERIFY(it != ydbSchema.Columns.end()); - - ssaProgram->SourceColumns[key] = it->second.Name; - } - - return ssaProgram; -} - } diff --git a/ydb/core/tx/columnshard/engines/reader/description.h b/ydb/core/tx/columnshard/engines/reader/description.h index b2928673e2a..b581055388f 100644 --- a/ydb/core/tx/columnshard/engines/reader/description.h +++ b/ydb/core/tx/columnshard/engines/reader/description.h @@ -1,22 +1,14 @@ #pragma once +#include "program.h" #include <ydb/core/tx/columnshard/engines/predicate/filter.h> -#include <ydb/core/tablet_flat/flat_dbase_scheme.h> -#include <ydb/core/formats/arrow/program.h> -#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> namespace NKikimr::NOlap { -class IColumnResolver { -public: - virtual ~IColumnResolver() = default; - virtual TString GetColumnName(ui32 id, bool required = true) const = 0; - virtual const NTable::TScheme::TTableSchema& GetSchema() const = 0; -}; - // Describes read/scan request struct TReadDescription { private: TSnapshot Snapshot; + TProgramContainer Program; public: // Table ui64 PathId = 0; @@ -27,25 +19,26 @@ public: // operations with potentially different columns. We have to remove columns to support -Inf (Null) and +Inf. NOlap::TPKRangesFilter PKRangesFilter; - // SSA Program - std::shared_ptr<NSsa::TProgram> Program; - std::shared_ptr<arrow::RecordBatch> ProgramParameters; // TODO - // List of columns std::vector<ui32> ColumnIds; std::vector<TString> ColumnNames; - - std::shared_ptr<NSsa::TProgram> AddProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program); TReadDescription(const TSnapshot& snapshot, const bool isReverse) : Snapshot(snapshot) , PKRangesFilter(isReverse) { + } + void SetProgram(TProgramContainer&& value) { + Program = std::move(value); } const TSnapshot& GetSnapshot() const { return Snapshot; } + + const TProgramContainer& GetProgram() const { + return Program; + } }; } diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp index 24a29917d86..7389d88286c 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp @@ -20,9 +20,9 @@ bool TAssembleFilter::DoExecuteImpl() { FilteredBatch = nullptr; return true; } - if (ReadMetadata->Program) { + auto earlyFilter = ReadMetadata->GetProgram().BuildEarlyFilter(batch); + if (earlyFilter) { if (AllowEarlyFilter) { - auto earlyFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, ReadMetadata->Program)); Filter = std::make_shared<NArrow::TColumnFilter>(Filter->CombineSequentialAnd(*earlyFilter)); if (!earlyFilter->Apply(batch)) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount); @@ -30,7 +30,7 @@ bool TAssembleFilter::DoExecuteImpl() { return true; } } else if (BatchesOrderPolicy->NeedNotAppliedEarlyFilter()) { - EarlyFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, ReadMetadata->Program)); + EarlyFilter = earlyFilter; } } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp index ff24281f35c..0afae0b72b9 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.cpp +++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp @@ -111,9 +111,7 @@ void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) { if (NotIndexedBatch) { RecordBatches.emplace_back(NotIndexedBatch); } - if (Owner->GetReadMetadata()->Program) { - NotIndexedBatchFutureFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, Owner->GetReadMetadata()->Program)); - } + NotIndexedBatchFutureFilter = Owner->GetReadMetadata()->GetProgram().BuildEarlyFilter(batch); DuplicationsAvailableFlag = true; } CheckReady(); diff --git a/ydb/core/tx/columnshard/engines/reader/program.cpp b/ydb/core/tx/columnshard/engines/reader/program.cpp new file mode 100644 index 00000000000..1d693164cf0 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/program.cpp @@ -0,0 +1,519 @@ +#include "program.h" + +#include <ydb/core/formats/arrow/ssa_program_optimizer.h> +#include <ydb/core/tx/columnshard/engines/filter.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/cast.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api_scalar.h> +#include <google/protobuf/text_format.h> + +namespace NKikimr::NOlap { + +namespace { + +using EOperation = NArrow::EOperation; +using EAggregate = NArrow::EAggregate; +using TAssign = NSsa::TAssign; +using TAggregateAssign = NSsa::TAggregateAssign; + +struct TContext { + const IColumnResolver& ColumnResolver; + mutable THashMap<ui32, TString> Sources; + mutable THashMap<TString, std::shared_ptr<arrow::Scalar>> Constants; + + explicit TContext(const IColumnResolver& columnResolver) + : ColumnResolver(columnResolver) + {} + + std::string GetName(const NKikimrSSA::TProgram::TColumn& column) const { + ui32 columnId = column.GetId(); + TString name = ColumnResolver.GetColumnName(columnId, false); + if (name.Empty()) { + return GenerateName(column); + } else { + Sources[columnId] = name; + } + return std::string(name.data(), name.size()); + } + + std::string GenerateName(const NKikimrSSA::TProgram::TColumn& column) const { + TString name; + if (column.HasName()) { + name = column.GetName(); + } else { + name = ToString(column.GetId()); + } + return std::string(name.data(), name.size()); + } +}; + +TAssign MakeFunction(const TContext& info, const std::string& name, + const NKikimrSSA::TProgram::TAssignment::TFunction& func) { + using TId = NKikimrSSA::TProgram::TAssignment; + + std::vector<std::string> arguments; + for (auto& col : func.GetArguments()) { + arguments.push_back(info.GetName(col)); + } + + auto mkCastOptions = [](std::shared_ptr<arrow::DataType> dataType) { + // TODO: support CAST with OrDefault/OrNull logic (second argument is default value) + auto castOpts = std::make_shared<arrow::compute::CastOptions>(false); + castOpts->to_type = dataType; + return castOpts; + }; + + auto mkLikeOptions = [&](bool ignoreCase) { + if (arguments.size() != 2 || !info.Constants.contains(arguments[1])) { + return std::shared_ptr<arrow::compute::MatchSubstringOptions>(); + } + auto patternScalar = info.Constants[arguments[1]]; + if (!arrow::is_base_binary_like(patternScalar->type->id())) { + return std::shared_ptr<arrow::compute::MatchSubstringOptions>(); + } + arguments.resize(1); + auto& pattern = static_cast<arrow::BaseBinaryScalar&>(*patternScalar).value; + return std::make_shared<arrow::compute::MatchSubstringOptions>(pattern->ToString(), ignoreCase); + }; + + switch (func.GetId()) { + case TId::FUNC_CMP_EQUAL: + return TAssign(name, EOperation::Equal, std::move(arguments)); + case TId::FUNC_CMP_NOT_EQUAL: + return TAssign(name, EOperation::NotEqual, std::move(arguments)); + case TId::FUNC_CMP_LESS: + return TAssign(name, EOperation::Less, std::move(arguments)); + case TId::FUNC_CMP_LESS_EQUAL: + return TAssign(name, EOperation::LessEqual, std::move(arguments)); + case TId::FUNC_CMP_GREATER: + return TAssign(name, EOperation::Greater, std::move(arguments)); + case TId::FUNC_CMP_GREATER_EQUAL: + return TAssign(name, EOperation::GreaterEqual, std::move(arguments)); + case TId::FUNC_IS_NULL: + return TAssign(name, EOperation::IsNull, std::move(arguments)); + case TId::FUNC_STR_LENGTH: + return TAssign(name, EOperation::BinaryLength, std::move(arguments)); + case TId::FUNC_STR_MATCH: { + if (auto opts = mkLikeOptions(false)) { + return TAssign(name, EOperation::MatchSubstring, std::move(arguments), opts); + } + break; + } + case TId::FUNC_STR_MATCH_LIKE: { + if (auto opts = mkLikeOptions(false)) { + return TAssign(name, EOperation::MatchLike, std::move(arguments), opts); + } + break; + } + case TId::FUNC_STR_STARTS_WITH: { + if (auto opts = mkLikeOptions(false)) { + return TAssign(name, EOperation::StartsWith, std::move(arguments), opts); + } + break; + } + case TId::FUNC_STR_ENDS_WITH: { + if (auto opts = mkLikeOptions(false)) { + return TAssign(name, EOperation::EndsWith, std::move(arguments), opts); + } + break; + } + case TId::FUNC_STR_MATCH_IGNORE_CASE: { + if (auto opts = mkLikeOptions(true)) { + return TAssign(name, EOperation::MatchSubstring, std::move(arguments), opts); + } + break; + } + case TId::FUNC_STR_STARTS_WITH_IGNORE_CASE: { + if (auto opts = mkLikeOptions(true)) { + return TAssign(name, EOperation::StartsWith, std::move(arguments), opts); + } + break; + } + case TId::FUNC_STR_ENDS_WITH_IGNORE_CASE: { + if (auto opts = mkLikeOptions(true)) { + return TAssign(name, EOperation::EndsWith, std::move(arguments), opts); + } + break; + } + case TId::FUNC_BINARY_NOT: + return TAssign(name, EOperation::Invert, std::move(arguments)); + case TId::FUNC_BINARY_AND: + return TAssign(name, EOperation::And, std::move(arguments)); + case TId::FUNC_BINARY_OR: + return TAssign(name, EOperation::Or, std::move(arguments)); + case TId::FUNC_BINARY_XOR: + return TAssign(name, EOperation::Xor, std::move(arguments)); + case TId::FUNC_MATH_ADD: + return TAssign(name, EOperation::Add, std::move(arguments)); + case TId::FUNC_MATH_SUBTRACT: + return TAssign(name, EOperation::Subtract, std::move(arguments)); + case TId::FUNC_MATH_MULTIPLY: + return TAssign(name, EOperation::Multiply, std::move(arguments)); + case TId::FUNC_MATH_DIVIDE: + return TAssign(name, EOperation::Divide, std::move(arguments)); + case TId::FUNC_CAST_TO_INT8: + return TAssign(name, EOperation::CastInt8, std::move(arguments), + mkCastOptions(std::make_shared<arrow::Int8Type>())); + case TId::FUNC_CAST_TO_INT16: + return TAssign(name, EOperation::CastInt16, std::move(arguments), + mkCastOptions(std::make_shared<arrow::Int16Type>())); + case TId::FUNC_CAST_TO_INT32: + return TAssign(name, EOperation::CastInt32, std::move(arguments), + mkCastOptions(std::make_shared<arrow::Int32Type>())); + case TId::FUNC_CAST_TO_INT64: + return TAssign(name, EOperation::CastInt64, std::move(arguments), + mkCastOptions(std::make_shared<arrow::Int64Type>())); + case TId::FUNC_CAST_TO_UINT8: + return TAssign(name, EOperation::CastUInt8, std::move(arguments), + mkCastOptions(std::make_shared<arrow::UInt8Type>())); + case TId::FUNC_CAST_TO_UINT16: + return TAssign(name, EOperation::CastUInt16, std::move(arguments), + mkCastOptions(std::make_shared<arrow::UInt16Type>())); + case TId::FUNC_CAST_TO_UINT32: + return TAssign(name, EOperation::CastUInt32, std::move(arguments), + mkCastOptions(std::make_shared<arrow::UInt32Type>())); + case TId::FUNC_CAST_TO_UINT64: + return TAssign(name, EOperation::CastUInt64, std::move(arguments), + mkCastOptions(std::make_shared<arrow::UInt64Type>())); + case TId::FUNC_CAST_TO_FLOAT: + return TAssign(name, EOperation::CastFloat, std::move(arguments), + mkCastOptions(std::make_shared<arrow::FloatType>())); + case TId::FUNC_CAST_TO_DOUBLE: + return TAssign(name, EOperation::CastDouble, std::move(arguments), + mkCastOptions(std::make_shared<arrow::DoubleType>())); + case TId::FUNC_CAST_TO_TIMESTAMP: + return TAssign(name, EOperation::CastTimestamp, std::move(arguments), + mkCastOptions(std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO))); + case TId::FUNC_CAST_TO_BINARY: + case TId::FUNC_CAST_TO_FIXED_SIZE_BINARY: + case TId::FUNC_UNSPECIFIED: + break; + } + return TAssign(name, EOperation::Unspecified, std::move(arguments)); +} + +NSsa::TAssign MakeConstant(const std::string& name, const NKikimrSSA::TProgram::TConstant& constant) { + using TId = NKikimrSSA::TProgram::TConstant; + + switch (constant.GetValueCase()) { + case TId::kBool: + return TAssign(name, constant.GetBool()); + case TId::kInt32: + return TAssign(name, constant.GetInt32()); + case TId::kUint32: + return TAssign(name, constant.GetUint32()); + case TId::kInt64: + return TAssign(name, constant.GetInt64()); + case TId::kUint64: + return TAssign(name, constant.GetUint64()); + case TId::kFloat: + return TAssign(name, constant.GetFloat()); + case TId::kDouble: + return TAssign(name, constant.GetDouble()); + case TId::kBytes: + { + TString str = constant.GetBytes(); + return TAssign(name, std::string(str.data(), str.size())); + } + case TId::kText: + { + TString str = constant.GetText(); + return TAssign(name, std::string(str.data(), str.size())); + } + case TId::VALUE_NOT_SET: + break; + } + return TAssign(name, EOperation::Unspecified, {}); +} + +NSsa::TAggregateAssign MakeAggregate(const TContext& info, const std::string& name, + const NKikimrSSA::TProgram::TAggregateAssignment::TAggregateFunction& func) +{ + using TId = NKikimrSSA::TProgram::TAggregateAssignment; + + if (func.ArgumentsSize() == 1) { + std::string argument = info.GetName(func.GetArguments()[0]); + + switch (func.GetId()) { + case TId::AGG_SOME: + return TAggregateAssign(name, EAggregate::Some, std::move(argument)); + case TId::AGG_COUNT: + return TAggregateAssign(name, EAggregate::Count, std::move(argument)); + case TId::AGG_MIN: + return TAggregateAssign(name, EAggregate::Min, std::move(argument)); + case TId::AGG_MAX: + return TAggregateAssign(name, EAggregate::Max, std::move(argument)); + case TId::AGG_SUM: + return TAggregateAssign(name, EAggregate::Sum, std::move(argument)); +#if 0 // TODO + case TId::AGG_AVG: + return TAggregateAssign(name, EAggregate::Avg, std::move(argument)); +#endif + case TId::AGG_UNSPECIFIED: + break; + } + } else if (func.ArgumentsSize() == 0 && func.GetId() == TId::AGG_COUNT) { + // COUNT(*) case + return TAggregateAssign(name, EAggregate::Count); + } + return TAggregateAssign(name); // !ok() +} + +NSsa::TAssign MaterializeParameter(const std::string& name, const NKikimrSSA::TProgram::TParameter& parameter, + const std::shared_ptr<arrow::RecordBatch>& parameterValues) +{ + auto parameterName = parameter.GetName(); + auto column = parameterValues->GetColumnByName(parameterName); +#if 0 + Y_VERIFY( + column, + "No parameter %s in serialized parameters.", parameterName.c_str() + ); + Y_VERIFY( + column->length() == 1, + "Incorrect values count in parameter array" + ); +#else + if (!column || column->length() != 1) { + return TAssign(name, NArrow::EOperation::Unspecified, {}); + } +#endif + return TAssign(name, *column->GetScalar(0)); +} + +bool ExtractAssign(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TAssignment& assign, + const std::shared_ptr<arrow::RecordBatch>& parameterValues) +{ + using TId = NKikimrSSA::TProgram::TAssignment; + + std::string columnName = info.GetName(assign.GetColumn()); + + switch (assign.GetExpressionCase()) { + case TId::kFunction: + { + auto func = MakeFunction(info, columnName, assign.GetFunction()); + if (!func.IsOk()) { + return false; + } + step.Assignes.emplace_back(std::move(func)); + break; + } + case TId::kConstant: + { + auto cnst = MakeConstant(columnName, assign.GetConstant()); + if (!cnst.IsConstant()) { + return false; + } + info.Constants[columnName] = cnst.GetConstant(); + step.Assignes.emplace_back(std::move(cnst)); + break; + } + case TId::kParameter: + { + auto param = MaterializeParameter(columnName, assign.GetParameter(), parameterValues); + if (!param.IsConstant()) { + return false; + } + step.Assignes.emplace_back(std::move(param)); + break; + } + case TId::kExternalFunction: + case TId::kNull: + case TId::EXPRESSION_NOT_SET: + return false; + } + return true; +} + +bool ExtractFilter(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TFilter& filter) { + auto& column = filter.GetPredicate(); + if (!column.HasId() && !column.HasName()) { + return false; + } + // NOTE: Name maskes Id for column. If column assigned with name it's accessible only by name. + step.Filters.push_back(info.GetName(column)); + return true; +} + +bool ExtractProjection(const TContext& info, NSsa::TProgramStep& step, + const NKikimrSSA::TProgram::TProjection& projection) { + step.Projection.reserve(projection.ColumnsSize()); + for (auto& col : projection.GetColumns()) { + // NOTE: Name maskes Id for column. If column assigned with name it's accessible only by name. + step.Projection.push_back(info.GetName(col)); + } + return true; +} + +bool ExtractGroupBy(const TContext& info, NSsa::TProgramStep& step, const NKikimrSSA::TProgram::TGroupBy& groupBy) { + if (!groupBy.AggregatesSize()) { + return false; + } + + step.GroupBy.reserve(groupBy.AggregatesSize()); + step.GroupByKeys.reserve(groupBy.KeyColumnsSize()); + for (auto& agg : groupBy.GetAggregates()) { + auto& resColumn = agg.GetColumn(); + TString columnName = info.GenerateName(resColumn); + + auto func = MakeAggregate(info, columnName, agg.GetFunction()); + if (!func.IsOk()) { + return false; + } + step.GroupBy.push_back(std::move(func)); + } + for (auto& key : groupBy.GetKeyColumns()) { + step.GroupByKeys.push_back(info.GetName(key)); + } + + return true; +} + +} + +const THashMap<ui32, TString>& TProgramContainer::GetSourceColumns() const { + if (!Program) { + return Default<THashMap<ui32, TString>>(); + } + return Program->SourceColumns; +} + +bool TProgramContainer::HasProgram() const { + return !!Program; +} + +std::shared_ptr<NArrow::TColumnFilter> TProgramContainer::BuildEarlyFilter(std::shared_ptr<arrow::RecordBatch> batch) const { + if (Program) { + return std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, Program)); + } + return nullptr; +} + +std::set<std::string> TProgramContainer::GetEarlyFilterColumns() const { + if (Program) { + return Program->GetEarlyFilterColumns(); + } + return Default<std::set<std::string>>(); +} + +bool TProgramContainer::HasEarlyFilterOnly() const { + if (!Program) { + return true; + } + for (ui32 i = 1; i < Program->Steps.size(); ++i) { + if (Program->Steps[i]->Filters.size()) { + return false; + } + } + return true; +} + +bool TProgramContainer::Init(const IColumnResolver& columnResolver, NKikimrSchemeOp::EOlapProgramType programType, TString serializedProgram, TString& error) { + Y_VERIFY(serializedProgram); + + NKikimrSSA::TProgram programProto; + NKikimrSSA::TOlapProgram olapProgramProto; + + switch (programType) { + case NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS: + if (!olapProgramProto.ParseFromString(serializedProgram)) { + error = TStringBuilder() << "Can't parse TOlapProgram"; + return false; + } + + if (!programProto.ParseFromString(olapProgramProto.GetProgram())) { + error = TStringBuilder() << "Can't parse TProgram"; + return false; + } + + break; + default: + error = TStringBuilder() << "Unsupported olap program version: " << (ui32)programType; + return false; + } + + if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) { + TString out; + ::google::protobuf::TextFormat::PrintToString(programProto, &out); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("program", out); + } + + if (olapProgramProto.HasParameters()) { + Y_VERIFY(olapProgramProto.HasParametersSchema(), "Parameters are present, but there is no schema."); + + auto schema = NArrow::DeserializeSchema(olapProgramProto.GetParametersSchema()); + ProgramParameters = NArrow::DeserializeBatch(olapProgramProto.GetParameters(), schema); + } + + NOlap::TProgramContainer ssaProgram; + if (!ParseProgram(columnResolver, programProto)) { + error = TStringBuilder() << "Wrong olap program"; + return false; + } + return true; +} + +bool TProgramContainer::ParseProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program) { + using TId = NKikimrSSA::TProgram::TCommand; + + auto ssaProgram = std::make_shared<NSsa::TProgram>(); + TContext info(columnResolver); + auto step = std::make_shared<NSsa::TProgramStep>(); + for (auto& cmd : program.GetCommand()) { + switch (cmd.GetLineCase()) { + case TId::kAssign: + if (!ExtractAssign(info, *step, cmd.GetAssign(), ProgramParameters)) { + return false; + } + break; + case TId::kFilter: + if (!ExtractFilter(info, *step, cmd.GetFilter())) { + return false; + } + break; + case TId::kProjection: + if (!ExtractProjection(info, *step, cmd.GetProjection())) { + return false; + } + ssaProgram->Steps.push_back(step); + step = std::make_shared<NSsa::TProgramStep>(); + break; + case TId::kGroupBy: + if (!ExtractGroupBy(info, *step, cmd.GetGroupBy())) { + return false; + } + ssaProgram->Steps.push_back(step); + step = std::make_shared<NSsa::TProgramStep>(); + break; + case TId::LINE_NOT_SET: + return false; + } + } + + // final step without final projection + if (!step->Empty()) { + ssaProgram->Steps.push_back(step); + } + + ssaProgram->SourceColumns = std::move(info.Sources); + + // Query 'SELECT count(*) FROM table' needs a column + if (ssaProgram->SourceColumns.empty()) { + auto& ydbSchema = columnResolver.GetSchema(); + + Y_VERIFY(!ydbSchema.KeyColumns.empty()); + ui32 key = ydbSchema.KeyColumns[0]; + + auto it = ydbSchema.Columns.find(key); + Y_VERIFY(it != ydbSchema.Columns.end()); + + ssaProgram->SourceColumns[key] = it->second.Name; + } + + if (!ssaProgram->Steps.empty()) { + NSsa::OptimizeProgram(*ssaProgram); + } + Program = ssaProgram; + return true; + } + +} diff --git a/ydb/core/tx/columnshard/engines/reader/program.h b/ydb/core/tx/columnshard/engines/reader/program.h new file mode 100644 index 00000000000..494018def88 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/program.h @@ -0,0 +1,44 @@ +#pragma once + +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/protos/ssa.pb.h> +#include <ydb/core/formats/arrow/program.h> +#include <ydb/core/formats/arrow/custom_registry.h> +#include <ydb/core/tablet_flat/flat_dbase_scheme.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> + + +namespace NKikimr::NOlap { +class IColumnResolver { +public: + virtual ~IColumnResolver() = default; + virtual TString GetColumnName(ui32 id, bool required = true) const = 0; + virtual const NTable::TScheme::TTableSchema& GetSchema() const = 0; +}; + +class TProgramContainer { +private: + std::shared_ptr<NSsa::TProgram> Program; + std::shared_ptr<arrow::RecordBatch> ProgramParameters; // TODO +public: + bool Init(const IColumnResolver& columnResolver, NKikimrSchemeOp::EOlapProgramType programType, TString serializedProgram, TString& error); + + inline arrow::Status ApplyProgram(std::shared_ptr<arrow::RecordBatch>& batch) const { + if (Program) { + return Program->ApplyTo(batch, NArrow::GetCustomExecContext()); + } + return arrow::Status::OK(); + } + + const THashMap<ui32, TString>& GetSourceColumns() const; + bool HasProgram() const; + + std::shared_ptr<NArrow::TColumnFilter> BuildEarlyFilter(std::shared_ptr<arrow::RecordBatch> batch) const; + std::set<std::string> GetEarlyFilterColumns() const; + + bool HasEarlyFilterOnly() const; +private: + bool ParseProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program); +}; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index 626eb64a46d..8ecf5fb4907 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -100,11 +100,8 @@ bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataSto columnIds.insert(columnId); } - Program = readDescription.Program; - if (Program) { - for (auto& [id, name] : Program->SourceColumns) { - columnIds.insert(id); - } + for (auto& [id, name] : GetProgram().GetSourceColumns()) { + columnIds.insert(id); } SelectInfo = dataAccessor.Select(readDescription, columnIds); @@ -126,13 +123,11 @@ std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); } } - if (Program) { - for (auto&& i : Program->GetEarlyFilterColumns()) { - auto id = indexInfo.GetColumnIdOptional(i); - if (id) { - result.emplace(*id); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); - } + for (auto&& i : GetProgram().GetEarlyFilterColumns()) { + auto id = indexInfo.GetColumnIdOptional(i); + if (id) { + result.emplace(*id); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); } } if (Snapshot.GetPlanStep()) { @@ -217,12 +212,9 @@ NIndexedReader::IOrderPolicy::TPtr TReadMetadata::BuildSortingPolicy() const { } ++idx; } - if (Program) { - for (ui32 i = 1; i < Program->Steps.size(); ++i) { - if (Program->Steps[i]->Filters.size()) { - return std::make_shared<NIndexedReader::TAnySorting>(this->shared_from_this()); - } - } + + if (!GetProgram().HasEarlyFilterOnly()) { + return std::make_shared<NIndexedReader::TAnySorting>(this->shared_from_this()); } return std::make_shared<NIndexedReader::TPKSortingWithLimit>(this->shared_from_this()); diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h index 52dac3522d9..77a31505227 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h @@ -77,6 +77,7 @@ public: private: const ESorting Sorting = ESorting::ASC; // Sorting inside returned batches std::optional<TPKRangesFilter> PKRangesFilter; + TProgramContainer Program; public: using TConstPtr = std::shared_ptr<const TReadMetadataBase>; @@ -91,16 +92,15 @@ public: return *PKRangesFilter; } - TReadMetadataBase(const ESorting sorting) + TReadMetadataBase(const ESorting sorting, const TProgramContainer& ssaProgram) : Sorting(sorting) + , Program(ssaProgram) { - } virtual ~TReadMetadataBase() = default; std::shared_ptr<NOlap::TPredicate> LessPredicate; std::shared_ptr<NOlap::TPredicate> GreaterPredicate; - std::shared_ptr<NSsa::TProgram> Program; std::shared_ptr<const THashSet<TUnifiedBlobId>> ExternBlobs; ui64 Limit{0}; // TODO @@ -122,6 +122,10 @@ public: meta.Dump(out); return out; } + + const TProgramContainer& GetProgram() const { + return Program; + } }; // Holds all metadata that is needed to perform read/scan @@ -147,8 +151,8 @@ public: std::shared_ptr<NIndexedReader::IOrderPolicy> BuildSortingPolicy() const; - TReadMetadata(const TVersionedIndex& info, const TSnapshot& snapshot, const ESorting sorting) - : TBase(sorting) + TReadMetadata(const TVersionedIndex& info, const TSnapshot& snapshot, const ESorting sorting, const TProgramContainer& ssaProgram) + : TBase(sorting, ssaProgram) , IndexVersions(info) , Snapshot(snapshot) , ResultIndexSchema(info.GetSchema(Snapshot)) @@ -251,7 +255,7 @@ public: << " index records: " << NumIndexedRecords() << " index blobs: " << NumIndexedBlobs() << " committed blobs: " << CommittedBlobs.size() - << " with program steps: " << (Program ? Program->Steps.size() : 0) + // << " with program steps: " << (Program ? Program->Steps.size() : 0) << " at snapshot: " << Snapshot.GetPlanStep() << ":" << Snapshot.GetTxId(); TBase::Dump(out); if (SelectInfo) { @@ -276,8 +280,8 @@ public: std::vector<ui32> ResultColumnIds; THashMap<ui64, std::shared_ptr<NOlap::TColumnEngineStats>> IndexStats; - explicit TReadStatsMetadata(ui64 tabletId, const ESorting sorting) - : TBase(sorting) + explicit TReadStatsMetadata(ui64 tabletId, const ESorting sorting, const TProgramContainer& ssaProgram) + : TBase(sorting, ssaProgram) , TabletId(tabletId) {} |