diff options
author | vvvv <vvvv@ydb.tech> | 2023-06-06 17:35:16 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-06-06 17:35:16 +0300 |
commit | 80cf01289fdbfba3645a843027d570645d547991 (patch) | |
tree | a9ca7df5bf9bae0c76226065d306f18bde2a75d6 | |
parent | d0be5aa18d38add1dd73a3279bc23a1d4a3ad6a6 (diff) | |
download | ydb-80cf01289fdbfba3645a843027d570645d547991.tar.gz |
Initial implementation of PG column converters
pg биндинги
```
pragma UseBlocks;
select
URL,
CounterID,
EventDate,
EventTime
from bindings.hits limit 1
"Data"=[["http://liver.ru/belgorod/page/1006.j\xD0\xBA\xD0\xB8/\xD0\xB4\xD0\xBE\xD0\xBF_\xD0\xBF\xD1\x80\xD0\xB8\xD0\xB1\xD0\xBE\xD1\x80\xD1\x8B";"225510";"2013-07-05";"2013-07-05 17:42:42"]]}]}]
```
yql биндинги
```
select
URL,
CounterID,
EventDate,
EventTime
from bindings.hits limit 1
"Data"=[["http://liver.ru/belgorod/page/1006.j\xD0\xBA\xD0\xB8/\xD0\xB4\xD0\xBE\xD0\xBF_\xD0\xBF\xD1\x80\xD0\xB8\xD0\xB1\xD0\xBE\xD1\x80\xD1\x8B";"225510";"15891";"1373046162"]]}]}]
```
15891 = 2013-07-05
1373046162 = 2013-07-05T17:42:42Z
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/arrow.cpp | 183 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/arrow.h | 8 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/interface/arrow.h | 3 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 14 | ||||
-rw-r--r-- | ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp | 6 |
5 files changed, 208 insertions, 6 deletions
diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.cpp b/ydb/library/yql/parser/pg_wrapper/arrow.cpp index e8494512235..7846d3c4a4e 100644 --- a/ydb/library/yql/parser/pg_wrapper/arrow.cpp +++ b/ydb/library/yql/parser/pg_wrapper/arrow.cpp @@ -1,8 +1,19 @@ #include "arrow.h" +#include <ydb/library/yql/parser/pg_wrapper/interface/arrow.h> #include <ydb/library/yql/parser/pg_wrapper/interface/utils.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <ydb/library/yql/minikql/arrow/arrow_util.h> #include <util/generic/singleton.h> +#include <arrow/compute/cast.h> +#include <arrow/array.h> +#include <arrow/array/builder_binary.h> + +extern "C" { +#include "utils/date.h" +#include "utils/timestamp.h" +} + namespace NYql { extern "C" { @@ -60,4 +71,176 @@ const NPg::TAggregateDesc& ResolveAggregation(const TString& name, NKikimr::NMin } } +std::shared_ptr<arrow::Array> PgConvertBool(const std::shared_ptr<arrow::Array>& value) { + const auto& data = value->data(); + size_t length = data->length; + NUdf::TFixedSizeArrayBuilder<ui64, false> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *arrow::default_memory_pool(), length); + auto input = data->GetValues<ui8>(1, 0); + builder.UnsafeReserve(length); + auto output = builder.MutableData(); + for (size_t i = 0; i < length; ++i) { + auto fullIndex = i + data->offset; + output[i] = BoolGetDatum(arrow::BitUtil::GetBit(input, fullIndex)); + } + + auto dataBuffer = builder.Build(true).array()->buffers[1]; + return arrow::MakeArray(arrow::ArrayData::Make(arrow::uint64(), length, { data->buffers[0], dataBuffer })); +} + +template <typename T, typename F> +std::shared_ptr<arrow::Array> PgConvertFixed(const std::shared_ptr<arrow::Array>& value, const F& f) { + const auto& data = value->data(); + size_t length = data->length; + NUdf::TFixedSizeArrayBuilder<ui64, false> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *arrow::default_memory_pool(), length); + auto input = data->GetValues<T>(1); + builder.UnsafeReserve(length); + auto output = builder.MutableData(); + for (size_t i = 0; i < length; ++i) { + output[i] = f(input[i]); + } + + auto dataBuffer = builder.Build(true).array()->buffers[1]; + return arrow::MakeArray(arrow::ArrayData::Make(arrow::uint64(), length, { data->buffers[0], dataBuffer })); +} + +template <bool IsCString> +std::shared_ptr<arrow::Array> PgConvertString(const std::shared_ptr<arrow::Array>& value) { + const auto& data = value->data(); + size_t length = data->length; + arrow::BinaryBuilder builder; + ARROW_OK(builder.Reserve(length)); + auto inputDataSize = arrow::BinaryArray(data).total_values_length(); + ARROW_OK(builder.ReserveData(inputDataSize + length * (sizeof(void*) + (IsCString ? 1 : VARHDRSZ)))); + NUdf::TStringBlockReader<arrow::BinaryType, true> reader; + std::vector<char> tmp; + for (size_t i = 0; i < length; ++i) { + auto item = reader.GetItem(*data, i); + if (!item) { + builder.AppendNull(); + continue; + } + + auto originalLen = item.AsStringRef().Size(); + ui32 len; + if constexpr (IsCString) { + len = sizeof(void*) + 1 + originalLen; + } else { + len = sizeof(void*) + VARHDRSZ + originalLen; + } + + if (Y_UNLIKELY(len < originalLen)) { + ythrow yexception() << "Too long string"; + } + + if (tmp.capacity() < len) { + tmp.reserve(Max<ui64>(len, tmp.capacity() * 2)); + } + + tmp.resize(len); + NUdf::ZeroMemoryContext(tmp.data() + sizeof(void*)); + if constexpr (IsCString) { + memcpy(tmp.data() + sizeof(void*), item.AsStringRef().Data(), originalLen); + tmp[len - 1] = 0; + } else { + memcpy(tmp.data() + sizeof(void*) + VARHDRSZ, item.AsStringRef().Data(), originalLen); + UpdateCleanVarSize((text*)(tmp.data() + sizeof(void*)), originalLen); + } + + ARROW_OK(builder.Append(tmp.data(), len)); + } + + std::shared_ptr<arrow::BinaryArray> ret; + ARROW_OK(builder.Finish(&ret)); + return ret; +} + +template <typename T, typename F> +TColumnConverter BuildPgFixedColumnConverter(const std::shared_ptr<arrow::DataType>& originalType, const F& f) { + auto primaryType = NKikimr::NMiniKQL::GetPrimitiveDataType<T>(); + if (!originalType->Equals(*primaryType) && !arrow::compute::CanCast(*originalType, *primaryType)) { + return {}; + } + + return [primaryType, originalType, f](const std::shared_ptr<arrow::Array>& value) { + auto res = originalType->Equals(*primaryType) ? value : ARROW_RESULT(arrow::compute::Cast(*value, primaryType)); + return PgConvertFixed<T, F>(res, f); + }; +} + +Datum MakePgDateFromUint16(ui16 value) { + return DatumGetDateADT(UNIX_EPOCH_JDATE - POSTGRES_EPOCH_JDATE + value); +} + +Datum MakePgTimestampFromInt64(i64 value) { + return DatumGetTimestamp(USECS_PER_SEC * ((UNIX_EPOCH_JDATE - POSTGRES_EPOCH_JDATE) * SECS_PER_DAY + value)); +} + +TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>& originalType, NKikimr::NMiniKQL::TPgType* targetType) { + switch (targetType->GetTypeId()) { + case BOOLOID: { + auto primaryType = arrow::boolean(); + if (!originalType->Equals(*primaryType) && !arrow::compute::CanCast(*originalType, *primaryType)) { + return {}; + } + + return [primaryType, originalType](const std::shared_ptr<arrow::Array>& value) { + auto res = originalType->Equals(*primaryType) ? value : ARROW_RESULT(arrow::compute::Cast(*value, primaryType)); + return PgConvertBool(res); + }; + } + case INT2OID: { + return BuildPgFixedColumnConverter<i16>(originalType, [](auto value){ return Int16GetDatum(value); }); + } + case INT4OID: { + return BuildPgFixedColumnConverter<i32>(originalType, [](auto value){ return Int32GetDatum(value); }); + } + case INT8OID: { + return BuildPgFixedColumnConverter<i64>(originalType, [](auto value){ return Int64GetDatum(value); }); + } + case FLOAT4OID: { + return BuildPgFixedColumnConverter<float>(originalType, [](auto value){ return Float4GetDatum(value); }); + } + case FLOAT8OID: { + return BuildPgFixedColumnConverter<double>(originalType, [](auto value){ return Float8GetDatum(value); }); + } + case BYTEAOID: + case VARCHAROID: + case TEXTOID: + case CSTRINGOID: { + auto primaryType = (targetType->GetTypeId() == BYTEAOID) ? arrow::binary() : arrow::utf8(); + if (!arrow::compute::CanCast(*originalType, *primaryType)) { + return {}; + } + + return [primaryType, originalType, isCString = targetType->GetTypeId() == CSTRINGOID](const std::shared_ptr<arrow::Array>& value) { + auto res = originalType->Equals(*primaryType) ? value : ARROW_RESULT(arrow::compute::Cast(*value, primaryType)); + if (isCString) { + return PgConvertString<true>(res); + } else { + return PgConvertString<false>(res); + } + }; + } + case DATEOID: { + if (originalType->Equals(arrow::uint16())) { + return [](const std::shared_ptr<arrow::Array>& value) { + return PgConvertFixed<ui16>(value, [](auto value){ return MakePgDateFromUint16(value); }); + }; + } else { + return {}; + } + } + case TIMESTAMPOID: { + if (originalType->Equals(arrow::int64())) { + return [](const std::shared_ptr<arrow::Array>& value) { + return PgConvertFixed<i64>(value, [](auto value){ return MakePgTimestampFromInt64(value); }); + }; + } else { + return {}; + } + } + } + return {}; +} + } diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.h b/ydb/library/yql/parser/pg_wrapper/arrow.h index 72f37124710..aaa8fe082e7 100644 --- a/ydb/library/yql/parser/pg_wrapper/arrow.h +++ b/ydb/library/yql/parser/pg_wrapper/arrow.h @@ -286,7 +286,7 @@ struct TGenericExec { NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); *res = Dispatch2<true, true>(batch, length, state, builder); } else { - NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); + NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::binary(), *ctx->memory_pool(), length); *res = Dispatch2<true, true>(batch, length, state, builder); } } else { @@ -294,7 +294,7 @@ struct TGenericExec { NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); *res = Dispatch2<true, false>(batch, length, state, builder); } else { - NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); + NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::binary(), *ctx->memory_pool(), length); *res = Dispatch2<true, false>(batch, length, state, builder); } } @@ -304,7 +304,7 @@ struct TGenericExec { NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); *res = Dispatch2<false, true>(batch, length, state, builder); } else { - NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); + NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::binary(), *ctx->memory_pool(), length); *res = Dispatch2<false, true>(batch, length, state, builder); } } else { @@ -312,7 +312,7 @@ struct TGenericExec { NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); *res = Dispatch2<false, false>(batch, length, state, builder); } else { - NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); + NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::binary(), *ctx->memory_pool(), length); *res = Dispatch2<false, false>(batch, length, state, builder); } } diff --git a/ydb/library/yql/parser/pg_wrapper/interface/arrow.h b/ydb/library/yql/parser/pg_wrapper/interface/arrow.h index b16329c2ced..2c42f33992e 100644 --- a/ydb/library/yql/parser/pg_wrapper/interface/arrow.h +++ b/ydb/library/yql/parser/pg_wrapper/interface/arrow.h @@ -6,6 +6,9 @@ namespace NYql { arrow::Datum MakePgScalar(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, arrow::MemoryPool& pool); +using TColumnConverter = std::function<std::shared_ptr<arrow::Array>(const std::shared_ptr<arrow::Array>&)>; +TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>& originalType, NKikimr::NMiniKQL::TPgType* targetType); + } // NYql namespace NKikimr { diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index ad9bcdca985..48a24c7091f 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -54,6 +54,7 @@ #include <ydb/library/yql/public/udf/arrow/block_reader.h> #include <ydb/library/yql/public/udf/arrow/util.h> #include <ydb/library/yql/utils/yql_panic.h> +#include <ydb/library/yql/parser/pg_wrapper/interface/arrow.h> #include <ydb/library/yql/providers/s3/common/util.h> #include <ydb/library/yql/providers/s3/compressors/factory.h> @@ -1087,8 +1088,6 @@ void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSyste inflightCounter); } -using TColumnConverter = std::function<std::shared_ptr<arrow::Array>(const std::shared_ptr<arrow::Array>&)>; - template <bool isOptional> std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value) { ::NYql::NUdf::TFixedSizeArrayBuilder<ui16, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length()); @@ -1114,6 +1113,17 @@ std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow:: } TColumnConverter BuildColumnConverter(const std::string& columnName, const std::shared_ptr<arrow::DataType>& originalType, const std::shared_ptr<arrow::DataType>& targetType, TType* yqlType) { + if (yqlType->IsPg()) { + auto pgType = AS_TYPE(TPgType, yqlType); + auto conv = BuildPgColumnConverter(originalType, pgType); + if (!conv) { + ythrow yexception() << "Arrow type: " << originalType->ToString() << + " of field: " << columnName << " isn't compatible to PG type: " << NPg::LookupType(pgType->GetTypeId()).Name; + } + + return conv; + } + if (originalType->id() == arrow::Type::DATE32) { // TODO: support more than 1 optional level bool isOptional = false; diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp index c29c58c0c8b..d493c1bcaea 100644 --- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp +++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp @@ -240,6 +240,12 @@ arrow::Datum MakePgScalar(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf: return arrow::Datum(); } +TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>& originalType, NKikimr::NMiniKQL::TPgType* targetType) { + Y_UNUSED(originalType); + Y_UNUSED(targetType); + return {}; +} + TMaybe<ui32> ConvertToPgType(NKikimr::NUdf::EDataSlot slot) { Y_UNUSED(slot); return Nothing(); |