aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2022-11-05 16:17:38 +0300
committervvvv <vvvv@ydb.tech>2022-11-05 16:17:38 +0300
commited11e26937c18a86271556c8a515d2c6b22195d5 (patch)
tree913db846997f5490c62c420974f2e6dc04fe09fe
parent83e1a68166c7ef9f594922b976761844581124f4 (diff)
downloadydb-ed11e26937c18a86271556c8a515d2c6b22195d5.tar.gz
use native arrow parquet reader without CH conversions
Запрос вида %% pragma UseBlocks; SELECT count(*) -- EventDate -- RemoteIP FROM `yq-clickbench-local`.`hits_*.parquet` WITH ( format=parquet, SCHEMA ( RemoteIP INTEGER NOT NULL, CounterID INTEGER NOT NULL, --EventDate Date NOT NULL, --EventTime Timestamp NOT NULL ) ) limit 100 ; %% для обычного режима с 100 файлами clickbench 14Гб %% Execution precomputes complete, duration: 5.962375s %% для UseBlocks %% Execution precomputes complete, duration: 0.306933s %% Для UseBlocks но при чтении с S3 %% Execution precomputes complete, duration: 97.178274s %% Без UseBlocks но при чтении с S3 %% Execution precomputes complete, duration: 111.921679s %% Заодно пропадает костыль, что нужно читать хотя бы одну колонку - parquet reader умеет читать 0 колонок, и при этом говорить число строк
-rw-r--r--ydb/library/yql/providers/common/mkql/parser.cpp26
-rw-r--r--ydb/library/yql/providers/common/mkql/parser.h5
-rw-r--r--ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json5
-rw-r--r--ydb/library/yql/providers/dq/mkql/dqs_mkql_compiler.cpp2
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp119
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.h2
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp3
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp2
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp15
-rw-r--r--ydb/library/yql/providers/s3/actors/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp276
-rw-r--r--ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json10
-rw-r--r--ydb/library/yql/providers/s3/proto/source.proto1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp54
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp77
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp41
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp12
17 files changed, 518 insertions, 133 deletions
diff --git a/ydb/library/yql/providers/common/mkql/parser.cpp b/ydb/library/yql/providers/common/mkql/parser.cpp
index 4de60bd7025..4965894e565 100644
--- a/ydb/library/yql/providers/common/mkql/parser.cpp
+++ b/ydb/library/yql/providers/common/mkql/parser.cpp
@@ -146,12 +146,31 @@ TRuntimeNode BuildParseCall(
TType* inputType,
TType* parseItemType,
TType* finalItemType,
- NCommon::TMkqlBuildContext& ctx)
+ NCommon::TMkqlBuildContext& ctx,
+ bool useBlocks)
{
const auto* inputItemType = static_cast<TStreamType*>(inputType)->GetItemType();
const auto* parseItemStructType = static_cast<TStructType*>(parseItemType);
const auto* finalItemStructType = static_cast<TStructType*>(finalItemType);
+ if (useBlocks) {
+ return ctx.ProgramBuilder.ExpandMap(ctx.ProgramBuilder.ToFlow(input), [&](TRuntimeNode item) {
+ MKQL_ENSURE(!extraColumnsByPathIndex && metadataColumns.empty(), "TODO");
+
+ auto structObj = ctx.ProgramBuilder.Nth(item, 0);
+ auto length = ctx.ProgramBuilder.Nth(item, 1);
+ TRuntimeNode::TList fields;
+
+ for (ui32 i = 0; i < finalItemStructType->GetMembersCount(); ++i) {
+ TStringBuf name = finalItemStructType->GetMemberName(i);
+ fields.push_back(ctx.ProgramBuilder.Member(structObj, name));
+ }
+
+ fields.push_back(length);
+ return fields;
+ });
+ }
+
if (!compression.empty()) {
input = WrapWithDecompress(input, inputItemType, compression, ctx);
}
@@ -301,7 +320,7 @@ TRuntimeNode BuildParseCall(
);
}
-TMaybe<TRuntimeNode> TryWrapWithParser(const TDqSourceWideWrap& wrapper, NCommon::TMkqlBuildContext& ctx) {
+TMaybe<TRuntimeNode> TryWrapWithParser(const TDqSourceWrapBase& wrapper, NCommon::TMkqlBuildContext& ctx, bool useBlocks) {
const auto& format = GetFormat(wrapper.Settings().Cast().Ref());
if (!format.Content()) {
return TMaybe<TRuntimeNode>();
@@ -358,7 +377,8 @@ TMaybe<TRuntimeNode> TryWrapWithParser(const TDqSourceWideWrap& wrapper, NCommon
inputType,
parseItemType,
finalItemType,
- ctx);
+ ctx,
+ useBlocks);
}
}
diff --git a/ydb/library/yql/providers/common/mkql/parser.h b/ydb/library/yql/providers/common/mkql/parser.h
index 5ddaaa31417..30d023b2a0a 100644
--- a/ydb/library/yql/providers/common/mkql/parser.h
+++ b/ydb/library/yql/providers/common/mkql/parser.h
@@ -23,8 +23,9 @@ NKikimr::NMiniKQL::TRuntimeNode BuildParseCall(
NKikimr::NMiniKQL::TType* inputType,
NKikimr::NMiniKQL::TType* parseItemType,
NKikimr::NMiniKQL::TType* finalItemType,
- NCommon::TMkqlBuildContext& ctx);
+ NCommon::TMkqlBuildContext& ctx,
+ bool useBlocks = false);
-TMaybe<NKikimr::NMiniKQL::TRuntimeNode> TryWrapWithParser(const NYql::NNodes::TDqSourceWideWrap& wrapper, NCommon::TMkqlBuildContext& ctx);
+TMaybe<NKikimr::NMiniKQL::TRuntimeNode> TryWrapWithParser(const NYql::NNodes::TDqSourceWrapBase& wrapper, NCommon::TMkqlBuildContext& ctx, bool useBlocks = false);
}
diff --git a/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json b/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json
index 566f8d05032..5d7d541b923 100644
--- a/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json
+++ b/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json
@@ -56,6 +56,11 @@
"Name": "TDqSourceWideWrap",
"Base": "TDqSourceWrapBase",
"Match": {"Type": "Callable", "Name": "DqSourceWideWrap"}
+ },
+ {
+ "Name": "TDqSourceWideBlockWrap",
+ "Base": "TDqSourceWrapBase",
+ "Match": {"Type": "Callable", "Name": "DqSourceWideBlockWrap"}
}
]
}
diff --git a/ydb/library/yql/providers/dq/mkql/dqs_mkql_compiler.cpp b/ydb/library/yql/providers/dq/mkql/dqs_mkql_compiler.cpp
index 77820d1d5ed..2bb15a52fd2 100644
--- a/ydb/library/yql/providers/dq/mkql/dqs_mkql_compiler.cpp
+++ b/ydb/library/yql/providers/dq/mkql/dqs_mkql_compiler.cpp
@@ -10,7 +10,7 @@ using namespace NKikimr::NMiniKQL;
using namespace NNodes;
void RegisterDqsMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, const TTypeAnnotationContext& ctx) {
- compiler.AddCallable({TDqSourceWideWrap::CallableName(), TDqReadWideWrap::CallableName()},
+ compiler.AddCallable({TDqSourceWideWrap::CallableName(), TDqSourceWideBlockWrap::CallableName(), TDqReadWideWrap::CallableName()},
[](const TExprNode& node, NCommon::TMkqlBuildContext&) {
YQL_ENSURE(false, "Unsupported reader: " << node.Head().Content());
return TRuntimeNode();
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index 6ccb1e42a75..ed0761fece0 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -17,9 +17,10 @@ using namespace NYql::NNodes;
class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
public:
- TDqsPhysicalOptProposalTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config)
+ TDqsPhysicalOptProposalTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config, bool useBlocks)
: TOptimizeTransformerBase(typeCtx, NLog::EComponent::ProviderDq, {})
, Config(config)
+ , UseBlocks(useBlocks)
{
const bool enablePrecompute = Config->_EnablePrecompute.Get().GetOrElse(false);
@@ -106,62 +107,65 @@ protected:
.Build().Done();
}
const auto& items = GetSeqItemType(wrap.Ref().GetTypeAnn())->Cast<TStructExprType>()->GetItems();
- auto narrow = wrap.Settings() ?
- ctx.Builder(node.Pos())
- .Lambda()
- .Param("source")
- .Callable("NarrowMap")
- .Callable(0, TDqSourceWideWrap::CallableName())
- .Arg(0, "source")
- .Add(1, wrap.DataSource().Ptr())
- .Add(2, wrap.RowType().Ptr())
- .Add(3, wrap.Settings().Cast().Ptr())
- .Seal()
- .Lambda(1)
- .Params("fields", items.size())
- .Callable(TCoAsStruct::CallableName())
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- ui32 i = 0U;
- for (const auto& item : items) {
- parent.List(i)
- .Atom(0, item->GetName())
- .Arg(1, "fields", i)
- .Seal();
- ++i;
- }
- return parent;
- })
- .Seal()
- .Seal()
+ auto sourceArg = ctx.NewArgument(node.Pos(), "source");
+ bool supportsBlocks = false;
+ auto inputType = GetSeqItemType(wrap.Input().Ref().GetTypeAnn());
+ while (inputType->GetKind() == ETypeAnnotationKind::Tuple) {
+ auto tupleType = inputType->Cast<TTupleExprType>();
+ if (tupleType->GetSize() == 2 && tupleType->GetItems()[1]->GetKind() == ETypeAnnotationKind::Scalar) {
+ supportsBlocks = true;
+ break;
+ }
+
+ inputType = tupleType->GetItems()[0];
+ }
+
+ auto wideWrap = ctx.Builder(node.Pos())
+ .Callable(UseBlocks && supportsBlocks ? TDqSourceWideBlockWrap::CallableName() : TDqSourceWideWrap::CallableName())
+ .Add(0, sourceArg)
+ .Add(1, wrap.DataSource().Ptr())
+ .Add(2, wrap.RowType().Ptr())
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ if (wrap.Settings()) {
+ parent.Add(3, wrap.Settings().Cast().Ptr());
+ }
+
+ return parent;
+ })
+ .Seal()
+ .Build();
+
+ if (UseBlocks && supportsBlocks) {
+ wideWrap = ctx.Builder(node.Pos())
+ .Callable("WideFromBlocks")
+ .Add(0, wideWrap)
.Seal()
- .Seal().Build():
- ctx.Builder(node.Pos())
- .Lambda()
- .Param("source")
- .Callable("NarrowMap")
- .Callable(0, TDqSourceWideWrap::CallableName())
- .Arg(0, "source")
- .Add(1, wrap.DataSource().Ptr())
- .Add(2, wrap.RowType().Ptr())
- .Seal()
- .Lambda(1)
- .Params("fields", items.size())
- .Callable(TCoAsStruct::CallableName())
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- ui32 i = 0U;
- for (const auto& item : items) {
- parent.List(i)
- .Atom(0, item->GetName())
- .Arg(1, "fields", i)
- .Seal();
- ++i;
- }
- return parent;
- })
- .Seal()
+ .Build();
+ }
+
+ auto narrow = ctx.Builder(node.Pos())
+ .Callable("NarrowMap")
+ .Add(0, wideWrap)
+ .Lambda(1)
+ .Params("fields", items.size())
+ .Callable(TCoAsStruct::CallableName())
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ ui32 i = 0U;
+ for (const auto& item : items) {
+ parent.List(i)
+ .Atom(0, item->GetName())
+ .Arg(1, "fields", i)
+ .Seal();
+ ++i;
+ }
+ return parent;
+ })
.Seal()
.Seal()
- .Seal().Build();
+ .Seal()
+ .Build();
+
+ auto program = ctx.NewLambda(node.Pos(), ctx.NewArguments(node.Pos(), { sourceArg }), std::move(narrow));
return Build<TDqCnUnionAll>(ctx, node.Pos())
.Output()
@@ -172,7 +176,7 @@ protected:
.Settings(wrap.Input())
.Build()
.Build()
- .Program(narrow)
+ .Program(program)
.Settings(TDqStageSettings().BuildNode(ctx, node.Pos()))
.Build()
.Index().Build("0")
@@ -353,10 +357,11 @@ protected:
private:
TDqConfiguration::TPtr Config;
+ const bool UseBlocks;
};
-THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config) {
- return THolder(new TDqsPhysicalOptProposalTransformer(typeCtx, config));
+THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config, bool useBlocks) {
+ return THolder(new TDqsPhysicalOptProposalTransformer(typeCtx, config, useBlocks));
}
} // NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.h b/ydb/library/yql/providers/dq/opt/physical_optimize.h
index 5ed9da98807..ab20f334966 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.h
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.h
@@ -8,6 +8,6 @@
namespace NYql::NDqs {
-THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config);
+THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config, bool useBlocks);
} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index 6ee4e054cbe..6e5b852c0ce 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -810,6 +810,7 @@ private:
YQL_CLOG(DEBUG, ProviderDq) << state->SessionId << " WrapFutureCallback";
auto duration = TInstant::Now() - startTime;
+ YQL_CLOG(INFO, ProviderDq) << "Execution Result complete, duration: " << duration;
if (state->Metrics) {
state->Metrics->SetCounter("dq", "TotalExecutionTime", duration.MilliSeconds());
state->Metrics->SetCounter(
@@ -1164,6 +1165,7 @@ private:
// TODO: remove copy-paste
return WrapFutureCallback<false>(future, [settings, startTime, localRun, type, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
auto duration = TInstant::Now() - startTime;
+ YQL_CLOG(INFO, ProviderDq) << "Execution Pull complete, duration: " << duration;
if (state->Metrics) {
state->Metrics->SetCounter("dq", "TotalExecutionTime", duration.MilliSeconds());
state->Metrics->SetCounter(
@@ -1613,6 +1615,7 @@ private:
MarkProgressFinished(publicIds->AllPublicIds, res.Success(), state->ProgressWriter);
auto duration = TInstant::Now() - startTime;
+ YQL_CLOG(INFO, ProviderDq) << "Execution precomputes complete, duration: " << duration;
if (state->Metrics) {
state->Metrics->SetCounter("dq", "PrecomputeExecutionTime", duration.MilliSeconds());
}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
index b136bf2ab18..f8fdd0b63e1 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
@@ -28,7 +28,7 @@ public:
TDqDataProviderSink(const TDqStatePtr& state)
: State(state)
, LogOptTransformer([state] () { return CreateDqsLogOptTransformer(state->TypeCtx, state->Settings); })
- , PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(/*TODO*/nullptr, state->Settings); })
+ , PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(/*TODO*/nullptr, state->Settings, state->TypeCtx->UseBlocks ); })
, PhysicalFinalizingTransformer([] () { return CreateDqsFinalizingOptTransformer(); })
, TypeAnnotationTransformer([state] () {
return CreateDqsDataSinkTypeAnnotationTransformer(
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp
index 6e03017ccb1..1fa68c32beb 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp
@@ -18,8 +18,9 @@ public:
TDqsDataSourceTypeAnnotationTransformer()
: TVisitorTransformerBase(true)
{
- AddHandler({TDqSourceWrap::CallableName()}, Hndl(&TDqsDataSourceTypeAnnotationTransformer::HandleSourceWrap<false>));
- AddHandler({TDqSourceWideWrap::CallableName()}, Hndl(&TDqsDataSourceTypeAnnotationTransformer::HandleSourceWrap<true>));
+ AddHandler({TDqSourceWrap::CallableName()}, Hndl(&TDqsDataSourceTypeAnnotationTransformer::HandleSourceWrap<false, false>));
+ AddHandler({TDqSourceWideWrap::CallableName()}, Hndl(&TDqsDataSourceTypeAnnotationTransformer::HandleSourceWrap<true, false>));
+ AddHandler({TDqSourceWideBlockWrap::CallableName()}, Hndl(&TDqsDataSourceTypeAnnotationTransformer::HandleSourceWrap<true, true>));
AddHandler({TDqReadWrap::CallableName()}, Hndl(&TDqsDataSourceTypeAnnotationTransformer::HandleReadWrap));
AddHandler({TDqReadWideWrap::CallableName()}, Hndl(&TDqsDataSourceTypeAnnotationTransformer::HandleWideReadWrap));
AddHandler({TCoConfigure::CallableName()}, Hndl(&TDqsDataSourceTypeAnnotationTransformer::HandleConfig));
@@ -58,7 +59,7 @@ private:
return TStatus::Ok;
}
- template<bool Wide>
+ template<bool Wide, bool Blocks>
TStatus HandleSourceWrap(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureMinMaxArgsCount(*input, 3U, 4U, ctx)) {
return TStatus::Error;
@@ -91,6 +92,14 @@ private:
TTypeAnnotationNode::TListType types;
types.reserve(items.size());
std::transform(items.cbegin(), items.cend(), std::back_inserter(types), std::bind(&TItemExprType::GetItemType, std::placeholders::_1));
+ if constexpr (Blocks) {
+ for (auto& type : types) {
+ type = ctx.MakeType<TBlockExprType>(type);
+ }
+
+ types.push_back(ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64)));
+ }
+
input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TMultiExprType>(types)));
} else {
input->SetTypeAnn(ctx.MakeType<TListExprType>(input->Child(TDqSourceWrapBase::idx_RowType)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()));
diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.txt
index 4c579198440..3c4ff17e5e5 100644
--- a/ydb/library/yql/providers/s3/actors/CMakeLists.txt
+++ b/ydb/library/yql/providers/s3/actors/CMakeLists.txt
@@ -10,6 +10,7 @@
add_library(providers-s3-actors)
target_compile_options(providers-s3-actors PRIVATE
-DARCADIA_BUILD
+ -DUSE_PARQUET
-DUSE_CURRENT_UDF_ABI_VERSION
)
target_include_directories(providers-s3-actors PRIVATE
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index d5a6759d491..1da4a3809c1 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -20,6 +20,14 @@
#include <ydb/library/yql/udfs/common/clickhouse/client/src/Formats/FormatFactory.h>
#include <ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/InputStreamFromInputFormat.h>
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/Processors/Formats/Impl/ArrowBufferedStreams.h>
+
+#include <arrow/api.h>
+#include <arrow/io/api.h>
+#include <arrow/status.h>
+#include <parquet/arrow/reader.h>
+#include <parquet/file_reader.h>
+
#endif
#include "yql_s3_read_actor.h"
@@ -85,6 +93,13 @@
#define LOG_CORO_T(name, stream) \
LOG_TRACE_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream)
+#define THROW_ARROW_NOT_OK(status) \
+ do \
+ { \
+ if (::arrow::Status _s = (status); !_s.ok()) \
+ throw yexception() << _s.ToString(); \
+ } while (false)
+
namespace NYql::NDq {
using namespace ::NActors;
@@ -108,6 +123,7 @@ struct TEvPrivate {
EvReadError,
EvRetry,
EvNextBlock,
+ EvNextRecordBatch,
EvBlockProcessed,
EvEnd
@@ -161,6 +177,13 @@ struct TEvPrivate {
std::function<void()> Functor;
};
+ struct TEvNextRecordBatch : public NActors::TEventLocal<TEvNextRecordBatch, EvNextRecordBatch> {
+ TEvNextRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, size_t pathInd, std::function<void()> functor = []() {}) : Batch(batch), PathIndex(pathInd), Functor(functor) { }
+ std::shared_ptr<arrow::RecordBatch> Batch;
+ const size_t PathIndex;
+ std::function<void()> Functor;
+ };
+
struct TEvBlockProcessed : public NActors::TEventLocal<TEvBlockProcessed, EvBlockProcessed> {
TEvBlockProcessed() {}
};
@@ -332,7 +355,9 @@ private:
struct TReadSpec {
using TPtr = std::shared_ptr<TReadSpec>;
- NDB::ColumnsWithTypeAndName Columns;
+ bool Arrow = false;
+ NDB::ColumnsWithTypeAndName CHColumns;
+ std::shared_ptr<arrow::Schema> ArrowSchema;
NDB::FormatSettings Settings;
TString Format, Compression;
ui64 SizeLimit = 0;
@@ -394,6 +419,68 @@ void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSyste
std::bind(&OnDownloadFinished, actorSystem, self, parent, std::placeholders::_1));
}
+template <typename T>
+class IBatchReader {
+public:
+ virtual ~IBatchReader() = default;
+
+ virtual bool Next(T& value) = 0;
+};
+
+class TBlockReader : public IBatchReader<NDB::Block> {
+public:
+ TBlockReader(std::unique_ptr<NDB::IBlockInputStream>&& stream)
+ : Stream(std::move(stream))
+ {}
+
+ bool Next(NDB::Block& value) final {
+ value = Stream->read();
+ return !!value;
+ }
+
+private:
+ std::unique_ptr<NDB::IBlockInputStream> Stream;
+};
+
+class TArrowParquetBatchReader : public IBatchReader<std::shared_ptr<arrow::RecordBatch>> {
+public:
+ TArrowParquetBatchReader(std::unique_ptr<parquet::arrow::FileReader>&& fileReader, std::vector<int>&& columnIndices)
+ : FileReader(std::move(fileReader))
+ , ColumnIndices(std::move(columnIndices))
+ , TotalGroups(FileReader->num_row_groups())
+ , CurrentGroup(0)
+ {}
+
+ bool Next(std::shared_ptr<arrow::RecordBatch>& value) final {
+ for (;;) {
+ if (CurrentGroup == TotalGroups) {
+ return false;
+ }
+
+ if (!CurrentBatchReader) {
+ THROW_ARROW_NOT_OK(FileReader->ReadRowGroup(CurrentGroup++, ColumnIndices, &CurrentTable));
+ CurrentBatchReader = std::make_unique<arrow::TableBatchReader>(*CurrentTable);
+ }
+
+ THROW_ARROW_NOT_OK(CurrentBatchReader->ReadNext(&value));
+ if (value) {
+ return true;
+ }
+
+ CurrentBatchReader = nullptr;
+ CurrentTable = nullptr;
+ }
+ }
+
+private:
+ std::unique_ptr<parquet::arrow::FileReader> FileReader;
+ const std::vector<int> ColumnIndices;
+ const int TotalGroups;
+ int CurrentGroup;
+ std::shared_ptr<arrow::Table> CurrentTable;
+ std::unique_ptr<arrow::TableBatchReader> CurrentBatchReader;
+};
+
class TS3ReadCoroImpl : public TActorCoroImpl {
private:
class TReadBufferFromStream : public NDB::ReadBuffer {
@@ -517,31 +604,35 @@ private:
} else {
buffer = std::make_unique<TReadBufferFromStream>(this);
}
+
const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression));
YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression.");
- NDB::InputStreamFromInputFormat stream(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : *buffer, NDB::Block(ReadSpec->Columns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings));
- auto actorSystem = GetActorSystem();
- auto selfActorId = SelfActorId;
- size_t cntBlocksInFly = 0;
- if (isLocal) {
- while (auto block = stream.read()) {
- if (++cntBlocksInFly > MaxBlocksInFly) {
- WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>();
- --cntBlocksInFly;
- }
- Send(ParentActorId, new TEvPrivate::TEvNextBlock(block, PathIndex, [actorSystem, selfActorId]() {
- actorSystem->Send(new IEventHandle(selfActorId, selfActorId, new TEvPrivate::TEvBlockProcessed()));
- }));
- }
- while (cntBlocksInFly--) {
- WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>();
+ if (ReadSpec->Arrow) {
+ YQL_ENSURE(ReadSpec->Format == "parquet");
+ std::unique_ptr<parquet::arrow::FileReader> fileReader;
+ auto arrowFile = NDB::asArrowFile(decompress ? *decompress : *buffer);
+
+ THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(arrowFile, arrow::default_memory_pool(), &fileReader));
+ std::shared_ptr<arrow::Schema> schema;
+ THROW_ARROW_NOT_OK(fileReader->GetSchema(&schema));
+
+ std::vector<int> columnIndices;
+ for (int i = 0; i < ReadSpec->ArrowSchema->num_fields(); ++i) {
+ const auto& targetField = ReadSpec->ArrowSchema->field(i);
+ auto srcFieldIndex = schema->GetFieldIndex(targetField->name());
+ YQL_ENSURE(srcFieldIndex != -1, "Missing field: " << targetField->name());
+ YQL_ENSURE(targetField->type()->Equals(schema->field(srcFieldIndex)->type()), "Mismatch type for field: " << targetField->name() << ", expected: "
+ << targetField->type()->ToString() << ", got: " << schema->field(srcFieldIndex)->type()->ToString());
+ columnIndices.push_back(srcFieldIndex);
}
+
+ TArrowParquetBatchReader reader(std::move(fileReader), std::move(columnIndices));
+ ProcessBatches<std::shared_ptr<arrow::RecordBatch>, TEvPrivate::TEvNextRecordBatch>(reader, isLocal);
} else {
- while (auto block = stream.read()) {
- Send(ParentActorId, new TEvPrivate::TEvNextBlock(block, PathIndex));
- }
+ auto stream = std::make_unique<NDB::InputStreamFromInputFormat>(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : *buffer, NDB::Block(ReadSpec->CHColumns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings));
+ TBlockReader reader(std::move(stream));
+ ProcessBatches<NDB::Block, TEvPrivate::TEvNextBlock>(reader, isLocal);
}
-
} catch (const TS3ReadError&) {
// Finish reading. Add error from server to issues
} catch (const TDtorException&) {
@@ -580,6 +671,41 @@ private:
return;
}
+ template <typename T, typename TEv>
+ void ProcessBatches(IBatchReader<T>& reader, bool isLocal) {
+ auto actorSystem = GetActorSystem();
+ auto selfActorId = SelfActorId;
+ size_t cntBlocksInFly = 0;
+ if (isLocal) {
+ for (;;) {
+ T batch;
+ if (!reader.Next(batch)) {
+ break;
+ }
+
+ if (++cntBlocksInFly > MaxBlocksInFly) {
+ WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>();
+ --cntBlocksInFly;
+ }
+ Send(ParentActorId, new TEv(batch, PathIndex, [actorSystem, selfActorId]() {
+ actorSystem->Send(new IEventHandle(selfActorId, selfActorId, new TEvPrivate::TEvBlockProcessed()));
+ }));
+ }
+ while (cntBlocksInFly--) {
+ WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>();
+ }
+ } else {
+ for (;;) {
+ T batch;
+ if (!reader.Next(batch)) {
+ break;
+ }
+
+ Send(ParentActorId, new TEv(batch, PathIndex));
+ }
+ }
+ }
+
void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) final {
TStringBuilder message;
message << "S3 read. Unexpected message type " << Hex(ev->GetTypeRewrite());
@@ -666,6 +792,35 @@ private:
const TRetryStuff::TPtr RetryStuff;
};
+ui64 GetSizeOfData(const arrow::ArrayData& data) {
+ ui64 size = sizeof(data);
+ size += data.buffers.size() * sizeof(void*);
+ size += data.child_data.size() * sizeof(void*);
+ for (const auto& b : data.buffers) {
+ if (b) {
+ size += b->size();
+ }
+ }
+
+ for (const auto& c : data.child_data) {
+ if (c) {
+ size += GetSizeOfData(*c);
+ }
+ }
+
+ return size;
+}
+
+ui64 GetSizeOfBatch(const arrow::RecordBatch& batch) {
+ ui64 size = sizeof(batch);
+ size += batch.num_columns() * sizeof(void*);
+ for (int i = 0; i < batch.num_columns(); ++i) {
+ size += GetSizeOfData(*batch.column_data(i));
+ }
+
+ return size;
+}
+
class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public IDqComputeActorAsyncInput {
public:
TS3StreamReadActor(
@@ -718,8 +873,8 @@ public:
private:
class TBoxedBlock : public TComputationValue<TBoxedBlock> {
public:
- TBoxedBlock(TMemoryUsageInfo* memInfo, NDB::Block& block, std::function<void()> functor)
- : TComputationValue(memInfo), OnDestroyFunctor(functor)
+ TBoxedBlock(TMemoryUsageInfo* memInfo, NDB::Block& block)
+ : TComputationValue(memInfo)
{
Block.swap(block);
}
@@ -732,20 +887,15 @@ private:
return &Block;
}
- ~TBoxedBlock() {
- if (OnDestroyFunctor) {
- OnDestroyFunctor();
- }
- }
-
NDB::Block Block;
- std::function<void()> OnDestroyFunctor;
};
class TReadyBlock {
public:
TReadyBlock(TEvPrivate::TEvNextBlock::TPtr& event) : PathInd(event->Get()->PathIndex), Functor (std::move(event->Get()->Functor)) { Block.swap(event->Get()->Block); }
+ TReadyBlock(TEvPrivate::TEvNextRecordBatch::TPtr& event) : Batch(event->Get()->Batch), PathInd(event->Get()->PathIndex), Functor(std::move(event->Get()->Functor)) {}
NDB::Block Block;
+ std::shared_ptr<arrow::RecordBatch> Batch;
size_t PathInd;
std::function<void()> Functor;
};
@@ -758,9 +908,29 @@ private:
i64 GetAsyncInputData(TUnboxedValueVector& output, TMaybe<TInstant>&, bool& finished, i64 free) final {
i64 total = 0LL;
if (!Blocks.empty()) do {
- const i64 s = Blocks.front().Block.bytes();
+ const i64 s = ReadSpec->Arrow ? GetSizeOfBatch(*Blocks.front().Batch) : Blocks.front().Block.bytes();
+
+ NUdf::TUnboxedValue value;
+ if (ReadSpec->Arrow) {
+ const auto& batch = *Blocks.front().Batch;
+
+ NUdf::TUnboxedValue* structItems = nullptr;
+ auto structObj = ArrowRowContainerCache.NewArray(HolderFactory, batch.num_columns(), structItems);
+ for (int i = 0; i < batch.num_columns(); ++i) {
+ structItems[i] = HolderFactory.CreateArrowBlock(arrow::Datum(batch.column_data(i)));
+ }
+
+ NUdf::TUnboxedValue* tupleItems = nullptr;
+ auto tuple = ArrowTupleContainerCache.NewArray(HolderFactory, 2, tupleItems);
+ *tupleItems++ = structObj;
+ *tupleItems++ = HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(batch.num_rows())));
+ value = tuple;
+ } else {
+ value = HolderFactory.Create<TBoxedBlock>(Blocks.front().Block);
+ }
+
+ Blocks.front().Functor();
- auto value = HolderFactory.Create<TBoxedBlock>(Blocks.front().Block, Blocks.front().Functor);
if (AddPathIndex) {
NUdf::TUnboxedValue* tupleItems = nullptr;
auto tuple = ContainerCache.NewArray(HolderFactory, 2, tupleItems);
@@ -773,11 +943,13 @@ private:
total += s;
output.emplace_back(std::move(value));
Blocks.pop_front();
- } while (!Blocks.empty() && free > 0LL && Blocks.front().Block.bytes() <= size_t(free));
+ } while (!Blocks.empty() && free > 0LL && (ReadSpec->Arrow ? GetSizeOfBatch(*Blocks.front().Batch) : Blocks.front().Block.bytes()) <= size_t(free));
finished = Blocks.empty() && !Count;
if (finished) {
ContainerCache.Clear();
+ ArrowTupleContainerCache.Clear();
+ ArrowRowContainerCache.Clear();
}
return total;
}
@@ -799,6 +971,7 @@ private:
STRICT_STFUNC(StateFunc,
hFunc(TEvPrivate::TEvRetryEventFunc, HandleRetry);
hFunc(TEvPrivate::TEvNextBlock, HandleNextBlock);
+ hFunc(TEvPrivate::TEvNextRecordBatch, HandleNextRecordBatch);
cFunc(TEvPrivate::EvReadFinished, HandleReadFinished);
)
@@ -807,6 +980,13 @@ private:
}
void HandleNextBlock(TEvPrivate::TEvNextBlock::TPtr& next) {
+ YQL_ENSURE(!ReadSpec->Arrow);
+ Blocks.emplace_back(next);
+ Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
+ }
+
+ void HandleNextRecordBatch(TEvPrivate::TEvNextRecordBatch::TPtr& next) {
+ YQL_ENSURE(ReadSpec->Arrow);
Blocks.emplace_back(next);
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
}
@@ -829,6 +1009,8 @@ private:
std::vector<TRetryStuff::TPtr> RetryStuffForFile;
const THolderFactory& HolderFactory;
TPlainContainerCache ContainerCache;
+ TPlainContainerCache ArrowTupleContainerCache;
+ TPlainContainerCache ArrowRowContainerCache;
const ui64 InputIndex;
const TTxId TxId;
@@ -995,12 +1177,30 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const auto structType = static_cast<TStructType*>(outputItemType);
const auto readSpec = std::make_shared<TReadSpec>();
- readSpec->Columns.resize(structType->GetMembersCount());
- for (ui32 i = 0U; i < structType->GetMembersCount(); ++i) {
- auto& column = readSpec->Columns[i];
- column.type = MetaToClickHouse(structType->GetMemberType(i), intervalUnit);
- column.name = structType->GetMemberName(i);
+ readSpec->Arrow = params.GetArrow();
+ if (readSpec->Arrow) {
+ arrow::SchemaBuilder builder;
+ for (ui32 i = 0U; i < structType->GetMembersCount(); ++i) {
+ auto memberType = structType->GetMemberType(i);
+ bool isOptional;
+ std::shared_ptr<arrow::DataType> dataType;
+
+ YQL_ENSURE(ConvertArrowType(memberType, isOptional, dataType), "Unsupported arrow type");
+ THROW_ARROW_NOT_OK(builder.AddField(std::make_shared<arrow::Field>(std::string(structType->GetMemberName(i)), dataType, isOptional)));
+ }
+
+ auto res = builder.Finish();
+ THROW_ARROW_NOT_OK(res.status());
+ readSpec->ArrowSchema = std::move(res).ValueOrDie();
+ } else {
+ readSpec->CHColumns.resize(structType->GetMembersCount());
+ for (ui32 i = 0U; i < structType->GetMembersCount(); ++i) {
+ auto& column = readSpec->CHColumns[i];
+ column.type = MetaToClickHouse(structType->GetMemberType(i), intervalUnit);
+ column.name = structType->GetMemberName(i);
+ }
}
+
readSpec->Format = params.GetFormat();
if (const auto it = settings.find("compression"); settings.cend() != it)
diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
index b3741f765fd..124e66bf037 100644
--- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
+++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
@@ -67,6 +67,16 @@
]
},
{
+ "Name": "TS3ArrowSettings",
+ "Base": "TS3SourceSettingsBase",
+ "Match": {"Type": "Callable", "Name": "S3ArrowSettings"},
+ "Children": [
+ {"Index": 2, "Name": "Format", "Type": "TCoAtom"},
+ {"Index": 3, "Name": "RowType", "Type": "TExprBase"},
+ {"Index": 4, "Name": "Settings", "Type": "TExprBase", "Optional": true}
+ ]
+ },
+ {
"Name": "TS3Read",
"Base": "TFreeArgCallable",
"Match": {"Type": "Callable", "Name": "Read!"},
diff --git a/ydb/library/yql/providers/s3/proto/source.proto b/ydb/library/yql/providers/s3/proto/source.proto
index 232abb19cd1..3b01e90ec1e 100644
--- a/ydb/library/yql/providers/s3/proto/source.proto
+++ b/ydb/library/yql/providers/s3/proto/source.proto
@@ -15,4 +15,5 @@ message TSource {
optional string RowType = 4;
optional string Format = 5;
map<string, string> Settings = 6;
+ bool Arrow = 7;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
index be392ce739a..8592c05fb54 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
@@ -110,6 +110,7 @@ public:
AddHandler({TS3Object::CallableName()}, Hndl(&TSelf::HandleObject));
AddHandler({TS3SourceSettings::CallableName()}, Hndl(&TSelf::HandleS3SourceSettings));
AddHandler({TS3ParseSettings::CallableName()}, Hndl(&TSelf::HandleS3ParseSettings));
+ AddHandler({TS3ArrowSettings::CallableName()}, Hndl(&TSelf::HandleS3ArrowSettings));
AddHandler({TCoConfigure::CallableName()}, Hndl(&TSelf::HandleConfig));
}
@@ -181,6 +182,59 @@ public:
return TStatus::Ok;
}
+ TStatus HandleS3ArrowSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
+ if (!EnsureMinMaxArgsCount(*input, 4U, 5U, ctx)) {
+ return TStatus::Error;
+ }
+
+ const TStructExprType* extraColumnsType = nullptr;
+ if (!ValidateS3Paths(*input->Child(TS3SourceSettings::idx_Paths), extraColumnsType, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!TCoSecureParam::Match(input->Child(TS3ParseSettings::idx_Token))) {
+ ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3ParseSettings::idx_Token)->Pos()), TStringBuilder() << "Expected " << TCoSecureParam::CallableName()));
+ return TStatus::Error;
+ }
+
+ if (!EnsureAtom(*input->Child(TS3ParseSettings::idx_Format), ctx) ||
+ !NCommon::ValidateFormatForInput(input->Child(TS3ParseSettings::idx_Format)->Content(), ctx))
+ {
+ return TStatus::Error;
+ }
+
+ const auto& rowTypeNode = *input->Child(TS3ParseSettings::idx_RowType);
+ if (!EnsureType(rowTypeNode, ctx)) {
+ return TStatus::Error;
+ }
+
+ const TTypeAnnotationNode* rowType = rowTypeNode.GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ if (!EnsureStructType(rowTypeNode.Pos(), *rowType, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (input->ChildrenSize() > TS3ParseSettings::idx_Settings && !EnsureTuple(*input->Child(TS3ParseSettings::idx_Settings), ctx)) {
+ return TStatus::Error;
+ }
+
+ TVector<const TItemExprType*> blockRowTypeItems;
+ for (const auto& x : rowType->Cast<TStructExprType>()->GetItems()) {
+ blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(x->GetName(), ctx.MakeType<TBlockExprType>(x->GetItemType())));
+ }
+
+ auto blockRowType = ctx.MakeType<TStructExprType>(blockRowTypeItems);
+
+ const TTypeAnnotationNode* itemType = ctx.MakeType<TTupleExprType>(
+ TTypeAnnotationNode::TListType{ blockRowType, ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64)) }); // struct + block length
+
+ if (extraColumnsType->GetSize()) {
+ itemType = ctx.MakeType<TTupleExprType>(
+ TTypeAnnotationNode::TListType{ itemType, ctx.MakeType<TDataExprType>(EDataSlot::Uint64) });
+ }
+ input->SetTypeAnn(ctx.MakeType<TStreamExprType>(itemType));
+ return TStatus::Ok;
+ }
+
TStatus HandleRead(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureMinMaxArgsCount(*input, 4U, 5U, ctx)) {
return TStatus::Error;
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index d13849455b3..816bead18e2 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -163,22 +163,50 @@ public:
);
}
- if (const auto useCoro = State_->Configuration->SourceCoroActor.Get(); (!useCoro || *useCoro) && !s3ReadObject.Object().Format().Ref().IsAtom({"raw", "json_list"}))
- return Build<TDqSourceWrap>(ctx, read->Pos())
- .Input<TS3ParseSettings>()
- .Paths(s3ReadObject.Object().Paths())
- .Token<TCoSecureParam>()
- .Name().Build(token)
+ auto format = s3ReadObject.Object().Format().Ref().Content();
+ if (const auto useCoro = State_->Configuration->SourceCoroActor.Get(); (!useCoro || *useCoro) && format != "raw" && format != "json_list") {
+ bool supportedArrowTypes = false;
+ if (State_->Types->UseBlocks && State_->Types->ArrowResolver) {
+ TVector<const TTypeAnnotationNode*> allTypes;
+ for (const auto& x : rowType->Cast<TStructExprType>()->GetItems()) {
+ allTypes.push_back(x->GetItemType());
+ }
+
+ YQL_ENSURE(State_->Types->ArrowResolver->AreTypesSupported(ctx.GetPosition(read->Pos()), allTypes, supportedArrowTypes, ctx));
+ }
+
+ if (supportedArrowTypes && format == "parquet") {
+ return Build<TDqSourceWrap>(ctx, read->Pos())
+ .Input<TS3ArrowSettings>()
+ .Paths(s3ReadObject.Object().Paths())
+ .Token<TCoSecureParam>()
+ .Name().Build(token)
+ .Build()
+ .Format(s3ReadObject.Object().Format())
+ .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx))
+ .Settings(s3ReadObject.Object().Settings())
.Build()
- .Format(s3ReadObject.Object().Format())
.RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx))
- .Settings(s3ReadObject.Object().Settings())
- .Build()
- .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx))
- .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>())
- .Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings)))
- .Done().Ptr();
- else {
+ .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>())
+ .Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings)))
+ .Done().Ptr();
+ } else {
+ return Build<TDqSourceWrap>(ctx, read->Pos())
+ .Input<TS3ParseSettings>()
+ .Paths(s3ReadObject.Object().Paths())
+ .Token<TCoSecureParam>()
+ .Name().Build(token)
+ .Build()
+ .Format(s3ReadObject.Object().Format())
+ .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx))
+ .Settings(s3ReadObject.Object().Settings())
+ .Build()
+ .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx))
+ .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>())
+ .Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings)))
+ .Done().Ptr();
+ }
+ } else {
if (const auto& objectSettings = s3ReadObject.Object().Settings()) {
settings.emplace_back(
ctx.Builder(objectSettings.Cast().Pos())
@@ -260,6 +288,27 @@ public:
if (const auto maySettings = parseSettings.Settings()) {
const auto& settings = maySettings.Cast();
for (auto i = 0U; i < settings.Ref().ChildrenSize(); ++i) {
+ srcDesc.MutableSettings()->insert({ TString(settings.Ref().Child(i)->Head().Content()), TString(settings.Ref().Child(i)->Tail().IsAtom() ? settings.Ref().Child(i)->Tail().Content() : settings.Ref().Child(i)->Tail().Head().Content()) });
+ }
+ }
+ } else if (const auto mayArrowSettings = settings.Maybe<TS3ArrowSettings>()) {
+ const auto arrowSettings = mayArrowSettings.Cast();
+ srcDesc.SetFormat(arrowSettings.Format().StringValue().c_str());
+ srcDesc.SetArrow(true);
+
+ const TStructExprType* fullRowType = arrowSettings.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
+ // exclude extra columns to get actual row type we need to read from input
+ auto rowTypeItems = fullRowType->GetItems();
+ EraseIf(rowTypeItems, [extraColumnsType](const auto& item) { return extraColumnsType->FindItem(item->GetName()); });
+ {
+ // TODO: pass context
+ TExprContext ctx;
+ srcDesc.SetRowType(NCommon::WriteTypeToYson(ctx.MakeType<TStructExprType>(rowTypeItems), NYT::NYson::EYsonFormat::Text));
+ }
+
+ if (const auto maySettings = arrowSettings.Settings()) {
+ const auto& settings = maySettings.Cast();
+ for (auto i = 0U; i < settings.Ref().ChildrenSize(); ++i) {
srcDesc.MutableSettings()->insert({TString(settings.Ref().Child(i)->Head().Content()), TString(settings.Ref().Child(i)->Tail().IsAtom() ? settings.Ref().Child(i)->Tail().Content() : settings.Ref().Child(i)->Tail().Head().Content())});
}
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
index 015254f159e..73dc00a056d 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
@@ -466,13 +466,15 @@ public:
.Done();
}
+ const bool isArrowSettings = maybeS3SourceSettings.Cast().CallableName() == TS3ArrowSettings::CallableName();
const TStructExprType* readRowType =
- dqSource.Input().Maybe<TS3ParseSettings>().Cast().RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
+ (isArrowSettings ? dqSource.Input().Maybe<TS3ArrowSettings>().Cast().RowType().Ref() : dqSource.Input().Maybe<TS3ParseSettings>().Cast().RowType().Ref())
+ .GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
TVector<const TItemExprType*> readRowDataItems = readRowType->GetItems();
TVector<const TItemExprType*> outputRowDataItems = outputRowType->GetItems();
- if (auto settings = dqSource.Input().Maybe<TS3ParseSettings>().Cast().Settings()) {
+ if (auto settings = isArrowSettings ? dqSource.Input().Maybe<TS3ArrowSettings>().Cast().Settings() : dqSource.Input().Maybe<TS3ParseSettings>().Cast().Settings()) {
if (auto ps = GetSetting(settings.Cast().Ref(), "partitionedby")) {
THashSet<TStringBuf> cols;
for (size_t i = 1; i < ps->ChildrenSize(); ++i) {
@@ -485,7 +487,7 @@ public:
}
}
- if (outputRowDataItems.size() == 0 && readRowDataItems.size() != 0) {
+ if (outputRowDataItems.size() == 0 && readRowDataItems.size() != 0 && !isArrowSettings) {
const TStructExprType* readRowDataType = ctx.MakeType<TStructExprType>(readRowDataItems);
auto item = GetLightColumn(*readRowDataType);
YQL_ENSURE(item);
@@ -494,16 +496,29 @@ public:
readRowType = outputRowType;
}
- return Build<TDqSourceWrap>(ctx, dqSource.Pos())
- .InitFrom(dqSource)
- .Input<TS3ParseSettings>()
- .InitFrom(dqSource.Input().Maybe<TS3ParseSettings>().Cast())
- .Paths(newPaths)
- .RowType(ExpandType(dqSource.Input().Pos(), *readRowType, ctx))
- .Build()
- .RowType(outputRowTypeNode)
- .Settings(newSettings)
- .Done();
+ if (isArrowSettings) {
+ return Build<TDqSourceWrap>(ctx, dqSource.Pos())
+ .InitFrom(dqSource)
+ .Input<TS3ArrowSettings>()
+ .InitFrom(dqSource.Input().Maybe<TS3ArrowSettings>().Cast())
+ .Paths(newPaths)
+ .RowType(ExpandType(dqSource.Input().Pos(), *readRowType, ctx))
+ .Build()
+ .RowType(outputRowTypeNode)
+ .Settings(newSettings)
+ .Done();
+ } else {
+ return Build<TDqSourceWrap>(ctx, dqSource.Pos())
+ .InitFrom(dqSource)
+ .Input<TS3ParseSettings>()
+ .InitFrom(dqSource.Input().Maybe<TS3ParseSettings>().Cast())
+ .Paths(newPaths)
+ .RowType(ExpandType(dqSource.Input().Pos(), *readRowType, ctx))
+ .Build()
+ .RowType(outputRowTypeNode)
+ .Settings(newSettings)
+ .Done();
+ }
}
TMaybeNode<TExprBase> MergeS3Paths(TExprBase node, TExprContext& ctx) const {
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
index 9ff07d6ca33..e6e16121491 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
@@ -135,6 +135,18 @@ void RegisterDqS3MkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, con
return TRuntimeNode();
});
+ compiler.ChainCallable(TDqSourceWideBlockWrap::CallableName(),
+ [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
+ if (const auto wrapper = TDqSourceWideBlockWrap(&node); wrapper.DataSource().Category().Value() == S3ProviderName) {
+ const auto wrapped = TryWrapWithParser(wrapper, ctx, true);
+ if (wrapped) {
+ return *wrapped;
+ }
+ }
+
+ return TRuntimeNode();
+ });
+
if (!compiler.HasCallable(TS3SinkOutput::CallableName()))
compiler.AddCallable(TS3SinkOutput::CallableName(),
[state](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {