diff options
author | vvvv <vvvv@ydb.tech> | 2022-11-05 16:17:38 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2022-11-05 16:17:38 +0300 |
commit | ed11e26937c18a86271556c8a515d2c6b22195d5 (patch) | |
tree | 913db846997f5490c62c420974f2e6dc04fe09fe | |
parent | 83e1a68166c7ef9f594922b976761844581124f4 (diff) | |
download | ydb-ed11e26937c18a86271556c8a515d2c6b22195d5.tar.gz |
use native arrow parquet reader without CH conversions
Запрос вида
%%
pragma UseBlocks;
SELECT
count(*)
-- EventDate
-- RemoteIP
FROM
`yq-clickbench-local`.`hits_*.parquet`
WITH
(
format=parquet,
SCHEMA
(
RemoteIP INTEGER NOT NULL,
CounterID INTEGER NOT NULL,
--EventDate Date NOT NULL,
--EventTime Timestamp NOT NULL
)
)
limit 100
;
%%
для обычного режима с 100 файлами clickbench 14Гб
%%
Execution precomputes complete, duration: 5.962375s
%%
для UseBlocks
%%
Execution precomputes complete, duration: 0.306933s
%%
Для UseBlocks но при чтении с S3
%%
Execution precomputes complete, duration: 97.178274s
%%
Без UseBlocks но при чтении с S3
%%
Execution precomputes complete, duration: 111.921679s
%%
Заодно пропадает костыль, что нужно читать хотя бы одну колонку - parquet reader умеет читать 0 колонок, и при этом говорить число строк
17 files changed, 518 insertions, 133 deletions
diff --git a/ydb/library/yql/providers/common/mkql/parser.cpp b/ydb/library/yql/providers/common/mkql/parser.cpp index 4de60bd7025..4965894e565 100644 --- a/ydb/library/yql/providers/common/mkql/parser.cpp +++ b/ydb/library/yql/providers/common/mkql/parser.cpp @@ -146,12 +146,31 @@ TRuntimeNode BuildParseCall( TType* inputType, TType* parseItemType, TType* finalItemType, - NCommon::TMkqlBuildContext& ctx) + NCommon::TMkqlBuildContext& ctx, + bool useBlocks) { const auto* inputItemType = static_cast<TStreamType*>(inputType)->GetItemType(); const auto* parseItemStructType = static_cast<TStructType*>(parseItemType); const auto* finalItemStructType = static_cast<TStructType*>(finalItemType); + if (useBlocks) { + return ctx.ProgramBuilder.ExpandMap(ctx.ProgramBuilder.ToFlow(input), [&](TRuntimeNode item) { + MKQL_ENSURE(!extraColumnsByPathIndex && metadataColumns.empty(), "TODO"); + + auto structObj = ctx.ProgramBuilder.Nth(item, 0); + auto length = ctx.ProgramBuilder.Nth(item, 1); + TRuntimeNode::TList fields; + + for (ui32 i = 0; i < finalItemStructType->GetMembersCount(); ++i) { + TStringBuf name = finalItemStructType->GetMemberName(i); + fields.push_back(ctx.ProgramBuilder.Member(structObj, name)); + } + + fields.push_back(length); + return fields; + }); + } + if (!compression.empty()) { input = WrapWithDecompress(input, inputItemType, compression, ctx); } @@ -301,7 +320,7 @@ TRuntimeNode BuildParseCall( ); } -TMaybe<TRuntimeNode> TryWrapWithParser(const TDqSourceWideWrap& wrapper, NCommon::TMkqlBuildContext& ctx) { +TMaybe<TRuntimeNode> TryWrapWithParser(const TDqSourceWrapBase& wrapper, NCommon::TMkqlBuildContext& ctx, bool useBlocks) { const auto& format = GetFormat(wrapper.Settings().Cast().Ref()); if (!format.Content()) { return TMaybe<TRuntimeNode>(); @@ -358,7 +377,8 @@ TMaybe<TRuntimeNode> TryWrapWithParser(const TDqSourceWideWrap& wrapper, NCommon inputType, parseItemType, finalItemType, - ctx); + ctx, + useBlocks); } } diff --git a/ydb/library/yql/providers/common/mkql/parser.h b/ydb/library/yql/providers/common/mkql/parser.h index 5ddaaa31417..30d023b2a0a 100644 --- a/ydb/library/yql/providers/common/mkql/parser.h +++ b/ydb/library/yql/providers/common/mkql/parser.h @@ -23,8 +23,9 @@ NKikimr::NMiniKQL::TRuntimeNode BuildParseCall( NKikimr::NMiniKQL::TType* inputType, NKikimr::NMiniKQL::TType* parseItemType, NKikimr::NMiniKQL::TType* finalItemType, - NCommon::TMkqlBuildContext& ctx); + NCommon::TMkqlBuildContext& ctx, + bool useBlocks = false); -TMaybe<NKikimr::NMiniKQL::TRuntimeNode> TryWrapWithParser(const NYql::NNodes::TDqSourceWideWrap& wrapper, NCommon::TMkqlBuildContext& ctx); +TMaybe<NKikimr::NMiniKQL::TRuntimeNode> TryWrapWithParser(const NYql::NNodes::TDqSourceWrapBase& wrapper, NCommon::TMkqlBuildContext& ctx, bool useBlocks = false); } diff --git a/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json b/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json index 566f8d05032..5d7d541b923 100644 --- a/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json +++ b/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json @@ -56,6 +56,11 @@ "Name": "TDqSourceWideWrap", "Base": "TDqSourceWrapBase", "Match": {"Type": "Callable", "Name": "DqSourceWideWrap"} + }, + { + "Name": "TDqSourceWideBlockWrap", + "Base": "TDqSourceWrapBase", + "Match": {"Type": "Callable", "Name": "DqSourceWideBlockWrap"} } ] } diff --git a/ydb/library/yql/providers/dq/mkql/dqs_mkql_compiler.cpp b/ydb/library/yql/providers/dq/mkql/dqs_mkql_compiler.cpp index 77820d1d5ed..2bb15a52fd2 100644 --- a/ydb/library/yql/providers/dq/mkql/dqs_mkql_compiler.cpp +++ b/ydb/library/yql/providers/dq/mkql/dqs_mkql_compiler.cpp @@ -10,7 +10,7 @@ using namespace NKikimr::NMiniKQL; using namespace NNodes; void RegisterDqsMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, const TTypeAnnotationContext& ctx) { - compiler.AddCallable({TDqSourceWideWrap::CallableName(), TDqReadWideWrap::CallableName()}, + compiler.AddCallable({TDqSourceWideWrap::CallableName(), TDqSourceWideBlockWrap::CallableName(), TDqReadWideWrap::CallableName()}, [](const TExprNode& node, NCommon::TMkqlBuildContext&) { YQL_ENSURE(false, "Unsupported reader: " << node.Head().Content()); return TRuntimeNode(); diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index 6ccb1e42a75..ed0761fece0 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -17,9 +17,10 @@ using namespace NYql::NNodes; class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase { public: - TDqsPhysicalOptProposalTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config) + TDqsPhysicalOptProposalTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config, bool useBlocks) : TOptimizeTransformerBase(typeCtx, NLog::EComponent::ProviderDq, {}) , Config(config) + , UseBlocks(useBlocks) { const bool enablePrecompute = Config->_EnablePrecompute.Get().GetOrElse(false); @@ -106,62 +107,65 @@ protected: .Build().Done(); } const auto& items = GetSeqItemType(wrap.Ref().GetTypeAnn())->Cast<TStructExprType>()->GetItems(); - auto narrow = wrap.Settings() ? - ctx.Builder(node.Pos()) - .Lambda() - .Param("source") - .Callable("NarrowMap") - .Callable(0, TDqSourceWideWrap::CallableName()) - .Arg(0, "source") - .Add(1, wrap.DataSource().Ptr()) - .Add(2, wrap.RowType().Ptr()) - .Add(3, wrap.Settings().Cast().Ptr()) - .Seal() - .Lambda(1) - .Params("fields", items.size()) - .Callable(TCoAsStruct::CallableName()) - .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - ui32 i = 0U; - for (const auto& item : items) { - parent.List(i) - .Atom(0, item->GetName()) - .Arg(1, "fields", i) - .Seal(); - ++i; - } - return parent; - }) - .Seal() - .Seal() + auto sourceArg = ctx.NewArgument(node.Pos(), "source"); + bool supportsBlocks = false; + auto inputType = GetSeqItemType(wrap.Input().Ref().GetTypeAnn()); + while (inputType->GetKind() == ETypeAnnotationKind::Tuple) { + auto tupleType = inputType->Cast<TTupleExprType>(); + if (tupleType->GetSize() == 2 && tupleType->GetItems()[1]->GetKind() == ETypeAnnotationKind::Scalar) { + supportsBlocks = true; + break; + } + + inputType = tupleType->GetItems()[0]; + } + + auto wideWrap = ctx.Builder(node.Pos()) + .Callable(UseBlocks && supportsBlocks ? TDqSourceWideBlockWrap::CallableName() : TDqSourceWideWrap::CallableName()) + .Add(0, sourceArg) + .Add(1, wrap.DataSource().Ptr()) + .Add(2, wrap.RowType().Ptr()) + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + if (wrap.Settings()) { + parent.Add(3, wrap.Settings().Cast().Ptr()); + } + + return parent; + }) + .Seal() + .Build(); + + if (UseBlocks && supportsBlocks) { + wideWrap = ctx.Builder(node.Pos()) + .Callable("WideFromBlocks") + .Add(0, wideWrap) .Seal() - .Seal().Build(): - ctx.Builder(node.Pos()) - .Lambda() - .Param("source") - .Callable("NarrowMap") - .Callable(0, TDqSourceWideWrap::CallableName()) - .Arg(0, "source") - .Add(1, wrap.DataSource().Ptr()) - .Add(2, wrap.RowType().Ptr()) - .Seal() - .Lambda(1) - .Params("fields", items.size()) - .Callable(TCoAsStruct::CallableName()) - .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - ui32 i = 0U; - for (const auto& item : items) { - parent.List(i) - .Atom(0, item->GetName()) - .Arg(1, "fields", i) - .Seal(); - ++i; - } - return parent; - }) - .Seal() + .Build(); + } + + auto narrow = ctx.Builder(node.Pos()) + .Callable("NarrowMap") + .Add(0, wideWrap) + .Lambda(1) + .Params("fields", items.size()) + .Callable(TCoAsStruct::CallableName()) + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + ui32 i = 0U; + for (const auto& item : items) { + parent.List(i) + .Atom(0, item->GetName()) + .Arg(1, "fields", i) + .Seal(); + ++i; + } + return parent; + }) .Seal() .Seal() - .Seal().Build(); + .Seal() + .Build(); + + auto program = ctx.NewLambda(node.Pos(), ctx.NewArguments(node.Pos(), { sourceArg }), std::move(narrow)); return Build<TDqCnUnionAll>(ctx, node.Pos()) .Output() @@ -172,7 +176,7 @@ protected: .Settings(wrap.Input()) .Build() .Build() - .Program(narrow) + .Program(program) .Settings(TDqStageSettings().BuildNode(ctx, node.Pos())) .Build() .Index().Build("0") @@ -353,10 +357,11 @@ protected: private: TDqConfiguration::TPtr Config; + const bool UseBlocks; }; -THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config) { - return THolder(new TDqsPhysicalOptProposalTransformer(typeCtx, config)); +THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config, bool useBlocks) { + return THolder(new TDqsPhysicalOptProposalTransformer(typeCtx, config, useBlocks)); } } // NYql::NDqs diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.h b/ydb/library/yql/providers/dq/opt/physical_optimize.h index 5ed9da98807..ab20f334966 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.h +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.h @@ -8,6 +8,6 @@ namespace NYql::NDqs { -THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config); +THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config, bool useBlocks); } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 6ee4e054cbe..6e5b852c0ce 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -810,6 +810,7 @@ private: YQL_CLOG(DEBUG, ProviderDq) << state->SessionId << " WrapFutureCallback"; auto duration = TInstant::Now() - startTime; + YQL_CLOG(INFO, ProviderDq) << "Execution Result complete, duration: " << duration; if (state->Metrics) { state->Metrics->SetCounter("dq", "TotalExecutionTime", duration.MilliSeconds()); state->Metrics->SetCounter( @@ -1164,6 +1165,7 @@ private: // TODO: remove copy-paste return WrapFutureCallback<false>(future, [settings, startTime, localRun, type, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { auto duration = TInstant::Now() - startTime; + YQL_CLOG(INFO, ProviderDq) << "Execution Pull complete, duration: " << duration; if (state->Metrics) { state->Metrics->SetCounter("dq", "TotalExecutionTime", duration.MilliSeconds()); state->Metrics->SetCounter( @@ -1613,6 +1615,7 @@ private: MarkProgressFinished(publicIds->AllPublicIds, res.Success(), state->ProgressWriter); auto duration = TInstant::Now() - startTime; + YQL_CLOG(INFO, ProviderDq) << "Execution precomputes complete, duration: " << duration; if (state->Metrics) { state->Metrics->SetCounter("dq", "PrecomputeExecutionTime", duration.MilliSeconds()); } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp index b136bf2ab18..f8fdd0b63e1 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp @@ -28,7 +28,7 @@ public: TDqDataProviderSink(const TDqStatePtr& state) : State(state) , LogOptTransformer([state] () { return CreateDqsLogOptTransformer(state->TypeCtx, state->Settings); }) - , PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(/*TODO*/nullptr, state->Settings); }) + , PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(/*TODO*/nullptr, state->Settings, state->TypeCtx->UseBlocks ); }) , PhysicalFinalizingTransformer([] () { return CreateDqsFinalizingOptTransformer(); }) , TypeAnnotationTransformer([state] () { return CreateDqsDataSinkTypeAnnotationTransformer( diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp index 6e03017ccb1..1fa68c32beb 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp @@ -18,8 +18,9 @@ public: TDqsDataSourceTypeAnnotationTransformer() : TVisitorTransformerBase(true) { - AddHandler({TDqSourceWrap::CallableName()}, Hndl(&TDqsDataSourceTypeAnnotationTransformer::HandleSourceWrap<false>)); - AddHandler({TDqSourceWideWrap::CallableName()}, Hndl(&TDqsDataSourceTypeAnnotationTransformer::HandleSourceWrap<true>)); + AddHandler({TDqSourceWrap::CallableName()}, Hndl(&TDqsDataSourceTypeAnnotationTransformer::HandleSourceWrap<false, false>)); + AddHandler({TDqSourceWideWrap::CallableName()}, Hndl(&TDqsDataSourceTypeAnnotationTransformer::HandleSourceWrap<true, false>)); + AddHandler({TDqSourceWideBlockWrap::CallableName()}, Hndl(&TDqsDataSourceTypeAnnotationTransformer::HandleSourceWrap<true, true>)); AddHandler({TDqReadWrap::CallableName()}, Hndl(&TDqsDataSourceTypeAnnotationTransformer::HandleReadWrap)); AddHandler({TDqReadWideWrap::CallableName()}, Hndl(&TDqsDataSourceTypeAnnotationTransformer::HandleWideReadWrap)); AddHandler({TCoConfigure::CallableName()}, Hndl(&TDqsDataSourceTypeAnnotationTransformer::HandleConfig)); @@ -58,7 +59,7 @@ private: return TStatus::Ok; } - template<bool Wide> + template<bool Wide, bool Blocks> TStatus HandleSourceWrap(const TExprNode::TPtr& input, TExprContext& ctx) { if (!EnsureMinMaxArgsCount(*input, 3U, 4U, ctx)) { return TStatus::Error; @@ -91,6 +92,14 @@ private: TTypeAnnotationNode::TListType types; types.reserve(items.size()); std::transform(items.cbegin(), items.cend(), std::back_inserter(types), std::bind(&TItemExprType::GetItemType, std::placeholders::_1)); + if constexpr (Blocks) { + for (auto& type : types) { + type = ctx.MakeType<TBlockExprType>(type); + } + + types.push_back(ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64))); + } + input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TMultiExprType>(types))); } else { input->SetTypeAnn(ctx.MakeType<TListExprType>(input->Child(TDqSourceWrapBase::idx_RowType)->GetTypeAnn()->Cast<TTypeExprType>()->GetType())); diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.txt index 4c579198440..3c4ff17e5e5 100644 --- a/ydb/library/yql/providers/s3/actors/CMakeLists.txt +++ b/ydb/library/yql/providers/s3/actors/CMakeLists.txt @@ -10,6 +10,7 @@ add_library(providers-s3-actors) target_compile_options(providers-s3-actors PRIVATE -DARCADIA_BUILD + -DUSE_PARQUET -DUSE_CURRENT_UDF_ABI_VERSION ) target_include_directories(providers-s3-actors PRIVATE diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index d5a6759d491..1da4a3809c1 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -20,6 +20,14 @@ #include <ydb/library/yql/udfs/common/clickhouse/client/src/Formats/FormatFactory.h> #include <ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/InputStreamFromInputFormat.h> +#include <ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowBufferedStreams.h> + +#include <arrow/api.h> +#include <arrow/io/api.h> +#include <arrow/status.h> +#include <parquet/arrow/reader.h> +#include <parquet/file_reader.h> + #endif #include "yql_s3_read_actor.h" @@ -85,6 +93,13 @@ #define LOG_CORO_T(name, stream) \ LOG_TRACE_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream) +#define THROW_ARROW_NOT_OK(status) \ + do \ + { \ + if (::arrow::Status _s = (status); !_s.ok()) \ + throw yexception() << _s.ToString(); \ + } while (false) + namespace NYql::NDq { using namespace ::NActors; @@ -108,6 +123,7 @@ struct TEvPrivate { EvReadError, EvRetry, EvNextBlock, + EvNextRecordBatch, EvBlockProcessed, EvEnd @@ -161,6 +177,13 @@ struct TEvPrivate { std::function<void()> Functor; }; + struct TEvNextRecordBatch : public NActors::TEventLocal<TEvNextRecordBatch, EvNextRecordBatch> { + TEvNextRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, size_t pathInd, std::function<void()> functor = []() {}) : Batch(batch), PathIndex(pathInd), Functor(functor) { } + std::shared_ptr<arrow::RecordBatch> Batch; + const size_t PathIndex; + std::function<void()> Functor; + }; + struct TEvBlockProcessed : public NActors::TEventLocal<TEvBlockProcessed, EvBlockProcessed> { TEvBlockProcessed() {} }; @@ -332,7 +355,9 @@ private: struct TReadSpec { using TPtr = std::shared_ptr<TReadSpec>; - NDB::ColumnsWithTypeAndName Columns; + bool Arrow = false; + NDB::ColumnsWithTypeAndName CHColumns; + std::shared_ptr<arrow::Schema> ArrowSchema; NDB::FormatSettings Settings; TString Format, Compression; ui64 SizeLimit = 0; @@ -394,6 +419,68 @@ void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSyste std::bind(&OnDownloadFinished, actorSystem, self, parent, std::placeholders::_1)); } +template <typename T> +class IBatchReader { +public: + virtual ~IBatchReader() = default; + + virtual bool Next(T& value) = 0; +}; + +class TBlockReader : public IBatchReader<NDB::Block> { +public: + TBlockReader(std::unique_ptr<NDB::IBlockInputStream>&& stream) + : Stream(std::move(stream)) + {} + + bool Next(NDB::Block& value) final { + value = Stream->read(); + return !!value; + } + +private: + std::unique_ptr<NDB::IBlockInputStream> Stream; +}; + +class TArrowParquetBatchReader : public IBatchReader<std::shared_ptr<arrow::RecordBatch>> { +public: + TArrowParquetBatchReader(std::unique_ptr<parquet::arrow::FileReader>&& fileReader, std::vector<int>&& columnIndices) + : FileReader(std::move(fileReader)) + , ColumnIndices(std::move(columnIndices)) + , TotalGroups(FileReader->num_row_groups()) + , CurrentGroup(0) + {} + + bool Next(std::shared_ptr<arrow::RecordBatch>& value) final { + for (;;) { + if (CurrentGroup == TotalGroups) { + return false; + } + + if (!CurrentBatchReader) { + THROW_ARROW_NOT_OK(FileReader->ReadRowGroup(CurrentGroup++, ColumnIndices, &CurrentTable)); + CurrentBatchReader = std::make_unique<arrow::TableBatchReader>(*CurrentTable); + } + + THROW_ARROW_NOT_OK(CurrentBatchReader->ReadNext(&value)); + if (value) { + return true; + } + + CurrentBatchReader = nullptr; + CurrentTable = nullptr; + } + } + +private: + std::unique_ptr<parquet::arrow::FileReader> FileReader; + const std::vector<int> ColumnIndices; + const int TotalGroups; + int CurrentGroup; + std::shared_ptr<arrow::Table> CurrentTable; + std::unique_ptr<arrow::TableBatchReader> CurrentBatchReader; +}; + class TS3ReadCoroImpl : public TActorCoroImpl { private: class TReadBufferFromStream : public NDB::ReadBuffer { @@ -517,31 +604,35 @@ private: } else { buffer = std::make_unique<TReadBufferFromStream>(this); } + const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression)); YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression."); - NDB::InputStreamFromInputFormat stream(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : *buffer, NDB::Block(ReadSpec->Columns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings)); - auto actorSystem = GetActorSystem(); - auto selfActorId = SelfActorId; - size_t cntBlocksInFly = 0; - if (isLocal) { - while (auto block = stream.read()) { - if (++cntBlocksInFly > MaxBlocksInFly) { - WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); - --cntBlocksInFly; - } - Send(ParentActorId, new TEvPrivate::TEvNextBlock(block, PathIndex, [actorSystem, selfActorId]() { - actorSystem->Send(new IEventHandle(selfActorId, selfActorId, new TEvPrivate::TEvBlockProcessed())); - })); - } - while (cntBlocksInFly--) { - WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); + if (ReadSpec->Arrow) { + YQL_ENSURE(ReadSpec->Format == "parquet"); + std::unique_ptr<parquet::arrow::FileReader> fileReader; + auto arrowFile = NDB::asArrowFile(decompress ? *decompress : *buffer); + + THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(arrowFile, arrow::default_memory_pool(), &fileReader)); + std::shared_ptr<arrow::Schema> schema; + THROW_ARROW_NOT_OK(fileReader->GetSchema(&schema)); + + std::vector<int> columnIndices; + for (int i = 0; i < ReadSpec->ArrowSchema->num_fields(); ++i) { + const auto& targetField = ReadSpec->ArrowSchema->field(i); + auto srcFieldIndex = schema->GetFieldIndex(targetField->name()); + YQL_ENSURE(srcFieldIndex != -1, "Missing field: " << targetField->name()); + YQL_ENSURE(targetField->type()->Equals(schema->field(srcFieldIndex)->type()), "Mismatch type for field: " << targetField->name() << ", expected: " + << targetField->type()->ToString() << ", got: " << schema->field(srcFieldIndex)->type()->ToString()); + columnIndices.push_back(srcFieldIndex); } + + TArrowParquetBatchReader reader(std::move(fileReader), std::move(columnIndices)); + ProcessBatches<std::shared_ptr<arrow::RecordBatch>, TEvPrivate::TEvNextRecordBatch>(reader, isLocal); } else { - while (auto block = stream.read()) { - Send(ParentActorId, new TEvPrivate::TEvNextBlock(block, PathIndex)); - } + auto stream = std::make_unique<NDB::InputStreamFromInputFormat>(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : *buffer, NDB::Block(ReadSpec->CHColumns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings)); + TBlockReader reader(std::move(stream)); + ProcessBatches<NDB::Block, TEvPrivate::TEvNextBlock>(reader, isLocal); } - } catch (const TS3ReadError&) { // Finish reading. Add error from server to issues } catch (const TDtorException&) { @@ -580,6 +671,41 @@ private: return; } + template <typename T, typename TEv> + void ProcessBatches(IBatchReader<T>& reader, bool isLocal) { + auto actorSystem = GetActorSystem(); + auto selfActorId = SelfActorId; + size_t cntBlocksInFly = 0; + if (isLocal) { + for (;;) { + T batch; + if (!reader.Next(batch)) { + break; + } + + if (++cntBlocksInFly > MaxBlocksInFly) { + WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); + --cntBlocksInFly; + } + Send(ParentActorId, new TEv(batch, PathIndex, [actorSystem, selfActorId]() { + actorSystem->Send(new IEventHandle(selfActorId, selfActorId, new TEvPrivate::TEvBlockProcessed())); + })); + } + while (cntBlocksInFly--) { + WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); + } + } else { + for (;;) { + T batch; + if (!reader.Next(batch)) { + break; + } + + Send(ParentActorId, new TEv(batch, PathIndex)); + } + } + } + void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) final { TStringBuilder message; message << "S3 read. Unexpected message type " << Hex(ev->GetTypeRewrite()); @@ -666,6 +792,35 @@ private: const TRetryStuff::TPtr RetryStuff; }; +ui64 GetSizeOfData(const arrow::ArrayData& data) { + ui64 size = sizeof(data); + size += data.buffers.size() * sizeof(void*); + size += data.child_data.size() * sizeof(void*); + for (const auto& b : data.buffers) { + if (b) { + size += b->size(); + } + } + + for (const auto& c : data.child_data) { + if (c) { + size += GetSizeOfData(*c); + } + } + + return size; +} + +ui64 GetSizeOfBatch(const arrow::RecordBatch& batch) { + ui64 size = sizeof(batch); + size += batch.num_columns() * sizeof(void*); + for (int i = 0; i < batch.num_columns(); ++i) { + size += GetSizeOfData(*batch.column_data(i)); + } + + return size; +} + class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public IDqComputeActorAsyncInput { public: TS3StreamReadActor( @@ -718,8 +873,8 @@ public: private: class TBoxedBlock : public TComputationValue<TBoxedBlock> { public: - TBoxedBlock(TMemoryUsageInfo* memInfo, NDB::Block& block, std::function<void()> functor) - : TComputationValue(memInfo), OnDestroyFunctor(functor) + TBoxedBlock(TMemoryUsageInfo* memInfo, NDB::Block& block) + : TComputationValue(memInfo) { Block.swap(block); } @@ -732,20 +887,15 @@ private: return &Block; } - ~TBoxedBlock() { - if (OnDestroyFunctor) { - OnDestroyFunctor(); - } - } - NDB::Block Block; - std::function<void()> OnDestroyFunctor; }; class TReadyBlock { public: TReadyBlock(TEvPrivate::TEvNextBlock::TPtr& event) : PathInd(event->Get()->PathIndex), Functor (std::move(event->Get()->Functor)) { Block.swap(event->Get()->Block); } + TReadyBlock(TEvPrivate::TEvNextRecordBatch::TPtr& event) : Batch(event->Get()->Batch), PathInd(event->Get()->PathIndex), Functor(std::move(event->Get()->Functor)) {} NDB::Block Block; + std::shared_ptr<arrow::RecordBatch> Batch; size_t PathInd; std::function<void()> Functor; }; @@ -758,9 +908,29 @@ private: i64 GetAsyncInputData(TUnboxedValueVector& output, TMaybe<TInstant>&, bool& finished, i64 free) final { i64 total = 0LL; if (!Blocks.empty()) do { - const i64 s = Blocks.front().Block.bytes(); + const i64 s = ReadSpec->Arrow ? GetSizeOfBatch(*Blocks.front().Batch) : Blocks.front().Block.bytes(); + + NUdf::TUnboxedValue value; + if (ReadSpec->Arrow) { + const auto& batch = *Blocks.front().Batch; + + NUdf::TUnboxedValue* structItems = nullptr; + auto structObj = ArrowRowContainerCache.NewArray(HolderFactory, batch.num_columns(), structItems); + for (int i = 0; i < batch.num_columns(); ++i) { + structItems[i] = HolderFactory.CreateArrowBlock(arrow::Datum(batch.column_data(i))); + } + + NUdf::TUnboxedValue* tupleItems = nullptr; + auto tuple = ArrowTupleContainerCache.NewArray(HolderFactory, 2, tupleItems); + *tupleItems++ = structObj; + *tupleItems++ = HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(batch.num_rows()))); + value = tuple; + } else { + value = HolderFactory.Create<TBoxedBlock>(Blocks.front().Block); + } + + Blocks.front().Functor(); - auto value = HolderFactory.Create<TBoxedBlock>(Blocks.front().Block, Blocks.front().Functor); if (AddPathIndex) { NUdf::TUnboxedValue* tupleItems = nullptr; auto tuple = ContainerCache.NewArray(HolderFactory, 2, tupleItems); @@ -773,11 +943,13 @@ private: total += s; output.emplace_back(std::move(value)); Blocks.pop_front(); - } while (!Blocks.empty() && free > 0LL && Blocks.front().Block.bytes() <= size_t(free)); + } while (!Blocks.empty() && free > 0LL && (ReadSpec->Arrow ? GetSizeOfBatch(*Blocks.front().Batch) : Blocks.front().Block.bytes()) <= size_t(free)); finished = Blocks.empty() && !Count; if (finished) { ContainerCache.Clear(); + ArrowTupleContainerCache.Clear(); + ArrowRowContainerCache.Clear(); } return total; } @@ -799,6 +971,7 @@ private: STRICT_STFUNC(StateFunc, hFunc(TEvPrivate::TEvRetryEventFunc, HandleRetry); hFunc(TEvPrivate::TEvNextBlock, HandleNextBlock); + hFunc(TEvPrivate::TEvNextRecordBatch, HandleNextRecordBatch); cFunc(TEvPrivate::EvReadFinished, HandleReadFinished); ) @@ -807,6 +980,13 @@ private: } void HandleNextBlock(TEvPrivate::TEvNextBlock::TPtr& next) { + YQL_ENSURE(!ReadSpec->Arrow); + Blocks.emplace_back(next); + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); + } + + void HandleNextRecordBatch(TEvPrivate::TEvNextRecordBatch::TPtr& next) { + YQL_ENSURE(ReadSpec->Arrow); Blocks.emplace_back(next); Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); } @@ -829,6 +1009,8 @@ private: std::vector<TRetryStuff::TPtr> RetryStuffForFile; const THolderFactory& HolderFactory; TPlainContainerCache ContainerCache; + TPlainContainerCache ArrowTupleContainerCache; + TPlainContainerCache ArrowRowContainerCache; const ui64 InputIndex; const TTxId TxId; @@ -995,12 +1177,30 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( const auto structType = static_cast<TStructType*>(outputItemType); const auto readSpec = std::make_shared<TReadSpec>(); - readSpec->Columns.resize(structType->GetMembersCount()); - for (ui32 i = 0U; i < structType->GetMembersCount(); ++i) { - auto& column = readSpec->Columns[i]; - column.type = MetaToClickHouse(structType->GetMemberType(i), intervalUnit); - column.name = structType->GetMemberName(i); + readSpec->Arrow = params.GetArrow(); + if (readSpec->Arrow) { + arrow::SchemaBuilder builder; + for (ui32 i = 0U; i < structType->GetMembersCount(); ++i) { + auto memberType = structType->GetMemberType(i); + bool isOptional; + std::shared_ptr<arrow::DataType> dataType; + + YQL_ENSURE(ConvertArrowType(memberType, isOptional, dataType), "Unsupported arrow type"); + THROW_ARROW_NOT_OK(builder.AddField(std::make_shared<arrow::Field>(std::string(structType->GetMemberName(i)), dataType, isOptional))); + } + + auto res = builder.Finish(); + THROW_ARROW_NOT_OK(res.status()); + readSpec->ArrowSchema = std::move(res).ValueOrDie(); + } else { + readSpec->CHColumns.resize(structType->GetMembersCount()); + for (ui32 i = 0U; i < structType->GetMembersCount(); ++i) { + auto& column = readSpec->CHColumns[i]; + column.type = MetaToClickHouse(structType->GetMemberType(i), intervalUnit); + column.name = structType->GetMemberName(i); + } } + readSpec->Format = params.GetFormat(); if (const auto it = settings.find("compression"); settings.cend() != it) diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json index b3741f765fd..124e66bf037 100644 --- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json +++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json @@ -67,6 +67,16 @@ ] }, { + "Name": "TS3ArrowSettings", + "Base": "TS3SourceSettingsBase", + "Match": {"Type": "Callable", "Name": "S3ArrowSettings"}, + "Children": [ + {"Index": 2, "Name": "Format", "Type": "TCoAtom"}, + {"Index": 3, "Name": "RowType", "Type": "TExprBase"}, + {"Index": 4, "Name": "Settings", "Type": "TExprBase", "Optional": true} + ] + }, + { "Name": "TS3Read", "Base": "TFreeArgCallable", "Match": {"Type": "Callable", "Name": "Read!"}, diff --git a/ydb/library/yql/providers/s3/proto/source.proto b/ydb/library/yql/providers/s3/proto/source.proto index 232abb19cd1..3b01e90ec1e 100644 --- a/ydb/library/yql/providers/s3/proto/source.proto +++ b/ydb/library/yql/providers/s3/proto/source.proto @@ -15,4 +15,5 @@ message TSource { optional string RowType = 4; optional string Format = 5; map<string, string> Settings = 6; + bool Arrow = 7; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index be392ce739a..8592c05fb54 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -110,6 +110,7 @@ public: AddHandler({TS3Object::CallableName()}, Hndl(&TSelf::HandleObject)); AddHandler({TS3SourceSettings::CallableName()}, Hndl(&TSelf::HandleS3SourceSettings)); AddHandler({TS3ParseSettings::CallableName()}, Hndl(&TSelf::HandleS3ParseSettings)); + AddHandler({TS3ArrowSettings::CallableName()}, Hndl(&TSelf::HandleS3ArrowSettings)); AddHandler({TCoConfigure::CallableName()}, Hndl(&TSelf::HandleConfig)); } @@ -181,6 +182,59 @@ public: return TStatus::Ok; } + TStatus HandleS3ArrowSettings(const TExprNode::TPtr& input, TExprContext& ctx) { + if (!EnsureMinMaxArgsCount(*input, 4U, 5U, ctx)) { + return TStatus::Error; + } + + const TStructExprType* extraColumnsType = nullptr; + if (!ValidateS3Paths(*input->Child(TS3SourceSettings::idx_Paths), extraColumnsType, ctx)) { + return TStatus::Error; + } + + if (!TCoSecureParam::Match(input->Child(TS3ParseSettings::idx_Token))) { + ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3ParseSettings::idx_Token)->Pos()), TStringBuilder() << "Expected " << TCoSecureParam::CallableName())); + return TStatus::Error; + } + + if (!EnsureAtom(*input->Child(TS3ParseSettings::idx_Format), ctx) || + !NCommon::ValidateFormatForInput(input->Child(TS3ParseSettings::idx_Format)->Content(), ctx)) + { + return TStatus::Error; + } + + const auto& rowTypeNode = *input->Child(TS3ParseSettings::idx_RowType); + if (!EnsureType(rowTypeNode, ctx)) { + return TStatus::Error; + } + + const TTypeAnnotationNode* rowType = rowTypeNode.GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + if (!EnsureStructType(rowTypeNode.Pos(), *rowType, ctx)) { + return TStatus::Error; + } + + if (input->ChildrenSize() > TS3ParseSettings::idx_Settings && !EnsureTuple(*input->Child(TS3ParseSettings::idx_Settings), ctx)) { + return TStatus::Error; + } + + TVector<const TItemExprType*> blockRowTypeItems; + for (const auto& x : rowType->Cast<TStructExprType>()->GetItems()) { + blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(x->GetName(), ctx.MakeType<TBlockExprType>(x->GetItemType()))); + } + + auto blockRowType = ctx.MakeType<TStructExprType>(blockRowTypeItems); + + const TTypeAnnotationNode* itemType = ctx.MakeType<TTupleExprType>( + TTypeAnnotationNode::TListType{ blockRowType, ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64)) }); // struct + block length + + if (extraColumnsType->GetSize()) { + itemType = ctx.MakeType<TTupleExprType>( + TTypeAnnotationNode::TListType{ itemType, ctx.MakeType<TDataExprType>(EDataSlot::Uint64) }); + } + input->SetTypeAnn(ctx.MakeType<TStreamExprType>(itemType)); + return TStatus::Ok; + } + TStatus HandleRead(const TExprNode::TPtr& input, TExprContext& ctx) { if (!EnsureMinMaxArgsCount(*input, 4U, 5U, ctx)) { return TStatus::Error; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index d13849455b3..816bead18e2 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -163,22 +163,50 @@ public: ); } - if (const auto useCoro = State_->Configuration->SourceCoroActor.Get(); (!useCoro || *useCoro) && !s3ReadObject.Object().Format().Ref().IsAtom({"raw", "json_list"})) - return Build<TDqSourceWrap>(ctx, read->Pos()) - .Input<TS3ParseSettings>() - .Paths(s3ReadObject.Object().Paths()) - .Token<TCoSecureParam>() - .Name().Build(token) + auto format = s3ReadObject.Object().Format().Ref().Content(); + if (const auto useCoro = State_->Configuration->SourceCoroActor.Get(); (!useCoro || *useCoro) && format != "raw" && format != "json_list") { + bool supportedArrowTypes = false; + if (State_->Types->UseBlocks && State_->Types->ArrowResolver) { + TVector<const TTypeAnnotationNode*> allTypes; + for (const auto& x : rowType->Cast<TStructExprType>()->GetItems()) { + allTypes.push_back(x->GetItemType()); + } + + YQL_ENSURE(State_->Types->ArrowResolver->AreTypesSupported(ctx.GetPosition(read->Pos()), allTypes, supportedArrowTypes, ctx)); + } + + if (supportedArrowTypes && format == "parquet") { + return Build<TDqSourceWrap>(ctx, read->Pos()) + .Input<TS3ArrowSettings>() + .Paths(s3ReadObject.Object().Paths()) + .Token<TCoSecureParam>() + .Name().Build(token) + .Build() + .Format(s3ReadObject.Object().Format()) + .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx)) + .Settings(s3ReadObject.Object().Settings()) .Build() - .Format(s3ReadObject.Object().Format()) .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx)) - .Settings(s3ReadObject.Object().Settings()) - .Build() - .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx)) - .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>()) - .Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings))) - .Done().Ptr(); - else { + .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>()) + .Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings))) + .Done().Ptr(); + } else { + return Build<TDqSourceWrap>(ctx, read->Pos()) + .Input<TS3ParseSettings>() + .Paths(s3ReadObject.Object().Paths()) + .Token<TCoSecureParam>() + .Name().Build(token) + .Build() + .Format(s3ReadObject.Object().Format()) + .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx)) + .Settings(s3ReadObject.Object().Settings()) + .Build() + .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx)) + .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>()) + .Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings))) + .Done().Ptr(); + } + } else { if (const auto& objectSettings = s3ReadObject.Object().Settings()) { settings.emplace_back( ctx.Builder(objectSettings.Cast().Pos()) @@ -260,6 +288,27 @@ public: if (const auto maySettings = parseSettings.Settings()) { const auto& settings = maySettings.Cast(); for (auto i = 0U; i < settings.Ref().ChildrenSize(); ++i) { + srcDesc.MutableSettings()->insert({ TString(settings.Ref().Child(i)->Head().Content()), TString(settings.Ref().Child(i)->Tail().IsAtom() ? settings.Ref().Child(i)->Tail().Content() : settings.Ref().Child(i)->Tail().Head().Content()) }); + } + } + } else if (const auto mayArrowSettings = settings.Maybe<TS3ArrowSettings>()) { + const auto arrowSettings = mayArrowSettings.Cast(); + srcDesc.SetFormat(arrowSettings.Format().StringValue().c_str()); + srcDesc.SetArrow(true); + + const TStructExprType* fullRowType = arrowSettings.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); + // exclude extra columns to get actual row type we need to read from input + auto rowTypeItems = fullRowType->GetItems(); + EraseIf(rowTypeItems, [extraColumnsType](const auto& item) { return extraColumnsType->FindItem(item->GetName()); }); + { + // TODO: pass context + TExprContext ctx; + srcDesc.SetRowType(NCommon::WriteTypeToYson(ctx.MakeType<TStructExprType>(rowTypeItems), NYT::NYson::EYsonFormat::Text)); + } + + if (const auto maySettings = arrowSettings.Settings()) { + const auto& settings = maySettings.Cast(); + for (auto i = 0U; i < settings.Ref().ChildrenSize(); ++i) { srcDesc.MutableSettings()->insert({TString(settings.Ref().Child(i)->Head().Content()), TString(settings.Ref().Child(i)->Tail().IsAtom() ? settings.Ref().Child(i)->Tail().Content() : settings.Ref().Child(i)->Tail().Head().Content())}); } } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp index 015254f159e..73dc00a056d 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp @@ -466,13 +466,15 @@ public: .Done(); } + const bool isArrowSettings = maybeS3SourceSettings.Cast().CallableName() == TS3ArrowSettings::CallableName(); const TStructExprType* readRowType = - dqSource.Input().Maybe<TS3ParseSettings>().Cast().RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); + (isArrowSettings ? dqSource.Input().Maybe<TS3ArrowSettings>().Cast().RowType().Ref() : dqSource.Input().Maybe<TS3ParseSettings>().Cast().RowType().Ref()) + .GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); TVector<const TItemExprType*> readRowDataItems = readRowType->GetItems(); TVector<const TItemExprType*> outputRowDataItems = outputRowType->GetItems(); - if (auto settings = dqSource.Input().Maybe<TS3ParseSettings>().Cast().Settings()) { + if (auto settings = isArrowSettings ? dqSource.Input().Maybe<TS3ArrowSettings>().Cast().Settings() : dqSource.Input().Maybe<TS3ParseSettings>().Cast().Settings()) { if (auto ps = GetSetting(settings.Cast().Ref(), "partitionedby")) { THashSet<TStringBuf> cols; for (size_t i = 1; i < ps->ChildrenSize(); ++i) { @@ -485,7 +487,7 @@ public: } } - if (outputRowDataItems.size() == 0 && readRowDataItems.size() != 0) { + if (outputRowDataItems.size() == 0 && readRowDataItems.size() != 0 && !isArrowSettings) { const TStructExprType* readRowDataType = ctx.MakeType<TStructExprType>(readRowDataItems); auto item = GetLightColumn(*readRowDataType); YQL_ENSURE(item); @@ -494,16 +496,29 @@ public: readRowType = outputRowType; } - return Build<TDqSourceWrap>(ctx, dqSource.Pos()) - .InitFrom(dqSource) - .Input<TS3ParseSettings>() - .InitFrom(dqSource.Input().Maybe<TS3ParseSettings>().Cast()) - .Paths(newPaths) - .RowType(ExpandType(dqSource.Input().Pos(), *readRowType, ctx)) - .Build() - .RowType(outputRowTypeNode) - .Settings(newSettings) - .Done(); + if (isArrowSettings) { + return Build<TDqSourceWrap>(ctx, dqSource.Pos()) + .InitFrom(dqSource) + .Input<TS3ArrowSettings>() + .InitFrom(dqSource.Input().Maybe<TS3ArrowSettings>().Cast()) + .Paths(newPaths) + .RowType(ExpandType(dqSource.Input().Pos(), *readRowType, ctx)) + .Build() + .RowType(outputRowTypeNode) + .Settings(newSettings) + .Done(); + } else { + return Build<TDqSourceWrap>(ctx, dqSource.Pos()) + .InitFrom(dqSource) + .Input<TS3ParseSettings>() + .InitFrom(dqSource.Input().Maybe<TS3ParseSettings>().Cast()) + .Paths(newPaths) + .RowType(ExpandType(dqSource.Input().Pos(), *readRowType, ctx)) + .Build() + .RowType(outputRowTypeNode) + .Settings(newSettings) + .Done(); + } } TMaybeNode<TExprBase> MergeS3Paths(TExprBase node, TExprContext& ctx) const { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp index 9ff07d6ca33..e6e16121491 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp @@ -135,6 +135,18 @@ void RegisterDqS3MkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, con return TRuntimeNode(); }); + compiler.ChainCallable(TDqSourceWideBlockWrap::CallableName(), + [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { + if (const auto wrapper = TDqSourceWideBlockWrap(&node); wrapper.DataSource().Category().Value() == S3ProviderName) { + const auto wrapped = TryWrapWithParser(wrapper, ctx, true); + if (wrapped) { + return *wrapped; + } + } + + return TRuntimeNode(); + }); + if (!compiler.HasCallable(TS3SinkOutput::CallableName())) compiler.AddCallable(TS3SinkOutput::CallableName(), [state](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { |