diff options
author | chertus <[email protected]> | 2022-08-30 13:31:50 +0300 |
---|---|---|
committer | chertus <[email protected]> | 2022-08-30 13:31:50 +0300 |
commit | 76dc9dab1ef4e4b4fcae17e5ae188e1c3c9774ce (patch) | |
tree | 6682510118ec90e7783ba567cdc639c40e79373f | |
parent | e7d0ad21f2f82162d09849c0c4c02522ebc6adf2 (diff) |
use ch aggregate functions
27 files changed, 690 insertions, 276 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index cc237e6dce8..9652a821b21 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -565,6 +565,10 @@ add_subdirectory(ydb/library/yql/providers/common/config) add_subdirectory(ydb/library/yql/providers/common/gateway) add_subdirectory(ydb/library/yql/providers/result/provider) add_subdirectory(ydb/core/formats) +add_subdirectory(ydb/library/arrow_clickhouse) +add_subdirectory(ydb/library/arrow_clickhouse/Common) +add_subdirectory(ydb/library/arrow_clickhouse/Columns) +add_subdirectory(ydb/library/arrow_clickhouse/DataStreams) add_subdirectory(ydb/core/keyvalue/protos) add_subdirectory(ydb/core/tx) add_subdirectory(ydb/core/persqueue/config) @@ -1044,10 +1048,6 @@ add_subdirectory(ydb/core/filestore) add_subdirectory(ydb/core/grpc_caching) add_subdirectory(ydb/core/pgproxy) add_subdirectory(ydb/core/yql_testlib) -add_subdirectory(ydb/library/arrow_clickhouse) -add_subdirectory(ydb/library/arrow_clickhouse/Common) -add_subdirectory(ydb/library/arrow_clickhouse/Columns) -add_subdirectory(ydb/library/arrow_clickhouse/DataStreams) add_subdirectory(ydb/core/actorlib_impl/ut) add_subdirectory(library/cpp/testing/unittest_main) add_subdirectory(library/cpp/terminate_handler) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 961c6d74083..48a37d39e3c 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -569,6 +569,10 @@ add_subdirectory(ydb/library/yql/providers/common/config) add_subdirectory(ydb/library/yql/providers/common/gateway) add_subdirectory(ydb/library/yql/providers/result/provider) add_subdirectory(ydb/core/formats) +add_subdirectory(ydb/library/arrow_clickhouse) +add_subdirectory(ydb/library/arrow_clickhouse/Common) +add_subdirectory(ydb/library/arrow_clickhouse/Columns) +add_subdirectory(ydb/library/arrow_clickhouse/DataStreams) add_subdirectory(ydb/core/keyvalue/protos) add_subdirectory(ydb/core/tx) add_subdirectory(ydb/core/persqueue/config) @@ -1048,10 +1052,6 @@ add_subdirectory(ydb/core/filestore) add_subdirectory(ydb/core/grpc_caching) add_subdirectory(ydb/core/pgproxy) add_subdirectory(ydb/core/yql_testlib) -add_subdirectory(ydb/library/arrow_clickhouse) -add_subdirectory(ydb/library/arrow_clickhouse/Common) -add_subdirectory(ydb/library/arrow_clickhouse/Columns) -add_subdirectory(ydb/library/arrow_clickhouse/DataStreams) add_subdirectory(ydb/core/actorlib_impl/ut) add_subdirectory(library/cpp/testing/unittest_main) add_subdirectory(library/cpp/terminate_handler) diff --git a/ydb/core/formats/CMakeLists.txt b/ydb/core/formats/CMakeLists.txt index 72e015389af..35be1df939b 100644 --- a/ydb/core/formats/CMakeLists.txt +++ b/ydb/core/formats/CMakeLists.txt @@ -11,6 +11,9 @@ add_library(ydb-core-formats) target_compile_options(ydb-core-formats PRIVATE -DUSE_CURRENT_UDF_ABI_VERSION ) +target_include_directories(ydb-core-formats PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/arrow_clickhouse +) target_link_libraries(ydb-core-formats PUBLIC contrib-libs-cxxsupp yutil @@ -18,6 +21,7 @@ target_link_libraries(ydb-core-formats PUBLIC ydb-core-scheme ydb-library-binary_json ydb-library-dynumber + ydb-library-arrow_clickhouse ) target_sources(ydb-core-formats PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow_batch_builder.cpp diff --git a/ydb/core/formats/custom_registry.cpp b/ydb/core/formats/custom_registry.cpp index 347e833f9f6..ef1fb1a1934 100644 --- a/ydb/core/formats/custom_registry.cpp +++ b/ydb/core/formats/custom_registry.cpp @@ -2,10 +2,19 @@ #include "functions.h" #include "func_common.h" +#include "program.h" + #include <util/system/yassert.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/compute/registry_internal.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h> +#ifndef WIN32 +#include <AggregateFunctions/AggregateFunctionCount.h> +#include <AggregateFunctions/AggregateFunctionMinMaxAny.h> +#include <AggregateFunctions/AggregateFunctionSum.h> +#include <AggregateFunctions/AggregateFunctionAvg.h> +#endif + namespace cp = ::arrow::compute; namespace NKikimr::NArrow { @@ -50,6 +59,23 @@ static void RegisterYdbCast(cp::FunctionRegistry* registry) { Y_VERIFY(registry->AddFunction(std::make_shared<YdbCastMetaFunction>()).ok()); } +static void RegisterHouseAggregates(cp::FunctionRegistry* registry) { +#ifndef WIN32 + try { + Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedAny>(GetHouseFunctionName(EAggregate::Any))).ok()); + Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedCount>(GetHouseFunctionName(EAggregate::Count))).ok()); + Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedMin>(GetHouseFunctionName(EAggregate::Min))).ok()); + Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedMax>(GetHouseFunctionName(EAggregate::Max))).ok()); + Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedSum>(GetHouseFunctionName(EAggregate::Sum))).ok()); + Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedAvg>(GetHouseFunctionName(EAggregate::Avg))).ok()); + } catch (const std::exception& /*ex*/) { + Y_VERIFY(false); + } +#else + Y_UNUSED(registry); +#endif +} + static std::unique_ptr<cp::FunctionRegistry> CreateCustomRegistry() { auto registry = cp::FunctionRegistry::Make(); @@ -57,6 +83,7 @@ static std::unique_ptr<cp::FunctionRegistry> CreateCustomRegistry() { RegisterRound(registry.get()); RegisterArithmetic(registry.get()); RegisterYdbCast(registry.get()); + RegisterHouseAggregates(registry.get()); return registry; } diff --git a/ydb/core/formats/program.cpp b/ydb/core/formats/program.cpp index d42ed0cd5f0..48a615c41ce 100644 --- a/ydb/core/formats/program.cpp +++ b/ydb/core/formats/program.cpp @@ -257,24 +257,57 @@ const char * GetFunctionName(EAggregate op) { return ""; } +const char * GetHouseFunctionName(EAggregate op) { + switch (op) { + case EAggregate::Any: + return "ch.any"; + case EAggregate::Count: + return "ch.count"; + case EAggregate::Min: + return "ch.min"; + case EAggregate::Max: + return "ch.max"; + case EAggregate::Sum: + return "ch.sum"; + case EAggregate::Avg: + return "ch.avg"; -void AddColumn(std::shared_ptr<TProgramStep::TDatumBatch>& batch, std::string field_name, const arrow::Datum& column) { - auto field = ::arrow::field(std::move(field_name), column.type()); - Y_VERIFY(field != nullptr); - Y_VERIFY(field->type()->Equals(column.type())); - Y_VERIFY(column.is_scalar() || column.length() == batch->rows); - auto new_schema = *batch->fields->AddField(batch->fields->num_fields(), field); - batch->datums.push_back(column); - batch->fields = new_schema; + default: + break; + } + return ""; } -arrow::Result<arrow::Datum> GetColumnByName(const std::shared_ptr<TProgramStep::TDatumBatch>& batch, const std::string& name) { - int i = batch->fields->GetFieldIndex(name); - if (i == -1) { - return arrow::Status::Invalid("Not found or duplicate"); +namespace { + +arrow::Status AddColumn( + TProgramStep::TDatumBatch& batch, + const std::string& name, + arrow::Datum&& column) +{ + if (batch.fields->GetFieldIndex(name) != -1) { + return arrow::Status::Invalid("Trying to add duplicate column '" + name + "'"); + } + + auto field = arrow::field(name, column.type()); + if (!field || !field->type()->Equals(column.type())) { + return arrow::Status::Invalid("Cannot create field."); } - else { - return batch->datums[i]; + if (!column.is_scalar() && column.length() != batch.rows) { + return arrow::Status::Invalid("Wrong column length."); + } + + batch.fields = *batch.fields->AddField(batch.fields->num_fields(), field); + batch.datums.emplace_back(column); + return arrow::Status::OK(); +} + +arrow::Result<arrow::Datum> GetColumnByName(const TProgramStep::TDatumBatch& batch, const std::string& name) { + int i = batch.fields->GetFieldIndex(name); + if (i == -1) { + return arrow::Status::Invalid("Not found column '" + name + "' or duplicate"); + } else { + return batch.datums[i]; } } @@ -287,134 +320,189 @@ std::shared_ptr<TProgramStep::TDatumBatch> ToTDatumBatch(std::shared_ptr<arrow:: return std::make_shared<TProgramStep::TDatumBatch>(TProgramStep::TDatumBatch{std::make_shared<arrow::Schema>(*batch->schema()), batch->num_rows(), std::move(datums)}); } -std::shared_ptr<arrow::RecordBatch> ToRecordBatch(std::shared_ptr<TProgramStep::TDatumBatch>& batch) { +std::shared_ptr<arrow::RecordBatch> ToRecordBatch(TProgramStep::TDatumBatch& batch) { std::vector<std::shared_ptr<arrow::Array>> columns; - columns.reserve(batch->datums.size()); - for (auto col : batch->datums) { + columns.reserve(batch.datums.size()); + for (auto col : batch.datums) { if (col.is_scalar()) { - columns.push_back(*arrow::MakeArrayFromScalar(*col.scalar(), batch->rows)); + columns.push_back(*arrow::MakeArrayFromScalar(*col.scalar(), batch.rows)); } else if (col.is_array()){ - Y_VERIFY(col.length() != -1); + if (col.length() == -1) { + return {}; + } columns.push_back(col.make_array()); } } - return arrow::RecordBatch::Make(batch->fields, batch->rows, columns); -} - - -std::shared_ptr<arrow::Array> MakeConstantColumn(const arrow::Scalar& value, int64_t size) { - auto res = arrow::MakeArrayFromScalar(value, size); - Y_VERIFY(res.ok()); - return *res; + return arrow::RecordBatch::Make(batch.fields, batch.rows, columns); } -template <typename TOpId, typename TOptions> -arrow::Datum CallFunctionById(TOpId funcId, const std::vector<std::string>& args, - const TOptions* funcOpts, - std::shared_ptr<TProgramStep::TDatumBatch> batch, - arrow::compute::ExecContext* ctx) +template <bool houseFunction, typename TOpId, typename TOptions> +arrow::Result<arrow::Datum> CallFunctionById( + TOpId funcId, const std::vector<std::string>& args, + const TOptions* funcOpts, + const TProgramStep::TDatumBatch& batch, + arrow::compute::ExecContext* ctx) { std::vector<arrow::Datum> arguments; arguments.reserve(args.size()); for (auto& colName : args) { auto column = GetColumnByName(batch, colName); - Y_VERIFY(column.ok()); + if (!column.ok()) { + return column.status(); + } arguments.push_back(*column); } - std::string funcName = GetFunctionName(funcId); - - arrow::Result<arrow::Datum> result; - if (ctx != nullptr && ctx->func_registry()->GetFunction(funcName).ok()) { - result = arrow::compute::CallFunction(GetFunctionName(funcId), arguments, funcOpts, ctx); + std::string funcName; + if constexpr (houseFunction) { + funcName = GetHouseFunctionName(funcId); } else { - result = arrow::compute::CallFunction(GetFunctionName(funcId), arguments, funcOpts); + funcName = GetFunctionName(funcId); + } + + if (ctx && ctx->func_registry()->GetFunction(funcName).ok()) { + return arrow::compute::CallFunction(funcName, arguments, funcOpts, ctx); } - Y_VERIFY_S(result.ok(), result.status().message()); - return result.ValueOrDie(); + return arrow::compute::CallFunction(funcName, arguments, funcOpts); +} + +arrow::Result<arrow::Datum> CallFunctionByAssign( + const TAssign& assign, + const TProgramStep::TDatumBatch& batch, + arrow::compute::ExecContext* ctx) +{ + return CallFunctionById<false>(assign.GetOperation(), assign.GetArguments(), assign.GetFunctionOptions(), + batch, ctx); } -arrow::Datum CallFunctionByAssign(const TAssign& assign, - std::shared_ptr<TProgramStep::TDatumBatch> batch, - arrow::compute::ExecContext* ctx) +arrow::Result<arrow::Datum> CallFunctionByAssign( + const TAggregateAssign& assign, + const TProgramStep::TDatumBatch& batch, + arrow::compute::ExecContext* ctx) { - return CallFunctionById(assign.GetOperation(), assign.GetArguments(), assign.GetFunctionOptions(), batch, ctx); + return CallFunctionById<false>(assign.GetOperation(), assign.GetArguments(), &assign.GetAggregateOptions(), + batch, ctx); } -arrow::Datum CallFunctionByAssign(const TAggregateAssign& assign, - std::shared_ptr<TProgramStep::TDatumBatch> batch, - arrow::compute::ExecContext* ctx) +arrow::Result<arrow::Datum> CallHouseFunctionByAssign( + const TAggregateAssign& assign, + TProgramStep::TDatumBatch& batch, + arrow::compute::ExecContext* ctx) { - return CallFunctionById(assign.GetOperation(), assign.GetArguments(), &assign.GetAggregateOptions(), batch, ctx); + try { + return CallFunctionById<true>(assign.GetOperation(), assign.GetArguments(), &assign.GetAggregateOptions(), + batch, ctx); + } catch (const std::exception& ex) { + Y_VERIFY_S(false, ex.what()); + } +} + } -void TProgramStep::ApplyAssignes(std::shared_ptr<TProgramStep::TDatumBatch>& batch, - arrow::compute::ExecContext* ctx) const + +arrow::Status TProgramStep::ApplyAssignes( + TProgramStep::TDatumBatch& batch, + arrow::compute::ExecContext* ctx) const { if (Assignes.empty()) { - return; + return arrow::Status::OK(); } - batch->datums.reserve(batch->datums.size() + Assignes.size()); + batch.datums.reserve(batch.datums.size() + Assignes.size()); for (auto& assign : Assignes) { - Y_VERIFY(!GetColumnByName(batch, assign.GetName()).ok()); + if (GetColumnByName(batch, assign.GetName()).ok()) { + return arrow::Status::Invalid("Assign to existing column '" + assign.GetName() + "'."); + } arrow::Datum column; if (assign.IsConstant()) { column = assign.GetConstant(); } else { - column = CallFunctionByAssign(assign, batch, ctx); + auto funcResult = CallFunctionByAssign(assign, batch, ctx); + if (!funcResult.ok()) { + return funcResult.status(); + } + column = *funcResult; + } + auto status = AddColumn(batch, assign.GetName(), std::move(column)); + if (!status.ok()) { + return status; } - AddColumn(batch, assign.GetName(), column); } - //Y_VERIFY(batch->Validate().ok()); + //return batch->Validate(); + return arrow::Status::OK(); } -void TProgramStep::ApplyAggregates(std::shared_ptr<TDatumBatch>& batch, arrow::compute::ExecContext* ctx) const { +arrow::Status TProgramStep::ApplyAggregates( + TDatumBatch& batch, + arrow::compute::ExecContext* ctx) const +{ if (GroupBy.empty()) { - return; + return arrow::Status::OK(); } - auto res = std::make_shared<TDatumBatch>(); - res->rows = 1; // TODO - res->datums.reserve(GroupBy.size()); + TDatumBatch res; + res.rows = 1; // TODO + res.datums.reserve(GroupBy.size()); arrow::FieldVector fields; fields.reserve(GroupBy.size()); for (auto& assign : GroupBy) { - res->datums.push_back(CallFunctionByAssign(assign, batch, ctx)); - auto& column = res->datums.back(); - Y_VERIFY_S(column.is_scalar(), TStringBuilder() << "Aggregate result is not a scalar."); - - auto op = assign.GetOperation(); - if (op == EAggregate::Min) { - const auto& minMax = column.scalar_as<arrow::StructScalar>(); - column = minMax.value[0]; - } else if (op == EAggregate::Max) { - const auto& minMax = column.scalar_as<arrow::StructScalar>(); - column = minMax.value[1]; + auto funcResult = CallFunctionByAssign(assign, batch, ctx); + if (!funcResult.ok()) { + auto houseResult = CallHouseFunctionByAssign(assign, batch, ctx); + if (!houseResult.ok()) { + return funcResult.status(); + } + funcResult = houseResult; } - Y_VERIFY_S(column.type(), TStringBuilder() << "Aggregate result has no type."); + res.datums.push_back(*funcResult); + auto& column = res.datums.back(); + if (!column.is_scalar()) { + return arrow::Status::Invalid("Aggregate result is not a scalar."); + } + + if (column.scalar()->type->id() == arrow::Type::STRUCT) { + auto op = assign.GetOperation(); + if (op == EAggregate::Min) { + const auto& minMax = column.scalar_as<arrow::StructScalar>(); + column = minMax.value[0]; + } else if (op == EAggregate::Max) { + const auto& minMax = column.scalar_as<arrow::StructScalar>(); + column = minMax.value[1]; + } else { + return arrow::Status::Invalid("Unexpected struct result for aggregate function"); + } + } + + if (!column.type()) { + return arrow::Status::Invalid("Aggregate result has no type."); + } fields.emplace_back(std::make_shared<arrow::Field>(assign.GetName(), column.type())); } - res->fields = std::make_shared<arrow::Schema>(fields); + res.fields = std::make_shared<arrow::Schema>(fields); batch = res; + return arrow::Status::OK(); } -void TProgramStep::ApplyFilters(std::shared_ptr<TDatumBatch>& batch) const { +arrow::Status TProgramStep::ApplyFilters(TDatumBatch& batch) const { if (Filters.empty()) { - return; + return arrow::Status::OK(); } std::vector<std::vector<bool>> filters; filters.reserve(Filters.size()); for (auto& colName : Filters) { auto column = GetColumnByName(batch, colName); - Y_VERIFY_S(column.ok(), TStringBuilder() << "Column " << colName << " is not ok."); - Y_VERIFY_S(column->is_array(), TStringBuilder() << "Column " << colName << " is not an array."); - Y_VERIFY_S(column->type() == arrow::boolean(), TStringBuilder() << "Column " << colName << " type is not bool."); + if (!column.ok()) { + return column.status(); + } + if (!column->is_array() || column->type() != arrow::boolean()) { + return arrow::Status::Invalid("Column '" + colName + "' is not a boolean array."); + } + auto boolColumn = std::static_pointer_cast<arrow::BooleanArray>(column->make_array()); filters.push_back(std::vector<bool>(boolColumn->length())); auto& bits = filters.back(); @@ -445,13 +533,18 @@ void TProgramStep::ApplyFilters(std::shared_ptr<TDatumBatch>& batch) const { } } - for (int64_t i = 0; i < batch->fields->num_fields(); ++i) { - bool needed = (allColumns || neededColumns.contains(batch->fields->field(i)->name())); - if (batch->datums[i].is_array() && needed) { - auto res = arrow::compute::Filter(batch->datums[i].make_array(), filter); - Y_VERIFY_S(res.ok(), res.status().message()); - Y_VERIFY((*res).kind() == batch->datums[i].kind()); - batch->datums[i] = *res; + for (int64_t i = 0; i < batch.fields->num_fields(); ++i) { + bool needed = (allColumns || neededColumns.contains(batch.fields->field(i)->name())); + if (batch.datums[i].is_array() && needed) { + auto res = arrow::compute::Filter(batch.datums[i].make_array(), filter); + if (!res.ok()) { + return res.status(); + } + if ((*res).kind() != batch.datums[i].kind()) { + return arrow::Status::Invalid("Unexpected filter result."); + } + + batch.datums[i] = *res; } } @@ -459,13 +552,14 @@ void TProgramStep::ApplyFilters(std::shared_ptr<TDatumBatch>& batch) const { for (int64_t i = 0; i < filter->length(); ++i) { newRows += filter->Value(i); } - batch->rows = newRows; + batch.rows = newRows; } + return arrow::Status::OK(); } -void TProgramStep::ApplyProjection(std::shared_ptr<TDatumBatch>& batch) const { +arrow::Status TProgramStep::ApplyProjection(TDatumBatch& batch) const { if (Projection.empty()) { - return; + return arrow::Status::OK(); } std::unordered_set<std::string_view> projSet; for (auto& str: Projection) { @@ -473,38 +567,69 @@ void TProgramStep::ApplyProjection(std::shared_ptr<TDatumBatch>& batch) const { } std::vector<std::shared_ptr<arrow::Field>> newFields; std::vector<arrow::Datum> newDatums; - for (int64_t i = 0; i < batch->fields->num_fields(); ++i) { - auto& cur_field_name = batch->fields->field(i)->name(); + for (int64_t i = 0; i < batch.fields->num_fields(); ++i) { + auto& cur_field_name = batch.fields->field(i)->name(); if (projSet.contains(cur_field_name)) { - newFields.push_back(batch->fields->field(i)); - Y_VERIFY(newFields.back()); - newDatums.push_back(batch->datums[i]); + newFields.push_back(batch.fields->field(i)); + if (!newFields.back()) { + return arrow::Status::Invalid("Wrong projection."); + } + newDatums.push_back(batch.datums[i]); } } - batch->fields = std::make_shared<arrow::Schema>(newFields); - batch->datums = std::move(newDatums); + batch.fields = std::make_shared<arrow::Schema>(newFields); + batch.datums = std::move(newDatums); + return arrow::Status::OK(); } -void TProgramStep::ApplyProjection(std::shared_ptr<arrow::RecordBatch>& batch) const { +arrow::Status TProgramStep::ApplyProjection(std::shared_ptr<arrow::RecordBatch>& batch) const { if (Projection.empty()) { - return; + return arrow::Status::OK(); } std::vector<std::shared_ptr<arrow::Field>> fields; for (auto& column : Projection) { fields.push_back(batch->schema()->GetFieldByName(column)); - Y_VERIFY(fields.back()); + if (!fields.back()) { + return arrow::Status::Invalid("Wrong projection column '" + column + "'."); + } } batch = NArrow::ExtractColumns(batch, std::make_shared<arrow::Schema>(fields)); + return arrow::Status::OK(); } -void TProgramStep::Apply(std::shared_ptr<arrow::RecordBatch>& batch, arrow::compute::ExecContext* ctx) const { +arrow::Status TProgramStep::Apply(std::shared_ptr<arrow::RecordBatch>& batch, arrow::compute::ExecContext* ctx) const { auto rb = ToTDatumBatch(batch); - ApplyAssignes(rb, ctx); - ApplyFilters(rb); - ApplyAggregates(rb, ctx); - ApplyProjection(rb); - batch = ToRecordBatch(rb); + + auto status = ApplyAssignes(*rb, ctx); + //Y_VERIFY_S(status.ok(), status.message()); + if (!status.ok()) { + return status; + } + + status = ApplyFilters(*rb); + //Y_VERIFY_S(status.ok(), status.message()); + if (!status.ok()) { + return status; + } + + status = ApplyAggregates(*rb, ctx); + //Y_VERIFY_S(status.ok(), status.message()); + if (!status.ok()) { + return status; + } + + status = ApplyProjection(*rb); + //Y_VERIFY_S(status.ok(), status.message()); + if (!status.ok()) { + return status; + } + + batch = ToRecordBatch(*rb); + if (!batch) { + return arrow::Status::Invalid("Failed to create program result."); + } + return arrow::Status::OK(); } } diff --git a/ydb/core/formats/program.h b/ydb/core/formats/program.h index 800ec9bb424..a314272003b 100644 --- a/ydb/core/formats/program.h +++ b/ydb/core/formats/program.h @@ -98,6 +98,7 @@ enum class EAggregate { const char * GetFunctionName(EOperation op); const char * GetFunctionName(EAggregate op); +const char * GetHouseFunctionName(EAggregate op); EOperation ValidateOperation(EOperation op, ui32 argsSize); class TAssign { @@ -246,21 +247,27 @@ struct TProgramStep { return Assignes.empty() && Filters.empty() && Projection.empty(); } - void Apply(std::shared_ptr<arrow::RecordBatch>& batch, arrow::compute::ExecContext* ctx) const; + arrow::Status Apply(std::shared_ptr<arrow::RecordBatch>& batch, arrow::compute::ExecContext* ctx) const; - void ApplyAssignes(std::shared_ptr<TDatumBatch>& batch, arrow::compute::ExecContext* ctx) const; - void ApplyAggregates(std::shared_ptr<TDatumBatch>& batch, arrow::compute::ExecContext* ctx) const; - void ApplyFilters(std::shared_ptr<TDatumBatch>& batch) const; - void ApplyProjection(std::shared_ptr<arrow::RecordBatch>& batch) const; - void ApplyProjection(std::shared_ptr<TDatumBatch>& batch) const; + arrow::Status ApplyAssignes(TDatumBatch& batch, arrow::compute::ExecContext* ctx) const; + arrow::Status ApplyAggregates(TDatumBatch& batch, arrow::compute::ExecContext* ctx) const; + arrow::Status ApplyFilters(TDatumBatch& batch) const; + arrow::Status ApplyProjection(std::shared_ptr<arrow::RecordBatch>& batch) const; + arrow::Status ApplyProjection(TDatumBatch& batch) const; }; -inline void ApplyProgram(std::shared_ptr<arrow::RecordBatch>& batch, - const std::vector<std::shared_ptr<TProgramStep>>& program, - arrow::compute::ExecContext* ctx = nullptr) { +inline arrow::Status ApplyProgram( + std::shared_ptr<arrow::RecordBatch>& batch, + const std::vector<std::shared_ptr<TProgramStep>>& program, + arrow::compute::ExecContext* ctx = nullptr) +{ for (auto& step : program) { - step->Apply(batch, ctx); + auto status = step->Apply(batch, ctx); + if (!status.ok()) { + return status; + } } + return arrow::Status::OK(); } struct TSsaProgramSteps { diff --git a/ydb/core/formats/ut/CMakeLists.darwin.txt b/ydb/core/formats/ut/CMakeLists.darwin.txt index 2f19d1653bc..d5029e328fe 100644 --- a/ydb/core/formats/ut/CMakeLists.darwin.txt +++ b/ydb/core/formats/ut/CMakeLists.darwin.txt @@ -14,6 +14,7 @@ target_compile_options(ydb-core-formats-ut PRIVATE ) target_include_directories(ydb-core-formats-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats + ${CMAKE_SOURCE_DIR}/ydb/library/arrow_clickhouse ) target_link_libraries(ydb-core-formats-ut PUBLIC contrib-libs-cxxsupp diff --git a/ydb/core/formats/ut/CMakeLists.linux.txt b/ydb/core/formats/ut/CMakeLists.linux.txt index 28f7e2e587f..41b2721bf22 100644 --- a/ydb/core/formats/ut/CMakeLists.linux.txt +++ b/ydb/core/formats/ut/CMakeLists.linux.txt @@ -14,6 +14,7 @@ target_compile_options(ydb-core-formats-ut PRIVATE ) target_include_directories(ydb-core-formats-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats + ${CMAKE_SOURCE_DIR}/ydb/library/arrow_clickhouse ) target_link_libraries(ydb-core-formats-ut PUBLIC contrib-libs-cxxsupp diff --git a/ydb/core/formats/ut_program_step.cpp b/ydb/core/formats/ut_program_step.cpp index fa368af5626..a3a62ab9578 100644 --- a/ydb/core/formats/ut_program_step.cpp +++ b/ydb/core/formats/ut_program_step.cpp @@ -23,7 +23,7 @@ size_t FilterTest(std::vector<std::shared_ptr<arrow::Array>> args, EOperation fr ps->Assignes = {TAssign("res1", frst, {"x", "y"}), TAssign("res2", scnd, {"res1", "z"})}; ps->Filters = {"res2"}; ps->Projection = {"res1", "res2"}; - ApplyProgram(rBatch, {ps}, GetCustomExecContext()); + UNIT_ASSERT(ApplyProgram(rBatch, {ps}, GetCustomExecContext()).ok()); UNIT_ASSERT(rBatch->ValidateFull().ok()); UNIT_ASSERT(rBatch->num_columns() == 2); return rBatch->num_rows(); @@ -39,7 +39,7 @@ size_t FilterTestUnary(std::vector<std::shared_ptr<arrow::Array>> args, EOperati ps->Assignes = {TAssign("res1", frst, {"x"}), TAssign("res2", scnd, {"res1", "z"})}; ps->Filters = {"res2"}; ps->Projection = {"res1", "res2"}; - ApplyProgram(rBatch, {ps}, GetCustomExecContext()); + UNIT_ASSERT(ApplyProgram(rBatch, {ps}, GetCustomExecContext()).ok()); UNIT_ASSERT(rBatch->ValidateFull().ok()); UNIT_ASSERT(rBatch->num_columns() == 2); return rBatch->num_rows(); @@ -173,7 +173,7 @@ Y_UNIT_TEST_SUITE(ProgramStepTest) { ps->Assignes = {TAssign("y", 56), TAssign("res", EOperation::Add, {"x", "y"})}; ps->Filters = {"filter"}; ps->Projection = {"res", "filter"}; - ApplyProgram(rBatch, {ps}, GetCustomExecContext()); + UNIT_ASSERT(ApplyProgram(rBatch, {ps}, GetCustomExecContext()).ok()); UNIT_ASSERT(rBatch->ValidateFull().ok()); UNIT_ASSERT(rBatch->num_columns() == 2); UNIT_ASSERT(rBatch->num_rows() == 2); @@ -189,7 +189,7 @@ Y_UNIT_TEST_SUITE(ProgramStepTest) { ps->Assignes = {TAssign("y", 56), TAssign("res", EOperation::Add, {"x", "y"})}; ps->Filters = {}; ps->Projection = {"res", "filter"}; - ApplyProgram(rBatch, {ps}, GetCustomExecContext()); + UNIT_ASSERT(ApplyProgram(rBatch, {ps}, GetCustomExecContext()).ok()); UNIT_ASSERT(rBatch->ValidateFull().ok()); UNIT_ASSERT(rBatch->num_columns() == 2); UNIT_ASSERT(rBatch->num_rows() == 4); diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h index 28605b4a98c..cc7fba4b373 100644 --- a/ydb/core/tx/columnshard/columnshard__stats_scan.h +++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h @@ -54,7 +54,8 @@ public: ApplyRangePredicates(batch); if (!ReadMetadata->Program.empty()) { - ApplyProgram(batch, ReadMetadata->Program, NArrow::GetCustomExecContext()); + auto status = ApplyProgram(batch, ReadMetadata->Program, NArrow::GetCustomExecContext()); + Y_VERIFY_S(status.ok(), status.message()); } // Leave only requested columns diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index 0b38308548a..ef849697feb 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -160,7 +160,6 @@ TVector<TCell> MakeTestCells(const TVector<TTypeId>& types, ui32 value, TVector< cells.reserve(types.size()); for (auto& type : types) { - // test only: 64-bit integer or string if (type == NTypeIds::Utf8 || type == NTypeIds::String || type == NTypeIds::String4k || @@ -176,12 +175,19 @@ TVector<TCell> MakeTestCells(const TVector<TTypeId>& types, ui32 value, TVector< mem.push_back("{ \"a\" = [ { \"b\" = 1; } ]; }"); const TString& str = mem.back(); cells.push_back(TCell(str.data(), str.size())); - } else if (type == NTypeIds::Timestamp || - type == NTypeIds::Uint64 || - type == NTypeIds::Int64) { + } else if (type == NTypeIds::Timestamp || type == NTypeIds::Uint64 || type == NTypeIds::Int64) { cells.push_back(TCell::Make<ui64>(value)); - } else if (type == NTypeIds::Int32) { - cells.push_back(TCell::Make<i32>(value)); + } else if (type == NTypeIds::Uint32 || type == NTypeIds::Int32 || type == NTypeIds::Datetime) { + cells.push_back(TCell::Make<ui32>(value)); + } else if (type == NTypeIds::Uint16 || type == NTypeIds::Int16 || type == NTypeIds::Date) { + cells.push_back(TCell::Make<ui16>(value)); + } else if (type == NTypeIds::Uint8 || type == NTypeIds::Int8 || type == NTypeIds::Byte || + type == NTypeIds::Bool) { + cells.push_back(TCell::Make<ui8>(value)); + } else if (type == NTypeIds::Float) { + cells.push_back(TCell::Make<float>(value)); + } else if (type == NTypeIds::Double) { + cells.push_back(TCell::Make<double>(value)); } else { UNIT_ASSERT(false); } diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index 45fd99abfc1..2fdb5c12a99 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -132,6 +132,39 @@ struct TTestSchema { return schema; } + static auto YdbAllTypesSchema() { + TVector<std::pair<TString, TTypeId>> schema = { + { "ts", NTypeIds::Timestamp }, + + { "i8", NTypeIds::Int8 }, + { "i16", NTypeIds::Int16 }, + { "i32", NTypeIds::Int32 }, + { "i64", NTypeIds::Int64 }, + { "u8", NTypeIds::Uint8 }, + { "u16", NTypeIds::Uint16 }, + { "u32", NTypeIds::Uint32 }, + { "u64", NTypeIds::Uint64 }, + { "float", NTypeIds::Float }, + { "double", NTypeIds::Double }, + + { "byte", NTypeIds::Byte }, + //{ "bool", NTypeIds::Bool }, + //{ "decimal", NTypeIds::Decimal }, + //{ "dynum", NTypeIds::DyNumber }, + + { "date", NTypeIds::Date }, + { "datetime", NTypeIds::Datetime }, + //{ "interval", NTypeIds::Interval }, + + {"text", NTypeIds::Text }, + {"bytes", NTypeIds::Bytes }, + {"yson", NTypeIds::Yson }, + {"json", NTypeIds::Json }, + {"jsondoc", NTypeIds::JsonDocument } + }; + return schema; + }; + static NKikimrSchemeOp::TOlapColumnDescription CreateColumn(ui32 id, const TString& name, TTypeId type) { NKikimrSchemeOp::TOlapColumnDescription col; col.SetId(id); @@ -160,7 +193,10 @@ struct TTestSchema { *schema->MutableColumns()->Add() = CreateColumn(i + 1, columns[i].first, columns[i].second); } - for (auto& column : ExtractNames(YdbPkSchema())) { + auto pk = columns; + Y_VERIFY(pk.size() >= 4); + pk.resize(4); + for (auto& column : ExtractNames(pk)) { schema->AddKeyColumnNames(column); } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 23b41f81c48..d2693d0e7c8 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -515,7 +515,8 @@ TIndexedReadData::MakeResult(TVector<std::vector<std::shared_ptr<arrow::RecordBa if (ReadMetadata->HasProgram()) { for (auto& batch : out) { - ApplyProgram(batch.ResultBatch, ReadMetadata->Program, NArrow::GetCustomExecContext()); + auto status = ApplyProgram(batch.ResultBatch, ReadMetadata->Program, NArrow::GetCustomExecContext()); + Y_VERIFY_S(status.ok(), status.message()); } } return out; diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp index ad895944e2a..8daf6084479 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp @@ -84,22 +84,48 @@ bool DataHasOnly(const TVector<TString>& blobs, const TString& srtSchema, std::p } template <typename TArrowType> -bool CheckIntValue(const TVector<TString>& blobs, const TString& srcSchema, const TString& colName, - const TVector<int>& expectedVals) { - auto schema = NArrow::DeserializeSchema(srcSchema); - for (auto& blob : blobs) { - auto batch = NArrow::DeserializeBatch(blob, schema); - UNIT_ASSERT(batch); +bool CheckTypedIntValues(const std::shared_ptr<arrow::Array>& array, const TVector<int64_t>& expected) { + UNIT_ASSERT(array); + UNIT_ASSERT_VALUES_EQUAL(array->length(), (int)expected.size()); - std::shared_ptr<arrow::Array> array = batch->GetColumnByName(colName); - UNIT_ASSERT(array); - auto& val = dynamic_cast<const arrow::NumericArray<TArrowType>&>(*array); + auto& column = dynamic_cast<const arrow::NumericArray<TArrowType>&>(*array); - UNIT_ASSERT_VALUES_EQUAL(val.length(), (int)expectedVals.size()); - for (int i = 0; i < val.length(); ++i) { - int value = val.Value(i); - UNIT_ASSERT_VALUES_EQUAL(value, expectedVals[i]); - } + for (int i = 0; i < column.length(); ++i) { + auto value = column.Value(i); + UNIT_ASSERT_VALUES_EQUAL(value, expected[i]); + } + return true; +} + +bool CheckIntValues(const std::shared_ptr<arrow::Array>& array, const TVector<int64_t>& expected) { + UNIT_ASSERT(array); + + switch (array->type()->id()) { + case arrow::Type::UINT8: + return CheckTypedIntValues<arrow::UInt8Type>(array, expected); + case arrow::Type::UINT16: + return CheckTypedIntValues<arrow::UInt16Type>(array, expected); + case arrow::Type::UINT32: + return CheckTypedIntValues<arrow::UInt32Type>(array, expected); + case arrow::Type::UINT64: + return CheckTypedIntValues<arrow::UInt64Type>(array, expected); + case arrow::Type::INT8: + return CheckTypedIntValues<arrow::Int8Type>(array, expected); + case arrow::Type::INT16: + return CheckTypedIntValues<arrow::Int16Type>(array, expected); + case arrow::Type::INT32: + return CheckTypedIntValues<arrow::Int32Type>(array, expected); + case arrow::Type::INT64: + return CheckTypedIntValues<arrow::Int64Type>(array, expected); + + case arrow::Type::TIMESTAMP: + return CheckTypedIntValues<arrow::TimestampType>(array, expected); + case arrow::Type::DURATION: + return CheckTypedIntValues<arrow::DurationType>(array, expected); + + default: + UNIT_ASSERT(false); + //return false; } return true; } @@ -903,100 +929,101 @@ static NKikimrSSA::TProgram MakeSelect(TAssignment::EFunction compareId = TAssig return ssa; } -// SELECT some(timestamp), some(saved_at) FROM t -// -// FIXME: -// NotImplemented: Function any has no kernel matching input types (array[timestamp[us]]) -// NotImplemented: Function any has no kernel matching input types (array[string]) -// NotImplemented: Function any has no kernel matching input types (array[int32]) -// NotImplemented: Function min_max has no kernel matching input types (array[timestamp[us]]) -// NotImplemented: Function min_max has no kernel matching input types (array[string]) -// -NKikimrSSA::TProgram MakeSelectAggregates(TAggAssignment::EAggregateFunction aggId = TAggAssignment::AGG_ANY, - std::vector<ui32> columnIds = {1, 9}) +// SELECT min(x), max(x), some(x), count(x) FROM t +NKikimrSSA::TProgram MakeSelectAggregates(ui32 columnId) { NKikimrSSA::TProgram ssa; - - ui32 tmpColumnId = 100; - auto* line1 = ssa.AddCommand(); auto* groupBy = line1->MutableGroupBy(); // auto* l1_agg1 = groupBy->AddAggregates(); - l1_agg1->MutableColumn()->SetId(tmpColumnId); + l1_agg1->MutableColumn()->SetId(100); auto* l1_agg1_f = l1_agg1->MutableFunction(); - l1_agg1_f->SetId(aggId); - l1_agg1_f->AddArguments()->SetId(columnIds[0]); + l1_agg1_f->SetId(TAggAssignment::AGG_MIN); + l1_agg1_f->AddArguments()->SetId(columnId); // auto* l1_agg2 = groupBy->AddAggregates(); - l1_agg2->MutableColumn()->SetId(tmpColumnId + 1); + l1_agg2->MutableColumn()->SetId(101); auto* l1_agg2_f = l1_agg2->MutableFunction(); - l1_agg2_f->SetId(aggId); - l1_agg2_f->AddArguments()->SetId(columnIds[1]); + l1_agg2_f->SetId(TAggAssignment::AGG_MAX); + l1_agg2_f->AddArguments()->SetId(columnId); + // + auto* l1_agg3 = groupBy->AddAggregates(); + l1_agg3->MutableColumn()->SetId(102); + auto* l1_agg3_f = l1_agg3->MutableFunction(); + l1_agg3_f->SetId(TAggAssignment::AGG_ANY); + l1_agg3_f->AddArguments()->SetId(columnId); + // + auto* l1_agg4 = groupBy->AddAggregates(); + l1_agg4->MutableColumn()->SetId(103); + auto* l1_agg4_f = l1_agg4->MutableFunction(); + l1_agg4_f->SetId(TAggAssignment::AGG_COUNT); + l1_agg4_f->AddArguments()->SetId(columnId); auto* line2 = ssa.AddCommand(); auto* proj = line2->MutableProjection(); - proj->AddColumns()->SetId(tmpColumnId); - proj->AddColumns()->SetId(tmpColumnId + 1); + proj->AddColumns()->SetId(100); + proj->AddColumns()->SetId(101); + proj->AddColumns()->SetId(102); + proj->AddColumns()->SetId(103); return ssa; } -// SELECT min(level), max(level), some(level), count(timestamp) FROM t WHERE level = 1 -NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(std::vector<ui32> columnIds = {1, 5}) +// SELECT min(x), max(x), some(x), count(x) FROM t WHERE y = 1 +NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(ui32 columnId, ui32 filterColumnId) { NKikimrSSA::TProgram ssa; - ui32 tmpColumnId = 100; auto* line1 = ssa.AddCommand(); auto* l1_assign = line1->MutableAssign(); - l1_assign->MutableColumn()->SetId(tmpColumnId); + l1_assign->MutableColumn()->SetId(50); l1_assign->MutableConstant()->SetInt32(1); auto* line2 = ssa.AddCommand(); auto* l2_assign = line2->MutableAssign(); - l2_assign->MutableColumn()->SetId(tmpColumnId + 1); + l2_assign->MutableColumn()->SetId(51); auto* l2_func = l2_assign->MutableFunction(); l2_func->SetId(TAssignment::FUNC_CMP_EQUAL); - l2_func->AddArguments()->SetId(columnIds[1]); - l2_func->AddArguments()->SetId(tmpColumnId); + l2_func->AddArguments()->SetId(filterColumnId); + l2_func->AddArguments()->SetId(50); auto* line3 = ssa.AddCommand(); - line3->MutableFilter()->MutablePredicate()->SetId(tmpColumnId + 1); + line3->MutableFilter()->MutablePredicate()->SetId(51); auto* line4 = ssa.AddCommand(); auto* groupBy = line4->MutableGroupBy(); // auto* l4_agg1 = groupBy->AddAggregates(); - l4_agg1->MutableColumn()->SetId(tmpColumnId + 2); + l4_agg1->MutableColumn()->SetId(100); auto* l4_agg1_f = l4_agg1->MutableFunction(); l4_agg1_f->SetId(TAggAssignment::AGG_MIN); - l4_agg1_f->AddArguments()->SetId(columnIds[1]); + l4_agg1_f->AddArguments()->SetId(columnId); // auto* l4_agg2 = groupBy->AddAggregates(); - l4_agg2->MutableColumn()->SetId(tmpColumnId + 3); + l4_agg2->MutableColumn()->SetId(101); auto* l4_agg2_f = l4_agg2->MutableFunction(); l4_agg2_f->SetId(TAggAssignment::AGG_MAX); - l4_agg2_f->AddArguments()->SetId(columnIds[1]); -#if 0 + l4_agg2_f->AddArguments()->SetId(columnId); + // auto* l4_agg3 = groupBy->AddAggregates(); - l4_agg3->MutableColumn()->SetId(tmpColumnId + 4); + l4_agg3->MutableColumn()->SetId(102); auto* l4_agg3_f = l4_agg3->MutableFunction(); l4_agg3_f->SetId(TAggAssignment::AGG_ANY); - l4_agg3_f->AddArguments()->SetId(columnIds[1]); -#endif + l4_agg3_f->AddArguments()->SetId(columnId); + // auto* l4_agg4 = groupBy->AddAggregates(); - l4_agg4->MutableColumn()->SetId(tmpColumnId + 5); + l4_agg4->MutableColumn()->SetId(103); auto* l4_agg4_f = l4_agg4->MutableFunction(); l4_agg4_f->SetId(TAggAssignment::AGG_COUNT); - l4_agg4_f->AddArguments()->SetId(columnIds[0]); + l4_agg4_f->AddArguments()->SetId(columnId); auto* line5 = ssa.AddCommand(); auto* proj = line5->MutableProjection(); - proj->AddColumns()->SetId(tmpColumnId + 2); - proj->AddColumns()->SetId(tmpColumnId + 3); - //proj->AddColumns()->SetId(tmpColumnId + 4); - proj->AddColumns()->SetId(tmpColumnId + 5); + proj->AddColumns()->SetId(100); + proj->AddColumns()->SetId(101); + proj->AddColumns()->SetId(102); + proj->AddColumns()->SetId(103); return ssa; } @@ -1127,7 +1154,7 @@ void TestReadWithProgram(const TVector<std::pair<TString, TTypeId>>& ydbSchema = } } -void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = TTestSchema::YdbSchema()) { +void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = TTestSchema::YdbAllTypesSchema()) { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -1144,7 +1171,7 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T ui64 planStep = 100; ui64 txId = 100; - SetupSchema(runtime, sender, tableId); + SetupSchema(runtime, sender, tableId, ydbSchema); { // write some data bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, ydbSchema)); @@ -1157,20 +1184,21 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T // TODO: write some into index std::vector<TString> programs; + THashSet<ui32> intResult; + THashSet<ui32> isFiltered; + THashSet<NScheme::TTypeId> intTypes = { + NTypeIds::Int8, NTypeIds::Int16, NTypeIds::Int32, NTypeIds::Int64, + NTypeIds::Uint8, NTypeIds::Uint16, NTypeIds::Uint32, NTypeIds::Uint64, + NTypeIds::Timestamp + }; - { - NKikimrSSA::TProgram ssa = MakeSelectAggregates(TAggAssignment::AGG_COUNT); - TString serialized; - UNIT_ASSERT(ssa.SerializeToString(&serialized)); - NKikimrSSA::TOlapProgram program; - program.SetProgram(serialized); - - programs.push_back(""); - UNIT_ASSERT(program.SerializeToString(&programs.back())); - } + ui32 prog = 0; + for (ui32 i = 0; i < ydbSchema.size(); ++i, ++prog) { + if (intTypes.count(ydbSchema[i].second)) { + intResult.insert(prog); + } - { - NKikimrSSA::TProgram ssa = MakeSelectAggregates(TAggAssignment::AGG_MIN, {5, 5}); + NKikimrSSA::TProgram ssa = MakeSelectAggregates(i + 1); TString serialized; UNIT_ASSERT(ssa.SerializeToString(&serialized)); NKikimrSSA::TOlapProgram program; @@ -1180,8 +1208,13 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T UNIT_ASSERT(program.SerializeToString(&programs.back())); } - { - NKikimrSSA::TProgram ssa = MakeSelectAggregatesWithFilter(); + for (ui32 i = 0; i < ydbSchema.size(); ++i, ++prog) { + isFiltered.insert(prog); + if (intTypes.count(ydbSchema[i].second)) { + intResult.insert(prog); + } + + NKikimrSSA::TProgram ssa = MakeSelectAggregatesWithFilter(i + 1, 4); TString serialized; UNIT_ASSERT(ssa.SerializeToString(&serialized)); NKikimrSSA::TOlapProgram program; @@ -1191,7 +1224,7 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T UNIT_ASSERT(program.SerializeToString(&programs.back())); } - ui32 i = 0; + prog = 0; for (auto& programText : programs) { auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId); auto& readProto = Proto(readEvent); @@ -1220,22 +1253,30 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T TVector<TString> readData; readData.push_back(resRead.GetData()); + auto& data = readData[0]; - switch (i) { - case 2: - UNIT_ASSERT(CheckColumns(readData[0], meta, {"102", "103", /*"104",*/ "105"}, 1)); - UNIT_ASSERT(CheckIntValue<arrow::Int32Type>(readData, schema, "102", {1})); - UNIT_ASSERT(CheckIntValue<arrow::Int32Type>(readData, schema, "103", {1})); - //UNIT_ASSERT(CheckIntValue<arrow::Int32Type>(readData, schema, "104", {1})); - UNIT_ASSERT(CheckIntValue<arrow::Int64Type>(readData, schema, "105", {1})); - break; - default: - UNIT_ASSERT(CheckColumns(readData[0], meta, {"100", "101"}, 1)); - break; + auto batch = NArrow::DeserializeBatch(data, NArrow::DeserializeSchema(schema)); + UNIT_ASSERT(batch); + + UNIT_ASSERT(CheckColumns(data, meta, {"100", "101", "102", "103"}, 1)); + + // min, max, any, count + if (intResult.count(prog)) { + if (isFiltered.count(prog)) { + UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("100"), {1})); + UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("101"), {1})); + UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("102"), {1})); + UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("103"), {1})); + } else { + UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("100"), {0})); + UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("101"), {99})); + //UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("102"), {0})); + UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("103"), {100})); + } } } - ++i; + ++prog; } } diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp index 5757f7909b1..55429f6cb39 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp @@ -9,7 +9,6 @@ using NWrappers::NTestHelpers::TS3Mock; namespace { static const TVector<std::pair<TString, TTypeId>> testYdbSchema = TTestSchema::YdbSchema(); -//static const TVector<std::pair<TString, TTypeId>> testYdbPkSchema = TTestSchema::YdbPkSchema(); std::shared_ptr<arrow::RecordBatch> UpdateColumn(std::shared_ptr<arrow::RecordBatch> batch, TString columnName, i64 seconds) { std::string name(columnName.c_str(), columnName.size()); diff --git a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionAvg.h b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionAvg.h index 3e8bce5fdbb..598afb54a3f 100644 --- a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionAvg.h +++ b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionAvg.h @@ -8,6 +8,7 @@ #include <type_traits> #include <AggregateFunctions/IAggregateFunction.h> +#include <AggregateFunctions/AggregateFunctionWrapper.h> #include <AggregateFunctions/AggregateFunctionSum.h> namespace CH @@ -151,13 +152,13 @@ public: : ArrowAggregateFunctionWrapper(std::move(name)) {} - AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) override + AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) const override { return createWithSameType<AggregateFunctionAvg>(argument_types); } template <template <typename> typename AggFunc> - std::shared_ptr<IAggregateFunction> createWithSameType(const DataTypes & argument_types) + std::shared_ptr<IAggregateFunction> createWithSameType(const DataTypes & argument_types) const { if (argument_types.size() != 1) return {}; diff --git a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionCount.h b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionCount.h index 6044df828e6..0994cadc1fd 100644 --- a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionCount.h +++ b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionCount.h @@ -6,6 +6,7 @@ #include "arrow_clickhouse_types.h" #include <AggregateFunctions/IAggregateFunction.h> +#include <AggregateFunctions/AggregateFunctionWrapper.h> #include <Columns/ColumnsCommon.h> #include <array> @@ -84,7 +85,7 @@ public: : ArrowAggregateFunctionWrapper(std::move(name)) {} - AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) override + AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) const override { return std::make_shared<AggregateFunctionCount>(argument_types); } diff --git a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionMinMaxAny.h b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionMinMaxAny.h index 5fbfb53170d..c8ce2884c70 100644 --- a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionMinMaxAny.h +++ b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionMinMaxAny.h @@ -5,7 +5,9 @@ #pragma once #include "arrow_clickhouse_types.h" +#include <ydb/library/yql/udfs/common/clickhouse/client/src/Common/BitHelpers.h> #include <AggregateFunctions/IAggregateFunction.h> +#include <AggregateFunctions/AggregateFunctionWrapper.h> namespace CH @@ -669,7 +671,7 @@ public: : ArrowAggregateFunctionWrapper(std::move(name)) {} - AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) override + AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) const override { return createAggregateFunctionSingleValue<AggFunc, AggData>(argument_types); } diff --git a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionSum.h b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionSum.h index 52844b95da2..9819eac3df6 100644 --- a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionSum.h +++ b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionSum.h @@ -9,6 +9,7 @@ #include <type_traits> #include <AggregateFunctions/IAggregateFunction.h> +#include <AggregateFunctions/AggregateFunctionWrapper.h> namespace CH { @@ -255,13 +256,13 @@ public: : ArrowAggregateFunctionWrapper(std::move(name)) {} - AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) override + AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) const override { return createWithSameType<AggregateFunctionSumWithOverflow>(argument_types); } template <template <typename> typename AggFunc> - std::shared_ptr<IAggregateFunction> createWithSameType(const DataTypes & argument_types) + std::shared_ptr<IAggregateFunction> createWithSameType(const DataTypes & argument_types) const { if (argument_types.size() != 1) return {}; diff --git a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionWrapper.h b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionWrapper.h new file mode 100644 index 00000000000..732b07f8e3e --- /dev/null +++ b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionWrapper.h @@ -0,0 +1,89 @@ +#pragma once +#include "arrow_clickhouse_types.h" +#include "Aggregator.h" +#include <AggregateFunctions/IAggregateFunction.h> +#include <DataStreams/OneBlockInputStream.h> +#include <DataStreams/AggregatingBlockInputStream.h> + +namespace CH +{ + +class ArrowAggregateFunctionWrapper : public arrow::compute::ScalarAggregateFunction +{ +public: + ArrowAggregateFunctionWrapper(std::string name) + : arrow::compute::ScalarAggregateFunction(std::move(name), arrow::compute::Arity::Unary(), nullptr) + {} + + virtual AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) const = 0; + + arrow::Result<arrow::Datum> Execute( + const std::vector<arrow::Datum>& args, + const arrow::compute::FunctionOptions* /*options*/, + arrow::compute::ExecContext* /*ctx*/) const override + { + static const std::string result_name = "res"; + static const std::vector<std::string> arg_names = {"0", "1", "2", "3", "4", "5"}; + if (args.size() > arg_names.size()) + return arrow::Status::Invalid("unexpected arguments count"); + + std::vector<uint32_t> arg_positions; + arg_positions.reserve(args.size()); + + DataTypes types; + arrow::FieldVector fields; + std::vector<std::shared_ptr<arrow::Array>> columns; + + types.reserve(args.size()); + fields.reserve(args.size()); + columns.reserve(args.size()); + + int num_rows = 0; + uint32_t arg_num = 0; + for (auto& arg : args) + { + if (!arg.is_array()) + return arrow::Status::Invalid("argument is not an array"); + + columns.push_back(arg.make_array()); + types.push_back(columns.back()->type()); + fields.push_back(std::make_shared<arrow::Field>(arg_names[arg_num], types.back())); + + if (!arg_num) + num_rows = columns.back()->length(); + else if (num_rows != columns.back()->length()) + return arrow::Status::Invalid("different argiments length"); + + arg_positions.push_back(arg_num); + ++arg_num; + } + + auto batch = arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(fields), num_rows, columns); + + AggregateDescription description { + .function = getHouseFunction(types), + .arguments = arg_positions, + .column_name = result_name + }; + + auto input_stream = std::make_shared<OneBlockInputStream>(batch); + + // no agg keys, final aggregate states + Aggregator::Params agg_params(false, input_stream->getHeader(), {}, {description}, false); + AggregatingBlockInputStream agg_stream(input_stream, agg_params, true); + + auto result_batch = agg_stream.read(); + if (!result_batch || result_batch->num_rows() != 1) + return arrow::Status::Invalid("unexpected arrgerate result"); + if (agg_stream.read()) + return arrow::Status::Invalid("unexpected second batch in aggregate result"); + + auto res_column = result_batch->GetColumnByName(result_name); + if (!res_column || res_column->length() != 1) + return arrow::Status::Invalid("no result value"); + + return res_column->GetScalar(0); + } +}; + +} diff --git a/ydb/library/arrow_clickhouse/AggregateFunctions/IAggregateFunction.h b/ydb/library/arrow_clickhouse/AggregateFunctions/IAggregateFunction.h index 81c425d2494..dc23c3a2a8f 100644 --- a/ydb/library/arrow_clickhouse/AggregateFunctions/IAggregateFunction.h +++ b/ydb/library/arrow_clickhouse/AggregateFunctions/IAggregateFunction.h @@ -481,15 +481,4 @@ struct AggregateFunctionProperties bool is_order_dependent = false; }; - -class ArrowAggregateFunctionWrapper : public arrow::compute::ScalarAggregateFunction -{ -public: - ArrowAggregateFunctionWrapper(std::string name) - : arrow::compute::ScalarAggregateFunction(std::move(name), arrow::compute::Arity::Unary(), nullptr) - {} - - virtual AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) = 0; -}; - } diff --git a/ydb/library/arrow_clickhouse/Aggregator.h b/ydb/library/arrow_clickhouse/Aggregator.h index a83baf531f1..af08ba1ef76 100644 --- a/ydb/library/arrow_clickhouse/Aggregator.h +++ b/ydb/library/arrow_clickhouse/Aggregator.h @@ -20,17 +20,6 @@ namespace CH { -struct AggregateDescription -{ - AggregateFunctionPtr function; - Array parameters; /// Parameters of the (parametric) aggregate function. - ColumnNumbers arguments; - Names argument_names; /// used if no `arguments` are specified. - String column_name; /// What name to use for a column with aggregate function values -}; - -using AggregateDescriptions = std::vector<AggregateDescription>; - /** Different data structures that can be used for aggregation * For efficiency, the aggregation data itself is put into the pool. * Data and pool ownership (states of aggregate functions) diff --git a/ydb/library/arrow_clickhouse/Columns/ColumnAggregateFunction.h b/ydb/library/arrow_clickhouse/Columns/ColumnAggregateFunction.h index cf126113230..27cbb8ecd58 100644 --- a/ydb/library/arrow_clickhouse/Columns/ColumnAggregateFunction.h +++ b/ydb/library/arrow_clickhouse/Columns/ColumnAggregateFunction.h @@ -144,7 +144,4 @@ private: std::shared_ptr<arrow::UInt64Builder> builder; }; -using AggregateColumnsData = std::vector<arrow::UInt64Builder *>; -using AggregateColumnsConstData = std::vector<const arrow::UInt64Array *>; - } diff --git a/ydb/library/arrow_clickhouse/Columns/ColumnsCommon.cpp b/ydb/library/arrow_clickhouse/Columns/ColumnsCommon.cpp index 3dbc50b22ff..c44841f07e3 100644 --- a/ydb/library/arrow_clickhouse/Columns/ColumnsCommon.cpp +++ b/ydb/library/arrow_clickhouse/Columns/ColumnsCommon.cpp @@ -288,6 +288,13 @@ bool insertData(MutableColumn & column, const StringRef & value) case arrow::Type::BINARY: return insertString(column, value); + case arrow::Type::TIMESTAMP: + return insertNumber(column, *reinterpret_cast<const Int64 *>(value.data)); + case arrow::Type::DURATION: + return insertNumber(column, *reinterpret_cast<const Int64 *>(value.data)); + case arrow::Type::DECIMAL: + return insertDecimal(column, value); + case arrow::Type::EXTENSION: // AggregateColumn break; // TODO @@ -328,13 +335,23 @@ StringRef serializeValueIntoArena(const IColumn& column, size_t row, Arena & poo case arrow::Type::FIXED_SIZE_BINARY: { auto str = assert_cast<const ColumnFixedString &>(column).GetView(row); - return serializeStringIntoArena<true>(StringRef(str.data(), str.size()), pool, begin); + return serializeFixedStringIntoArena(StringRef(str.data(), str.size()), pool, begin); } case arrow::Type::STRING: case arrow::Type::BINARY: { auto str = assert_cast<const ColumnBinary &>(column).GetView(row); - return serializeStringIntoArena<false>(StringRef(str.data(), str.size()), pool, begin); + return serializeStringIntoArena(StringRef(str.data(), str.size()), pool, begin); + } + + case arrow::Type::TIMESTAMP: + return serializeNumberIntoArena(assert_cast<const ColumnTimestamp &>(column).Value(row), pool, begin); + case arrow::Type::DURATION: + return serializeNumberIntoArena(assert_cast<const ColumnDuration &>(column).Value(row), pool, begin); + case arrow::Type::DECIMAL: + { + auto str = assert_cast<const ColumnDecimal &>(column).GetView(row); + return serializeDecimalIntoArena(StringRef(str.data(), str.size()), pool, begin); } case arrow::Type::EXTENSION: // AggregateColumn @@ -381,6 +398,13 @@ const char * deserializeAndInsertFromArena(MutableColumn& column, const char * p case arrow::Type::BINARY: return deserializeStringFromArena(assert_cast<MutableColumnBinary &>(column), pos); + case arrow::Type::TIMESTAMP: + return deserializeNumberFromArena(assert_cast<MutableColumnTimestamp &>(column), pos); + case arrow::Type::DURATION: + return deserializeNumberFromArena(assert_cast<MutableColumnDuration &>(column), pos); + case arrow::Type::DECIMAL: + return deserializeDecimalFromArena(assert_cast<MutableColumnDecimal &>(column), pos); + case arrow::Type::EXTENSION: // AggregateColumn break; // TODO @@ -430,6 +454,13 @@ void updateHashWithValue(const IColumn& column, size_t row, SipHash & hash) return hash.update(str.data(), str.size()); } + case arrow::Type::TIMESTAMP: + return hash.update(assert_cast<const ColumnTimestamp &>(column).Value(row)); + case arrow::Type::DURATION: + return hash.update(assert_cast<const ColumnDuration &>(column).Value(row)); + case arrow::Type::DECIMAL: + return hash.update(assert_cast<const ColumnDecimal &>(column).Value(row)); + case arrow::Type::EXTENSION: // AggregateColumn break; // TODO @@ -475,6 +506,13 @@ MutableColumnPtr createMutableColumn(const DataTypePtr & type) case arrow::Type::STRING: return std::make_shared<MutableColumnString>(); + case arrow::Type::TIMESTAMP: + return std::make_shared<MutableColumnTimestamp>(type, arrow::default_memory_pool()); + case arrow::Type::DURATION: + return std::make_shared<MutableColumnDuration>(type, arrow::default_memory_pool()); + case arrow::Type::DECIMAL: + return std::make_shared<MutableColumnDecimal>(type, arrow::default_memory_pool()); + case arrow::Type::EXTENSION: // AggregateColumn break; // TODO: do we really need it here? @@ -517,6 +555,13 @@ uint32_t fixedContiguousSize(const DataTypePtr & type) case arrow::Type::BINARY: break; + case arrow::Type::TIMESTAMP: + return 8; + case arrow::Type::DURATION: + return 8; + case arrow::Type::DECIMAL: + return std::static_pointer_cast<DataTypeDecimal>(type)->byte_width(); + case arrow::Type::EXTENSION: // AggregateColumn break; diff --git a/ydb/library/arrow_clickhouse/Columns/ColumnsCommon.h b/ydb/library/arrow_clickhouse/Columns/ColumnsCommon.h index 5eb633d15a0..e427867e494 100644 --- a/ydb/library/arrow_clickhouse/Columns/ColumnsCommon.h +++ b/ydb/library/arrow_clickhouse/Columns/ColumnsCommon.h @@ -27,25 +27,29 @@ inline StringRef serializeNumberIntoArena(T value, Arena & arena, char const *& return StringRef(pos, sizeof(T)); } -template <bool fixed> inline StringRef serializeStringIntoArena(const StringRef & str, Arena & arena, char const *& begin) { - if constexpr (fixed) - { - auto * pos = arena.allocContinue(str.size, begin); - memcpy(pos, str.data, str.size); - return StringRef(pos, str.size); - } - else - { - StringRef res; - res.size = sizeof(str.size) + str.size; - char * pos = arena.allocContinue(res.size, begin); - memcpy(pos, &str.size, sizeof(str.size)); - memcpy(pos + sizeof(str.size), str.data, str.size); - res.data = pos; - return res; - } + StringRef res; + res.size = sizeof(str.size) + str.size; + char * pos = arena.allocContinue(res.size, begin); + memcpy(pos, &str.size, sizeof(str.size)); + memcpy(pos + sizeof(str.size), str.data, str.size); + res.data = pos; + return res; +} + +inline StringRef serializeFixedStringIntoArena(const StringRef & str, Arena & arena, char const *& begin) +{ + auto * pos = arena.allocContinue(str.size, begin); + memcpy(pos, str.data, str.size); + return StringRef(pos, str.size); +} + +inline StringRef serializeDecimalIntoArena(const StringRef & str, Arena & arena, char const *& begin) +{ + auto * pos = arena.allocContinue(str.size, begin); + memcpy(pos, str.data, str.size); + return StringRef(pos, str.size); } template <typename T> @@ -85,6 +89,11 @@ inline bool insertFixedString(MutableColumn & column, const StringRef & value) return assert_cast<MutableColumnFixedString &>(column).Append(arrow::util::string_view{value.data, value.size}).ok(); } +inline bool insertDecimal(MutableColumn & column, const StringRef & value) +{ + return assert_cast<MutableColumnDecimal &>(column).Append(arrow::util::string_view{value.data, value.size}).ok(); +} + template <typename DataType> inline const char * deserializeNumberFromArena(arrow::NumericBuilder<DataType> & column, const char * pos) { @@ -110,6 +119,12 @@ inline const char * deserializeStringFromArena(MutableColumnFixedString & column return pos + column.byte_width(); } +inline const char * deserializeDecimalFromArena(MutableColumnDecimal & column, const char * pos) +{ + column.Append(pos).ok(); + return pos + column.byte_width(); +} + bool insertData(MutableColumn & column, const StringRef & value); StringRef serializeValueIntoArena(const IColumn& column, size_t row, Arena & pool, char const *& begin); const char * deserializeAndInsertFromArena(MutableColumn& column, const char * pos); diff --git a/ydb/library/arrow_clickhouse/Common/Allocator.h b/ydb/library/arrow_clickhouse/Common/Allocator.h index b9c1aa1247c..46fcc75719a 100644 --- a/ydb/library/arrow_clickhouse/Common/Allocator.h +++ b/ydb/library/arrow_clickhouse/Common/Allocator.h @@ -15,10 +15,12 @@ #if !defined(__APPLE__) && !defined(__FreeBSD__) #include <malloc.h> #endif +#if !defined(WIN32) +#include <sys/mman.h> +#endif #include <cstdlib> #include <algorithm> -#include <sys/mman.h> #include <common/mremap.h> diff --git a/ydb/library/arrow_clickhouse/arrow_clickhouse_types.h b/ydb/library/arrow_clickhouse/arrow_clickhouse_types.h index 698503007eb..57dee49a1fe 100644 --- a/ydb/library/arrow_clickhouse/arrow_clickhouse_types.h +++ b/ydb/library/arrow_clickhouse/arrow_clickhouse_types.h @@ -6,6 +6,8 @@ #include <map> #include <stdexcept> +#include <util/system/types.h> + #include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h> @@ -88,6 +90,10 @@ using ColumnBinary = arrow::BinaryArray; using ColumnString = arrow::StringArray; using ColumnFixedString = arrow::FixedSizeBinaryArray; +using ColumnTimestamp = arrow::TimestampArray; +using ColumnDuration = arrow::DurationArray; +using ColumnDecimal = arrow::DecimalArray; + using MutableColumnInt8 = arrow::Int8Builder; using MutableColumnInt16 = arrow::Int16Builder; using MutableColumnInt32 = arrow::Int32Builder; @@ -105,6 +111,10 @@ using MutableColumnBinary = arrow::BinaryBuilder; using MutableColumnString = arrow::StringBuilder; using MutableColumnFixedString = arrow::FixedSizeBinaryBuilder; +using MutableColumnTimestamp = arrow::TimestampBuilder; +using MutableColumnDuration = arrow::DurationBuilder; +using MutableColumnDecimal = arrow::DecimalBuilder; + using IDataType = arrow::DataType; using DataTypePtr = std::shared_ptr<IDataType>; using DataTypes = arrow::DataTypeVector; @@ -119,8 +129,32 @@ using DataTypeUInt16 = arrow::UInt16Type; using DataTypeUInt32 = arrow::UInt32Type; using DataTypeUInt64 = arrow::UInt64Type; +using DataTypeBinary = arrow::BinaryType; +using DataTypeString = arrow::StringType; using DataTypeFixedString = arrow::FixedSizeBinaryType; +using DataTypeTimestamp = arrow::TimestampType; +using DataTypeDuration = arrow::DurationType; +using DataTypeDecimal = arrow::DecimalType; + +class IAggregateFunction; +using AggregateFunctionPtr = std::shared_ptr<const IAggregateFunction>; + +struct AggregateDescription +{ + AggregateFunctionPtr function; + Array parameters; /// Parameters of the (parametric) aggregate function. + ColumnNumbers arguments; + Names argument_names; /// used if no `arguments` are specified. + String column_name; /// What name to use for a column with aggregate function values +}; + +using AggregateDescriptions = std::vector<AggregateDescription>; + +using AggregateColumnsData = std::vector<arrow::UInt64Builder *>; +using AggregateColumnsConstData = std::vector<const arrow::UInt64Array *>; + + inline Columns columnsFromHeader(const Header& schema, size_t num_rows = 0) { std::vector<std::shared_ptr<arrow::Array>> columns; |