aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-03-15 18:11:43 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-03-15 18:11:43 +0300
commit269126dcced1cc8b53eb4398b4a33e5142f10290 (patch)
tree9cd7b8b2124d173ce9cba9c18d62902a3ed341e3
parent8e75ef6ede12714180dcf561d8d51988cfb538c1 (diff)
downloadydb-269126dcced1cc8b53eb4398b4a33e5142f10290.tar.gz
Utf8 possibility for JsonDocument type providing
-rw-r--r--ydb/core/formats/arrow_helpers.cpp35
-rw-r--r--ydb/core/formats/arrow_helpers.h10
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h9
3 files changed, 39 insertions, 15 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index 7ff35617a5..fd8a2c8ba8 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -15,6 +15,7 @@
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/type_traits.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
+#include <library/cpp/actors/core/log.h>
#include <memory>
#define Y_VERIFY_OK(status) Y_VERIFY(status.ok(), "%s", status.ToString().c_str())
@@ -1079,14 +1080,16 @@ static bool ConvertData(TCell& cell, const NScheme::TTypeInfo& colType, TMemoryP
return true;
}
-static std::shared_ptr<arrow::Array> ConvertColumn(const std::shared_ptr<arrow::Array>& column,
+static bool ConvertColumn(std::shared_ptr<arrow::Array>& column, std::shared_ptr<arrow::Field>& field,
NScheme::TTypeInfo colType) {
if (colType.GetTypeId() == NScheme::NTypeIds::Decimal) {
- return {};
+ return false;
}
- if (column->type()->id() != arrow::Type::BINARY) {
- return {};
+ if (colType.GetTypeId() == NScheme::NTypeIds::JsonDocument && (column->type()->id() == arrow::Type::BINARY || column->type()->id() == arrow::Type::STRING)) {
+
+ } else if (column->type()->id() != arrow::Type::BINARY) {
+ return false;
}
auto& binaryArray = static_cast<arrow::BinaryArray&>(*column);
@@ -1100,7 +1103,7 @@ static std::shared_ptr<arrow::Array> ConvertColumn(const std::shared_ptr<arrow::
auto value = binaryArray.Value(i);
const auto dyNumber = NDyNumber::ParseDyNumberString(TStringBuf(value.data(), value.size()));
if (!dyNumber.Defined() || !builder.Append((*dyNumber).data(), (*dyNumber).size()).ok()) {
- return {};
+ return false;
}
}
}
@@ -1109,7 +1112,8 @@ static std::shared_ptr<arrow::Array> ConvertColumn(const std::shared_ptr<arrow::
auto value = binaryArray.Value(i);
const auto binaryJson = NBinaryJson::SerializeToBinaryJson(TStringBuf(value.data(), value.size()));
if (!binaryJson.Defined() || !builder.Append(binaryJson->Data(), binaryJson->Size()).ok()) {
- return {};
+ ALS_ERROR(0) << "NOT PARSED JSON: " << TStringBuf(value.data(), value.size());
+ return false;
}
}
}
@@ -1119,26 +1123,33 @@ static std::shared_ptr<arrow::Array> ConvertColumn(const std::shared_ptr<arrow::
std::shared_ptr<arrow::BinaryArray> result;
if (!builder.Finish(&result).ok()) {
- return {};
+ return false;
+ }
+
+ column = result;
+ if (colType.GetTypeId() == NScheme::NTypeIds::JsonDocument && field->type()->id() == arrow::Type::STRING) {
+ field = std::make_shared<arrow::Field>(field->name(), std::make_shared<arrow::BinaryType>());
}
- return result;
+
+ return true;
}
std::shared_ptr<arrow::RecordBatch> ConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch,
const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert)
{
std::vector<std::shared_ptr<arrow::Array>> columns = batch->columns();
+ std::vector<std::shared_ptr<arrow::Field>> fields = batch->schema()->fields();
+ Y_VERIFY(columns.size() == fields.size());
for (i32 i = 0; i < batch->num_columns(); ++i) {
auto& colName = batch->column_name(i);
auto it = columnsToConvert.find(TString(colName.data(), colName.size()));
if (it != columnsToConvert.end()) {
- columns[i] = ConvertColumn(columns[i], it->second);
- if (!columns[i]) {
+ if (!ConvertColumn(columns[i], fields[i], it->second)) {
return {};
}
}
}
- return arrow::RecordBatch::Make(batch->schema(), batch->num_rows(), columns);
+ return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(fields), batch->num_rows(), columns);
}
static std::shared_ptr<arrow::Array> InplaceConvertColumn(const std::shared_ptr<arrow::Array>& column,
@@ -1154,7 +1165,7 @@ static std::shared_ptr<arrow::Array> InplaceConvertColumn(const std::shared_ptr<
Y_VERIFY(arrow::bit_width(column->type()->id()) == 32);
return std::make_shared<arrow::Date32Array>(column->data());
}
- default:
+ default:
return {};
}
}
diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h
index cfd6c8c4a1..b3cff21378 100644
--- a/ydb/core/formats/arrow_helpers.h
+++ b/ydb/core/formats/arrow_helpers.h
@@ -200,6 +200,16 @@ public:
return false;
}
+ static bool NeedConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType) {
+ switch (expectedType.GetTypeId()) {
+ case NScheme::NTypeIds::JsonDocument:
+ return typeInRequest.GetTypeId() == NScheme::NTypeIds::Utf8;
+ default:
+ break;
+ }
+ return false;
+ }
+
TArrowToYdbConverter(const TVector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema, IRowWriter& rowWriter)
: YdbSchema(ydbSchema)
, RowWriter(rowWriter)
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index 6f508342f7..30acfe2827 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -280,14 +280,17 @@ private:
static bool SameDstType(NScheme::TTypeInfo type1, NScheme::TTypeInfo type2, bool allowConvert) {
bool res = (type1 == type2);
if (!res && allowConvert) {
- res = NArrow::GetArrowType(type1)->id() == NArrow::GetArrowType(type2)->id();
+ res = (NArrow::GetArrowType(type1)->id() == NArrow::GetArrowType(type2)->id());
}
return res;
}
static bool SameOrConvertableDstType(NScheme::TTypeInfo type1, NScheme::TTypeInfo type2, bool allowConvert) {
- bool ok = SameDstType(type1, type2, allowConvert);
- return ok || NArrow::TArrowToYdbConverter::NeedInplaceConversion(type1, type2);
+ bool ok = SameDstType(type1, type2, allowConvert) || NArrow::TArrowToYdbConverter::NeedInplaceConversion(type1, type2);
+ if (!ok && allowConvert) {
+ ok = NArrow::TArrowToYdbConverter::NeedConversion(type1, type2);
+ }
+ return ok;
}
bool BuildSchema(const NActors::TActorContext& ctx, TString& errorMessage, bool makeYqbSchema) {