aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2022-12-29 18:34:38 +0300
committervvvv <vvvv@ydb.tech>2022-12-29 18:34:38 +0300
commit73b247f4a0932b7d9fb693cfcc28965862abb20a (patch)
tree32a64dd51a010930c7781108a284e3edcd842177
parent52fd853619c9f751019975f7ce6a4fe2ce906fae (diff)
downloadydb-73b247f4a0932b7d9fb693cfcc28965862abb20a.tar.gz
nested optionals
-rw-r--r--ydb/library/yql/minikql/arrow/mkql_functions.cpp9
-rw-r--r--ydb/library/yql/minikql/arrow/mkql_functions.h2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp4
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp123
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp6
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp51
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp9
-rw-r--r--ydb/library/yql/minikql/mkql_type_builder.cpp40
-rw-r--r--ydb/library/yql/minikql/mkql_type_builder.h2
-rw-r--r--ydb/library/yql/providers/common/arrow_resolve/yql_simple_arrow_resolver.cpp3
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp5
11 files changed, 212 insertions, 42 deletions
diff --git a/ydb/library/yql/minikql/arrow/mkql_functions.cpp b/ydb/library/yql/minikql/arrow/mkql_functions.cpp
index 2f9250f88a..92a553136d 100644
--- a/ydb/library/yql/minikql/arrow/mkql_functions.cpp
+++ b/ydb/library/yql/minikql/arrow/mkql_functions.cpp
@@ -12,10 +12,10 @@
namespace NKikimr::NMiniKQL {
-bool ConvertInputArrowType(TType* blockType, bool& isOptional, arrow::ValueDescr& descr) {
+bool ConvertInputArrowType(TType* blockType, arrow::ValueDescr& descr) {
auto asBlockType = AS_TYPE(TBlockType, blockType);
descr.shape = asBlockType->GetShape() == TBlockType::EShape::Scalar ? arrow::ValueDescr::SCALAR : arrow::ValueDescr::ARRAY;
- return ConvertArrowType(asBlockType->GetItemType(), isOptional, descr.type);
+ return ConvertArrowType(asBlockType->GetItemType(), descr.type);
}
class TOutputTypeVisitor : public arrow::TypeVisitor
@@ -175,13 +175,12 @@ bool FindArrowFunction(TStringBuf name, const TArrayRef<TType*>& inputTypes, TTy
}
bool HasArrowCast(TType* from, TType* to) {
- bool isOptional;
std::shared_ptr<arrow::DataType> fromArrowType, toArrowType;
- if (!ConvertArrowType(from, isOptional, fromArrowType)) {
+ if (!ConvertArrowType(from, fromArrowType)) {
return false;
}
- if (!ConvertArrowType(to, isOptional, toArrowType)) {
+ if (!ConvertArrowType(to, toArrowType)) {
return false;
}
diff --git a/ydb/library/yql/minikql/arrow/mkql_functions.h b/ydb/library/yql/minikql/arrow/mkql_functions.h
index f33443debe..225822be3f 100644
--- a/ydb/library/yql/minikql/arrow/mkql_functions.h
+++ b/ydb/library/yql/minikql/arrow/mkql_functions.h
@@ -8,6 +8,6 @@ namespace NKikimr::NMiniKQL {
class IBuiltinFunctionRegistry;
bool FindArrowFunction(TStringBuf name, const TArrayRef<TType*>& inputTypes, TType* outputType, const IBuiltinFunctionRegistry& registry);
-bool ConvertInputArrowType(TType* blockType, bool& isOptional, arrow::ValueDescr& descr);
+bool ConvertInputArrowType(TType* blockType, arrow::ValueDescr& descr);
bool HasArrowCast(TType* from, TType* to);
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
index 9d93077092..8cce82cc5d 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
@@ -878,10 +878,10 @@ std::unique_ptr<typename TTag::TPreparedAggregator> PrepareAvgOverInput(TTupleTy
TVector<TType*> tupleElements = { doubleType, ui64Type };
auto avgRetType = TTupleType::Create(2, tupleElements.data(), env);
std::shared_ptr<arrow::DataType> builderDataType;
- bool isOptional;
- MKQL_ENSURE(ConvertArrowType(avgRetType, isOptional, builderDataType), "Unsupported builder type");
+ MKQL_ENSURE(ConvertArrowType(avgRetType, builderDataType), "Unsupported builder type");
auto argType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn))->GetItemType();
+ bool isOptional;
auto dataType = UnpackOptionalData(argType, isOptional);
switch (*dataType->GetDataSlot()) {
case NUdf::EDataSlot::Int8:
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
index 3fcb0f1523..ef6914c379 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
@@ -19,7 +19,7 @@ namespace {
bool AlwaysUseChunks(const TType* type) {
if (type->IsOptional()) {
- type = AS_TYPE(TOptionalType, type)->GetItemType();
+ return AlwaysUseChunks(AS_TYPE(TOptionalType, type)->GetItemType());
}
if (type->IsTuple()) {
@@ -42,8 +42,7 @@ bool AlwaysUseChunks(const TType* type) {
std::shared_ptr<arrow::DataType> GetArrowType(TType* type) {
std::shared_ptr<arrow::DataType> result;
- bool isOptional;
- Y_VERIFY(ConvertArrowType(type, isOptional, result));
+ Y_VERIFY(ConvertArrowType(type, result));
return result;
}
@@ -629,6 +628,79 @@ private:
std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
};
+class TExternalOptionalBlockBuilder : public TBlockBuilderBase {
+public:
+ TExternalOptionalBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen, std::unique_ptr<TBlockBuilderBase>&& inner)
+ : TBlockBuilderBase(type, pool, maxLen)
+ , Inner(std::move(inner))
+ {
+ Reserve();
+ }
+
+ void DoAdd(NUdf::TUnboxedValuePod value) final {
+ if (!value) {
+ NullBuilder->UnsafeAppend(0);
+ Inner->AddDefault();
+ return;
+ }
+
+ NullBuilder->UnsafeAppend(1);
+ Inner->Add(value.GetOptionalValue());
+ }
+
+ void DoAddDefault() final {
+ NullBuilder->UnsafeAppend(1);
+ Inner->AddDefault();
+ }
+
+ void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
+ Y_VERIFY(!array.buffers.empty());
+ Y_VERIFY(array.child_data.size() == 1);
+
+ if (array.buffers.front()) {
+ ui8* dstBitmap = NullBuilder->End();
+ CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length);
+ NullBuilder->UnsafeAdvance(popCount);
+ } else {
+ NullBuilder->UnsafeAppend(popCount, 1);
+ }
+
+ Inner->AddMany(*array.child_data[0], popCount, sparseBitmap, array.length);
+ }
+
+ TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
+ TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
+
+ std::shared_ptr<arrow::Buffer> nullBitmap;
+ const size_t length = GetCurrLen();
+ MKQL_ENSURE(length == NullBuilder->Length(), "Unexpected NullBuilder length");
+ nullBitmap = NullBuilder->Finish();
+ nullBitmap = MakeDenseBitmap(nullBitmap->data(), length, Pool);
+
+ Y_VERIFY(length);
+ result->Payload.push_back(arrow::ArrayData::Make(GetArrowType(Type), length, { nullBitmap }));
+ result->Children.emplace_back(Inner->BuildTree(finish));
+
+ if (!finish) {
+ Reserve();
+ }
+
+ return result;
+ }
+
+private:
+ void Reserve() {
+ NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool);
+ NullBuilder->Reserve(MaxLen + 1);
+ }
+
+private:
+ std::unique_ptr<TBlockBuilderBase> Inner;
+ std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
+};
+
+std::unique_ptr<TBlockBuilderBase> MakeBlockBuilderBase(TType* type, arrow::MemoryPool& pool, size_t maxBlockLength);
+
template<bool Nullable>
std::unique_ptr<TBlockBuilderBase> MakeBlockBuilderImpl(TType* type, arrow::MemoryPool& pool, size_t maxLen) {
if constexpr (Nullable) {
@@ -640,9 +712,7 @@ std::unique_ptr<TBlockBuilderBase> MakeBlockBuilderImpl(TType* type, arrow::Memo
TVector<std::unique_ptr<TBlockBuilderBase>> children;
for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
TType* childType = tupleType->GetElementType(i);
- auto childBuilder = childType->IsOptional() ?
- MakeBlockBuilderImpl<true>(childType, pool, maxLen) :
- MakeBlockBuilderImpl<false>(childType, pool, maxLen);
+ auto childBuilder = MakeBlockBuilderBase(childType, pool, maxLen);
children.push_back(std::move(childBuilder));
}
@@ -689,12 +759,46 @@ std::unique_ptr<TBlockBuilderBase> MakeBlockBuilderImpl(TType* type, arrow::Memo
MKQL_ENSURE(false, "Unsupported type");
}
+std::unique_ptr<TBlockBuilderBase> MakeBlockBuilderBase(TType* type, arrow::MemoryPool& pool, size_t maxBlockLength) {
+ TType* unpacked = type;
+ if (type->IsOptional()) {
+ unpacked = AS_TYPE(TOptionalType, type)->GetItemType();
+ }
+
+ if (unpacked->IsOptional()) {
+ // at least 2 levels of optionals
+ ui32 nestLevel = 0;
+ auto currentType = type;
+ auto previousType = type;
+ TVector<TType*> types;
+ do {
+ ++nestLevel;
+ types.push_back(currentType);
+ previousType = currentType;
+ currentType = AS_TYPE(TOptionalType, currentType)->GetItemType();
+ } while (currentType->IsOptional());
+
+ auto builder = MakeBlockBuilderBase(previousType, pool, maxBlockLength);
+ for (ui32 i = 1; i < nestLevel; ++i) {
+ builder = std::make_unique<TExternalOptionalBlockBuilder>(types[nestLevel - 1 - i], pool, maxBlockLength, std::move(builder));
+ }
+
+ return builder;
+ } else {
+ if (type->IsOptional()) {
+ return MakeBlockBuilderImpl<true>(type, pool, maxBlockLength);
+ } else {
+ return MakeBlockBuilderImpl<false>(type, pool, maxBlockLength);
+ }
+ }
+}
+
} // namespace
size_t CalcMaxBlockItemSize(const TType* type) {
// we do not count block bitmap size
if (type->IsOptional()) {
- type = AS_TYPE(TOptionalType, type)->GetItemType();
+ return CalcMaxBlockItemSize(AS_TYPE(TOptionalType, type)->GetItemType());
}
if (type->IsTuple()) {
@@ -743,10 +847,7 @@ size_t CalcMaxBlockItemSize(const TType* type) {
}
std::unique_ptr<IBlockBuilder> MakeBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxBlockLength) {
- if (type->IsOptional()) {
- return MakeBlockBuilderImpl<true>(type, pool, maxBlockLength);
- }
- return MakeBlockBuilderImpl<false>(type, pool, maxBlockLength);
+ return MakeBlockBuilderBase(type, pool, maxBlockLength);
}
} // namespace NMiniKQL
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp
index 7d13b9e828..d8b4aa8b43 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp
@@ -22,9 +22,8 @@ namespace NMiniKQL {
namespace {
arrow::ValueDescr ToValueDescr(TType* type) {
- bool isOptional;
arrow::ValueDescr ret;
- MKQL_ENSURE(ConvertInputArrowType(type, isOptional, ret), "can't get arrow type");
+ MKQL_ENSURE(ConvertInputArrowType(type, ret), "can't get arrow type");
return ret;
}
@@ -197,9 +196,8 @@ private:
}
static const arrow::compute::Function& ResolveFunction(TType* to) {
- bool isOptional;
std::shared_ptr<arrow::DataType> type;
- MKQL_ENSURE(ConvertArrowType(to, isOptional, type), "can't get arrow type");
+ MKQL_ENSURE(ConvertArrowType(to, type), "can't get arrow type");
auto function = ARROW_RESULT(arrow::compute::GetCastFunction(type));
MKQL_ENSURE(function != nullptr, "missing function");
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
index a39c38d2cd..5c6e4585f0 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
@@ -150,9 +150,58 @@ private:
TPlainContainerCache Cache;
};
+class TExternalOptionalBlockReader : public TBlockReaderBase {
+public:
+ TExternalOptionalBlockReader(std::unique_ptr<TBlockReaderBase>&& inner)
+ : Inner(std::move(inner))
+ {}
+
+ NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final {
+ if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) {
+ return {};
+ }
+
+ return Inner->Get(*data.child_data[0], index).MakeOptional();
+ }
+
+ NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final {
+ if (!scalar.is_valid) {
+ return {};
+ }
+
+ const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
+ return Inner->GetScalar(*structScalar.value[0]).MakeOptional();
+ }
+
+private:
+ std::unique_ptr<TBlockReaderBase> Inner;
+};
+
std::unique_ptr<TBlockReaderBase> MakeBlockReaderBase(TType* type, const THolderFactory& holderFactory) {
+ TType* unpacked = type;
if (type->IsOptional()) {
- type = AS_TYPE(TOptionalType, type)->GetItemType();
+ unpacked = AS_TYPE(TOptionalType, type)->GetItemType();
+ }
+
+ if (unpacked->IsOptional()) {
+ // at least 2 levels of optionals
+ ui32 nestLevel = 0;
+ auto currentType = type;
+ auto previousType = type;
+ do {
+ ++nestLevel;
+ previousType = currentType;
+ currentType = AS_TYPE(TOptionalType, currentType)->GetItemType();
+ } while (currentType->IsOptional());
+
+ std::unique_ptr<TBlockReaderBase> reader = MakeBlockReaderBase(previousType, holderFactory);
+ for (ui32 i = 1; i < nestLevel; ++i) {
+ reader = std::make_unique<TExternalOptionalBlockReader>(std::move(reader));
+ }
+
+ return reader;
+ } else {
+ type = unpacked;
}
if (type->IsTuple()) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
index 0ced6726b6..55dc51100f 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
@@ -321,9 +321,8 @@ public:
, Arg_(arg)
, Type_(type)
{
- bool isOptional;
std::shared_ptr<arrow::DataType> arrowType;
- MKQL_ENSURE(ConvertArrowType(Type_, isOptional, arrowType), "Unsupported type of scalar");
+ MKQL_ENSURE(ConvertArrowType(Type_, arrowType), "Unsupported type of scalar");
}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
@@ -339,9 +338,8 @@ private:
arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value, TComputationContext& ctx) const {
if (!value) {
- bool isOptional;
std::shared_ptr<arrow::DataType> arrowType;
- MKQL_ENSURE(ConvertArrowType(type, isOptional, arrowType), "Unsupported type of scalar");
+ MKQL_ENSURE(ConvertArrowType(type, arrowType), "Unsupported type of scalar");
return arrow::MakeNullScalar(arrowType);
}
@@ -351,9 +349,8 @@ private:
if (type->IsTuple()) {
auto tupleType = AS_TYPE(TTupleType, type);
- bool isOptional;
std::shared_ptr<arrow::DataType> arrowType;
- MKQL_ENSURE(ConvertArrowType(type, isOptional, arrowType), "Unsupported type of scalar");
+ MKQL_ENSURE(ConvertArrowType(type, arrowType), "Unsupported type of scalar");
std::vector<std::shared_ptr<arrow::Scalar>> arrowValue;
for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp
index dba908a706..badbe461ea 100644
--- a/ydb/library/yql/minikql/mkql_type_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_type_builder.cpp
@@ -1361,19 +1361,48 @@ bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& ty
}
}
-bool ConvertArrowType(TType* itemType, bool& isOptional, std::shared_ptr<arrow::DataType>& type) {
+bool ConvertArrowType(TType* itemType, std::shared_ptr<arrow::DataType>& type) {
+ bool isOptional;
auto unpacked = UnpackOptional(itemType, isOptional);
+ if (unpacked->IsOptional()) {
+ // at least 2 levels of optionals
+ ui32 nestLevel = 0;
+ auto currentType = itemType;
+ auto previousType = itemType;
+ do {
+ ++nestLevel;
+ previousType = currentType;
+ currentType = AS_TYPE(TOptionalType, currentType)->GetItemType();
+ } while (currentType->IsOptional());
+
+ // previousType is always Optional
+ std::shared_ptr<arrow::DataType> innerArrowType;
+ if (!ConvertArrowType(previousType, innerArrowType)) {
+ return false;
+ }
+
+ for (ui32 i = 1; i < nestLevel; ++i) {
+ // wrap as one nullable field in struct
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ fields.emplace_back(std::make_shared<arrow::Field>("opt", innerArrowType, true));
+ innerArrowType = std::make_shared<arrow::StructType>(fields);
+ }
+
+ type = innerArrowType;
+ return true;
+ }
+
if (unpacked->IsTuple()) {
auto tupleType = AS_TYPE(TTupleType, unpacked);
std::vector<std::shared_ptr<arrow::Field>> fields;
for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
std::shared_ptr<arrow::DataType> childType;
- bool isChildOptional;
- if (!ConvertArrowType(tupleType->GetElementType(i), isChildOptional, childType)) {
+ auto elementType = tupleType->GetElementType(i);
+ if (!ConvertArrowType(elementType, childType)) {
return false;
}
- fields.emplace_back(std::make_shared<arrow::Field>("field" + ToString(i), childType, isChildOptional));
+ fields.emplace_back(std::make_shared<arrow::Field>("field" + ToString(i), childType, elementType->IsOptional()));
}
type = std::make_shared<arrow::StructType>(fields);
@@ -1805,9 +1834,8 @@ const NYql::NUdf::TPgTypeDescription* TTypeInfoHelper::FindPgTypeDescription(ui3
}
NUdf::IArrowType::TPtr TTypeInfoHelper::MakeArrowType(const NUdf::TType* type) const {
- bool isOptional;
std::shared_ptr<arrow::DataType> arrowType;
- if (!ConvertArrowType(const_cast<TType*>(static_cast<const TType*>(type)), isOptional, arrowType)) {
+ if (!ConvertArrowType(const_cast<TType*>(static_cast<const TType*>(type)), arrowType)) {
return nullptr;
}
diff --git a/ydb/library/yql/minikql/mkql_type_builder.h b/ydb/library/yql/minikql/mkql_type_builder.h
index 7e5e95bcba..af6ba7e315 100644
--- a/ydb/library/yql/minikql/mkql_type_builder.h
+++ b/ydb/library/yql/minikql/mkql_type_builder.h
@@ -10,7 +10,7 @@
namespace NKikimr {
namespace NMiniKQL {
-bool ConvertArrowType(TType* itemType, bool& isOptional, std::shared_ptr<arrow::DataType>& type);
+bool ConvertArrowType(TType* itemType, std::shared_ptr<arrow::DataType>& type);
bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& type);
class TArrowType : public NUdf::IArrowType {
diff --git a/ydb/library/yql/providers/common/arrow_resolve/yql_simple_arrow_resolver.cpp b/ydb/library/yql/providers/common/arrow_resolve/yql_simple_arrow_resolver.cpp
index 418acc486d..4341422d2b 100644
--- a/ydb/library/yql/providers/common/arrow_resolve/yql_simple_arrow_resolver.cpp
+++ b/ydb/library/yql/providers/common/arrow_resolve/yql_simple_arrow_resolver.cpp
@@ -65,9 +65,8 @@ private:
for (const auto& type : types) {
TNullOutput null;
auto mkqlType = NCommon::BuildType(*type, pgmBuilder, null);
- bool isOptional;
std::shared_ptr<arrow::DataType> arrowType;
- if (!ConvertArrowType(mkqlType, isOptional, arrowType)) {
+ if (!ConvertArrowType(mkqlType, arrowType)) {
return EStatus::NOT_FOUND;
}
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 9db30c0043..532667ab41 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -1486,11 +1486,10 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
}
auto memberType = extraStructType->GetMemberType(i);
- bool isOptional;
std::shared_ptr<arrow::DataType> dataType;
- YQL_ENSURE(ConvertArrowType(memberType, isOptional, dataType), "Unsupported arrow type");
- THROW_ARROW_NOT_OK(builder.AddField(std::make_shared<arrow::Field>(std::string(extraStructType->GetMemberName(i)), dataType, isOptional)));
+ YQL_ENSURE(ConvertArrowType(memberType, dataType), "Unsupported arrow type");
+ THROW_ARROW_NOT_OK(builder.AddField(std::make_shared<arrow::Field>(std::string(extraStructType->GetMemberName(i)), dataType, memberType->IsOptional())));
readSpec->ColumnReorder.push_back(i);
}