aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-06-06 17:35:16 +0300
committervvvv <vvvv@ydb.tech>2023-06-06 17:35:16 +0300
commit80cf01289fdbfba3645a843027d570645d547991 (patch)
treea9ca7df5bf9bae0c76226065d306f18bde2a75d6
parentd0be5aa18d38add1dd73a3279bc23a1d4a3ad6a6 (diff)
downloadydb-80cf01289fdbfba3645a843027d570645d547991.tar.gz
Initial implementation of PG column converters
pg биндинги ``` pragma UseBlocks; select URL, CounterID, EventDate, EventTime from bindings.hits limit 1 "Data"=[["http://liver.ru/belgorod/page/1006.j\xD0\xBA\xD0\xB8/\xD0\xB4\xD0\xBE\xD0\xBF_\xD0\xBF\xD1\x80\xD0\xB8\xD0\xB1\xD0\xBE\xD1\x80\xD1\x8B";"225510";"2013-07-05";"2013-07-05 17:42:42"]]}]}] ``` yql биндинги ``` select URL, CounterID, EventDate, EventTime from bindings.hits limit 1 "Data"=[["http://liver.ru/belgorod/page/1006.j\xD0\xBA\xD0\xB8/\xD0\xB4\xD0\xBE\xD0\xBF_\xD0\xBF\xD1\x80\xD0\xB8\xD0\xB1\xD0\xBE\xD1\x80\xD1\x8B";"225510";"15891";"1373046162"]]}]}] ``` 15891 = 2013-07-05 1373046162 = 2013-07-05T17:42:42Z
-rw-r--r--ydb/library/yql/parser/pg_wrapper/arrow.cpp183
-rw-r--r--ydb/library/yql/parser/pg_wrapper/arrow.h8
-rw-r--r--ydb/library/yql/parser/pg_wrapper/interface/arrow.h3
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp14
-rw-r--r--ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp6
5 files changed, 208 insertions, 6 deletions
diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.cpp b/ydb/library/yql/parser/pg_wrapper/arrow.cpp
index e8494512235..7846d3c4a4e 100644
--- a/ydb/library/yql/parser/pg_wrapper/arrow.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/arrow.cpp
@@ -1,8 +1,19 @@
#include "arrow.h"
+#include <ydb/library/yql/parser/pg_wrapper/interface/arrow.h>
#include <ydb/library/yql/parser/pg_wrapper/interface/utils.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <ydb/library/yql/minikql/arrow/arrow_util.h>
#include <util/generic/singleton.h>
+#include <arrow/compute/cast.h>
+#include <arrow/array.h>
+#include <arrow/array/builder_binary.h>
+
+extern "C" {
+#include "utils/date.h"
+#include "utils/timestamp.h"
+}
+
namespace NYql {
extern "C" {
@@ -60,4 +71,176 @@ const NPg::TAggregateDesc& ResolveAggregation(const TString& name, NKikimr::NMin
}
}
+std::shared_ptr<arrow::Array> PgConvertBool(const std::shared_ptr<arrow::Array>& value) {
+ const auto& data = value->data();
+ size_t length = data->length;
+ NUdf::TFixedSizeArrayBuilder<ui64, false> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *arrow::default_memory_pool(), length);
+ auto input = data->GetValues<ui8>(1, 0);
+ builder.UnsafeReserve(length);
+ auto output = builder.MutableData();
+ for (size_t i = 0; i < length; ++i) {
+ auto fullIndex = i + data->offset;
+ output[i] = BoolGetDatum(arrow::BitUtil::GetBit(input, fullIndex));
+ }
+
+ auto dataBuffer = builder.Build(true).array()->buffers[1];
+ return arrow::MakeArray(arrow::ArrayData::Make(arrow::uint64(), length, { data->buffers[0], dataBuffer }));
+}
+
+template <typename T, typename F>
+std::shared_ptr<arrow::Array> PgConvertFixed(const std::shared_ptr<arrow::Array>& value, const F& f) {
+ const auto& data = value->data();
+ size_t length = data->length;
+ NUdf::TFixedSizeArrayBuilder<ui64, false> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *arrow::default_memory_pool(), length);
+ auto input = data->GetValues<T>(1);
+ builder.UnsafeReserve(length);
+ auto output = builder.MutableData();
+ for (size_t i = 0; i < length; ++i) {
+ output[i] = f(input[i]);
+ }
+
+ auto dataBuffer = builder.Build(true).array()->buffers[1];
+ return arrow::MakeArray(arrow::ArrayData::Make(arrow::uint64(), length, { data->buffers[0], dataBuffer }));
+}
+
+template <bool IsCString>
+std::shared_ptr<arrow::Array> PgConvertString(const std::shared_ptr<arrow::Array>& value) {
+ const auto& data = value->data();
+ size_t length = data->length;
+ arrow::BinaryBuilder builder;
+ ARROW_OK(builder.Reserve(length));
+ auto inputDataSize = arrow::BinaryArray(data).total_values_length();
+ ARROW_OK(builder.ReserveData(inputDataSize + length * (sizeof(void*) + (IsCString ? 1 : VARHDRSZ))));
+ NUdf::TStringBlockReader<arrow::BinaryType, true> reader;
+ std::vector<char> tmp;
+ for (size_t i = 0; i < length; ++i) {
+ auto item = reader.GetItem(*data, i);
+ if (!item) {
+ builder.AppendNull();
+ continue;
+ }
+
+ auto originalLen = item.AsStringRef().Size();
+ ui32 len;
+ if constexpr (IsCString) {
+ len = sizeof(void*) + 1 + originalLen;
+ } else {
+ len = sizeof(void*) + VARHDRSZ + originalLen;
+ }
+
+ if (Y_UNLIKELY(len < originalLen)) {
+ ythrow yexception() << "Too long string";
+ }
+
+ if (tmp.capacity() < len) {
+ tmp.reserve(Max<ui64>(len, tmp.capacity() * 2));
+ }
+
+ tmp.resize(len);
+ NUdf::ZeroMemoryContext(tmp.data() + sizeof(void*));
+ if constexpr (IsCString) {
+ memcpy(tmp.data() + sizeof(void*), item.AsStringRef().Data(), originalLen);
+ tmp[len - 1] = 0;
+ } else {
+ memcpy(tmp.data() + sizeof(void*) + VARHDRSZ, item.AsStringRef().Data(), originalLen);
+ UpdateCleanVarSize((text*)(tmp.data() + sizeof(void*)), originalLen);
+ }
+
+ ARROW_OK(builder.Append(tmp.data(), len));
+ }
+
+ std::shared_ptr<arrow::BinaryArray> ret;
+ ARROW_OK(builder.Finish(&ret));
+ return ret;
+}
+
+template <typename T, typename F>
+TColumnConverter BuildPgFixedColumnConverter(const std::shared_ptr<arrow::DataType>& originalType, const F& f) {
+ auto primaryType = NKikimr::NMiniKQL::GetPrimitiveDataType<T>();
+ if (!originalType->Equals(*primaryType) && !arrow::compute::CanCast(*originalType, *primaryType)) {
+ return {};
+ }
+
+ return [primaryType, originalType, f](const std::shared_ptr<arrow::Array>& value) {
+ auto res = originalType->Equals(*primaryType) ? value : ARROW_RESULT(arrow::compute::Cast(*value, primaryType));
+ return PgConvertFixed<T, F>(res, f);
+ };
+}
+
+Datum MakePgDateFromUint16(ui16 value) {
+ return DatumGetDateADT(UNIX_EPOCH_JDATE - POSTGRES_EPOCH_JDATE + value);
+}
+
+Datum MakePgTimestampFromInt64(i64 value) {
+ return DatumGetTimestamp(USECS_PER_SEC * ((UNIX_EPOCH_JDATE - POSTGRES_EPOCH_JDATE) * SECS_PER_DAY + value));
+}
+
+TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>& originalType, NKikimr::NMiniKQL::TPgType* targetType) {
+ switch (targetType->GetTypeId()) {
+ case BOOLOID: {
+ auto primaryType = arrow::boolean();
+ if (!originalType->Equals(*primaryType) && !arrow::compute::CanCast(*originalType, *primaryType)) {
+ return {};
+ }
+
+ return [primaryType, originalType](const std::shared_ptr<arrow::Array>& value) {
+ auto res = originalType->Equals(*primaryType) ? value : ARROW_RESULT(arrow::compute::Cast(*value, primaryType));
+ return PgConvertBool(res);
+ };
+ }
+ case INT2OID: {
+ return BuildPgFixedColumnConverter<i16>(originalType, [](auto value){ return Int16GetDatum(value); });
+ }
+ case INT4OID: {
+ return BuildPgFixedColumnConverter<i32>(originalType, [](auto value){ return Int32GetDatum(value); });
+ }
+ case INT8OID: {
+ return BuildPgFixedColumnConverter<i64>(originalType, [](auto value){ return Int64GetDatum(value); });
+ }
+ case FLOAT4OID: {
+ return BuildPgFixedColumnConverter<float>(originalType, [](auto value){ return Float4GetDatum(value); });
+ }
+ case FLOAT8OID: {
+ return BuildPgFixedColumnConverter<double>(originalType, [](auto value){ return Float8GetDatum(value); });
+ }
+ case BYTEAOID:
+ case VARCHAROID:
+ case TEXTOID:
+ case CSTRINGOID: {
+ auto primaryType = (targetType->GetTypeId() == BYTEAOID) ? arrow::binary() : arrow::utf8();
+ if (!arrow::compute::CanCast(*originalType, *primaryType)) {
+ return {};
+ }
+
+ return [primaryType, originalType, isCString = targetType->GetTypeId() == CSTRINGOID](const std::shared_ptr<arrow::Array>& value) {
+ auto res = originalType->Equals(*primaryType) ? value : ARROW_RESULT(arrow::compute::Cast(*value, primaryType));
+ if (isCString) {
+ return PgConvertString<true>(res);
+ } else {
+ return PgConvertString<false>(res);
+ }
+ };
+ }
+ case DATEOID: {
+ if (originalType->Equals(arrow::uint16())) {
+ return [](const std::shared_ptr<arrow::Array>& value) {
+ return PgConvertFixed<ui16>(value, [](auto value){ return MakePgDateFromUint16(value); });
+ };
+ } else {
+ return {};
+ }
+ }
+ case TIMESTAMPOID: {
+ if (originalType->Equals(arrow::int64())) {
+ return [](const std::shared_ptr<arrow::Array>& value) {
+ return PgConvertFixed<i64>(value, [](auto value){ return MakePgTimestampFromInt64(value); });
+ };
+ } else {
+ return {};
+ }
+ }
+ }
+ return {};
+}
+
}
diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.h b/ydb/library/yql/parser/pg_wrapper/arrow.h
index 72f37124710..aaa8fe082e7 100644
--- a/ydb/library/yql/parser/pg_wrapper/arrow.h
+++ b/ydb/library/yql/parser/pg_wrapper/arrow.h
@@ -286,7 +286,7 @@ struct TGenericExec {
NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
*res = Dispatch2<true, true>(batch, length, state, builder);
} else {
- NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
+ NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::binary(), *ctx->memory_pool(), length);
*res = Dispatch2<true, true>(batch, length, state, builder);
}
} else {
@@ -294,7 +294,7 @@ struct TGenericExec {
NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
*res = Dispatch2<true, false>(batch, length, state, builder);
} else {
- NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
+ NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::binary(), *ctx->memory_pool(), length);
*res = Dispatch2<true, false>(batch, length, state, builder);
}
}
@@ -304,7 +304,7 @@ struct TGenericExec {
NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
*res = Dispatch2<false, true>(batch, length, state, builder);
} else {
- NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
+ NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::binary(), *ctx->memory_pool(), length);
*res = Dispatch2<false, true>(batch, length, state, builder);
}
} else {
@@ -312,7 +312,7 @@ struct TGenericExec {
NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
*res = Dispatch2<false, false>(batch, length, state, builder);
} else {
- NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
+ NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::binary(), *ctx->memory_pool(), length);
*res = Dispatch2<false, false>(batch, length, state, builder);
}
}
diff --git a/ydb/library/yql/parser/pg_wrapper/interface/arrow.h b/ydb/library/yql/parser/pg_wrapper/interface/arrow.h
index b16329c2ced..2c42f33992e 100644
--- a/ydb/library/yql/parser/pg_wrapper/interface/arrow.h
+++ b/ydb/library/yql/parser/pg_wrapper/interface/arrow.h
@@ -6,6 +6,9 @@ namespace NYql {
arrow::Datum MakePgScalar(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, arrow::MemoryPool& pool);
+using TColumnConverter = std::function<std::shared_ptr<arrow::Array>(const std::shared_ptr<arrow::Array>&)>;
+TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>& originalType, NKikimr::NMiniKQL::TPgType* targetType);
+
} // NYql
namespace NKikimr {
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index ad9bcdca985..48a24c7091f 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -54,6 +54,7 @@
#include <ydb/library/yql/public/udf/arrow/block_reader.h>
#include <ydb/library/yql/public/udf/arrow/util.h>
#include <ydb/library/yql/utils/yql_panic.h>
+#include <ydb/library/yql/parser/pg_wrapper/interface/arrow.h>
#include <ydb/library/yql/providers/s3/common/util.h>
#include <ydb/library/yql/providers/s3/compressors/factory.h>
@@ -1087,8 +1088,6 @@ void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSyste
inflightCounter);
}
-using TColumnConverter = std::function<std::shared_ptr<arrow::Array>(const std::shared_ptr<arrow::Array>&)>;
-
template <bool isOptional>
std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value) {
::NYql::NUdf::TFixedSizeArrayBuilder<ui16, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
@@ -1114,6 +1113,17 @@ std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::
}
TColumnConverter BuildColumnConverter(const std::string& columnName, const std::shared_ptr<arrow::DataType>& originalType, const std::shared_ptr<arrow::DataType>& targetType, TType* yqlType) {
+ if (yqlType->IsPg()) {
+ auto pgType = AS_TYPE(TPgType, yqlType);
+ auto conv = BuildPgColumnConverter(originalType, pgType);
+ if (!conv) {
+ ythrow yexception() << "Arrow type: " << originalType->ToString() <<
+ " of field: " << columnName << " isn't compatible to PG type: " << NPg::LookupType(pgType->GetTypeId()).Name;
+ }
+
+ return conv;
+ }
+
if (originalType->id() == arrow::Type::DATE32) {
// TODO: support more than 1 optional level
bool isOptional = false;
diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
index c29c58c0c8b..d493c1bcaea 100644
--- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
+++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
@@ -240,6 +240,12 @@ arrow::Datum MakePgScalar(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf:
return arrow::Datum();
}
+TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>& originalType, NKikimr::NMiniKQL::TPgType* targetType) {
+ Y_UNUSED(originalType);
+ Y_UNUSED(targetType);
+ return {};
+}
+
TMaybe<ui32> ConvertToPgType(NKikimr::NUdf::EDataSlot slot) {
Y_UNUSED(slot);
return Nothing();