summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <[email protected]>2022-08-30 13:31:50 +0300
committerchertus <[email protected]>2022-08-30 13:31:50 +0300
commit76dc9dab1ef4e4b4fcae17e5ae188e1c3c9774ce (patch)
tree6682510118ec90e7783ba567cdc639c40e79373f
parente7d0ad21f2f82162d09849c0c4c02522ebc6adf2 (diff)
use ch aggregate functions
-rw-r--r--CMakeLists.darwin.txt8
-rw-r--r--CMakeLists.linux.txt8
-rw-r--r--ydb/core/formats/CMakeLists.txt4
-rw-r--r--ydb/core/formats/custom_registry.cpp27
-rw-r--r--ydb/core/formats/program.cpp337
-rw-r--r--ydb/core/formats/program.h27
-rw-r--r--ydb/core/formats/ut/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/formats/ut/CMakeLists.linux.txt1
-rw-r--r--ydb/core/formats/ut_program_step.cpp8
-rw-r--r--ydb/core/tx/columnshard/columnshard__stats_scan.h3
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp18
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h38
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp3
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_read_write.cpp215
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_schema.cpp1
-rw-r--r--ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionAvg.h5
-rw-r--r--ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionCount.h3
-rw-r--r--ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionMinMaxAny.h4
-rw-r--r--ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionSum.h5
-rw-r--r--ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionWrapper.h89
-rw-r--r--ydb/library/arrow_clickhouse/AggregateFunctions/IAggregateFunction.h11
-rw-r--r--ydb/library/arrow_clickhouse/Aggregator.h11
-rw-r--r--ydb/library/arrow_clickhouse/Columns/ColumnAggregateFunction.h3
-rw-r--r--ydb/library/arrow_clickhouse/Columns/ColumnsCommon.cpp49
-rw-r--r--ydb/library/arrow_clickhouse/Columns/ColumnsCommon.h49
-rw-r--r--ydb/library/arrow_clickhouse/Common/Allocator.h4
-rw-r--r--ydb/library/arrow_clickhouse/arrow_clickhouse_types.h34
27 files changed, 690 insertions, 276 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index cc237e6dce8..9652a821b21 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -565,6 +565,10 @@ add_subdirectory(ydb/library/yql/providers/common/config)
add_subdirectory(ydb/library/yql/providers/common/gateway)
add_subdirectory(ydb/library/yql/providers/result/provider)
add_subdirectory(ydb/core/formats)
+add_subdirectory(ydb/library/arrow_clickhouse)
+add_subdirectory(ydb/library/arrow_clickhouse/Common)
+add_subdirectory(ydb/library/arrow_clickhouse/Columns)
+add_subdirectory(ydb/library/arrow_clickhouse/DataStreams)
add_subdirectory(ydb/core/keyvalue/protos)
add_subdirectory(ydb/core/tx)
add_subdirectory(ydb/core/persqueue/config)
@@ -1044,10 +1048,6 @@ add_subdirectory(ydb/core/filestore)
add_subdirectory(ydb/core/grpc_caching)
add_subdirectory(ydb/core/pgproxy)
add_subdirectory(ydb/core/yql_testlib)
-add_subdirectory(ydb/library/arrow_clickhouse)
-add_subdirectory(ydb/library/arrow_clickhouse/Common)
-add_subdirectory(ydb/library/arrow_clickhouse/Columns)
-add_subdirectory(ydb/library/arrow_clickhouse/DataStreams)
add_subdirectory(ydb/core/actorlib_impl/ut)
add_subdirectory(library/cpp/testing/unittest_main)
add_subdirectory(library/cpp/terminate_handler)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 961c6d74083..48a37d39e3c 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -569,6 +569,10 @@ add_subdirectory(ydb/library/yql/providers/common/config)
add_subdirectory(ydb/library/yql/providers/common/gateway)
add_subdirectory(ydb/library/yql/providers/result/provider)
add_subdirectory(ydb/core/formats)
+add_subdirectory(ydb/library/arrow_clickhouse)
+add_subdirectory(ydb/library/arrow_clickhouse/Common)
+add_subdirectory(ydb/library/arrow_clickhouse/Columns)
+add_subdirectory(ydb/library/arrow_clickhouse/DataStreams)
add_subdirectory(ydb/core/keyvalue/protos)
add_subdirectory(ydb/core/tx)
add_subdirectory(ydb/core/persqueue/config)
@@ -1048,10 +1052,6 @@ add_subdirectory(ydb/core/filestore)
add_subdirectory(ydb/core/grpc_caching)
add_subdirectory(ydb/core/pgproxy)
add_subdirectory(ydb/core/yql_testlib)
-add_subdirectory(ydb/library/arrow_clickhouse)
-add_subdirectory(ydb/library/arrow_clickhouse/Common)
-add_subdirectory(ydb/library/arrow_clickhouse/Columns)
-add_subdirectory(ydb/library/arrow_clickhouse/DataStreams)
add_subdirectory(ydb/core/actorlib_impl/ut)
add_subdirectory(library/cpp/testing/unittest_main)
add_subdirectory(library/cpp/terminate_handler)
diff --git a/ydb/core/formats/CMakeLists.txt b/ydb/core/formats/CMakeLists.txt
index 72e015389af..35be1df939b 100644
--- a/ydb/core/formats/CMakeLists.txt
+++ b/ydb/core/formats/CMakeLists.txt
@@ -11,6 +11,9 @@ add_library(ydb-core-formats)
target_compile_options(ydb-core-formats PRIVATE
-DUSE_CURRENT_UDF_ABI_VERSION
)
+target_include_directories(ydb-core-formats PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/arrow_clickhouse
+)
target_link_libraries(ydb-core-formats PUBLIC
contrib-libs-cxxsupp
yutil
@@ -18,6 +21,7 @@ target_link_libraries(ydb-core-formats PUBLIC
ydb-core-scheme
ydb-library-binary_json
ydb-library-dynumber
+ ydb-library-arrow_clickhouse
)
target_sources(ydb-core-formats PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow_batch_builder.cpp
diff --git a/ydb/core/formats/custom_registry.cpp b/ydb/core/formats/custom_registry.cpp
index 347e833f9f6..ef1fb1a1934 100644
--- a/ydb/core/formats/custom_registry.cpp
+++ b/ydb/core/formats/custom_registry.cpp
@@ -2,10 +2,19 @@
#include "functions.h"
#include "func_common.h"
+#include "program.h"
+
#include <util/system/yassert.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/registry_internal.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h>
+#ifndef WIN32
+#include <AggregateFunctions/AggregateFunctionCount.h>
+#include <AggregateFunctions/AggregateFunctionMinMaxAny.h>
+#include <AggregateFunctions/AggregateFunctionSum.h>
+#include <AggregateFunctions/AggregateFunctionAvg.h>
+#endif
+
namespace cp = ::arrow::compute;
namespace NKikimr::NArrow {
@@ -50,6 +59,23 @@ static void RegisterYdbCast(cp::FunctionRegistry* registry) {
Y_VERIFY(registry->AddFunction(std::make_shared<YdbCastMetaFunction>()).ok());
}
+static void RegisterHouseAggregates(cp::FunctionRegistry* registry) {
+#ifndef WIN32
+ try {
+ Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedAny>(GetHouseFunctionName(EAggregate::Any))).ok());
+ Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedCount>(GetHouseFunctionName(EAggregate::Count))).ok());
+ Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedMin>(GetHouseFunctionName(EAggregate::Min))).ok());
+ Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedMax>(GetHouseFunctionName(EAggregate::Max))).ok());
+ Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedSum>(GetHouseFunctionName(EAggregate::Sum))).ok());
+ Y_VERIFY(registry->AddFunction(std::make_shared<CH::WrappedAvg>(GetHouseFunctionName(EAggregate::Avg))).ok());
+ } catch (const std::exception& /*ex*/) {
+ Y_VERIFY(false);
+ }
+#else
+ Y_UNUSED(registry);
+#endif
+}
+
static std::unique_ptr<cp::FunctionRegistry> CreateCustomRegistry() {
auto registry = cp::FunctionRegistry::Make();
@@ -57,6 +83,7 @@ static std::unique_ptr<cp::FunctionRegistry> CreateCustomRegistry() {
RegisterRound(registry.get());
RegisterArithmetic(registry.get());
RegisterYdbCast(registry.get());
+ RegisterHouseAggregates(registry.get());
return registry;
}
diff --git a/ydb/core/formats/program.cpp b/ydb/core/formats/program.cpp
index d42ed0cd5f0..48a615c41ce 100644
--- a/ydb/core/formats/program.cpp
+++ b/ydb/core/formats/program.cpp
@@ -257,24 +257,57 @@ const char * GetFunctionName(EAggregate op) {
return "";
}
+const char * GetHouseFunctionName(EAggregate op) {
+ switch (op) {
+ case EAggregate::Any:
+ return "ch.any";
+ case EAggregate::Count:
+ return "ch.count";
+ case EAggregate::Min:
+ return "ch.min";
+ case EAggregate::Max:
+ return "ch.max";
+ case EAggregate::Sum:
+ return "ch.sum";
+ case EAggregate::Avg:
+ return "ch.avg";
-void AddColumn(std::shared_ptr<TProgramStep::TDatumBatch>& batch, std::string field_name, const arrow::Datum& column) {
- auto field = ::arrow::field(std::move(field_name), column.type());
- Y_VERIFY(field != nullptr);
- Y_VERIFY(field->type()->Equals(column.type()));
- Y_VERIFY(column.is_scalar() || column.length() == batch->rows);
- auto new_schema = *batch->fields->AddField(batch->fields->num_fields(), field);
- batch->datums.push_back(column);
- batch->fields = new_schema;
+ default:
+ break;
+ }
+ return "";
}
-arrow::Result<arrow::Datum> GetColumnByName(const std::shared_ptr<TProgramStep::TDatumBatch>& batch, const std::string& name) {
- int i = batch->fields->GetFieldIndex(name);
- if (i == -1) {
- return arrow::Status::Invalid("Not found or duplicate");
+namespace {
+
+arrow::Status AddColumn(
+ TProgramStep::TDatumBatch& batch,
+ const std::string& name,
+ arrow::Datum&& column)
+{
+ if (batch.fields->GetFieldIndex(name) != -1) {
+ return arrow::Status::Invalid("Trying to add duplicate column '" + name + "'");
+ }
+
+ auto field = arrow::field(name, column.type());
+ if (!field || !field->type()->Equals(column.type())) {
+ return arrow::Status::Invalid("Cannot create field.");
}
- else {
- return batch->datums[i];
+ if (!column.is_scalar() && column.length() != batch.rows) {
+ return arrow::Status::Invalid("Wrong column length.");
+ }
+
+ batch.fields = *batch.fields->AddField(batch.fields->num_fields(), field);
+ batch.datums.emplace_back(column);
+ return arrow::Status::OK();
+}
+
+arrow::Result<arrow::Datum> GetColumnByName(const TProgramStep::TDatumBatch& batch, const std::string& name) {
+ int i = batch.fields->GetFieldIndex(name);
+ if (i == -1) {
+ return arrow::Status::Invalid("Not found column '" + name + "' or duplicate");
+ } else {
+ return batch.datums[i];
}
}
@@ -287,134 +320,189 @@ std::shared_ptr<TProgramStep::TDatumBatch> ToTDatumBatch(std::shared_ptr<arrow::
return std::make_shared<TProgramStep::TDatumBatch>(TProgramStep::TDatumBatch{std::make_shared<arrow::Schema>(*batch->schema()), batch->num_rows(), std::move(datums)});
}
-std::shared_ptr<arrow::RecordBatch> ToRecordBatch(std::shared_ptr<TProgramStep::TDatumBatch>& batch) {
+std::shared_ptr<arrow::RecordBatch> ToRecordBatch(TProgramStep::TDatumBatch& batch) {
std::vector<std::shared_ptr<arrow::Array>> columns;
- columns.reserve(batch->datums.size());
- for (auto col : batch->datums) {
+ columns.reserve(batch.datums.size());
+ for (auto col : batch.datums) {
if (col.is_scalar()) {
- columns.push_back(*arrow::MakeArrayFromScalar(*col.scalar(), batch->rows));
+ columns.push_back(*arrow::MakeArrayFromScalar(*col.scalar(), batch.rows));
}
else if (col.is_array()){
- Y_VERIFY(col.length() != -1);
+ if (col.length() == -1) {
+ return {};
+ }
columns.push_back(col.make_array());
}
}
- return arrow::RecordBatch::Make(batch->fields, batch->rows, columns);
-}
-
-
-std::shared_ptr<arrow::Array> MakeConstantColumn(const arrow::Scalar& value, int64_t size) {
- auto res = arrow::MakeArrayFromScalar(value, size);
- Y_VERIFY(res.ok());
- return *res;
+ return arrow::RecordBatch::Make(batch.fields, batch.rows, columns);
}
-template <typename TOpId, typename TOptions>
-arrow::Datum CallFunctionById(TOpId funcId, const std::vector<std::string>& args,
- const TOptions* funcOpts,
- std::shared_ptr<TProgramStep::TDatumBatch> batch,
- arrow::compute::ExecContext* ctx)
+template <bool houseFunction, typename TOpId, typename TOptions>
+arrow::Result<arrow::Datum> CallFunctionById(
+ TOpId funcId, const std::vector<std::string>& args,
+ const TOptions* funcOpts,
+ const TProgramStep::TDatumBatch& batch,
+ arrow::compute::ExecContext* ctx)
{
std::vector<arrow::Datum> arguments;
arguments.reserve(args.size());
for (auto& colName : args) {
auto column = GetColumnByName(batch, colName);
- Y_VERIFY(column.ok());
+ if (!column.ok()) {
+ return column.status();
+ }
arguments.push_back(*column);
}
- std::string funcName = GetFunctionName(funcId);
-
- arrow::Result<arrow::Datum> result;
- if (ctx != nullptr && ctx->func_registry()->GetFunction(funcName).ok()) {
- result = arrow::compute::CallFunction(GetFunctionName(funcId), arguments, funcOpts, ctx);
+ std::string funcName;
+ if constexpr (houseFunction) {
+ funcName = GetHouseFunctionName(funcId);
} else {
- result = arrow::compute::CallFunction(GetFunctionName(funcId), arguments, funcOpts);
+ funcName = GetFunctionName(funcId);
+ }
+
+ if (ctx && ctx->func_registry()->GetFunction(funcName).ok()) {
+ return arrow::compute::CallFunction(funcName, arguments, funcOpts, ctx);
}
- Y_VERIFY_S(result.ok(), result.status().message());
- return result.ValueOrDie();
+ return arrow::compute::CallFunction(funcName, arguments, funcOpts);
+}
+
+arrow::Result<arrow::Datum> CallFunctionByAssign(
+ const TAssign& assign,
+ const TProgramStep::TDatumBatch& batch,
+ arrow::compute::ExecContext* ctx)
+{
+ return CallFunctionById<false>(assign.GetOperation(), assign.GetArguments(), assign.GetFunctionOptions(),
+ batch, ctx);
}
-arrow::Datum CallFunctionByAssign(const TAssign& assign,
- std::shared_ptr<TProgramStep::TDatumBatch> batch,
- arrow::compute::ExecContext* ctx)
+arrow::Result<arrow::Datum> CallFunctionByAssign(
+ const TAggregateAssign& assign,
+ const TProgramStep::TDatumBatch& batch,
+ arrow::compute::ExecContext* ctx)
{
- return CallFunctionById(assign.GetOperation(), assign.GetArguments(), assign.GetFunctionOptions(), batch, ctx);
+ return CallFunctionById<false>(assign.GetOperation(), assign.GetArguments(), &assign.GetAggregateOptions(),
+ batch, ctx);
}
-arrow::Datum CallFunctionByAssign(const TAggregateAssign& assign,
- std::shared_ptr<TProgramStep::TDatumBatch> batch,
- arrow::compute::ExecContext* ctx)
+arrow::Result<arrow::Datum> CallHouseFunctionByAssign(
+ const TAggregateAssign& assign,
+ TProgramStep::TDatumBatch& batch,
+ arrow::compute::ExecContext* ctx)
{
- return CallFunctionById(assign.GetOperation(), assign.GetArguments(), &assign.GetAggregateOptions(), batch, ctx);
+ try {
+ return CallFunctionById<true>(assign.GetOperation(), assign.GetArguments(), &assign.GetAggregateOptions(),
+ batch, ctx);
+ } catch (const std::exception& ex) {
+ Y_VERIFY_S(false, ex.what());
+ }
+}
+
}
-void TProgramStep::ApplyAssignes(std::shared_ptr<TProgramStep::TDatumBatch>& batch,
- arrow::compute::ExecContext* ctx) const
+
+arrow::Status TProgramStep::ApplyAssignes(
+ TProgramStep::TDatumBatch& batch,
+ arrow::compute::ExecContext* ctx) const
{
if (Assignes.empty()) {
- return;
+ return arrow::Status::OK();
}
- batch->datums.reserve(batch->datums.size() + Assignes.size());
+ batch.datums.reserve(batch.datums.size() + Assignes.size());
for (auto& assign : Assignes) {
- Y_VERIFY(!GetColumnByName(batch, assign.GetName()).ok());
+ if (GetColumnByName(batch, assign.GetName()).ok()) {
+ return arrow::Status::Invalid("Assign to existing column '" + assign.GetName() + "'.");
+ }
arrow::Datum column;
if (assign.IsConstant()) {
column = assign.GetConstant();
} else {
- column = CallFunctionByAssign(assign, batch, ctx);
+ auto funcResult = CallFunctionByAssign(assign, batch, ctx);
+ if (!funcResult.ok()) {
+ return funcResult.status();
+ }
+ column = *funcResult;
+ }
+ auto status = AddColumn(batch, assign.GetName(), std::move(column));
+ if (!status.ok()) {
+ return status;
}
- AddColumn(batch, assign.GetName(), column);
}
- //Y_VERIFY(batch->Validate().ok());
+ //return batch->Validate();
+ return arrow::Status::OK();
}
-void TProgramStep::ApplyAggregates(std::shared_ptr<TDatumBatch>& batch, arrow::compute::ExecContext* ctx) const {
+arrow::Status TProgramStep::ApplyAggregates(
+ TDatumBatch& batch,
+ arrow::compute::ExecContext* ctx) const
+{
if (GroupBy.empty()) {
- return;
+ return arrow::Status::OK();
}
- auto res = std::make_shared<TDatumBatch>();
- res->rows = 1; // TODO
- res->datums.reserve(GroupBy.size());
+ TDatumBatch res;
+ res.rows = 1; // TODO
+ res.datums.reserve(GroupBy.size());
arrow::FieldVector fields;
fields.reserve(GroupBy.size());
for (auto& assign : GroupBy) {
- res->datums.push_back(CallFunctionByAssign(assign, batch, ctx));
- auto& column = res->datums.back();
- Y_VERIFY_S(column.is_scalar(), TStringBuilder() << "Aggregate result is not a scalar.");
-
- auto op = assign.GetOperation();
- if (op == EAggregate::Min) {
- const auto& minMax = column.scalar_as<arrow::StructScalar>();
- column = minMax.value[0];
- } else if (op == EAggregate::Max) {
- const auto& minMax = column.scalar_as<arrow::StructScalar>();
- column = minMax.value[1];
+ auto funcResult = CallFunctionByAssign(assign, batch, ctx);
+ if (!funcResult.ok()) {
+ auto houseResult = CallHouseFunctionByAssign(assign, batch, ctx);
+ if (!houseResult.ok()) {
+ return funcResult.status();
+ }
+ funcResult = houseResult;
}
- Y_VERIFY_S(column.type(), TStringBuilder() << "Aggregate result has no type.");
+ res.datums.push_back(*funcResult);
+ auto& column = res.datums.back();
+ if (!column.is_scalar()) {
+ return arrow::Status::Invalid("Aggregate result is not a scalar.");
+ }
+
+ if (column.scalar()->type->id() == arrow::Type::STRUCT) {
+ auto op = assign.GetOperation();
+ if (op == EAggregate::Min) {
+ const auto& minMax = column.scalar_as<arrow::StructScalar>();
+ column = minMax.value[0];
+ } else if (op == EAggregate::Max) {
+ const auto& minMax = column.scalar_as<arrow::StructScalar>();
+ column = minMax.value[1];
+ } else {
+ return arrow::Status::Invalid("Unexpected struct result for aggregate function");
+ }
+ }
+
+ if (!column.type()) {
+ return arrow::Status::Invalid("Aggregate result has no type.");
+ }
fields.emplace_back(std::make_shared<arrow::Field>(assign.GetName(), column.type()));
}
- res->fields = std::make_shared<arrow::Schema>(fields);
+ res.fields = std::make_shared<arrow::Schema>(fields);
batch = res;
+ return arrow::Status::OK();
}
-void TProgramStep::ApplyFilters(std::shared_ptr<TDatumBatch>& batch) const {
+arrow::Status TProgramStep::ApplyFilters(TDatumBatch& batch) const {
if (Filters.empty()) {
- return;
+ return arrow::Status::OK();
}
std::vector<std::vector<bool>> filters;
filters.reserve(Filters.size());
for (auto& colName : Filters) {
auto column = GetColumnByName(batch, colName);
- Y_VERIFY_S(column.ok(), TStringBuilder() << "Column " << colName << " is not ok.");
- Y_VERIFY_S(column->is_array(), TStringBuilder() << "Column " << colName << " is not an array.");
- Y_VERIFY_S(column->type() == arrow::boolean(), TStringBuilder() << "Column " << colName << " type is not bool.");
+ if (!column.ok()) {
+ return column.status();
+ }
+ if (!column->is_array() || column->type() != arrow::boolean()) {
+ return arrow::Status::Invalid("Column '" + colName + "' is not a boolean array.");
+ }
+
auto boolColumn = std::static_pointer_cast<arrow::BooleanArray>(column->make_array());
filters.push_back(std::vector<bool>(boolColumn->length()));
auto& bits = filters.back();
@@ -445,13 +533,18 @@ void TProgramStep::ApplyFilters(std::shared_ptr<TDatumBatch>& batch) const {
}
}
- for (int64_t i = 0; i < batch->fields->num_fields(); ++i) {
- bool needed = (allColumns || neededColumns.contains(batch->fields->field(i)->name()));
- if (batch->datums[i].is_array() && needed) {
- auto res = arrow::compute::Filter(batch->datums[i].make_array(), filter);
- Y_VERIFY_S(res.ok(), res.status().message());
- Y_VERIFY((*res).kind() == batch->datums[i].kind());
- batch->datums[i] = *res;
+ for (int64_t i = 0; i < batch.fields->num_fields(); ++i) {
+ bool needed = (allColumns || neededColumns.contains(batch.fields->field(i)->name()));
+ if (batch.datums[i].is_array() && needed) {
+ auto res = arrow::compute::Filter(batch.datums[i].make_array(), filter);
+ if (!res.ok()) {
+ return res.status();
+ }
+ if ((*res).kind() != batch.datums[i].kind()) {
+ return arrow::Status::Invalid("Unexpected filter result.");
+ }
+
+ batch.datums[i] = *res;
}
}
@@ -459,13 +552,14 @@ void TProgramStep::ApplyFilters(std::shared_ptr<TDatumBatch>& batch) const {
for (int64_t i = 0; i < filter->length(); ++i) {
newRows += filter->Value(i);
}
- batch->rows = newRows;
+ batch.rows = newRows;
}
+ return arrow::Status::OK();
}
-void TProgramStep::ApplyProjection(std::shared_ptr<TDatumBatch>& batch) const {
+arrow::Status TProgramStep::ApplyProjection(TDatumBatch& batch) const {
if (Projection.empty()) {
- return;
+ return arrow::Status::OK();
}
std::unordered_set<std::string_view> projSet;
for (auto& str: Projection) {
@@ -473,38 +567,69 @@ void TProgramStep::ApplyProjection(std::shared_ptr<TDatumBatch>& batch) const {
}
std::vector<std::shared_ptr<arrow::Field>> newFields;
std::vector<arrow::Datum> newDatums;
- for (int64_t i = 0; i < batch->fields->num_fields(); ++i) {
- auto& cur_field_name = batch->fields->field(i)->name();
+ for (int64_t i = 0; i < batch.fields->num_fields(); ++i) {
+ auto& cur_field_name = batch.fields->field(i)->name();
if (projSet.contains(cur_field_name)) {
- newFields.push_back(batch->fields->field(i));
- Y_VERIFY(newFields.back());
- newDatums.push_back(batch->datums[i]);
+ newFields.push_back(batch.fields->field(i));
+ if (!newFields.back()) {
+ return arrow::Status::Invalid("Wrong projection.");
+ }
+ newDatums.push_back(batch.datums[i]);
}
}
- batch->fields = std::make_shared<arrow::Schema>(newFields);
- batch->datums = std::move(newDatums);
+ batch.fields = std::make_shared<arrow::Schema>(newFields);
+ batch.datums = std::move(newDatums);
+ return arrow::Status::OK();
}
-void TProgramStep::ApplyProjection(std::shared_ptr<arrow::RecordBatch>& batch) const {
+arrow::Status TProgramStep::ApplyProjection(std::shared_ptr<arrow::RecordBatch>& batch) const {
if (Projection.empty()) {
- return;
+ return arrow::Status::OK();
}
std::vector<std::shared_ptr<arrow::Field>> fields;
for (auto& column : Projection) {
fields.push_back(batch->schema()->GetFieldByName(column));
- Y_VERIFY(fields.back());
+ if (!fields.back()) {
+ return arrow::Status::Invalid("Wrong projection column '" + column + "'.");
+ }
}
batch = NArrow::ExtractColumns(batch, std::make_shared<arrow::Schema>(fields));
+ return arrow::Status::OK();
}
-void TProgramStep::Apply(std::shared_ptr<arrow::RecordBatch>& batch, arrow::compute::ExecContext* ctx) const {
+arrow::Status TProgramStep::Apply(std::shared_ptr<arrow::RecordBatch>& batch, arrow::compute::ExecContext* ctx) const {
auto rb = ToTDatumBatch(batch);
- ApplyAssignes(rb, ctx);
- ApplyFilters(rb);
- ApplyAggregates(rb, ctx);
- ApplyProjection(rb);
- batch = ToRecordBatch(rb);
+
+ auto status = ApplyAssignes(*rb, ctx);
+ //Y_VERIFY_S(status.ok(), status.message());
+ if (!status.ok()) {
+ return status;
+ }
+
+ status = ApplyFilters(*rb);
+ //Y_VERIFY_S(status.ok(), status.message());
+ if (!status.ok()) {
+ return status;
+ }
+
+ status = ApplyAggregates(*rb, ctx);
+ //Y_VERIFY_S(status.ok(), status.message());
+ if (!status.ok()) {
+ return status;
+ }
+
+ status = ApplyProjection(*rb);
+ //Y_VERIFY_S(status.ok(), status.message());
+ if (!status.ok()) {
+ return status;
+ }
+
+ batch = ToRecordBatch(*rb);
+ if (!batch) {
+ return arrow::Status::Invalid("Failed to create program result.");
+ }
+ return arrow::Status::OK();
}
}
diff --git a/ydb/core/formats/program.h b/ydb/core/formats/program.h
index 800ec9bb424..a314272003b 100644
--- a/ydb/core/formats/program.h
+++ b/ydb/core/formats/program.h
@@ -98,6 +98,7 @@ enum class EAggregate {
const char * GetFunctionName(EOperation op);
const char * GetFunctionName(EAggregate op);
+const char * GetHouseFunctionName(EAggregate op);
EOperation ValidateOperation(EOperation op, ui32 argsSize);
class TAssign {
@@ -246,21 +247,27 @@ struct TProgramStep {
return Assignes.empty() && Filters.empty() && Projection.empty();
}
- void Apply(std::shared_ptr<arrow::RecordBatch>& batch, arrow::compute::ExecContext* ctx) const;
+ arrow::Status Apply(std::shared_ptr<arrow::RecordBatch>& batch, arrow::compute::ExecContext* ctx) const;
- void ApplyAssignes(std::shared_ptr<TDatumBatch>& batch, arrow::compute::ExecContext* ctx) const;
- void ApplyAggregates(std::shared_ptr<TDatumBatch>& batch, arrow::compute::ExecContext* ctx) const;
- void ApplyFilters(std::shared_ptr<TDatumBatch>& batch) const;
- void ApplyProjection(std::shared_ptr<arrow::RecordBatch>& batch) const;
- void ApplyProjection(std::shared_ptr<TDatumBatch>& batch) const;
+ arrow::Status ApplyAssignes(TDatumBatch& batch, arrow::compute::ExecContext* ctx) const;
+ arrow::Status ApplyAggregates(TDatumBatch& batch, arrow::compute::ExecContext* ctx) const;
+ arrow::Status ApplyFilters(TDatumBatch& batch) const;
+ arrow::Status ApplyProjection(std::shared_ptr<arrow::RecordBatch>& batch) const;
+ arrow::Status ApplyProjection(TDatumBatch& batch) const;
};
-inline void ApplyProgram(std::shared_ptr<arrow::RecordBatch>& batch,
- const std::vector<std::shared_ptr<TProgramStep>>& program,
- arrow::compute::ExecContext* ctx = nullptr) {
+inline arrow::Status ApplyProgram(
+ std::shared_ptr<arrow::RecordBatch>& batch,
+ const std::vector<std::shared_ptr<TProgramStep>>& program,
+ arrow::compute::ExecContext* ctx = nullptr)
+{
for (auto& step : program) {
- step->Apply(batch, ctx);
+ auto status = step->Apply(batch, ctx);
+ if (!status.ok()) {
+ return status;
+ }
}
+ return arrow::Status::OK();
}
struct TSsaProgramSteps {
diff --git a/ydb/core/formats/ut/CMakeLists.darwin.txt b/ydb/core/formats/ut/CMakeLists.darwin.txt
index 2f19d1653bc..d5029e328fe 100644
--- a/ydb/core/formats/ut/CMakeLists.darwin.txt
+++ b/ydb/core/formats/ut/CMakeLists.darwin.txt
@@ -14,6 +14,7 @@ target_compile_options(ydb-core-formats-ut PRIVATE
)
target_include_directories(ydb-core-formats-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats
+ ${CMAKE_SOURCE_DIR}/ydb/library/arrow_clickhouse
)
target_link_libraries(ydb-core-formats-ut PUBLIC
contrib-libs-cxxsupp
diff --git a/ydb/core/formats/ut/CMakeLists.linux.txt b/ydb/core/formats/ut/CMakeLists.linux.txt
index 28f7e2e587f..41b2721bf22 100644
--- a/ydb/core/formats/ut/CMakeLists.linux.txt
+++ b/ydb/core/formats/ut/CMakeLists.linux.txt
@@ -14,6 +14,7 @@ target_compile_options(ydb-core-formats-ut PRIVATE
)
target_include_directories(ydb-core-formats-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats
+ ${CMAKE_SOURCE_DIR}/ydb/library/arrow_clickhouse
)
target_link_libraries(ydb-core-formats-ut PUBLIC
contrib-libs-cxxsupp
diff --git a/ydb/core/formats/ut_program_step.cpp b/ydb/core/formats/ut_program_step.cpp
index fa368af5626..a3a62ab9578 100644
--- a/ydb/core/formats/ut_program_step.cpp
+++ b/ydb/core/formats/ut_program_step.cpp
@@ -23,7 +23,7 @@ size_t FilterTest(std::vector<std::shared_ptr<arrow::Array>> args, EOperation fr
ps->Assignes = {TAssign("res1", frst, {"x", "y"}), TAssign("res2", scnd, {"res1", "z"})};
ps->Filters = {"res2"};
ps->Projection = {"res1", "res2"};
- ApplyProgram(rBatch, {ps}, GetCustomExecContext());
+ UNIT_ASSERT(ApplyProgram(rBatch, {ps}, GetCustomExecContext()).ok());
UNIT_ASSERT(rBatch->ValidateFull().ok());
UNIT_ASSERT(rBatch->num_columns() == 2);
return rBatch->num_rows();
@@ -39,7 +39,7 @@ size_t FilterTestUnary(std::vector<std::shared_ptr<arrow::Array>> args, EOperati
ps->Assignes = {TAssign("res1", frst, {"x"}), TAssign("res2", scnd, {"res1", "z"})};
ps->Filters = {"res2"};
ps->Projection = {"res1", "res2"};
- ApplyProgram(rBatch, {ps}, GetCustomExecContext());
+ UNIT_ASSERT(ApplyProgram(rBatch, {ps}, GetCustomExecContext()).ok());
UNIT_ASSERT(rBatch->ValidateFull().ok());
UNIT_ASSERT(rBatch->num_columns() == 2);
return rBatch->num_rows();
@@ -173,7 +173,7 @@ Y_UNIT_TEST_SUITE(ProgramStepTest) {
ps->Assignes = {TAssign("y", 56), TAssign("res", EOperation::Add, {"x", "y"})};
ps->Filters = {"filter"};
ps->Projection = {"res", "filter"};
- ApplyProgram(rBatch, {ps}, GetCustomExecContext());
+ UNIT_ASSERT(ApplyProgram(rBatch, {ps}, GetCustomExecContext()).ok());
UNIT_ASSERT(rBatch->ValidateFull().ok());
UNIT_ASSERT(rBatch->num_columns() == 2);
UNIT_ASSERT(rBatch->num_rows() == 2);
@@ -189,7 +189,7 @@ Y_UNIT_TEST_SUITE(ProgramStepTest) {
ps->Assignes = {TAssign("y", 56), TAssign("res", EOperation::Add, {"x", "y"})};
ps->Filters = {};
ps->Projection = {"res", "filter"};
- ApplyProgram(rBatch, {ps}, GetCustomExecContext());
+ UNIT_ASSERT(ApplyProgram(rBatch, {ps}, GetCustomExecContext()).ok());
UNIT_ASSERT(rBatch->ValidateFull().ok());
UNIT_ASSERT(rBatch->num_columns() == 2);
UNIT_ASSERT(rBatch->num_rows() == 4);
diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h
index 28605b4a98c..cc7fba4b373 100644
--- a/ydb/core/tx/columnshard/columnshard__stats_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h
@@ -54,7 +54,8 @@ public:
ApplyRangePredicates(batch);
if (!ReadMetadata->Program.empty()) {
- ApplyProgram(batch, ReadMetadata->Program, NArrow::GetCustomExecContext());
+ auto status = ApplyProgram(batch, ReadMetadata->Program, NArrow::GetCustomExecContext());
+ Y_VERIFY_S(status.ok(), status.message());
}
// Leave only requested columns
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index 0b38308548a..ef849697feb 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -160,7 +160,6 @@ TVector<TCell> MakeTestCells(const TVector<TTypeId>& types, ui32 value, TVector<
cells.reserve(types.size());
for (auto& type : types) {
- // test only: 64-bit integer or string
if (type == NTypeIds::Utf8 ||
type == NTypeIds::String ||
type == NTypeIds::String4k ||
@@ -176,12 +175,19 @@ TVector<TCell> MakeTestCells(const TVector<TTypeId>& types, ui32 value, TVector<
mem.push_back("{ \"a\" = [ { \"b\" = 1; } ]; }");
const TString& str = mem.back();
cells.push_back(TCell(str.data(), str.size()));
- } else if (type == NTypeIds::Timestamp ||
- type == NTypeIds::Uint64 ||
- type == NTypeIds::Int64) {
+ } else if (type == NTypeIds::Timestamp || type == NTypeIds::Uint64 || type == NTypeIds::Int64) {
cells.push_back(TCell::Make<ui64>(value));
- } else if (type == NTypeIds::Int32) {
- cells.push_back(TCell::Make<i32>(value));
+ } else if (type == NTypeIds::Uint32 || type == NTypeIds::Int32 || type == NTypeIds::Datetime) {
+ cells.push_back(TCell::Make<ui32>(value));
+ } else if (type == NTypeIds::Uint16 || type == NTypeIds::Int16 || type == NTypeIds::Date) {
+ cells.push_back(TCell::Make<ui16>(value));
+ } else if (type == NTypeIds::Uint8 || type == NTypeIds::Int8 || type == NTypeIds::Byte ||
+ type == NTypeIds::Bool) {
+ cells.push_back(TCell::Make<ui8>(value));
+ } else if (type == NTypeIds::Float) {
+ cells.push_back(TCell::Make<float>(value));
+ } else if (type == NTypeIds::Double) {
+ cells.push_back(TCell::Make<double>(value));
} else {
UNIT_ASSERT(false);
}
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index 45fd99abfc1..2fdb5c12a99 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -132,6 +132,39 @@ struct TTestSchema {
return schema;
}
+ static auto YdbAllTypesSchema() {
+ TVector<std::pair<TString, TTypeId>> schema = {
+ { "ts", NTypeIds::Timestamp },
+
+ { "i8", NTypeIds::Int8 },
+ { "i16", NTypeIds::Int16 },
+ { "i32", NTypeIds::Int32 },
+ { "i64", NTypeIds::Int64 },
+ { "u8", NTypeIds::Uint8 },
+ { "u16", NTypeIds::Uint16 },
+ { "u32", NTypeIds::Uint32 },
+ { "u64", NTypeIds::Uint64 },
+ { "float", NTypeIds::Float },
+ { "double", NTypeIds::Double },
+
+ { "byte", NTypeIds::Byte },
+ //{ "bool", NTypeIds::Bool },
+ //{ "decimal", NTypeIds::Decimal },
+ //{ "dynum", NTypeIds::DyNumber },
+
+ { "date", NTypeIds::Date },
+ { "datetime", NTypeIds::Datetime },
+ //{ "interval", NTypeIds::Interval },
+
+ {"text", NTypeIds::Text },
+ {"bytes", NTypeIds::Bytes },
+ {"yson", NTypeIds::Yson },
+ {"json", NTypeIds::Json },
+ {"jsondoc", NTypeIds::JsonDocument }
+ };
+ return schema;
+ };
+
static NKikimrSchemeOp::TOlapColumnDescription CreateColumn(ui32 id, const TString& name, TTypeId type) {
NKikimrSchemeOp::TOlapColumnDescription col;
col.SetId(id);
@@ -160,7 +193,10 @@ struct TTestSchema {
*schema->MutableColumns()->Add() = CreateColumn(i + 1, columns[i].first, columns[i].second);
}
- for (auto& column : ExtractNames(YdbPkSchema())) {
+ auto pk = columns;
+ Y_VERIFY(pk.size() >= 4);
+ pk.resize(4);
+ for (auto& column : ExtractNames(pk)) {
schema->AddKeyColumnNames(column);
}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 23b41f81c48..d2693d0e7c8 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -515,7 +515,8 @@ TIndexedReadData::MakeResult(TVector<std::vector<std::shared_ptr<arrow::RecordBa
if (ReadMetadata->HasProgram()) {
for (auto& batch : out) {
- ApplyProgram(batch.ResultBatch, ReadMetadata->Program, NArrow::GetCustomExecContext());
+ auto status = ApplyProgram(batch.ResultBatch, ReadMetadata->Program, NArrow::GetCustomExecContext());
+ Y_VERIFY_S(status.ok(), status.message());
}
}
return out;
diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
index ad895944e2a..8daf6084479 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
@@ -84,22 +84,48 @@ bool DataHasOnly(const TVector<TString>& blobs, const TString& srtSchema, std::p
}
template <typename TArrowType>
-bool CheckIntValue(const TVector<TString>& blobs, const TString& srcSchema, const TString& colName,
- const TVector<int>& expectedVals) {
- auto schema = NArrow::DeserializeSchema(srcSchema);
- for (auto& blob : blobs) {
- auto batch = NArrow::DeserializeBatch(blob, schema);
- UNIT_ASSERT(batch);
+bool CheckTypedIntValues(const std::shared_ptr<arrow::Array>& array, const TVector<int64_t>& expected) {
+ UNIT_ASSERT(array);
+ UNIT_ASSERT_VALUES_EQUAL(array->length(), (int)expected.size());
- std::shared_ptr<arrow::Array> array = batch->GetColumnByName(colName);
- UNIT_ASSERT(array);
- auto& val = dynamic_cast<const arrow::NumericArray<TArrowType>&>(*array);
+ auto& column = dynamic_cast<const arrow::NumericArray<TArrowType>&>(*array);
- UNIT_ASSERT_VALUES_EQUAL(val.length(), (int)expectedVals.size());
- for (int i = 0; i < val.length(); ++i) {
- int value = val.Value(i);
- UNIT_ASSERT_VALUES_EQUAL(value, expectedVals[i]);
- }
+ for (int i = 0; i < column.length(); ++i) {
+ auto value = column.Value(i);
+ UNIT_ASSERT_VALUES_EQUAL(value, expected[i]);
+ }
+ return true;
+}
+
+bool CheckIntValues(const std::shared_ptr<arrow::Array>& array, const TVector<int64_t>& expected) {
+ UNIT_ASSERT(array);
+
+ switch (array->type()->id()) {
+ case arrow::Type::UINT8:
+ return CheckTypedIntValues<arrow::UInt8Type>(array, expected);
+ case arrow::Type::UINT16:
+ return CheckTypedIntValues<arrow::UInt16Type>(array, expected);
+ case arrow::Type::UINT32:
+ return CheckTypedIntValues<arrow::UInt32Type>(array, expected);
+ case arrow::Type::UINT64:
+ return CheckTypedIntValues<arrow::UInt64Type>(array, expected);
+ case arrow::Type::INT8:
+ return CheckTypedIntValues<arrow::Int8Type>(array, expected);
+ case arrow::Type::INT16:
+ return CheckTypedIntValues<arrow::Int16Type>(array, expected);
+ case arrow::Type::INT32:
+ return CheckTypedIntValues<arrow::Int32Type>(array, expected);
+ case arrow::Type::INT64:
+ return CheckTypedIntValues<arrow::Int64Type>(array, expected);
+
+ case arrow::Type::TIMESTAMP:
+ return CheckTypedIntValues<arrow::TimestampType>(array, expected);
+ case arrow::Type::DURATION:
+ return CheckTypedIntValues<arrow::DurationType>(array, expected);
+
+ default:
+ UNIT_ASSERT(false);
+ //return false;
}
return true;
}
@@ -903,100 +929,101 @@ static NKikimrSSA::TProgram MakeSelect(TAssignment::EFunction compareId = TAssig
return ssa;
}
-// SELECT some(timestamp), some(saved_at) FROM t
-//
-// FIXME:
-// NotImplemented: Function any has no kernel matching input types (array[timestamp[us]])
-// NotImplemented: Function any has no kernel matching input types (array[string])
-// NotImplemented: Function any has no kernel matching input types (array[int32])
-// NotImplemented: Function min_max has no kernel matching input types (array[timestamp[us]])
-// NotImplemented: Function min_max has no kernel matching input types (array[string])
-//
-NKikimrSSA::TProgram MakeSelectAggregates(TAggAssignment::EAggregateFunction aggId = TAggAssignment::AGG_ANY,
- std::vector<ui32> columnIds = {1, 9})
+// SELECT min(x), max(x), some(x), count(x) FROM t
+NKikimrSSA::TProgram MakeSelectAggregates(ui32 columnId)
{
NKikimrSSA::TProgram ssa;
-
- ui32 tmpColumnId = 100;
-
auto* line1 = ssa.AddCommand();
auto* groupBy = line1->MutableGroupBy();
//
auto* l1_agg1 = groupBy->AddAggregates();
- l1_agg1->MutableColumn()->SetId(tmpColumnId);
+ l1_agg1->MutableColumn()->SetId(100);
auto* l1_agg1_f = l1_agg1->MutableFunction();
- l1_agg1_f->SetId(aggId);
- l1_agg1_f->AddArguments()->SetId(columnIds[0]);
+ l1_agg1_f->SetId(TAggAssignment::AGG_MIN);
+ l1_agg1_f->AddArguments()->SetId(columnId);
//
auto* l1_agg2 = groupBy->AddAggregates();
- l1_agg2->MutableColumn()->SetId(tmpColumnId + 1);
+ l1_agg2->MutableColumn()->SetId(101);
auto* l1_agg2_f = l1_agg2->MutableFunction();
- l1_agg2_f->SetId(aggId);
- l1_agg2_f->AddArguments()->SetId(columnIds[1]);
+ l1_agg2_f->SetId(TAggAssignment::AGG_MAX);
+ l1_agg2_f->AddArguments()->SetId(columnId);
+ //
+ auto* l1_agg3 = groupBy->AddAggregates();
+ l1_agg3->MutableColumn()->SetId(102);
+ auto* l1_agg3_f = l1_agg3->MutableFunction();
+ l1_agg3_f->SetId(TAggAssignment::AGG_ANY);
+ l1_agg3_f->AddArguments()->SetId(columnId);
+ //
+ auto* l1_agg4 = groupBy->AddAggregates();
+ l1_agg4->MutableColumn()->SetId(103);
+ auto* l1_agg4_f = l1_agg4->MutableFunction();
+ l1_agg4_f->SetId(TAggAssignment::AGG_COUNT);
+ l1_agg4_f->AddArguments()->SetId(columnId);
auto* line2 = ssa.AddCommand();
auto* proj = line2->MutableProjection();
- proj->AddColumns()->SetId(tmpColumnId);
- proj->AddColumns()->SetId(tmpColumnId + 1);
+ proj->AddColumns()->SetId(100);
+ proj->AddColumns()->SetId(101);
+ proj->AddColumns()->SetId(102);
+ proj->AddColumns()->SetId(103);
return ssa;
}
-// SELECT min(level), max(level), some(level), count(timestamp) FROM t WHERE level = 1
-NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(std::vector<ui32> columnIds = {1, 5})
+// SELECT min(x), max(x), some(x), count(x) FROM t WHERE y = 1
+NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(ui32 columnId, ui32 filterColumnId)
{
NKikimrSSA::TProgram ssa;
- ui32 tmpColumnId = 100;
auto* line1 = ssa.AddCommand();
auto* l1_assign = line1->MutableAssign();
- l1_assign->MutableColumn()->SetId(tmpColumnId);
+ l1_assign->MutableColumn()->SetId(50);
l1_assign->MutableConstant()->SetInt32(1);
auto* line2 = ssa.AddCommand();
auto* l2_assign = line2->MutableAssign();
- l2_assign->MutableColumn()->SetId(tmpColumnId + 1);
+ l2_assign->MutableColumn()->SetId(51);
auto* l2_func = l2_assign->MutableFunction();
l2_func->SetId(TAssignment::FUNC_CMP_EQUAL);
- l2_func->AddArguments()->SetId(columnIds[1]);
- l2_func->AddArguments()->SetId(tmpColumnId);
+ l2_func->AddArguments()->SetId(filterColumnId);
+ l2_func->AddArguments()->SetId(50);
auto* line3 = ssa.AddCommand();
- line3->MutableFilter()->MutablePredicate()->SetId(tmpColumnId + 1);
+ line3->MutableFilter()->MutablePredicate()->SetId(51);
auto* line4 = ssa.AddCommand();
auto* groupBy = line4->MutableGroupBy();
//
auto* l4_agg1 = groupBy->AddAggregates();
- l4_agg1->MutableColumn()->SetId(tmpColumnId + 2);
+ l4_agg1->MutableColumn()->SetId(100);
auto* l4_agg1_f = l4_agg1->MutableFunction();
l4_agg1_f->SetId(TAggAssignment::AGG_MIN);
- l4_agg1_f->AddArguments()->SetId(columnIds[1]);
+ l4_agg1_f->AddArguments()->SetId(columnId);
//
auto* l4_agg2 = groupBy->AddAggregates();
- l4_agg2->MutableColumn()->SetId(tmpColumnId + 3);
+ l4_agg2->MutableColumn()->SetId(101);
auto* l4_agg2_f = l4_agg2->MutableFunction();
l4_agg2_f->SetId(TAggAssignment::AGG_MAX);
- l4_agg2_f->AddArguments()->SetId(columnIds[1]);
-#if 0
+ l4_agg2_f->AddArguments()->SetId(columnId);
+ //
auto* l4_agg3 = groupBy->AddAggregates();
- l4_agg3->MutableColumn()->SetId(tmpColumnId + 4);
+ l4_agg3->MutableColumn()->SetId(102);
auto* l4_agg3_f = l4_agg3->MutableFunction();
l4_agg3_f->SetId(TAggAssignment::AGG_ANY);
- l4_agg3_f->AddArguments()->SetId(columnIds[1]);
-#endif
+ l4_agg3_f->AddArguments()->SetId(columnId);
+ //
auto* l4_agg4 = groupBy->AddAggregates();
- l4_agg4->MutableColumn()->SetId(tmpColumnId + 5);
+ l4_agg4->MutableColumn()->SetId(103);
auto* l4_agg4_f = l4_agg4->MutableFunction();
l4_agg4_f->SetId(TAggAssignment::AGG_COUNT);
- l4_agg4_f->AddArguments()->SetId(columnIds[0]);
+ l4_agg4_f->AddArguments()->SetId(columnId);
auto* line5 = ssa.AddCommand();
auto* proj = line5->MutableProjection();
- proj->AddColumns()->SetId(tmpColumnId + 2);
- proj->AddColumns()->SetId(tmpColumnId + 3);
- //proj->AddColumns()->SetId(tmpColumnId + 4);
- proj->AddColumns()->SetId(tmpColumnId + 5);
+ proj->AddColumns()->SetId(100);
+ proj->AddColumns()->SetId(101);
+ proj->AddColumns()->SetId(102);
+ proj->AddColumns()->SetId(103);
return ssa;
}
@@ -1127,7 +1154,7 @@ void TestReadWithProgram(const TVector<std::pair<TString, TTypeId>>& ydbSchema =
}
}
-void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = TTestSchema::YdbSchema()) {
+void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = TTestSchema::YdbAllTypesSchema()) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -1144,7 +1171,7 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T
ui64 planStep = 100;
ui64 txId = 100;
- SetupSchema(runtime, sender, tableId);
+ SetupSchema(runtime, sender, tableId, ydbSchema);
{ // write some data
bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, ydbSchema));
@@ -1157,20 +1184,21 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T
// TODO: write some into index
std::vector<TString> programs;
+ THashSet<ui32> intResult;
+ THashSet<ui32> isFiltered;
+ THashSet<NScheme::TTypeId> intTypes = {
+ NTypeIds::Int8, NTypeIds::Int16, NTypeIds::Int32, NTypeIds::Int64,
+ NTypeIds::Uint8, NTypeIds::Uint16, NTypeIds::Uint32, NTypeIds::Uint64,
+ NTypeIds::Timestamp
+ };
- {
- NKikimrSSA::TProgram ssa = MakeSelectAggregates(TAggAssignment::AGG_COUNT);
- TString serialized;
- UNIT_ASSERT(ssa.SerializeToString(&serialized));
- NKikimrSSA::TOlapProgram program;
- program.SetProgram(serialized);
-
- programs.push_back("");
- UNIT_ASSERT(program.SerializeToString(&programs.back()));
- }
+ ui32 prog = 0;
+ for (ui32 i = 0; i < ydbSchema.size(); ++i, ++prog) {
+ if (intTypes.count(ydbSchema[i].second)) {
+ intResult.insert(prog);
+ }
- {
- NKikimrSSA::TProgram ssa = MakeSelectAggregates(TAggAssignment::AGG_MIN, {5, 5});
+ NKikimrSSA::TProgram ssa = MakeSelectAggregates(i + 1);
TString serialized;
UNIT_ASSERT(ssa.SerializeToString(&serialized));
NKikimrSSA::TOlapProgram program;
@@ -1180,8 +1208,13 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T
UNIT_ASSERT(program.SerializeToString(&programs.back()));
}
- {
- NKikimrSSA::TProgram ssa = MakeSelectAggregatesWithFilter();
+ for (ui32 i = 0; i < ydbSchema.size(); ++i, ++prog) {
+ isFiltered.insert(prog);
+ if (intTypes.count(ydbSchema[i].second)) {
+ intResult.insert(prog);
+ }
+
+ NKikimrSSA::TProgram ssa = MakeSelectAggregatesWithFilter(i + 1, 4);
TString serialized;
UNIT_ASSERT(ssa.SerializeToString(&serialized));
NKikimrSSA::TOlapProgram program;
@@ -1191,7 +1224,7 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T
UNIT_ASSERT(program.SerializeToString(&programs.back()));
}
- ui32 i = 0;
+ prog = 0;
for (auto& programText : programs) {
auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId);
auto& readProto = Proto(readEvent);
@@ -1220,22 +1253,30 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T
TVector<TString> readData;
readData.push_back(resRead.GetData());
+ auto& data = readData[0];
- switch (i) {
- case 2:
- UNIT_ASSERT(CheckColumns(readData[0], meta, {"102", "103", /*"104",*/ "105"}, 1));
- UNIT_ASSERT(CheckIntValue<arrow::Int32Type>(readData, schema, "102", {1}));
- UNIT_ASSERT(CheckIntValue<arrow::Int32Type>(readData, schema, "103", {1}));
- //UNIT_ASSERT(CheckIntValue<arrow::Int32Type>(readData, schema, "104", {1}));
- UNIT_ASSERT(CheckIntValue<arrow::Int64Type>(readData, schema, "105", {1}));
- break;
- default:
- UNIT_ASSERT(CheckColumns(readData[0], meta, {"100", "101"}, 1));
- break;
+ auto batch = NArrow::DeserializeBatch(data, NArrow::DeserializeSchema(schema));
+ UNIT_ASSERT(batch);
+
+ UNIT_ASSERT(CheckColumns(data, meta, {"100", "101", "102", "103"}, 1));
+
+ // min, max, any, count
+ if (intResult.count(prog)) {
+ if (isFiltered.count(prog)) {
+ UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("100"), {1}));
+ UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("101"), {1}));
+ UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("102"), {1}));
+ UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("103"), {1}));
+ } else {
+ UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("100"), {0}));
+ UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("101"), {99}));
+ //UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("102"), {0}));
+ UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("103"), {100}));
+ }
}
}
- ++i;
+ ++prog;
}
}
diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
index 5757f7909b1..55429f6cb39 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
@@ -9,7 +9,6 @@ using NWrappers::NTestHelpers::TS3Mock;
namespace {
static const TVector<std::pair<TString, TTypeId>> testYdbSchema = TTestSchema::YdbSchema();
-//static const TVector<std::pair<TString, TTypeId>> testYdbPkSchema = TTestSchema::YdbPkSchema();
std::shared_ptr<arrow::RecordBatch> UpdateColumn(std::shared_ptr<arrow::RecordBatch> batch, TString columnName, i64 seconds) {
std::string name(columnName.c_str(), columnName.size());
diff --git a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionAvg.h b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionAvg.h
index 3e8bce5fdbb..598afb54a3f 100644
--- a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionAvg.h
+++ b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionAvg.h
@@ -8,6 +8,7 @@
#include <type_traits>
#include <AggregateFunctions/IAggregateFunction.h>
+#include <AggregateFunctions/AggregateFunctionWrapper.h>
#include <AggregateFunctions/AggregateFunctionSum.h>
namespace CH
@@ -151,13 +152,13 @@ public:
: ArrowAggregateFunctionWrapper(std::move(name))
{}
- AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) override
+ AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) const override
{
return createWithSameType<AggregateFunctionAvg>(argument_types);
}
template <template <typename> typename AggFunc>
- std::shared_ptr<IAggregateFunction> createWithSameType(const DataTypes & argument_types)
+ std::shared_ptr<IAggregateFunction> createWithSameType(const DataTypes & argument_types) const
{
if (argument_types.size() != 1)
return {};
diff --git a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionCount.h b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionCount.h
index 6044df828e6..0994cadc1fd 100644
--- a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionCount.h
+++ b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionCount.h
@@ -6,6 +6,7 @@
#include "arrow_clickhouse_types.h"
#include <AggregateFunctions/IAggregateFunction.h>
+#include <AggregateFunctions/AggregateFunctionWrapper.h>
#include <Columns/ColumnsCommon.h>
#include <array>
@@ -84,7 +85,7 @@ public:
: ArrowAggregateFunctionWrapper(std::move(name))
{}
- AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) override
+ AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) const override
{
return std::make_shared<AggregateFunctionCount>(argument_types);
}
diff --git a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionMinMaxAny.h b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionMinMaxAny.h
index 5fbfb53170d..c8ce2884c70 100644
--- a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionMinMaxAny.h
+++ b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionMinMaxAny.h
@@ -5,7 +5,9 @@
#pragma once
#include "arrow_clickhouse_types.h"
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/Common/BitHelpers.h>
#include <AggregateFunctions/IAggregateFunction.h>
+#include <AggregateFunctions/AggregateFunctionWrapper.h>
namespace CH
@@ -669,7 +671,7 @@ public:
: ArrowAggregateFunctionWrapper(std::move(name))
{}
- AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) override
+ AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) const override
{
return createAggregateFunctionSingleValue<AggFunc, AggData>(argument_types);
}
diff --git a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionSum.h b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionSum.h
index 52844b95da2..9819eac3df6 100644
--- a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionSum.h
+++ b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionSum.h
@@ -9,6 +9,7 @@
#include <type_traits>
#include <AggregateFunctions/IAggregateFunction.h>
+#include <AggregateFunctions/AggregateFunctionWrapper.h>
namespace CH
{
@@ -255,13 +256,13 @@ public:
: ArrowAggregateFunctionWrapper(std::move(name))
{}
- AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) override
+ AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) const override
{
return createWithSameType<AggregateFunctionSumWithOverflow>(argument_types);
}
template <template <typename> typename AggFunc>
- std::shared_ptr<IAggregateFunction> createWithSameType(const DataTypes & argument_types)
+ std::shared_ptr<IAggregateFunction> createWithSameType(const DataTypes & argument_types) const
{
if (argument_types.size() != 1)
return {};
diff --git a/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionWrapper.h b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionWrapper.h
new file mode 100644
index 00000000000..732b07f8e3e
--- /dev/null
+++ b/ydb/library/arrow_clickhouse/AggregateFunctions/AggregateFunctionWrapper.h
@@ -0,0 +1,89 @@
+#pragma once
+#include "arrow_clickhouse_types.h"
+#include "Aggregator.h"
+#include <AggregateFunctions/IAggregateFunction.h>
+#include <DataStreams/OneBlockInputStream.h>
+#include <DataStreams/AggregatingBlockInputStream.h>
+
+namespace CH
+{
+
+class ArrowAggregateFunctionWrapper : public arrow::compute::ScalarAggregateFunction
+{
+public:
+ ArrowAggregateFunctionWrapper(std::string name)
+ : arrow::compute::ScalarAggregateFunction(std::move(name), arrow::compute::Arity::Unary(), nullptr)
+ {}
+
+ virtual AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) const = 0;
+
+ arrow::Result<arrow::Datum> Execute(
+ const std::vector<arrow::Datum>& args,
+ const arrow::compute::FunctionOptions* /*options*/,
+ arrow::compute::ExecContext* /*ctx*/) const override
+ {
+ static const std::string result_name = "res";
+ static const std::vector<std::string> arg_names = {"0", "1", "2", "3", "4", "5"};
+ if (args.size() > arg_names.size())
+ return arrow::Status::Invalid("unexpected arguments count");
+
+ std::vector<uint32_t> arg_positions;
+ arg_positions.reserve(args.size());
+
+ DataTypes types;
+ arrow::FieldVector fields;
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+
+ types.reserve(args.size());
+ fields.reserve(args.size());
+ columns.reserve(args.size());
+
+ int num_rows = 0;
+ uint32_t arg_num = 0;
+ for (auto& arg : args)
+ {
+ if (!arg.is_array())
+ return arrow::Status::Invalid("argument is not an array");
+
+ columns.push_back(arg.make_array());
+ types.push_back(columns.back()->type());
+ fields.push_back(std::make_shared<arrow::Field>(arg_names[arg_num], types.back()));
+
+ if (!arg_num)
+ num_rows = columns.back()->length();
+ else if (num_rows != columns.back()->length())
+ return arrow::Status::Invalid("different argiments length");
+
+ arg_positions.push_back(arg_num);
+ ++arg_num;
+ }
+
+ auto batch = arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(fields), num_rows, columns);
+
+ AggregateDescription description {
+ .function = getHouseFunction(types),
+ .arguments = arg_positions,
+ .column_name = result_name
+ };
+
+ auto input_stream = std::make_shared<OneBlockInputStream>(batch);
+
+ // no agg keys, final aggregate states
+ Aggregator::Params agg_params(false, input_stream->getHeader(), {}, {description}, false);
+ AggregatingBlockInputStream agg_stream(input_stream, agg_params, true);
+
+ auto result_batch = agg_stream.read();
+ if (!result_batch || result_batch->num_rows() != 1)
+ return arrow::Status::Invalid("unexpected arrgerate result");
+ if (agg_stream.read())
+ return arrow::Status::Invalid("unexpected second batch in aggregate result");
+
+ auto res_column = result_batch->GetColumnByName(result_name);
+ if (!res_column || res_column->length() != 1)
+ return arrow::Status::Invalid("no result value");
+
+ return res_column->GetScalar(0);
+ }
+};
+
+}
diff --git a/ydb/library/arrow_clickhouse/AggregateFunctions/IAggregateFunction.h b/ydb/library/arrow_clickhouse/AggregateFunctions/IAggregateFunction.h
index 81c425d2494..dc23c3a2a8f 100644
--- a/ydb/library/arrow_clickhouse/AggregateFunctions/IAggregateFunction.h
+++ b/ydb/library/arrow_clickhouse/AggregateFunctions/IAggregateFunction.h
@@ -481,15 +481,4 @@ struct AggregateFunctionProperties
bool is_order_dependent = false;
};
-
-class ArrowAggregateFunctionWrapper : public arrow::compute::ScalarAggregateFunction
-{
-public:
- ArrowAggregateFunctionWrapper(std::string name)
- : arrow::compute::ScalarAggregateFunction(std::move(name), arrow::compute::Arity::Unary(), nullptr)
- {}
-
- virtual AggregateFunctionPtr getHouseFunction(const DataTypes & argument_types) = 0;
-};
-
}
diff --git a/ydb/library/arrow_clickhouse/Aggregator.h b/ydb/library/arrow_clickhouse/Aggregator.h
index a83baf531f1..af08ba1ef76 100644
--- a/ydb/library/arrow_clickhouse/Aggregator.h
+++ b/ydb/library/arrow_clickhouse/Aggregator.h
@@ -20,17 +20,6 @@
namespace CH
{
-struct AggregateDescription
-{
- AggregateFunctionPtr function;
- Array parameters; /// Parameters of the (parametric) aggregate function.
- ColumnNumbers arguments;
- Names argument_names; /// used if no `arguments` are specified.
- String column_name; /// What name to use for a column with aggregate function values
-};
-
-using AggregateDescriptions = std::vector<AggregateDescription>;
-
/** Different data structures that can be used for aggregation
* For efficiency, the aggregation data itself is put into the pool.
* Data and pool ownership (states of aggregate functions)
diff --git a/ydb/library/arrow_clickhouse/Columns/ColumnAggregateFunction.h b/ydb/library/arrow_clickhouse/Columns/ColumnAggregateFunction.h
index cf126113230..27cbb8ecd58 100644
--- a/ydb/library/arrow_clickhouse/Columns/ColumnAggregateFunction.h
+++ b/ydb/library/arrow_clickhouse/Columns/ColumnAggregateFunction.h
@@ -144,7 +144,4 @@ private:
std::shared_ptr<arrow::UInt64Builder> builder;
};
-using AggregateColumnsData = std::vector<arrow::UInt64Builder *>;
-using AggregateColumnsConstData = std::vector<const arrow::UInt64Array *>;
-
}
diff --git a/ydb/library/arrow_clickhouse/Columns/ColumnsCommon.cpp b/ydb/library/arrow_clickhouse/Columns/ColumnsCommon.cpp
index 3dbc50b22ff..c44841f07e3 100644
--- a/ydb/library/arrow_clickhouse/Columns/ColumnsCommon.cpp
+++ b/ydb/library/arrow_clickhouse/Columns/ColumnsCommon.cpp
@@ -288,6 +288,13 @@ bool insertData(MutableColumn & column, const StringRef & value)
case arrow::Type::BINARY:
return insertString(column, value);
+ case arrow::Type::TIMESTAMP:
+ return insertNumber(column, *reinterpret_cast<const Int64 *>(value.data));
+ case arrow::Type::DURATION:
+ return insertNumber(column, *reinterpret_cast<const Int64 *>(value.data));
+ case arrow::Type::DECIMAL:
+ return insertDecimal(column, value);
+
case arrow::Type::EXTENSION: // AggregateColumn
break; // TODO
@@ -328,13 +335,23 @@ StringRef serializeValueIntoArena(const IColumn& column, size_t row, Arena & poo
case arrow::Type::FIXED_SIZE_BINARY:
{
auto str = assert_cast<const ColumnFixedString &>(column).GetView(row);
- return serializeStringIntoArena<true>(StringRef(str.data(), str.size()), pool, begin);
+ return serializeFixedStringIntoArena(StringRef(str.data(), str.size()), pool, begin);
}
case arrow::Type::STRING:
case arrow::Type::BINARY:
{
auto str = assert_cast<const ColumnBinary &>(column).GetView(row);
- return serializeStringIntoArena<false>(StringRef(str.data(), str.size()), pool, begin);
+ return serializeStringIntoArena(StringRef(str.data(), str.size()), pool, begin);
+ }
+
+ case arrow::Type::TIMESTAMP:
+ return serializeNumberIntoArena(assert_cast<const ColumnTimestamp &>(column).Value(row), pool, begin);
+ case arrow::Type::DURATION:
+ return serializeNumberIntoArena(assert_cast<const ColumnDuration &>(column).Value(row), pool, begin);
+ case arrow::Type::DECIMAL:
+ {
+ auto str = assert_cast<const ColumnDecimal &>(column).GetView(row);
+ return serializeDecimalIntoArena(StringRef(str.data(), str.size()), pool, begin);
}
case arrow::Type::EXTENSION: // AggregateColumn
@@ -381,6 +398,13 @@ const char * deserializeAndInsertFromArena(MutableColumn& column, const char * p
case arrow::Type::BINARY:
return deserializeStringFromArena(assert_cast<MutableColumnBinary &>(column), pos);
+ case arrow::Type::TIMESTAMP:
+ return deserializeNumberFromArena(assert_cast<MutableColumnTimestamp &>(column), pos);
+ case arrow::Type::DURATION:
+ return deserializeNumberFromArena(assert_cast<MutableColumnDuration &>(column), pos);
+ case arrow::Type::DECIMAL:
+ return deserializeDecimalFromArena(assert_cast<MutableColumnDecimal &>(column), pos);
+
case arrow::Type::EXTENSION: // AggregateColumn
break; // TODO
@@ -430,6 +454,13 @@ void updateHashWithValue(const IColumn& column, size_t row, SipHash & hash)
return hash.update(str.data(), str.size());
}
+ case arrow::Type::TIMESTAMP:
+ return hash.update(assert_cast<const ColumnTimestamp &>(column).Value(row));
+ case arrow::Type::DURATION:
+ return hash.update(assert_cast<const ColumnDuration &>(column).Value(row));
+ case arrow::Type::DECIMAL:
+ return hash.update(assert_cast<const ColumnDecimal &>(column).Value(row));
+
case arrow::Type::EXTENSION: // AggregateColumn
break; // TODO
@@ -475,6 +506,13 @@ MutableColumnPtr createMutableColumn(const DataTypePtr & type)
case arrow::Type::STRING:
return std::make_shared<MutableColumnString>();
+ case arrow::Type::TIMESTAMP:
+ return std::make_shared<MutableColumnTimestamp>(type, arrow::default_memory_pool());
+ case arrow::Type::DURATION:
+ return std::make_shared<MutableColumnDuration>(type, arrow::default_memory_pool());
+ case arrow::Type::DECIMAL:
+ return std::make_shared<MutableColumnDecimal>(type, arrow::default_memory_pool());
+
case arrow::Type::EXTENSION: // AggregateColumn
break; // TODO: do we really need it here?
@@ -517,6 +555,13 @@ uint32_t fixedContiguousSize(const DataTypePtr & type)
case arrow::Type::BINARY:
break;
+ case arrow::Type::TIMESTAMP:
+ return 8;
+ case arrow::Type::DURATION:
+ return 8;
+ case arrow::Type::DECIMAL:
+ return std::static_pointer_cast<DataTypeDecimal>(type)->byte_width();
+
case arrow::Type::EXTENSION: // AggregateColumn
break;
diff --git a/ydb/library/arrow_clickhouse/Columns/ColumnsCommon.h b/ydb/library/arrow_clickhouse/Columns/ColumnsCommon.h
index 5eb633d15a0..e427867e494 100644
--- a/ydb/library/arrow_clickhouse/Columns/ColumnsCommon.h
+++ b/ydb/library/arrow_clickhouse/Columns/ColumnsCommon.h
@@ -27,25 +27,29 @@ inline StringRef serializeNumberIntoArena(T value, Arena & arena, char const *&
return StringRef(pos, sizeof(T));
}
-template <bool fixed>
inline StringRef serializeStringIntoArena(const StringRef & str, Arena & arena, char const *& begin)
{
- if constexpr (fixed)
- {
- auto * pos = arena.allocContinue(str.size, begin);
- memcpy(pos, str.data, str.size);
- return StringRef(pos, str.size);
- }
- else
- {
- StringRef res;
- res.size = sizeof(str.size) + str.size;
- char * pos = arena.allocContinue(res.size, begin);
- memcpy(pos, &str.size, sizeof(str.size));
- memcpy(pos + sizeof(str.size), str.data, str.size);
- res.data = pos;
- return res;
- }
+ StringRef res;
+ res.size = sizeof(str.size) + str.size;
+ char * pos = arena.allocContinue(res.size, begin);
+ memcpy(pos, &str.size, sizeof(str.size));
+ memcpy(pos + sizeof(str.size), str.data, str.size);
+ res.data = pos;
+ return res;
+}
+
+inline StringRef serializeFixedStringIntoArena(const StringRef & str, Arena & arena, char const *& begin)
+{
+ auto * pos = arena.allocContinue(str.size, begin);
+ memcpy(pos, str.data, str.size);
+ return StringRef(pos, str.size);
+}
+
+inline StringRef serializeDecimalIntoArena(const StringRef & str, Arena & arena, char const *& begin)
+{
+ auto * pos = arena.allocContinue(str.size, begin);
+ memcpy(pos, str.data, str.size);
+ return StringRef(pos, str.size);
}
template <typename T>
@@ -85,6 +89,11 @@ inline bool insertFixedString(MutableColumn & column, const StringRef & value)
return assert_cast<MutableColumnFixedString &>(column).Append(arrow::util::string_view{value.data, value.size}).ok();
}
+inline bool insertDecimal(MutableColumn & column, const StringRef & value)
+{
+ return assert_cast<MutableColumnDecimal &>(column).Append(arrow::util::string_view{value.data, value.size}).ok();
+}
+
template <typename DataType>
inline const char * deserializeNumberFromArena(arrow::NumericBuilder<DataType> & column, const char * pos)
{
@@ -110,6 +119,12 @@ inline const char * deserializeStringFromArena(MutableColumnFixedString & column
return pos + column.byte_width();
}
+inline const char * deserializeDecimalFromArena(MutableColumnDecimal & column, const char * pos)
+{
+ column.Append(pos).ok();
+ return pos + column.byte_width();
+}
+
bool insertData(MutableColumn & column, const StringRef & value);
StringRef serializeValueIntoArena(const IColumn& column, size_t row, Arena & pool, char const *& begin);
const char * deserializeAndInsertFromArena(MutableColumn& column, const char * pos);
diff --git a/ydb/library/arrow_clickhouse/Common/Allocator.h b/ydb/library/arrow_clickhouse/Common/Allocator.h
index b9c1aa1247c..46fcc75719a 100644
--- a/ydb/library/arrow_clickhouse/Common/Allocator.h
+++ b/ydb/library/arrow_clickhouse/Common/Allocator.h
@@ -15,10 +15,12 @@
#if !defined(__APPLE__) && !defined(__FreeBSD__)
#include <malloc.h>
#endif
+#if !defined(WIN32)
+#include <sys/mman.h>
+#endif
#include <cstdlib>
#include <algorithm>
-#include <sys/mman.h>
#include <common/mremap.h>
diff --git a/ydb/library/arrow_clickhouse/arrow_clickhouse_types.h b/ydb/library/arrow_clickhouse/arrow_clickhouse_types.h
index 698503007eb..57dee49a1fe 100644
--- a/ydb/library/arrow_clickhouse/arrow_clickhouse_types.h
+++ b/ydb/library/arrow_clickhouse/arrow_clickhouse_types.h
@@ -6,6 +6,8 @@
#include <map>
#include <stdexcept>
+#include <util/system/types.h>
+
#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h>
@@ -88,6 +90,10 @@ using ColumnBinary = arrow::BinaryArray;
using ColumnString = arrow::StringArray;
using ColumnFixedString = arrow::FixedSizeBinaryArray;
+using ColumnTimestamp = arrow::TimestampArray;
+using ColumnDuration = arrow::DurationArray;
+using ColumnDecimal = arrow::DecimalArray;
+
using MutableColumnInt8 = arrow::Int8Builder;
using MutableColumnInt16 = arrow::Int16Builder;
using MutableColumnInt32 = arrow::Int32Builder;
@@ -105,6 +111,10 @@ using MutableColumnBinary = arrow::BinaryBuilder;
using MutableColumnString = arrow::StringBuilder;
using MutableColumnFixedString = arrow::FixedSizeBinaryBuilder;
+using MutableColumnTimestamp = arrow::TimestampBuilder;
+using MutableColumnDuration = arrow::DurationBuilder;
+using MutableColumnDecimal = arrow::DecimalBuilder;
+
using IDataType = arrow::DataType;
using DataTypePtr = std::shared_ptr<IDataType>;
using DataTypes = arrow::DataTypeVector;
@@ -119,8 +129,32 @@ using DataTypeUInt16 = arrow::UInt16Type;
using DataTypeUInt32 = arrow::UInt32Type;
using DataTypeUInt64 = arrow::UInt64Type;
+using DataTypeBinary = arrow::BinaryType;
+using DataTypeString = arrow::StringType;
using DataTypeFixedString = arrow::FixedSizeBinaryType;
+using DataTypeTimestamp = arrow::TimestampType;
+using DataTypeDuration = arrow::DurationType;
+using DataTypeDecimal = arrow::DecimalType;
+
+class IAggregateFunction;
+using AggregateFunctionPtr = std::shared_ptr<const IAggregateFunction>;
+
+struct AggregateDescription
+{
+ AggregateFunctionPtr function;
+ Array parameters; /// Parameters of the (parametric) aggregate function.
+ ColumnNumbers arguments;
+ Names argument_names; /// used if no `arguments` are specified.
+ String column_name; /// What name to use for a column with aggregate function values
+};
+
+using AggregateDescriptions = std::vector<AggregateDescription>;
+
+using AggregateColumnsData = std::vector<arrow::UInt64Builder *>;
+using AggregateColumnsConstData = std::vector<const arrow::UInt64Array *>;
+
+
inline Columns columnsFromHeader(const Header& schema, size_t num_rows = 0)
{
std::vector<std::shared_ptr<arrow::Array>> columns;