diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-03-15 18:11:43 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-03-15 18:11:43 +0300 |
commit | 269126dcced1cc8b53eb4398b4a33e5142f10290 (patch) | |
tree | 9cd7b8b2124d173ce9cba9c18d62902a3ed341e3 | |
parent | 8e75ef6ede12714180dcf561d8d51988cfb538c1 (diff) | |
download | ydb-269126dcced1cc8b53eb4398b4a33e5142f10290.tar.gz |
Utf8 possibility for JsonDocument type providing
-rw-r--r-- | ydb/core/formats/arrow_helpers.cpp | 35 | ||||
-rw-r--r-- | ydb/core/formats/arrow_helpers.h | 10 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_common_impl.h | 9 |
3 files changed, 39 insertions, 15 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index 7ff35617a5..fd8a2c8ba8 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -15,6 +15,7 @@ #include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/type_traits.h> #include <library/cpp/containers/stack_vector/stack_vec.h> +#include <library/cpp/actors/core/log.h> #include <memory> #define Y_VERIFY_OK(status) Y_VERIFY(status.ok(), "%s", status.ToString().c_str()) @@ -1079,14 +1080,16 @@ static bool ConvertData(TCell& cell, const NScheme::TTypeInfo& colType, TMemoryP return true; } -static std::shared_ptr<arrow::Array> ConvertColumn(const std::shared_ptr<arrow::Array>& column, +static bool ConvertColumn(std::shared_ptr<arrow::Array>& column, std::shared_ptr<arrow::Field>& field, NScheme::TTypeInfo colType) { if (colType.GetTypeId() == NScheme::NTypeIds::Decimal) { - return {}; + return false; } - if (column->type()->id() != arrow::Type::BINARY) { - return {}; + if (colType.GetTypeId() == NScheme::NTypeIds::JsonDocument && (column->type()->id() == arrow::Type::BINARY || column->type()->id() == arrow::Type::STRING)) { + + } else if (column->type()->id() != arrow::Type::BINARY) { + return false; } auto& binaryArray = static_cast<arrow::BinaryArray&>(*column); @@ -1100,7 +1103,7 @@ static std::shared_ptr<arrow::Array> ConvertColumn(const std::shared_ptr<arrow:: auto value = binaryArray.Value(i); const auto dyNumber = NDyNumber::ParseDyNumberString(TStringBuf(value.data(), value.size())); if (!dyNumber.Defined() || !builder.Append((*dyNumber).data(), (*dyNumber).size()).ok()) { - return {}; + return false; } } } @@ -1109,7 +1112,8 @@ static std::shared_ptr<arrow::Array> ConvertColumn(const std::shared_ptr<arrow:: auto value = binaryArray.Value(i); const auto binaryJson = NBinaryJson::SerializeToBinaryJson(TStringBuf(value.data(), value.size())); if (!binaryJson.Defined() || !builder.Append(binaryJson->Data(), binaryJson->Size()).ok()) { - return {}; + ALS_ERROR(0) << "NOT PARSED JSON: " << TStringBuf(value.data(), value.size()); + return false; } } } @@ -1119,26 +1123,33 @@ static std::shared_ptr<arrow::Array> ConvertColumn(const std::shared_ptr<arrow:: std::shared_ptr<arrow::BinaryArray> result; if (!builder.Finish(&result).ok()) { - return {}; + return false; + } + + column = result; + if (colType.GetTypeId() == NScheme::NTypeIds::JsonDocument && field->type()->id() == arrow::Type::STRING) { + field = std::make_shared<arrow::Field>(field->name(), std::make_shared<arrow::BinaryType>()); } - return result; + + return true; } std::shared_ptr<arrow::RecordBatch> ConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert) { std::vector<std::shared_ptr<arrow::Array>> columns = batch->columns(); + std::vector<std::shared_ptr<arrow::Field>> fields = batch->schema()->fields(); + Y_VERIFY(columns.size() == fields.size()); for (i32 i = 0; i < batch->num_columns(); ++i) { auto& colName = batch->column_name(i); auto it = columnsToConvert.find(TString(colName.data(), colName.size())); if (it != columnsToConvert.end()) { - columns[i] = ConvertColumn(columns[i], it->second); - if (!columns[i]) { + if (!ConvertColumn(columns[i], fields[i], it->second)) { return {}; } } } - return arrow::RecordBatch::Make(batch->schema(), batch->num_rows(), columns); + return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(fields), batch->num_rows(), columns); } static std::shared_ptr<arrow::Array> InplaceConvertColumn(const std::shared_ptr<arrow::Array>& column, @@ -1154,7 +1165,7 @@ static std::shared_ptr<arrow::Array> InplaceConvertColumn(const std::shared_ptr< Y_VERIFY(arrow::bit_width(column->type()->id()) == 32); return std::make_shared<arrow::Date32Array>(column->data()); } - default: + default: return {}; } } diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h index cfd6c8c4a1..b3cff21378 100644 --- a/ydb/core/formats/arrow_helpers.h +++ b/ydb/core/formats/arrow_helpers.h @@ -200,6 +200,16 @@ public: return false; } + static bool NeedConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType) { + switch (expectedType.GetTypeId()) { + case NScheme::NTypeIds::JsonDocument: + return typeInRequest.GetTypeId() == NScheme::NTypeIds::Utf8; + default: + break; + } + return false; + } + TArrowToYdbConverter(const TVector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema, IRowWriter& rowWriter) : YdbSchema(ydbSchema) , RowWriter(rowWriter) diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 6f508342f7..30acfe2827 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -280,14 +280,17 @@ private: static bool SameDstType(NScheme::TTypeInfo type1, NScheme::TTypeInfo type2, bool allowConvert) { bool res = (type1 == type2); if (!res && allowConvert) { - res = NArrow::GetArrowType(type1)->id() == NArrow::GetArrowType(type2)->id(); + res = (NArrow::GetArrowType(type1)->id() == NArrow::GetArrowType(type2)->id()); } return res; } static bool SameOrConvertableDstType(NScheme::TTypeInfo type1, NScheme::TTypeInfo type2, bool allowConvert) { - bool ok = SameDstType(type1, type2, allowConvert); - return ok || NArrow::TArrowToYdbConverter::NeedInplaceConversion(type1, type2); + bool ok = SameDstType(type1, type2, allowConvert) || NArrow::TArrowToYdbConverter::NeedInplaceConversion(type1, type2); + if (!ok && allowConvert) { + ok = NArrow::TArrowToYdbConverter::NeedConversion(type1, type2); + } + return ok; } bool BuildSchema(const NActors::TActorContext& ctx, TString& errorMessage, bool makeYqbSchema) { |