diff options
12 files changed, 608 insertions, 158 deletions
diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp index ba3db690f87..ef3adf31c1d 100644 --- a/ydb/library/yql/core/facade/yql_facade.cpp +++ b/ydb/library/yql/core/facade/yql_facade.cpp @@ -273,6 +273,7 @@ TProgram::TProgram( , Modules_(modules) , ExprRoot_(nullptr) , SessionId_(sessionId) + , ResultType_(IDataProvider::EResultFormat::Yson) , ResultFormat_(NYson::EYsonFormat::Binary) , OutputFormat_(NYson::EYsonFormat::Pretty) , EnableRangeComputeFor_(enableRangeComputeFor) @@ -1566,7 +1567,7 @@ TTypeAnnotationContextPtr TProgram::BuildTypeAnnotationContext(const TString& us auto resultFormat = ResultFormat_; auto writerFactory = [resultFormat] () { return CreateYsonResultWriter(resultFormat); }; ResultProviderConfig_ = MakeIntrusive<TResultProviderConfig>(*typeAnnotationContext, - *FunctionRegistry_, IDataProvider::EResultFormat::Yson, ToString((ui32)resultFormat), writerFactory); + *FunctionRegistry_, ResultType_, ToString((ui32)resultFormat), writerFactory); ResultProviderConfig_->SupportsResultPosition = SupportsResultPosition_; auto resultProvider = CreateResultProvider(ResultProviderConfig_); typeAnnotationContext->AddDataSink(ResultProviderName, resultProvider); diff --git a/ydb/library/yql/core/facade/yql_facade.h b/ydb/library/yql/core/facade/yql_facade.h index d68f36162aa..3f1fdb0f285 100644 --- a/ydb/library/yql/core/facade/yql_facade.h +++ b/ydb/library/yql/core/facade/yql_facade.h @@ -240,6 +240,10 @@ public: DiagnosticFormat_ = format; } + void SetResultType(IDataProvider::EResultFormat type) { + ResultType_ = type; + } + TMaybe<TString> GetDiagnostics(); IGraphTransformer::TStatistics GetRawDiagnostics(); @@ -409,6 +413,7 @@ private: TAutoPtr<IGraphTransformer> Transformer_; TIntrusivePtr<TResultProviderConfig> ResultProviderConfig_; bool SupportsResultPosition_ = false; + IDataProvider::EResultFormat ResultType_; NYson::EYsonFormat ResultFormat_; NYson::EYsonFormat OutputFormat_; TMaybe<NYson::EYsonFormat> DiagnosticFormat_; diff --git a/ydb/library/yql/core/yql_data_provider.h b/ydb/library/yql/core/yql_data_provider.h index 95d7aee773d..259d1817bbb 100644 --- a/ydb/library/yql/core/yql_data_provider.h +++ b/ydb/library/yql/core/yql_data_provider.h @@ -87,7 +87,8 @@ public: enum class EResultFormat { Yson, - Custom + Custom, + Skiff }; // settings for result data provider diff --git a/ydb/library/yql/providers/yt/codec/yt_codec_io.cpp b/ydb/library/yql/providers/yt/codec/yt_codec_io.cpp index 450ade03b6e..34b9fc29839 100644 --- a/ydb/library/yql/providers/yt/codec/yt_codec_io.cpp +++ b/ydb/library/yql/providers/yt/codec/yt_codec_io.cpp @@ -1689,17 +1689,32 @@ protected: bool Optional; }; - static TVector<TField> GetFields(TStructType* type) { + static TVector<TField> GetFields(TStructType* type, const TVector<TString>& columns = {}) { TVector<TField> res; - res.reserve(type->GetMembersCount()); + + THashMap<TString, ui32> columnsOrder; + if (columns.size() > 0) { + for (ui32 index = 0; index < columns.size(); index++) { + columnsOrder[columns[index]] = index; + } + res.resize(columns.size()); + } else { + res.reserve(type->GetMembersCount()); + } + for (ui32 index = 0; index < type->GetMembersCount(); ++index) { + auto name = type->GetMemberName(index); auto fieldType = type->GetMemberType(index); const bool isOptional = fieldType->IsOptional(); if (isOptional) { fieldType = static_cast<TOptionalType*>(fieldType)->GetItemType(); } - auto name = type->GetMemberName(index); - res.push_back(TField{name, fieldType, isOptional}); + + if (!columnsOrder.empty() && columnsOrder.contains(name)) { + res[columnsOrder[name]] = TField{name, fieldType, isOptional}; + } else { + res.push_back(TField{name, fieldType, isOptional}); + } } return res; } @@ -1802,10 +1817,10 @@ protected: class TSkiffEncoder: public TSkiffEncoderBase { public: - TSkiffEncoder(TOutputBuf& buf, const TMkqlIOSpecs& specs, size_t tableIndex) + TSkiffEncoder(TOutputBuf& buf, const TMkqlIOSpecs& specs, size_t tableIndex, const TVector<TString>& columns) : TSkiffEncoderBase(buf, specs) { - Fields_ = GetFields(Specs_.Outputs[tableIndex].RowType); + Fields_ = GetFields(Specs_.Outputs[tableIndex].RowType, columns); NativeYtTypeFlags_ = Specs_.Outputs[tableIndex].NativeYtTypeFlags; } @@ -1952,7 +1967,10 @@ TMkqlWriterImpl::TMkqlWriterImpl(NYT::TRawTableWriterPtr stream, size_t blockSiz TMkqlWriterImpl::~TMkqlWriterImpl() { } -void TMkqlWriterImpl::SetSpecs(const TMkqlIOSpecs& specs) { +void TMkqlWriterImpl::SetSpecs(const TMkqlIOSpecs& specs, const TVector<TString>& columns) { + // In the case of the "skiff" format, the ordering of columns in data and in spec are assumed to be identical during encoding + // To provide this, a "columns" field is used to change the alphabetical order of columns in spec + Specs_ = &specs; JobStats_ = specs.JobStats_; @@ -1969,8 +1987,14 @@ void TMkqlWriterImpl::SetSpecs(const TMkqlIOSpecs& specs) { const auto writer1 = MakeYtCodecCgWriter<false>(Codegen_, rowType); const auto writer2 = MakeYtCodecCgWriter<true>(Codegen_, rowType); + for (ui32 index = 0; index < rowType->GetMembersCount(); ++index) { - auto fieldType = rowType->GetMemberType(index); + ui32 column = index; + if (columns) { + column = rowType->GetMemberIndex(columns[index]); + } + + auto fieldType = rowType->GetMemberType(column); writer1->AddField(fieldType, Specs_->Outputs[i].NativeYtTypeFlags); writer2->AddField(fieldType, Specs_->Outputs[i].NativeYtTypeFlags); } @@ -1994,6 +2018,7 @@ void TMkqlWriterImpl::SetSpecs(const TMkqlIOSpecs& specs) { out->Buf_.SetStats(JobStats_); if (Specs_->UseSkiff_) { if (Specs_->Outputs[i].RowType->GetMembersCount() == 0) { + YQL_ENSURE(columns.empty()); Encoders_.emplace_back(new TSkiffEmptySchemaEncoder(out->Buf_, *Specs_)); } #ifndef MKQL_DISABLE_CODEGEN @@ -2005,9 +2030,10 @@ void TMkqlWriterImpl::SetSpecs(const TMkqlIOSpecs& specs) { } #endif else { - Encoders_.emplace_back(new TSkiffEncoder(out->Buf_, *Specs_, i)); + Encoders_.emplace_back(new TSkiffEncoder(out->Buf_, *Specs_, i, columns)); } } else { + YQL_ENSURE(columns.empty()); Encoders_.emplace_back(new TYsonEncoder(out->Buf_, *Specs_, i)); } } diff --git a/ydb/library/yql/providers/yt/codec/yt_codec_io.h b/ydb/library/yql/providers/yt/codec/yt_codec_io.h index bdc5af326b5..bef3620aed0 100644 --- a/ydb/library/yql/providers/yt/codec/yt_codec_io.h +++ b/ydb/library/yql/providers/yt/codec/yt_codec_io.h @@ -107,7 +107,7 @@ public: ~TMkqlWriterImpl(); - void SetSpecs(const TMkqlIOSpecs& specs); + void SetSpecs(const TMkqlIOSpecs& specs, const TVector<TString>& columns = {}); void AddRow(const NKikimr::NUdf::TUnboxedValuePod row) override; void AddFlatRow(const NUdf::TUnboxedValuePod* row) override; diff --git a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp index 4ba3dc13634..071ac579d2e 100644 --- a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp +++ b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp @@ -1190,7 +1190,7 @@ private: const TBindTerminator bind(compGraph->GetTerminator()); compGraph->Prepare(); - TExecuteResOrPull resultData(options.FillSettings().RowsLimitPerWrite, + TYsonExecuteResOrPull resultData(options.FillSettings().RowsLimitPerWrite, options.FillSettings().AllResultsBytesLimit, MakeMaybe(columns)); resultData.WriteValue(compGraph->GetValue(), data.GetStaticType()); diff --git a/ydb/library/yql/providers/yt/gateway/lib/yt_helpers.cpp b/ydb/library/yql/providers/yt/gateway/lib/yt_helpers.cpp index 316a8803069..2ff27baaef7 100644 --- a/ydb/library/yql/providers/yt/gateway/lib/yt_helpers.cpp +++ b/ydb/library/yql/providers/yt/gateway/lib/yt_helpers.cpp @@ -287,7 +287,7 @@ static bool IterateRows(NYT::ITransactionPtr tx, NYT::TRichYPath path, ui32 tableIndex, TMkqlIOCache& specsCache, - TExecuteResOrPull& exec, + IExecuteResOrPull& exec, const TTableLimiter& limiter, const TMaybe<TSampleParams>& sampling) { @@ -350,7 +350,7 @@ bool IterateYamredRows(NYT::ITransactionPtr tx, const NYT::TRichYPath& table, ui32 tableIndex, TMkqlIOCache& specsCache, - TExecuteResOrPull& exec, + IExecuteResOrPull& exec, const TTableLimiter& limiter, const TMaybe<TSampleParams>& sampling) { @@ -361,7 +361,7 @@ bool IterateYsonRows(NYT::ITransactionPtr tx, const NYT::TRichYPath& table, ui32 tableIndex, TMkqlIOCache& specsCache, - TExecuteResOrPull& exec, + IExecuteResOrPull& exec, const TTableLimiter& limiter, const TMaybe<TSampleParams>& sampling) { @@ -372,7 +372,7 @@ bool SelectRows(NYT::IClientPtr client, const TString& table, ui32 tableIndex, TMkqlIOCache& specsCache, - TExecuteResOrPull& exec, + IExecuteResOrPull& exec, TTableLimiter& limiter) { ui64 startRecordInTable = limiter.GetTableStart(); diff --git a/ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h b/ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h index 7c5c3cce638..75ad22ddf43 100644 --- a/ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h +++ b/ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h @@ -21,7 +21,7 @@ namespace NYql { class TMkqlIOCache; -class TExecuteResOrPull; +class IExecuteResOrPull; class TTableLimiter; struct TYqlOperationOptions; @@ -41,11 +41,11 @@ void TransferTableAttributes(const NYT::TNode& attributes, const std::function<v NYT::TNode FilterYqlAttributes(const NYT::TNode& attributes); bool IterateYamredRows(NYT::ITransactionPtr tx, const NYT::TRichYPath& table, ui32 tableIndex, TMkqlIOCache& specsCache, - TExecuteResOrPull& exec, const TTableLimiter& limiter, const TMaybe<TSampleParams>& sampling = {}); + IExecuteResOrPull& exec, const TTableLimiter& limiter, const TMaybe<TSampleParams>& sampling = {}); bool IterateYsonRows(NYT::ITransactionPtr tx, const NYT::TRichYPath& table, ui32 tableIndex, TMkqlIOCache& specsCache, - TExecuteResOrPull& exec, const TTableLimiter& limiter, const TMaybe<TSampleParams>& sampling = {}); + IExecuteResOrPull& exec, const TTableLimiter& limiter, const TMaybe<TSampleParams>& sampling = {}); bool SelectRows(NYT::IClientPtr client, const TString& table, ui32 tableIndex, TMkqlIOCache& specsCache, - TExecuteResOrPull& exec, TTableLimiter& limiter); + IExecuteResOrPull& exec, TTableLimiter& limiter); NYT::TNode YqlOpOptionsToSpec(const TYqlOperationOptions& opOpts, const TString& userName, const TVector<std::pair<TString, TString>>& code = {}); NYT::TNode YqlOpOptionsToAttrs(const TYqlOperationOptions& opOpts); diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp index b25b9c7370b..e84e1db8cfb 100644 --- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -18,6 +18,7 @@ #include <ydb/library/yql/providers/yt/lib/infer_schema/infer_schema.h> #include <ydb/library/yql/providers/yt/lib/infer_schema/infer_schema_rpc.h> #include <ydb/library/yql/providers/yt/lib/schema/schema.h> +#include <ydb/library/yql/providers/yt/lib/skiff/yql_skiff_schema.h> #include <ydb/library/yql/providers/yt/common/yql_names.h> #include <ydb/library/yql/providers/yt/common/yql_configuration.h> #include <ydb/library/yql/providers/yt/job/yql_job_base.h> @@ -1245,36 +1246,68 @@ private: return false; } - THolder<TNodeResultBuilder> operator()() const { + THolder<TNodeResultBuilder> Create(const TCodecContext& codecCtx, const NKikimr::NMiniKQL::THolderFactory& holderFactory) const { + Y_UNUSED(codecCtx); + Y_UNUSED(holderFactory); + + return Create(); + } + + THolder<TNodeResultBuilder> Create() const { return MakeHolder<TNodeResultBuilder>(); } }; - class TExprResultBuilder: public TExecuteResOrPull { + class TYsonExprResultFactory { public: - TExprResultBuilder(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, const TVector<TString>& columns) - : TExecuteResOrPull(rowLimit, byteLimit, MakeMaybe(columns)) + using TResult = std::pair<TString, bool>; + + TYsonExprResultFactory(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, const TVector<TString>& columns, bool hasListResult) + : RowLimit_(rowLimit) + , ByteLimit_(byteLimit) + , Columns_(columns) + , HasListResult_(hasListResult) { } - bool WriteNext(const NYT::TNode& item) { - return TExecuteResOrPull::WriteNext(NYT::NodeToYsonString(item, NYT::NYson::EYsonFormat::Binary)); + bool UseResultYson() const { + return true; + } + + THolder<TYsonExecuteResOrPull> Create(TCodecContext& codecCtx, const NKikimr::NMiniKQL::THolderFactory& holderFactory) const { + Y_UNUSED(codecCtx); + Y_UNUSED(holderFactory); + + return Create(); } - std::pair<TString, bool> Make() { - return {Finish(), IsTruncated()}; + THolder<TYsonExecuteResOrPull> Create() const { + THolder<TYsonExecuteResOrPull> res; + + res = MakeHolder<TYsonExecuteResOrPull>(RowLimit_, ByteLimit_, Columns_); + if (HasListResult_) { + res->SetListResult(); + } + return res; } + + private: + const TMaybe<ui64> RowLimit_; + const TMaybe<ui64> ByteLimit_; + const TVector<TString> Columns_; + const bool HasListResult_; }; - class TExprResultFactory { + class TSkiffExprResultFactory { public: using TResult = std::pair<TString, bool>; - TExprResultFactory(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, const TVector<TString>& columns, bool hasListResult) + TSkiffExprResultFactory(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, bool hasListResult, const NYT::TNode& attrs, const TString& optLLVM) : RowLimit_(rowLimit) , ByteLimit_(byteLimit) - , Columns_(columns) , HasListResult_(hasListResult) + , Attrs_(attrs) + , OptLLVM_(optLLVM) { } @@ -1282,18 +1315,26 @@ private: return true; } - THolder<TExprResultBuilder> operator()() const { - auto res = MakeHolder<TExprResultBuilder>(RowLimit_, ByteLimit_, Columns_); + THolder<TSkiffExecuteResOrPull> Create(TCodecContext& codecCtx, const NKikimr::NMiniKQL::THolderFactory& holderFactory) const { + THolder<TSkiffExecuteResOrPull> res; + + res = MakeHolder<TSkiffExecuteResOrPull>(RowLimit_, ByteLimit_, codecCtx, holderFactory, Attrs_, OptLLVM_); if (HasListResult_) { res->SetListResult(); } + return res; } + + THolder<TSkiffExecuteResOrPull> Create() const { + YQL_ENSURE(false, "Unexpected skiff result builder creation"); + } private: const TMaybe<ui64> RowLimit_; const TMaybe<ui64> ByteLimit_; - const TVector<TString> Columns_; const bool HasListResult_; + const NYT::TNode Attrs_; + const TString OptLLVM_; }; static TFinalizeResult ExecFinalize(const TSession::TPtr& session, bool abort) { @@ -2595,6 +2636,26 @@ private: return result; } + static std::pair<TString, NYT::TNode> ParseYTType(const TExprNode& node, + TExprContext& ctx, + const TExecContext<TResOrPullOptions>::TPtr& execCtx, + const TMaybe<NYql::TColumnOrder>& columns = Nothing()) + { + const auto sequenceItemType = GetSequenceItemType(node.Pos(), node.GetTypeAnn(), false, ctx); + + auto rowSpecInfo = MakeIntrusive<TYqlRowSpecInfo>(); + rowSpecInfo->SetType(sequenceItemType->Cast<TStructExprType>(), execCtx->Options_.Config()->UseNativeYtTypes.Get().GetOrElse(DEFAULT_USE_NATIVE_YT_TYPES) ? NTCF_ALL : NTCF_NONE); + if (columns) { + rowSpecInfo->SetColumnOrder(columns); + } + + NYT::TNode tableSpec = NYT::TNode::CreateMap(); + rowSpecInfo->FillCodecNode(tableSpec[YqlRowSpecAttribute]); + + auto resultYTType = NodeToYsonString(RowSpecToYTSchema(tableSpec[YqlRowSpecAttribute], execCtx->Options_.Config()->NativeYtTypeCompatibility.Get(execCtx->Cluster_).GetOrElse(NTCF_LEGACY)).ToNode()); + auto resultRowSpec = NYT::TNode::CreateMap()(TString{YqlIOSpecTables}, NYT::TNode::CreateList().Add(tableSpec)); + return {resultYTType, resultRowSpec}; + } TFuture<TResOrPullResult> DoPull(const TSession::TPtr& session, NNodes::TPull pull, TExprContext& ctx, TResOrPullOptions&& options) { if (options.FillSettings().Discard) { @@ -2628,7 +2689,13 @@ private: } TString type; - if (NCommon::HasResOrPullOption(pull.Ref(), "type")) { + NYT::TNode rowSpec; + if (execCtx->Options_.FillSettings().Format == IDataProvider::EResultFormat::Skiff) { + auto ytType = ParseYTType(pull.Input().Ref(), ctx, execCtx, columns); + + type = ytType.first; + rowSpec = ytType.second; + } else if (NCommon::HasResOrPullOption(pull.Ref(), "type")) { TStringStream typeYson; ::NYson::TYsonWriter typeWriter(&typeYson); NCommon::WriteResOrPullType(typeWriter, pull.Input().Ref().GetTypeAnn(), columns); @@ -2637,14 +2704,19 @@ private: auto pos = ctx.GetPosition(pull.Pos()); - return session->Queue_->Async([type, ref, range, autoRef, execCtx, columns, pos] () { + return session->Queue_->Async([rowSpec, type, ref, range, autoRef, execCtx, columns, pos] () { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); execCtx->MakeUserFiles(); try { TResOrPullResult res; TStringStream out; - ::NYson::TYsonWriter writer(&out, NCommon::GetYsonFormat(execCtx->Options_.FillSettings()), ::NYson::EYsonType::Node, false); + + auto fillSettings = execCtx->Options_.FillSettings(); + fillSettings.Format = IDataProvider::EResultFormat::Yson; + + ::NYson::TYsonWriter writer(&out, NCommon::GetYsonFormat(fillSettings), ::NYson::EYsonType::Node, false); writer.OnBeginMap(); + if (type) { writer.OnKeyedItem("Type"); writer.OnRaw(type); @@ -2652,8 +2724,9 @@ private: bool truncated = false; if (!ref) { - truncated = ExecPull(execCtx, writer, range, columns); + truncated = ExecPull(execCtx, writer, range, rowSpec, columns); } + if (ref || (truncated && autoRef)) { writer.OnKeyedItem("Ref"); writer.OnBeginList(); @@ -2692,6 +2765,7 @@ private: static bool ExecPull(const TExecContext<TResOrPullOptions>::TPtr& execCtx, ::NYson::TYsonWriter& writer, const TRecordsRange& range, + const NYT::TNode& rowSpec, const TVector<TString>& columns) { TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), @@ -2719,67 +2793,109 @@ private: const auto nativeTypeCompat = execCtx->Options_.Config()->NativeYtTypeCompatibility.Get(execCtx->Cluster_).GetOrElse(NTCF_LEGACY); specs.Init(codecCtx, execCtx->GetInputSpec(!useSkiff, nativeTypeCompat, false), tables, columns); - TExecuteResOrPull pullData(execCtx->Options_.FillSettings().RowsLimitPerWrite, - execCtx->Options_.FillSettings().AllResultsBytesLimit, MakeMaybe(columns)); - TMkqlIOCache specsCache(specs, holderFactory); + auto run = [&] (IExecuteResOrPull& pullData) { + TMkqlIOCache specsCache(specs, holderFactory); - if (testRun) { - YQL_ENSURE(execCtx->InputTables_.size() == 1U, "Support single input only."); - const auto itI = TestTables.find(execCtx->InputTables_.front().Path.Path_); - YQL_ENSURE(TestTables.cend() != itI); + if (testRun) { + YQL_ENSURE(execCtx->InputTables_.size() == 1U, "Support single input only."); + const auto itI = TestTables.find(execCtx->InputTables_.front().Path.Path_); + YQL_ENSURE(TestTables.cend() != itI); - TMkqlInput input(MakeStringInput(std::move(itI->second.second), false)); - TMkqlReaderImpl reader(input, 0, 4 << 10, 0); - reader.SetSpecs(specs, holderFactory); - for (reader.Next(); reader.IsValid(); reader.Next()) { - if (!pullData.WriteNext(specsCache, reader.GetRow(), 0)) { - return true; - } - } - } else if (auto limiter = TTableLimiter(range)) { - auto entry = execCtx->GetEntry(); - bool stop = false; - for (size_t i = 0; i < execCtx->InputTables_.size(); ++i) { - TString srcTableName = execCtx->InputTables_[i].Name; - NYT::TRichYPath srcTable = execCtx->InputTables_[i].Path; - bool isDynamic = execCtx->InputTables_[i].Dynamic; - ui64 recordsCount = execCtx->InputTables_[i].Records; - if (!isDynamic) { - if (!limiter.NextTable(recordsCount)) { - continue; + TMkqlInput input(MakeStringInput(std::move(itI->second.second), false)); + TMkqlReaderImpl reader(input, 0, 4 << 10, 0); + reader.SetSpecs(specs, holderFactory); + for (reader.Next(); reader.IsValid(); reader.Next()) { + if (!pullData.WriteNext(specsCache, reader.GetRow(), 0)) { + return true; } - } else { - limiter.NextDynamicTable(); } - - if (isDynamic) { - YQL_ENSURE(srcTable.GetRanges().Empty()); - stop = NYql::SelectRows(entry->Client, srcTableName, i, specsCache, pullData, limiter); - } else { - auto readTx = entry->Tx; - if (srcTable.TransactionId_) { - readTx = entry->GetSnapshotTx(*srcTable.TransactionId_); - srcTable.TransactionId_.Clear(); + } else if (auto limiter = TTableLimiter(range)) { + auto entry = execCtx->GetEntry(); + bool stop = false; + for (size_t i = 0; i < execCtx->InputTables_.size(); ++i) { + TString srcTableName = execCtx->InputTables_[i].Name; + NYT::TRichYPath srcTable = execCtx->InputTables_[i].Path; + bool isDynamic = execCtx->InputTables_[i].Dynamic; + ui64 recordsCount = execCtx->InputTables_[i].Records; + if (!isDynamic) { + if (!limiter.NextTable(recordsCount)) { + continue; + } + } else { + limiter.NextDynamicTable(); } - if (execCtx->YamrInput) { - stop = NYql::IterateYamredRows(readTx, srcTable, i, specsCache, pullData, limiter, execCtx->Sampling); + + if (isDynamic) { + YQL_ENSURE(srcTable.GetRanges().Empty()); + stop = NYql::SelectRows(entry->Client, srcTableName, i, specsCache, pullData, limiter); } else { - stop = NYql::IterateYsonRows(readTx, srcTable, i, specsCache, pullData, limiter, execCtx->Sampling); + auto readTx = entry->Tx; + if (srcTable.TransactionId_) { + readTx = entry->GetSnapshotTx(*srcTable.TransactionId_); + srcTable.TransactionId_.Clear(); + } + if (execCtx->YamrInput) { + stop = NYql::IterateYamredRows(readTx, srcTable, i, specsCache, pullData, limiter, execCtx->Sampling); + } else { + stop = NYql::IterateYsonRows(readTx, srcTable, i, specsCache, pullData, limiter, execCtx->Sampling); + } + } + if (stop || limiter.Exceed()) { + break; } } - if (stop || limiter.Exceed()) { - break; + } + return false; + }; + + switch (execCtx->Options_.FillSettings().Format) { + case IDataProvider::EResultFormat::Yson: { + TYsonExecuteResOrPull pullData(execCtx->Options_.FillSettings().RowsLimitPerWrite, + execCtx->Options_.FillSettings().AllResultsBytesLimit, MakeMaybe(columns)); + + if (run(pullData)) { + return true; } + specs.Clear(); + + writer.OnKeyedItem("Data"); + writer.OnBeginList(); + writer.OnRaw(pullData.Finish(), ::NYson::EYsonType::ListFragment); + writer.OnEndList(); + return pullData.IsTruncated(); } - } + case IDataProvider::EResultFormat::Skiff: { + THashMap<TString, ui32> structColumns; + for (size_t index = 0; index < columns.size(); index++) { + structColumns.emplace(columns[index], index); + } + + auto skiffNode = SingleTableSpecToInputSkiff(rowSpec[YqlIOSpecTables][0], structColumns, false, false, false); - specs.Clear(); - writer.OnKeyedItem("Data"); - writer.OnBeginList(); // Pull returns list fragment - writer.OnRaw(pullData.Finish(), ::NYson::EYsonType::ListFragment); - writer.OnEndList(); + writer.OnKeyedItem("SkiffType"); + writer.OnRaw(NodeToYsonString(skiffNode), ::NYson::EYsonType::Node); - return pullData.IsTruncated(); + TSkiffExecuteResOrPull pullData(execCtx->Options_.FillSettings().RowsLimitPerWrite, + execCtx->Options_.FillSettings().AllResultsBytesLimit, + codecCtx, + holderFactory, + rowSpec, + execCtx->Options_.OptLLVM(), + columns); + + if (run(pullData)) { + return true; + } + specs.Clear(); + + writer.OnKeyedItem("Data"); + writer.OnStringScalar(pullData.Finish()); + + return pullData.IsTruncated(); + } + default: + YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << execCtx->Options_.FillSettings().Format; + } } TFuture<TResOrPullResult> DoResult(const TSession::TPtr& session, NNodes::TResult result, TExprContext& ctx, TResOrPullOptions&& options) { @@ -2799,15 +2915,8 @@ private: hasListResult = rootNode.GetStaticType()->IsList(); lambda = SerializeRuntimeNode(rootNode, builder.GetTypeEnvironment()); } - auto extraUsage = ScanExtraResourceUsage(result.Input().Ref(), *options.Config()); - TString type; - if (NCommon::HasResOrPullOption(result.Ref(), "type")) { - TStringStream typeYson; - ::NYson::TYsonWriter typeWriter(&typeYson); - NCommon::WriteResOrPullType(typeWriter, result.Input().Ref().GetTypeAnn(), columns); - type = typeYson.Str(); - } + auto extraUsage = ScanExtraResourceUsage(result.Input().Ref(), *options.Config()); TString cluster = options.UsedCluster(); if (cluster.empty()) { @@ -2823,30 +2932,92 @@ private: auto execCtx = MakeExecCtx(std::move(options), session, cluster, result.Input().Raw(), &ctx); auto pos = ctx.GetPosition(result.Pos()); - return session->Queue_->Async([lambda, hasListResult, extraUsage, tmpTablePath, execCtx, columns] () { + TString type, skiffType; + NYT::TNode rowSpec; + if (execCtx->Options_.FillSettings().Format == IDataProvider::EResultFormat::Skiff) { + auto ytType = ParseYTType(result.Input().Ref(), ctx, execCtx); + + type = ytType.first; + rowSpec = ytType.second; + skiffType = NodeToYsonString(TablesSpecToOutputSkiff(rowSpec)); + } else if (NCommon::HasResOrPullOption(result.Ref(), "type")) { + TStringStream typeYson; + ::NYson::TYsonWriter typeWriter(&typeYson); + NCommon::WriteResOrPullType(typeWriter, result.Input().Ref().GetTypeAnn(), columns); + type = typeYson.Str(); + } + + return session->Queue_->Async([lambda, hasListResult, extraUsage, tmpTablePath, execCtx, columns, rowSpec] () { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); execCtx->MakeUserFiles(); - return ExecCalc(lambda, extraUsage, tmpTablePath, execCtx, {}, - TExprResultFactory(execCtx->Options_.FillSettings().RowsLimitPerWrite, - execCtx->Options_.FillSettings().AllResultsBytesLimit, columns, hasListResult), - &columns); + + switch (execCtx->Options_.FillSettings().Format) { + case IDataProvider::EResultFormat::Yson: + return ExecCalc(lambda, extraUsage, tmpTablePath, execCtx, {}, + TYsonExprResultFactory(execCtx->Options_.FillSettings().RowsLimitPerWrite, + execCtx->Options_.FillSettings().AllResultsBytesLimit, + columns, + hasListResult), + &columns, + execCtx->Options_.FillSettings().Format); + case IDataProvider::EResultFormat::Skiff: + return ExecCalc(lambda, extraUsage, tmpTablePath, execCtx, {}, + TSkiffExprResultFactory(execCtx->Options_.FillSettings().RowsLimitPerWrite, + execCtx->Options_.FillSettings().AllResultsBytesLimit, + hasListResult, + rowSpec, + execCtx->Options_.OptLLVM()), + &columns, + execCtx->Options_.FillSettings().Format); + break; + default: + YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << execCtx->Options_.FillSettings().Format; + } }) - .Apply([type, execCtx, discard, pos] (const TFuture<std::pair<TString, bool>>& f) { + .Apply([skiffType, type, execCtx, discard, pos, columns] (const TFuture<std::pair<TString, bool>>& f) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); try { const std::pair<TString, bool>& value = f.GetValue(); TResOrPullResult res; TStringStream out; - ::NYson::TYsonWriter writer(discard ? (IOutputStream*)&Cnull : (IOutputStream*)&out, NCommon::GetYsonFormat(execCtx->Options_.FillSettings()), ::NYson::EYsonType::Node, true); + + auto fillSettings = execCtx->Options_.FillSettings(); + fillSettings.Format = IDataProvider::EResultFormat::Yson; + + ::NYson::TYsonWriter writer(discard ? (IOutputStream*)&Cnull : (IOutputStream*)&out, NCommon::GetYsonFormat(fillSettings), ::NYson::EYsonType::Node, true); writer.OnBeginMap(); + + if (skiffType) { + writer.OnKeyedItem("SkiffType"); + writer.OnRaw(skiffType, ::NYson::EYsonType::Node); + + + writer.OnKeyedItem("Columns"); + writer.OnBeginList(); + for (auto& column: columns) { + writer.OnListItem(); + writer.OnStringScalar(column); + } + writer.OnEndList(); + } + if (type) { writer.OnKeyedItem("Type"); writer.OnRaw(type); } writer.OnKeyedItem("Data"); - writer.OnRaw(value.first); + switch (execCtx->Options_.FillSettings().Format) { + case IDataProvider::EResultFormat::Yson: + writer.OnRaw(value.first); + break; + case IDataProvider::EResultFormat::Skiff: + writer.OnStringScalar(value.first); + break; + default: + YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << execCtx->Options_.FillSettings().Format; + } if (value.second) { writer.OnKeyedItem("Truncated"); @@ -4504,7 +4675,7 @@ private: LocalRawMapReduce(spec, mapper, &in, &out); - const auto& builder = factory(); + const auto& builder = factory.Create(); for (const auto& reader = NYT::CreateTableReader<NYT::TNode>(&out, NYT::TTableReaderOptions()); reader->IsValid(); reader->Next()) { auto& row = reader->GetRow(); if (!builder->WriteNext(row["output"])) { @@ -4534,7 +4705,8 @@ private: const TExecParamsPtr& execCtx, TTransactionCache::TEntry::TPtr entry, TResultFactory&& factory, - const TVector<TString>* columns = nullptr + const TVector<TString>* columns = nullptr, + IDataProvider::EResultFormat format = IDataProvider::EResultFormat::Yson ) { TRawMapOperationSpec mapOpSpec; @@ -4574,9 +4746,25 @@ private: const TBindTerminator bind(graph->GetTerminator()); graph->Prepare(); auto value = graph->GetValue(); - auto builder = factory(); - builder->WriteValue(value, root.GetStaticType()); - return MakeFuture(builder->Make()); + + switch (format) { + case IDataProvider::EResultFormat::Skiff: { + TMemoryUsageInfo memInfo("Calc"); + THolderFactory holderFactory(alloc.Ref(), memInfo, execCtx->FunctionRegistry_); + TCodecContext codecCtx(builder.GetTypeEnvironment(), *execCtx->FunctionRegistry_, &holderFactory); + + auto skiffBuilder = factory.Create(codecCtx, holderFactory); + skiffBuilder->WriteValue(value, root.GetStaticType()); + return MakeFuture(skiffBuilder->Make()); + } + case IDataProvider::EResultFormat::Yson: { + auto ysonBuilder = factory.Create(); + ysonBuilder->WriteValue(value, root.GetStaticType()); + return MakeFuture(ysonBuilder->Make()); + } + default: + YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << format; + } } localRun = localRun && transform.CanExecuteLocally(); { @@ -4645,17 +4833,44 @@ private: return entry->Tx->RawMap(mapOpSpec, job, opOpts); }); }) - .Apply([tmpTable, entry, factory = std::move(factory)](const auto& f) { + .Apply([execCtx, tmpTable, entry, factory = std::move(factory), format](const auto& f) { f.GetValue(); - auto builder = factory(); + auto reader = entry->Tx->CreateTableReader<NYT::TNode>(tmpTable); - for (; reader->IsValid(); reader->Next()) { - auto& row = reader->GetRow(); - if (!builder->WriteNext(row["output"])) { - break; + + TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), + execCtx->FunctionRegistry_->SupportsSizedAllocators()); + alloc.SetLimit(execCtx->Options_.Config()->DefaultCalcMemoryLimit.Get().GetOrElse(0)); + TMemoryUsageInfo memInfo("Calc"); + TTypeEnvironment env(alloc); + THolderFactory holderFactory(alloc.Ref(), memInfo, execCtx->FunctionRegistry_); + + switch (format) { + case IDataProvider::EResultFormat::Skiff: { + TCodecContext codecCtx(env, *execCtx->FunctionRegistry_, &holderFactory); + + auto skiffBuilder = factory.Create(codecCtx, holderFactory); + for (; reader->IsValid(); reader->Next()) { + auto& row = reader->GetRow(); + if (!skiffBuilder->WriteNext(row["output"])) { + break; + } + } + return skiffBuilder->Make(); + } + case IDataProvider::EResultFormat::Yson: { + auto ysonBuilder = factory.Create(); + for (; reader->IsValid(); reader->Next()) { + auto& row = reader->GetRow(); + if (!ysonBuilder->WriteNext(row["output"])) { + break; + } + } + return ysonBuilder->Make(); } + default: + YQL_LOG_CTX_THROW yexception() << "Unexpected result type: " << format; } - return builder->Make(); }) .Apply([tmpTable, execCtx, entry](const TFuture<typename TResultFactory::TResult>& f) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); diff --git a/ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.cpp b/ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.cpp index bab2518be2e..ada1131e5b7 100644 --- a/ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.cpp +++ b/ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.cpp @@ -13,24 +13,15 @@ namespace NYql { using namespace NKikimr; using namespace NKikimr::NMiniKQL; -TExecuteResOrPull::TExecuteResOrPull(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, const TMaybe<TVector<TString>>& columns) - : Rows(rowLimit) - , Bytes(byteLimit) - , Columns(columns) - , Out(new THoldingStream<TCountingOutput>(THolder(new TStringOutput(Result)))) +////////////////////////////////////////////////////////////////////////////////////////////////////////// + +TYsonExecuteResOrPull::TYsonExecuteResOrPull(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, const TMaybe<TVector<TString>>& columns) + : IExecuteResOrPull(rowLimit, byteLimit, columns) , Writer(new NYson::TYsonWriter(Out.Get(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::Node, true)) - , IsList(false) - , Truncated(false) - , Row(0) { } -ui64 TExecuteResOrPull::GetWrittenSize() const { - YQL_ENSURE(Out, "GetWritten() must be callled before Finish()"); - return Out->Counter(); -} - -TString TExecuteResOrPull::Finish() { +TString TYsonExecuteResOrPull::Finish() { if (IsList) { Writer->OnEndList(); } @@ -39,7 +30,7 @@ TString TExecuteResOrPull::Finish() { return Result; } -bool TExecuteResOrPull::WriteNext(TStringBuf val) { +bool TYsonExecuteResOrPull::WriteNext(const NYT::TNode& item) { if (IsList) { if (!HasCapacity()) { Truncated = true; @@ -47,12 +38,14 @@ bool TExecuteResOrPull::WriteNext(TStringBuf val) { } Writer->OnListItem(); } - Writer->OnRaw(val, ::NYson::EYsonType::Node); + + Writer->OnRaw(NYT::NodeToYsonString(item, NYT::NYson::EYsonFormat::Binary), ::NYson::EYsonType::Node); ++Row; + return IsList; } -void TExecuteResOrPull::WriteValue(const NUdf::TUnboxedValue& value, TType* type) { +void TYsonExecuteResOrPull::WriteValue(const NUdf::TUnboxedValue& value, TType* type) { if (type->IsList()) { auto inputType = AS_TYPE(TListType, type)->GetItemType(); TMaybe<TVector<ui32>> structPositions = NCommon::CreateStructPositions(inputType, Columns.Defined() ? Columns.Get() : nullptr); @@ -63,6 +56,7 @@ void TExecuteResOrPull::WriteValue(const NUdf::TUnboxedValue& value, TType* type Truncated = true; break; } + Writer->OnListItem(); NCommon::WriteYsonValue(*Writer, item, inputType, structPositions.Get()); } @@ -71,4 +65,153 @@ void TExecuteResOrPull::WriteValue(const NUdf::TUnboxedValue& value, TType* type } } +bool TYsonExecuteResOrPull::WriteNext(TMkqlIOCache& specsCache, const NYT::TNode& rec, ui32 tableIndex) { + if (!HasCapacity()) { + Truncated = true; + return false; + } + + NYql::DecodeToYson(specsCache, tableIndex, rec, *Out); + ++Row; + return true; } + +bool TYsonExecuteResOrPull::WriteNext(TMkqlIOCache& specsCache, const NYT::TYaMRRow& rec, ui32 tableIndex) { + if (!HasCapacity()) { + Truncated = true; + return false; + } + + NYql::DecodeToYson(specsCache, tableIndex, rec, *Out); + ++Row; + return true; +} + +bool TYsonExecuteResOrPull::WriteNext(TMkqlIOCache& specsCache, const NUdf::TUnboxedValue& rec, ui32 tableIndex) { + if (!HasCapacity()) { + Truncated = true; + return false; + } + + NYql::DecodeToYson(specsCache, tableIndex, rec, *Out); + ++Row; + return true; +} + +void TYsonExecuteResOrPull::SetListResult() { + if (!IsList) { + IsList = true; + Writer->OnBeginList(); + } +} + +////////////////////////////////////////////////////////////////////////////////////////////////////////// + +TSkiffExecuteResOrPull::TSkiffExecuteResOrPull(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, NCommon::TCodecContext& codecCtx, const NKikimr::NMiniKQL::THolderFactory& holderFactory, const NYT::TNode& attrs, const TString& optLLVM, const TVector<TString>& columns) + : IExecuteResOrPull(rowLimit, byteLimit, columns) + , HolderFactory(holderFactory) + , SkiffWriter(*Out.Get(), 0, 4_MB) +{ + Specs.SetUseSkiff(optLLVM); + Specs.Init(codecCtx, attrs); + + SkiffWriter.SetSpecs(Specs, columns); +} + +TString TSkiffExecuteResOrPull::Finish() { + SkiffWriter.Finish(); + Out.Destroy(); + Specs.Clear(); + return Result; +} + +bool TSkiffExecuteResOrPull::WriteNext(const NYT::TNode& item) { + if (IsList) { + if (!HasCapacity()) { + Truncated = true; + return false; + } + } + + TStringStream err; + auto value = NCommon::ParseYsonNode(HolderFactory, item, Specs.Outputs[0].RowType, &err); + if (!value) { + throw yexception() << "Could not parse yson node"; + } + SkiffWriter.AddRow(*value); + + return IsList; +} + +void TSkiffExecuteResOrPull::WriteValue(const NUdf::TUnboxedValue& value, TType* type) { + if (type->IsList()) { + SetListResult(); + const auto it = value.GetListIterator(); + for (NUdf::TUnboxedValue item; it.Next(item); ++Row) { + if (!HasCapacity()) { + Truncated = true; + break; + } + + SkiffWriter.AddRow(item); + } + } else { + SkiffWriter.AddRow(value); + } +} + +bool TSkiffExecuteResOrPull::WriteNext(TMkqlIOCache& specsCache, const NYT::TNode& rec, ui32 tableIndex) { + if (!HasCapacity()) { + Truncated = true; + return false; + } + + TStringStream err; + auto value = NCommon::ParseYsonNode(specsCache.GetHolderFactory(), rec, Specs.Outputs[tableIndex].RowType, &err); + if (!value) { + throw yexception() << "Could not parse yson node"; + } + SkiffWriter.AddRow(*value); + + ++Row; + return true; +} + +bool TSkiffExecuteResOrPull::WriteNext(TMkqlIOCache& specsCache, const NYT::TYaMRRow& rec, ui32 tableIndex) { + if (!HasCapacity()) { + Truncated = true; + return false; + } + + NUdf::TUnboxedValue node; + node = DecodeYamr(specsCache, tableIndex, rec); + SkiffWriter.AddRow(node); + + ++Row; + return true; +} + +bool TSkiffExecuteResOrPull::WriteNext(TMkqlIOCache& specsCache, const NUdf::TUnboxedValue& rec, ui32 tableIndex) { + Y_UNUSED(specsCache); + Y_UNUSED(tableIndex); + + if (!HasCapacity()) { + Truncated = true; + return false; + } + + SkiffWriter.AddRow(rec); + + ++Row; + return true; +} + +void TSkiffExecuteResOrPull::SetListResult() { + if (!IsList) { + IsList = true; + } +} + +////////////////////////////////////////////////////////////////////////////////////////////////////////// + +} // NYql diff --git a/ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.h b/ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.h index d5d7fbcd7bf..d913a96a29c 100644 --- a/ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.h +++ b/ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.h @@ -5,6 +5,8 @@ #include <ydb/library/yql/minikql/mkql_node.h> +#include <library/cpp/yson/node/node_io.h> + #include <library/cpp/yson/writer.h> #include <util/stream/length.h> @@ -17,9 +19,22 @@ namespace NYql { -class TExecuteResOrPull : public TNonCopyable { +/////////////////////////////////////////////////////////////////////////////// +// IExecuteResOrPull +/////////////////////////////////////////////////////////////////////////////// +class IExecuteResOrPull : public TNonCopyable { public: - TExecuteResOrPull(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, const TMaybe<TVector<TString>>& columns); + IExecuteResOrPull(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, const TMaybe<TVector<TString>>& columns) + : Rows(rowLimit) + , Bytes(byteLimit) + , Columns(columns) + , Out(new THoldingStream<TCountingOutput>(THolder(new TStringOutput(Result)))) + , IsList(false) + , Truncated(false) + , Row(0) + { + } + virtual ~IExecuteResOrPull() = default; bool HasCapacity() const { return (!Rows || Row < *Rows) && (!Bytes || Out->Counter() < *Bytes); @@ -29,42 +44,38 @@ public: return Truncated; } - ui64 GetWrittenSize() const; + ui64 GetWrittenSize() const { + YQL_ENSURE(Out, "GetWritten() must be callled before Finish()"); + return Out->Counter(); + } ui64 GetWrittenRows() const { return Row; } - TString Finish(); TMaybe<ui64> GetRowsLimit() const { return Rows; } - void SetListResult() { - if (!IsList) { - IsList = true; - Writer->OnBeginList(); - } - } - const TMaybe<TVector<TString>>& GetColumns() const { return Columns; } - bool WriteNext(TStringBuf val); - - template <class TRec> - bool WriteNext(TMkqlIOCache& specCache, const TRec& rec, ui32 tableIndex) { - if (!HasCapacity()) { - Truncated = true; - return false; - } - NYql::DecodeToYson(specCache, tableIndex, rec, *Out); - ++Row; - return true; + std::pair<TString, bool> Make() { + return {Finish(), IsTruncated()}; } - void WriteValue(const NKikimr::NUdf::TUnboxedValue& value, NKikimr::NMiniKQL::TType* type); + virtual TString Finish() = 0; + + virtual void SetListResult() = 0; + + virtual bool WriteNext(const NYT::TNode& item) = 0; + + virtual bool WriteNext(TMkqlIOCache& specsCache, const NUdf::TUnboxedValue& rec, ui32 tableIndex) = 0; + virtual bool WriteNext(TMkqlIOCache& specsCache, const NYT::TYaMRRow& rec, ui32 tableIndex) = 0; + virtual bool WriteNext(TMkqlIOCache& specsCache, const NYT::TNode& rec, ui32 tableIndex) = 0; + + virtual void WriteValue(const NKikimr::NUdf::TUnboxedValue& value, NKikimr::NMiniKQL::TType* type) = 0; protected: const TMaybe<ui64> Rows; @@ -72,10 +83,58 @@ protected: const TMaybe<TVector<TString>> Columns; TString Result; THolder<TCountingOutput> Out; - THolder<NYson::TYsonWriter> Writer; bool IsList; bool Truncated; ui64 Row; }; +/////////////////////////////////////////////////////////////////////////////// +// TYsonExecuteResOrPull +/////////////////////////////////////////////////////////////////////////////// +class TYsonExecuteResOrPull : public IExecuteResOrPull { +public: + TYsonExecuteResOrPull(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, const TMaybe<TVector<TString>>& columns); + ~TYsonExecuteResOrPull() = default; + + TString Finish() override; + + void SetListResult() override; + + bool WriteNext(const NYT::TNode& item) override; + + bool WriteNext(TMkqlIOCache& specsCache, const NUdf::TUnboxedValue& rec, ui32 tableIndex) override; + bool WriteNext(TMkqlIOCache& specsCache, const NYT::TYaMRRow& rec, ui32 tableIndex) override; + bool WriteNext(TMkqlIOCache& specsCache, const NYT::TNode& rec, ui32 tableIndex) override; + + void WriteValue(const NKikimr::NUdf::TUnboxedValue& value, NKikimr::NMiniKQL::TType* type) override; +protected: + THolder<NYson::TYsonWriter> Writer; +}; + +/////////////////////////////////////////////////////////////////////////////// +// TSkiffExecuteResOrPull +/////////////////////////////////////////////////////////////////////////////// +class TSkiffExecuteResOrPull : public IExecuteResOrPull { +public: + TSkiffExecuteResOrPull(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, NCommon::TCodecContext& codecCtx, const NKikimr::NMiniKQL::THolderFactory& holderFactory, const NYT::TNode& attrs, const TString& optLLVM, const TVector<TString>& columns = {}); + ~TSkiffExecuteResOrPull() = default; + + TString Finish() override; + + void SetListResult() override; + + bool WriteNext(const NYT::TNode& item) override; + + bool WriteNext(TMkqlIOCache& specsCache, const NUdf::TUnboxedValue& rec, ui32 tableIndex) override; + bool WriteNext(TMkqlIOCache& specsCache, const NYT::TYaMRRow& rec, ui32 tableIndex) override; + bool WriteNext(TMkqlIOCache& specsCache, const NYT::TNode& rec, ui32 tableIndex) override; + + void WriteValue(const NKikimr::NUdf::TUnboxedValue& value, NKikimr::NMiniKQL::TType* type) override; +protected: + const NKikimr::NMiniKQL::THolderFactory& HolderFactory; + + TMkqlIOSpecs Specs; + TMkqlWriterImpl SkiffWriter; +}; + } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp index 032c396598a..923b0954178 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp @@ -78,7 +78,7 @@ protected: TResOrPullBase resOrPull(input); IDataProvider::TFillSettings fillSettings = NCommon::GetFillSettings(resOrPull.Ref()); - YQL_ENSURE(fillSettings.Format == IDataProvider::EResultFormat::Yson); + YQL_ENSURE(fillSettings.Format == IDataProvider::EResultFormat::Yson || fillSettings.Format == IDataProvider::EResultFormat::Skiff); auto data = resOrPull.Input(); if (auto maybePull = resOrPull.Maybe<TPull>()) { |
