summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/library/yql/core/facade/yql_facade.cpp3
-rw-r--r--ydb/library/yql/core/facade/yql_facade.h5
-rw-r--r--ydb/library/yql/core/yql_data_provider.h3
-rw-r--r--ydb/library/yql/providers/yt/codec/yt_codec_io.cpp44
-rw-r--r--ydb/library/yql/providers/yt/codec/yt_codec_io.h2
-rw-r--r--ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp2
-rw-r--r--ydb/library/yql/providers/yt/gateway/lib/yt_helpers.cpp8
-rw-r--r--ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h8
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp405
-rw-r--r--ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.cpp177
-rw-r--r--ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.h107
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp2
12 files changed, 608 insertions, 158 deletions
diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp
index ba3db690f87..ef3adf31c1d 100644
--- a/ydb/library/yql/core/facade/yql_facade.cpp
+++ b/ydb/library/yql/core/facade/yql_facade.cpp
@@ -273,6 +273,7 @@ TProgram::TProgram(
, Modules_(modules)
, ExprRoot_(nullptr)
, SessionId_(sessionId)
+ , ResultType_(IDataProvider::EResultFormat::Yson)
, ResultFormat_(NYson::EYsonFormat::Binary)
, OutputFormat_(NYson::EYsonFormat::Pretty)
, EnableRangeComputeFor_(enableRangeComputeFor)
@@ -1566,7 +1567,7 @@ TTypeAnnotationContextPtr TProgram::BuildTypeAnnotationContext(const TString& us
auto resultFormat = ResultFormat_;
auto writerFactory = [resultFormat] () { return CreateYsonResultWriter(resultFormat); };
ResultProviderConfig_ = MakeIntrusive<TResultProviderConfig>(*typeAnnotationContext,
- *FunctionRegistry_, IDataProvider::EResultFormat::Yson, ToString((ui32)resultFormat), writerFactory);
+ *FunctionRegistry_, ResultType_, ToString((ui32)resultFormat), writerFactory);
ResultProviderConfig_->SupportsResultPosition = SupportsResultPosition_;
auto resultProvider = CreateResultProvider(ResultProviderConfig_);
typeAnnotationContext->AddDataSink(ResultProviderName, resultProvider);
diff --git a/ydb/library/yql/core/facade/yql_facade.h b/ydb/library/yql/core/facade/yql_facade.h
index d68f36162aa..3f1fdb0f285 100644
--- a/ydb/library/yql/core/facade/yql_facade.h
+++ b/ydb/library/yql/core/facade/yql_facade.h
@@ -240,6 +240,10 @@ public:
DiagnosticFormat_ = format;
}
+ void SetResultType(IDataProvider::EResultFormat type) {
+ ResultType_ = type;
+ }
+
TMaybe<TString> GetDiagnostics();
IGraphTransformer::TStatistics GetRawDiagnostics();
@@ -409,6 +413,7 @@ private:
TAutoPtr<IGraphTransformer> Transformer_;
TIntrusivePtr<TResultProviderConfig> ResultProviderConfig_;
bool SupportsResultPosition_ = false;
+ IDataProvider::EResultFormat ResultType_;
NYson::EYsonFormat ResultFormat_;
NYson::EYsonFormat OutputFormat_;
TMaybe<NYson::EYsonFormat> DiagnosticFormat_;
diff --git a/ydb/library/yql/core/yql_data_provider.h b/ydb/library/yql/core/yql_data_provider.h
index 95d7aee773d..259d1817bbb 100644
--- a/ydb/library/yql/core/yql_data_provider.h
+++ b/ydb/library/yql/core/yql_data_provider.h
@@ -87,7 +87,8 @@ public:
enum class EResultFormat {
Yson,
- Custom
+ Custom,
+ Skiff
};
// settings for result data provider
diff --git a/ydb/library/yql/providers/yt/codec/yt_codec_io.cpp b/ydb/library/yql/providers/yt/codec/yt_codec_io.cpp
index 450ade03b6e..34b9fc29839 100644
--- a/ydb/library/yql/providers/yt/codec/yt_codec_io.cpp
+++ b/ydb/library/yql/providers/yt/codec/yt_codec_io.cpp
@@ -1689,17 +1689,32 @@ protected:
bool Optional;
};
- static TVector<TField> GetFields(TStructType* type) {
+ static TVector<TField> GetFields(TStructType* type, const TVector<TString>& columns = {}) {
TVector<TField> res;
- res.reserve(type->GetMembersCount());
+
+ THashMap<TString, ui32> columnsOrder;
+ if (columns.size() > 0) {
+ for (ui32 index = 0; index < columns.size(); index++) {
+ columnsOrder[columns[index]] = index;
+ }
+ res.resize(columns.size());
+ } else {
+ res.reserve(type->GetMembersCount());
+ }
+
for (ui32 index = 0; index < type->GetMembersCount(); ++index) {
+ auto name = type->GetMemberName(index);
auto fieldType = type->GetMemberType(index);
const bool isOptional = fieldType->IsOptional();
if (isOptional) {
fieldType = static_cast<TOptionalType*>(fieldType)->GetItemType();
}
- auto name = type->GetMemberName(index);
- res.push_back(TField{name, fieldType, isOptional});
+
+ if (!columnsOrder.empty() && columnsOrder.contains(name)) {
+ res[columnsOrder[name]] = TField{name, fieldType, isOptional};
+ } else {
+ res.push_back(TField{name, fieldType, isOptional});
+ }
}
return res;
}
@@ -1802,10 +1817,10 @@ protected:
class TSkiffEncoder: public TSkiffEncoderBase {
public:
- TSkiffEncoder(TOutputBuf& buf, const TMkqlIOSpecs& specs, size_t tableIndex)
+ TSkiffEncoder(TOutputBuf& buf, const TMkqlIOSpecs& specs, size_t tableIndex, const TVector<TString>& columns)
: TSkiffEncoderBase(buf, specs)
{
- Fields_ = GetFields(Specs_.Outputs[tableIndex].RowType);
+ Fields_ = GetFields(Specs_.Outputs[tableIndex].RowType, columns);
NativeYtTypeFlags_ = Specs_.Outputs[tableIndex].NativeYtTypeFlags;
}
@@ -1952,7 +1967,10 @@ TMkqlWriterImpl::TMkqlWriterImpl(NYT::TRawTableWriterPtr stream, size_t blockSiz
TMkqlWriterImpl::~TMkqlWriterImpl() {
}
-void TMkqlWriterImpl::SetSpecs(const TMkqlIOSpecs& specs) {
+void TMkqlWriterImpl::SetSpecs(const TMkqlIOSpecs& specs, const TVector<TString>& columns) {
+ // In the case of the "skiff" format, the ordering of columns in data and in spec are assumed to be identical during encoding
+ // To provide this, a "columns" field is used to change the alphabetical order of columns in spec
+
Specs_ = &specs;
JobStats_ = specs.JobStats_;
@@ -1969,8 +1987,14 @@ void TMkqlWriterImpl::SetSpecs(const TMkqlIOSpecs& specs) {
const auto writer1 = MakeYtCodecCgWriter<false>(Codegen_, rowType);
const auto writer2 = MakeYtCodecCgWriter<true>(Codegen_, rowType);
+
for (ui32 index = 0; index < rowType->GetMembersCount(); ++index) {
- auto fieldType = rowType->GetMemberType(index);
+ ui32 column = index;
+ if (columns) {
+ column = rowType->GetMemberIndex(columns[index]);
+ }
+
+ auto fieldType = rowType->GetMemberType(column);
writer1->AddField(fieldType, Specs_->Outputs[i].NativeYtTypeFlags);
writer2->AddField(fieldType, Specs_->Outputs[i].NativeYtTypeFlags);
}
@@ -1994,6 +2018,7 @@ void TMkqlWriterImpl::SetSpecs(const TMkqlIOSpecs& specs) {
out->Buf_.SetStats(JobStats_);
if (Specs_->UseSkiff_) {
if (Specs_->Outputs[i].RowType->GetMembersCount() == 0) {
+ YQL_ENSURE(columns.empty());
Encoders_.emplace_back(new TSkiffEmptySchemaEncoder(out->Buf_, *Specs_));
}
#ifndef MKQL_DISABLE_CODEGEN
@@ -2005,9 +2030,10 @@ void TMkqlWriterImpl::SetSpecs(const TMkqlIOSpecs& specs) {
}
#endif
else {
- Encoders_.emplace_back(new TSkiffEncoder(out->Buf_, *Specs_, i));
+ Encoders_.emplace_back(new TSkiffEncoder(out->Buf_, *Specs_, i, columns));
}
} else {
+ YQL_ENSURE(columns.empty());
Encoders_.emplace_back(new TYsonEncoder(out->Buf_, *Specs_, i));
}
}
diff --git a/ydb/library/yql/providers/yt/codec/yt_codec_io.h b/ydb/library/yql/providers/yt/codec/yt_codec_io.h
index bdc5af326b5..bef3620aed0 100644
--- a/ydb/library/yql/providers/yt/codec/yt_codec_io.h
+++ b/ydb/library/yql/providers/yt/codec/yt_codec_io.h
@@ -107,7 +107,7 @@ public:
~TMkqlWriterImpl();
- void SetSpecs(const TMkqlIOSpecs& specs);
+ void SetSpecs(const TMkqlIOSpecs& specs, const TVector<TString>& columns = {});
void AddRow(const NKikimr::NUdf::TUnboxedValuePod row) override;
void AddFlatRow(const NUdf::TUnboxedValuePod* row) override;
diff --git a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp
index 4ba3dc13634..071ac579d2e 100644
--- a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp
+++ b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp
@@ -1190,7 +1190,7 @@ private:
const TBindTerminator bind(compGraph->GetTerminator());
compGraph->Prepare();
- TExecuteResOrPull resultData(options.FillSettings().RowsLimitPerWrite,
+ TYsonExecuteResOrPull resultData(options.FillSettings().RowsLimitPerWrite,
options.FillSettings().AllResultsBytesLimit, MakeMaybe(columns));
resultData.WriteValue(compGraph->GetValue(), data.GetStaticType());
diff --git a/ydb/library/yql/providers/yt/gateway/lib/yt_helpers.cpp b/ydb/library/yql/providers/yt/gateway/lib/yt_helpers.cpp
index 316a8803069..2ff27baaef7 100644
--- a/ydb/library/yql/providers/yt/gateway/lib/yt_helpers.cpp
+++ b/ydb/library/yql/providers/yt/gateway/lib/yt_helpers.cpp
@@ -287,7 +287,7 @@ static bool IterateRows(NYT::ITransactionPtr tx,
NYT::TRichYPath path,
ui32 tableIndex,
TMkqlIOCache& specsCache,
- TExecuteResOrPull& exec,
+ IExecuteResOrPull& exec,
const TTableLimiter& limiter,
const TMaybe<TSampleParams>& sampling)
{
@@ -350,7 +350,7 @@ bool IterateYamredRows(NYT::ITransactionPtr tx,
const NYT::TRichYPath& table,
ui32 tableIndex,
TMkqlIOCache& specsCache,
- TExecuteResOrPull& exec,
+ IExecuteResOrPull& exec,
const TTableLimiter& limiter,
const TMaybe<TSampleParams>& sampling)
{
@@ -361,7 +361,7 @@ bool IterateYsonRows(NYT::ITransactionPtr tx,
const NYT::TRichYPath& table,
ui32 tableIndex,
TMkqlIOCache& specsCache,
- TExecuteResOrPull& exec,
+ IExecuteResOrPull& exec,
const TTableLimiter& limiter,
const TMaybe<TSampleParams>& sampling)
{
@@ -372,7 +372,7 @@ bool SelectRows(NYT::IClientPtr client,
const TString& table,
ui32 tableIndex,
TMkqlIOCache& specsCache,
- TExecuteResOrPull& exec,
+ IExecuteResOrPull& exec,
TTableLimiter& limiter)
{
ui64 startRecordInTable = limiter.GetTableStart();
diff --git a/ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h b/ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h
index 7c5c3cce638..75ad22ddf43 100644
--- a/ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h
+++ b/ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h
@@ -21,7 +21,7 @@
namespace NYql {
class TMkqlIOCache;
-class TExecuteResOrPull;
+class IExecuteResOrPull;
class TTableLimiter;
struct TYqlOperationOptions;
@@ -41,11 +41,11 @@ void TransferTableAttributes(const NYT::TNode& attributes, const std::function<v
NYT::TNode FilterYqlAttributes(const NYT::TNode& attributes);
bool IterateYamredRows(NYT::ITransactionPtr tx, const NYT::TRichYPath& table, ui32 tableIndex, TMkqlIOCache& specsCache,
- TExecuteResOrPull& exec, const TTableLimiter& limiter, const TMaybe<TSampleParams>& sampling = {});
+ IExecuteResOrPull& exec, const TTableLimiter& limiter, const TMaybe<TSampleParams>& sampling = {});
bool IterateYsonRows(NYT::ITransactionPtr tx, const NYT::TRichYPath& table, ui32 tableIndex, TMkqlIOCache& specsCache,
- TExecuteResOrPull& exec, const TTableLimiter& limiter, const TMaybe<TSampleParams>& sampling = {});
+ IExecuteResOrPull& exec, const TTableLimiter& limiter, const TMaybe<TSampleParams>& sampling = {});
bool SelectRows(NYT::IClientPtr client, const TString& table, ui32 tableIndex, TMkqlIOCache& specsCache,
- TExecuteResOrPull& exec, TTableLimiter& limiter);
+ IExecuteResOrPull& exec, TTableLimiter& limiter);
NYT::TNode YqlOpOptionsToSpec(const TYqlOperationOptions& opOpts, const TString& userName, const TVector<std::pair<TString, TString>>& code = {});
NYT::TNode YqlOpOptionsToAttrs(const TYqlOperationOptions& opOpts);
diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
index b25b9c7370b..e84e1db8cfb 100644
--- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
+++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
@@ -18,6 +18,7 @@
#include <ydb/library/yql/providers/yt/lib/infer_schema/infer_schema.h>
#include <ydb/library/yql/providers/yt/lib/infer_schema/infer_schema_rpc.h>
#include <ydb/library/yql/providers/yt/lib/schema/schema.h>
+#include <ydb/library/yql/providers/yt/lib/skiff/yql_skiff_schema.h>
#include <ydb/library/yql/providers/yt/common/yql_names.h>
#include <ydb/library/yql/providers/yt/common/yql_configuration.h>
#include <ydb/library/yql/providers/yt/job/yql_job_base.h>
@@ -1245,36 +1246,68 @@ private:
return false;
}
- THolder<TNodeResultBuilder> operator()() const {
+ THolder<TNodeResultBuilder> Create(const TCodecContext& codecCtx, const NKikimr::NMiniKQL::THolderFactory& holderFactory) const {
+ Y_UNUSED(codecCtx);
+ Y_UNUSED(holderFactory);
+
+ return Create();
+ }
+
+ THolder<TNodeResultBuilder> Create() const {
return MakeHolder<TNodeResultBuilder>();
}
};
- class TExprResultBuilder: public TExecuteResOrPull {
+ class TYsonExprResultFactory {
public:
- TExprResultBuilder(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, const TVector<TString>& columns)
- : TExecuteResOrPull(rowLimit, byteLimit, MakeMaybe(columns))
+ using TResult = std::pair<TString, bool>;
+
+ TYsonExprResultFactory(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, const TVector<TString>& columns, bool hasListResult)
+ : RowLimit_(rowLimit)
+ , ByteLimit_(byteLimit)
+ , Columns_(columns)
+ , HasListResult_(hasListResult)
{
}
- bool WriteNext(const NYT::TNode& item) {
- return TExecuteResOrPull::WriteNext(NYT::NodeToYsonString(item, NYT::NYson::EYsonFormat::Binary));
+ bool UseResultYson() const {
+ return true;
+ }
+
+ THolder<TYsonExecuteResOrPull> Create(TCodecContext& codecCtx, const NKikimr::NMiniKQL::THolderFactory& holderFactory) const {
+ Y_UNUSED(codecCtx);
+ Y_UNUSED(holderFactory);
+
+ return Create();
}
- std::pair<TString, bool> Make() {
- return {Finish(), IsTruncated()};
+ THolder<TYsonExecuteResOrPull> Create() const {
+ THolder<TYsonExecuteResOrPull> res;
+
+ res = MakeHolder<TYsonExecuteResOrPull>(RowLimit_, ByteLimit_, Columns_);
+ if (HasListResult_) {
+ res->SetListResult();
+ }
+ return res;
}
+
+ private:
+ const TMaybe<ui64> RowLimit_;
+ const TMaybe<ui64> ByteLimit_;
+ const TVector<TString> Columns_;
+ const bool HasListResult_;
};
- class TExprResultFactory {
+ class TSkiffExprResultFactory {
public:
using TResult = std::pair<TString, bool>;
- TExprResultFactory(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, const TVector<TString>& columns, bool hasListResult)
+ TSkiffExprResultFactory(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, bool hasListResult, const NYT::TNode& attrs, const TString& optLLVM)
: RowLimit_(rowLimit)
, ByteLimit_(byteLimit)
- , Columns_(columns)
, HasListResult_(hasListResult)
+ , Attrs_(attrs)
+ , OptLLVM_(optLLVM)
{
}
@@ -1282,18 +1315,26 @@ private:
return true;
}
- THolder<TExprResultBuilder> operator()() const {
- auto res = MakeHolder<TExprResultBuilder>(RowLimit_, ByteLimit_, Columns_);
+ THolder<TSkiffExecuteResOrPull> Create(TCodecContext& codecCtx, const NKikimr::NMiniKQL::THolderFactory& holderFactory) const {
+ THolder<TSkiffExecuteResOrPull> res;
+
+ res = MakeHolder<TSkiffExecuteResOrPull>(RowLimit_, ByteLimit_, codecCtx, holderFactory, Attrs_, OptLLVM_);
if (HasListResult_) {
res->SetListResult();
}
+
return res;
}
+
+ THolder<TSkiffExecuteResOrPull> Create() const {
+ YQL_ENSURE(false, "Unexpected skiff result builder creation");
+ }
private:
const TMaybe<ui64> RowLimit_;
const TMaybe<ui64> ByteLimit_;
- const TVector<TString> Columns_;
const bool HasListResult_;
+ const NYT::TNode Attrs_;
+ const TString OptLLVM_;
};
static TFinalizeResult ExecFinalize(const TSession::TPtr& session, bool abort) {
@@ -2595,6 +2636,26 @@ private:
return result;
}
+ static std::pair<TString, NYT::TNode> ParseYTType(const TExprNode& node,
+ TExprContext& ctx,
+ const TExecContext<TResOrPullOptions>::TPtr& execCtx,
+ const TMaybe<NYql::TColumnOrder>& columns = Nothing())
+ {
+ const auto sequenceItemType = GetSequenceItemType(node.Pos(), node.GetTypeAnn(), false, ctx);
+
+ auto rowSpecInfo = MakeIntrusive<TYqlRowSpecInfo>();
+ rowSpecInfo->SetType(sequenceItemType->Cast<TStructExprType>(), execCtx->Options_.Config()->UseNativeYtTypes.Get().GetOrElse(DEFAULT_USE_NATIVE_YT_TYPES) ? NTCF_ALL : NTCF_NONE);
+ if (columns) {
+ rowSpecInfo->SetColumnOrder(columns);
+ }
+
+ NYT::TNode tableSpec = NYT::TNode::CreateMap();
+ rowSpecInfo->FillCodecNode(tableSpec[YqlRowSpecAttribute]);
+
+ auto resultYTType = NodeToYsonString(RowSpecToYTSchema(tableSpec[YqlRowSpecAttribute], execCtx->Options_.Config()->NativeYtTypeCompatibility.Get(execCtx->Cluster_).GetOrElse(NTCF_LEGACY)).ToNode());
+ auto resultRowSpec = NYT::TNode::CreateMap()(TString{YqlIOSpecTables}, NYT::TNode::CreateList().Add(tableSpec));
+ return {resultYTType, resultRowSpec};
+ }
TFuture<TResOrPullResult> DoPull(const TSession::TPtr& session, NNodes::TPull pull, TExprContext& ctx, TResOrPullOptions&& options) {
if (options.FillSettings().Discard) {
@@ -2628,7 +2689,13 @@ private:
}
TString type;
- if (NCommon::HasResOrPullOption(pull.Ref(), "type")) {
+ NYT::TNode rowSpec;
+ if (execCtx->Options_.FillSettings().Format == IDataProvider::EResultFormat::Skiff) {
+ auto ytType = ParseYTType(pull.Input().Ref(), ctx, execCtx, columns);
+
+ type = ytType.first;
+ rowSpec = ytType.second;
+ } else if (NCommon::HasResOrPullOption(pull.Ref(), "type")) {
TStringStream typeYson;
::NYson::TYsonWriter typeWriter(&typeYson);
NCommon::WriteResOrPullType(typeWriter, pull.Input().Ref().GetTypeAnn(), columns);
@@ -2637,14 +2704,19 @@ private:
auto pos = ctx.GetPosition(pull.Pos());
- return session->Queue_->Async([type, ref, range, autoRef, execCtx, columns, pos] () {
+ return session->Queue_->Async([rowSpec, type, ref, range, autoRef, execCtx, columns, pos] () {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
execCtx->MakeUserFiles();
try {
TResOrPullResult res;
TStringStream out;
- ::NYson::TYsonWriter writer(&out, NCommon::GetYsonFormat(execCtx->Options_.FillSettings()), ::NYson::EYsonType::Node, false);
+
+ auto fillSettings = execCtx->Options_.FillSettings();
+ fillSettings.Format = IDataProvider::EResultFormat::Yson;
+
+ ::NYson::TYsonWriter writer(&out, NCommon::GetYsonFormat(fillSettings), ::NYson::EYsonType::Node, false);
writer.OnBeginMap();
+
if (type) {
writer.OnKeyedItem("Type");
writer.OnRaw(type);
@@ -2652,8 +2724,9 @@ private:
bool truncated = false;
if (!ref) {
- truncated = ExecPull(execCtx, writer, range, columns);
+ truncated = ExecPull(execCtx, writer, range, rowSpec, columns);
}
+
if (ref || (truncated && autoRef)) {
writer.OnKeyedItem("Ref");
writer.OnBeginList();
@@ -2692,6 +2765,7 @@ private:
static bool ExecPull(const TExecContext<TResOrPullOptions>::TPtr& execCtx,
::NYson::TYsonWriter& writer,
const TRecordsRange& range,
+ const NYT::TNode& rowSpec,
const TVector<TString>& columns)
{
TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(),
@@ -2719,67 +2793,109 @@ private:
const auto nativeTypeCompat = execCtx->Options_.Config()->NativeYtTypeCompatibility.Get(execCtx->Cluster_).GetOrElse(NTCF_LEGACY);
specs.Init(codecCtx, execCtx->GetInputSpec(!useSkiff, nativeTypeCompat, false), tables, columns);
- TExecuteResOrPull pullData(execCtx->Options_.FillSettings().RowsLimitPerWrite,
- execCtx->Options_.FillSettings().AllResultsBytesLimit, MakeMaybe(columns));
- TMkqlIOCache specsCache(specs, holderFactory);
+ auto run = [&] (IExecuteResOrPull& pullData) {
+ TMkqlIOCache specsCache(specs, holderFactory);
- if (testRun) {
- YQL_ENSURE(execCtx->InputTables_.size() == 1U, "Support single input only.");
- const auto itI = TestTables.find(execCtx->InputTables_.front().Path.Path_);
- YQL_ENSURE(TestTables.cend() != itI);
+ if (testRun) {
+ YQL_ENSURE(execCtx->InputTables_.size() == 1U, "Support single input only.");
+ const auto itI = TestTables.find(execCtx->InputTables_.front().Path.Path_);
+ YQL_ENSURE(TestTables.cend() != itI);
- TMkqlInput input(MakeStringInput(std::move(itI->second.second), false));
- TMkqlReaderImpl reader(input, 0, 4 << 10, 0);
- reader.SetSpecs(specs, holderFactory);
- for (reader.Next(); reader.IsValid(); reader.Next()) {
- if (!pullData.WriteNext(specsCache, reader.GetRow(), 0)) {
- return true;
- }
- }
- } else if (auto limiter = TTableLimiter(range)) {
- auto entry = execCtx->GetEntry();
- bool stop = false;
- for (size_t i = 0; i < execCtx->InputTables_.size(); ++i) {
- TString srcTableName = execCtx->InputTables_[i].Name;
- NYT::TRichYPath srcTable = execCtx->InputTables_[i].Path;
- bool isDynamic = execCtx->InputTables_[i].Dynamic;
- ui64 recordsCount = execCtx->InputTables_[i].Records;
- if (!isDynamic) {
- if (!limiter.NextTable(recordsCount)) {
- continue;
+ TMkqlInput input(MakeStringInput(std::move(itI->second.second), false));
+ TMkqlReaderImpl reader(input, 0, 4 << 10, 0);
+ reader.SetSpecs(specs, holderFactory);
+ for (reader.Next(); reader.IsValid(); reader.Next()) {
+ if (!pullData.WriteNext(specsCache, reader.GetRow(), 0)) {
+ return true;
}
- } else {
- limiter.NextDynamicTable();
}
-
- if (isDynamic) {
- YQL_ENSURE(srcTable.GetRanges().Empty());
- stop = NYql::SelectRows(entry->Client, srcTableName, i, specsCache, pullData, limiter);
- } else {
- auto readTx = entry->Tx;
- if (srcTable.TransactionId_) {
- readTx = entry->GetSnapshotTx(*srcTable.TransactionId_);
- srcTable.TransactionId_.Clear();
+ } else if (auto limiter = TTableLimiter(range)) {
+ auto entry = execCtx->GetEntry();
+ bool stop = false;
+ for (size_t i = 0; i < execCtx->InputTables_.size(); ++i) {
+ TString srcTableName = execCtx->InputTables_[i].Name;
+ NYT::TRichYPath srcTable = execCtx->InputTables_[i].Path;
+ bool isDynamic = execCtx->InputTables_[i].Dynamic;
+ ui64 recordsCount = execCtx->InputTables_[i].Records;
+ if (!isDynamic) {
+ if (!limiter.NextTable(recordsCount)) {
+ continue;
+ }
+ } else {
+ limiter.NextDynamicTable();
}
- if (execCtx->YamrInput) {
- stop = NYql::IterateYamredRows(readTx, srcTable, i, specsCache, pullData, limiter, execCtx->Sampling);
+
+ if (isDynamic) {
+ YQL_ENSURE(srcTable.GetRanges().Empty());
+ stop = NYql::SelectRows(entry->Client, srcTableName, i, specsCache, pullData, limiter);
} else {
- stop = NYql::IterateYsonRows(readTx, srcTable, i, specsCache, pullData, limiter, execCtx->Sampling);
+ auto readTx = entry->Tx;
+ if (srcTable.TransactionId_) {
+ readTx = entry->GetSnapshotTx(*srcTable.TransactionId_);
+ srcTable.TransactionId_.Clear();
+ }
+ if (execCtx->YamrInput) {
+ stop = NYql::IterateYamredRows(readTx, srcTable, i, specsCache, pullData, limiter, execCtx->Sampling);
+ } else {
+ stop = NYql::IterateYsonRows(readTx, srcTable, i, specsCache, pullData, limiter, execCtx->Sampling);
+ }
+ }
+ if (stop || limiter.Exceed()) {
+ break;
}
}
- if (stop || limiter.Exceed()) {
- break;
+ }
+ return false;
+ };
+
+ switch (execCtx->Options_.FillSettings().Format) {
+ case IDataProvider::EResultFormat::Yson: {
+ TYsonExecuteResOrPull pullData(execCtx->Options_.FillSettings().RowsLimitPerWrite,
+ execCtx->Options_.FillSettings().AllResultsBytesLimit, MakeMaybe(columns));
+
+ if (run(pullData)) {
+ return true;
}
+ specs.Clear();
+
+ writer.OnKeyedItem("Data");
+ writer.OnBeginList();
+ writer.OnRaw(pullData.Finish(), ::NYson::EYsonType::ListFragment);
+ writer.OnEndList();
+ return pullData.IsTruncated();
}
- }
+ case IDataProvider::EResultFormat::Skiff: {
+ THashMap<TString, ui32> structColumns;
+ for (size_t index = 0; index < columns.size(); index++) {
+ structColumns.emplace(columns[index], index);
+ }
+
+ auto skiffNode = SingleTableSpecToInputSkiff(rowSpec[YqlIOSpecTables][0], structColumns, false, false, false);
- specs.Clear();
- writer.OnKeyedItem("Data");
- writer.OnBeginList(); // Pull returns list fragment
- writer.OnRaw(pullData.Finish(), ::NYson::EYsonType::ListFragment);
- writer.OnEndList();
+ writer.OnKeyedItem("SkiffType");
+ writer.OnRaw(NodeToYsonString(skiffNode), ::NYson::EYsonType::Node);
- return pullData.IsTruncated();
+ TSkiffExecuteResOrPull pullData(execCtx->Options_.FillSettings().RowsLimitPerWrite,
+ execCtx->Options_.FillSettings().AllResultsBytesLimit,
+ codecCtx,
+ holderFactory,
+ rowSpec,
+ execCtx->Options_.OptLLVM(),
+ columns);
+
+ if (run(pullData)) {
+ return true;
+ }
+ specs.Clear();
+
+ writer.OnKeyedItem("Data");
+ writer.OnStringScalar(pullData.Finish());
+
+ return pullData.IsTruncated();
+ }
+ default:
+ YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << execCtx->Options_.FillSettings().Format;
+ }
}
TFuture<TResOrPullResult> DoResult(const TSession::TPtr& session, NNodes::TResult result, TExprContext& ctx, TResOrPullOptions&& options) {
@@ -2799,15 +2915,8 @@ private:
hasListResult = rootNode.GetStaticType()->IsList();
lambda = SerializeRuntimeNode(rootNode, builder.GetTypeEnvironment());
}
- auto extraUsage = ScanExtraResourceUsage(result.Input().Ref(), *options.Config());
- TString type;
- if (NCommon::HasResOrPullOption(result.Ref(), "type")) {
- TStringStream typeYson;
- ::NYson::TYsonWriter typeWriter(&typeYson);
- NCommon::WriteResOrPullType(typeWriter, result.Input().Ref().GetTypeAnn(), columns);
- type = typeYson.Str();
- }
+ auto extraUsage = ScanExtraResourceUsage(result.Input().Ref(), *options.Config());
TString cluster = options.UsedCluster();
if (cluster.empty()) {
@@ -2823,30 +2932,92 @@ private:
auto execCtx = MakeExecCtx(std::move(options), session, cluster, result.Input().Raw(), &ctx);
auto pos = ctx.GetPosition(result.Pos());
- return session->Queue_->Async([lambda, hasListResult, extraUsage, tmpTablePath, execCtx, columns] () {
+ TString type, skiffType;
+ NYT::TNode rowSpec;
+ if (execCtx->Options_.FillSettings().Format == IDataProvider::EResultFormat::Skiff) {
+ auto ytType = ParseYTType(result.Input().Ref(), ctx, execCtx);
+
+ type = ytType.first;
+ rowSpec = ytType.second;
+ skiffType = NodeToYsonString(TablesSpecToOutputSkiff(rowSpec));
+ } else if (NCommon::HasResOrPullOption(result.Ref(), "type")) {
+ TStringStream typeYson;
+ ::NYson::TYsonWriter typeWriter(&typeYson);
+ NCommon::WriteResOrPullType(typeWriter, result.Input().Ref().GetTypeAnn(), columns);
+ type = typeYson.Str();
+ }
+
+ return session->Queue_->Async([lambda, hasListResult, extraUsage, tmpTablePath, execCtx, columns, rowSpec] () {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
execCtx->MakeUserFiles();
- return ExecCalc(lambda, extraUsage, tmpTablePath, execCtx, {},
- TExprResultFactory(execCtx->Options_.FillSettings().RowsLimitPerWrite,
- execCtx->Options_.FillSettings().AllResultsBytesLimit, columns, hasListResult),
- &columns);
+
+ switch (execCtx->Options_.FillSettings().Format) {
+ case IDataProvider::EResultFormat::Yson:
+ return ExecCalc(lambda, extraUsage, tmpTablePath, execCtx, {},
+ TYsonExprResultFactory(execCtx->Options_.FillSettings().RowsLimitPerWrite,
+ execCtx->Options_.FillSettings().AllResultsBytesLimit,
+ columns,
+ hasListResult),
+ &columns,
+ execCtx->Options_.FillSettings().Format);
+ case IDataProvider::EResultFormat::Skiff:
+ return ExecCalc(lambda, extraUsage, tmpTablePath, execCtx, {},
+ TSkiffExprResultFactory(execCtx->Options_.FillSettings().RowsLimitPerWrite,
+ execCtx->Options_.FillSettings().AllResultsBytesLimit,
+ hasListResult,
+ rowSpec,
+ execCtx->Options_.OptLLVM()),
+ &columns,
+ execCtx->Options_.FillSettings().Format);
+ break;
+ default:
+ YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << execCtx->Options_.FillSettings().Format;
+ }
})
- .Apply([type, execCtx, discard, pos] (const TFuture<std::pair<TString, bool>>& f) {
+ .Apply([skiffType, type, execCtx, discard, pos, columns] (const TFuture<std::pair<TString, bool>>& f) {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
try {
const std::pair<TString, bool>& value = f.GetValue();
TResOrPullResult res;
TStringStream out;
- ::NYson::TYsonWriter writer(discard ? (IOutputStream*)&Cnull : (IOutputStream*)&out, NCommon::GetYsonFormat(execCtx->Options_.FillSettings()), ::NYson::EYsonType::Node, true);
+
+ auto fillSettings = execCtx->Options_.FillSettings();
+ fillSettings.Format = IDataProvider::EResultFormat::Yson;
+
+ ::NYson::TYsonWriter writer(discard ? (IOutputStream*)&Cnull : (IOutputStream*)&out, NCommon::GetYsonFormat(fillSettings), ::NYson::EYsonType::Node, true);
writer.OnBeginMap();
+
+ if (skiffType) {
+ writer.OnKeyedItem("SkiffType");
+ writer.OnRaw(skiffType, ::NYson::EYsonType::Node);
+
+
+ writer.OnKeyedItem("Columns");
+ writer.OnBeginList();
+ for (auto& column: columns) {
+ writer.OnListItem();
+ writer.OnStringScalar(column);
+ }
+ writer.OnEndList();
+ }
+
if (type) {
writer.OnKeyedItem("Type");
writer.OnRaw(type);
}
writer.OnKeyedItem("Data");
- writer.OnRaw(value.first);
+ switch (execCtx->Options_.FillSettings().Format) {
+ case IDataProvider::EResultFormat::Yson:
+ writer.OnRaw(value.first);
+ break;
+ case IDataProvider::EResultFormat::Skiff:
+ writer.OnStringScalar(value.first);
+ break;
+ default:
+ YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << execCtx->Options_.FillSettings().Format;
+ }
if (value.second) {
writer.OnKeyedItem("Truncated");
@@ -4504,7 +4675,7 @@ private:
LocalRawMapReduce(spec, mapper, &in, &out);
- const auto& builder = factory();
+ const auto& builder = factory.Create();
for (const auto& reader = NYT::CreateTableReader<NYT::TNode>(&out, NYT::TTableReaderOptions()); reader->IsValid(); reader->Next()) {
auto& row = reader->GetRow();
if (!builder->WriteNext(row["output"])) {
@@ -4534,7 +4705,8 @@ private:
const TExecParamsPtr& execCtx,
TTransactionCache::TEntry::TPtr entry,
TResultFactory&& factory,
- const TVector<TString>* columns = nullptr
+ const TVector<TString>* columns = nullptr,
+ IDataProvider::EResultFormat format = IDataProvider::EResultFormat::Yson
)
{
TRawMapOperationSpec mapOpSpec;
@@ -4574,9 +4746,25 @@ private:
const TBindTerminator bind(graph->GetTerminator());
graph->Prepare();
auto value = graph->GetValue();
- auto builder = factory();
- builder->WriteValue(value, root.GetStaticType());
- return MakeFuture(builder->Make());
+
+ switch (format) {
+ case IDataProvider::EResultFormat::Skiff: {
+ TMemoryUsageInfo memInfo("Calc");
+ THolderFactory holderFactory(alloc.Ref(), memInfo, execCtx->FunctionRegistry_);
+ TCodecContext codecCtx(builder.GetTypeEnvironment(), *execCtx->FunctionRegistry_, &holderFactory);
+
+ auto skiffBuilder = factory.Create(codecCtx, holderFactory);
+ skiffBuilder->WriteValue(value, root.GetStaticType());
+ return MakeFuture(skiffBuilder->Make());
+ }
+ case IDataProvider::EResultFormat::Yson: {
+ auto ysonBuilder = factory.Create();
+ ysonBuilder->WriteValue(value, root.GetStaticType());
+ return MakeFuture(ysonBuilder->Make());
+ }
+ default:
+ YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << format;
+ }
}
localRun = localRun && transform.CanExecuteLocally();
{
@@ -4645,17 +4833,44 @@ private:
return entry->Tx->RawMap(mapOpSpec, job, opOpts);
});
})
- .Apply([tmpTable, entry, factory = std::move(factory)](const auto& f) {
+ .Apply([execCtx, tmpTable, entry, factory = std::move(factory), format](const auto& f) {
f.GetValue();
- auto builder = factory();
+
auto reader = entry->Tx->CreateTableReader<NYT::TNode>(tmpTable);
- for (; reader->IsValid(); reader->Next()) {
- auto& row = reader->GetRow();
- if (!builder->WriteNext(row["output"])) {
- break;
+
+ TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(),
+ execCtx->FunctionRegistry_->SupportsSizedAllocators());
+ alloc.SetLimit(execCtx->Options_.Config()->DefaultCalcMemoryLimit.Get().GetOrElse(0));
+ TMemoryUsageInfo memInfo("Calc");
+ TTypeEnvironment env(alloc);
+ THolderFactory holderFactory(alloc.Ref(), memInfo, execCtx->FunctionRegistry_);
+
+ switch (format) {
+ case IDataProvider::EResultFormat::Skiff: {
+ TCodecContext codecCtx(env, *execCtx->FunctionRegistry_, &holderFactory);
+
+ auto skiffBuilder = factory.Create(codecCtx, holderFactory);
+ for (; reader->IsValid(); reader->Next()) {
+ auto& row = reader->GetRow();
+ if (!skiffBuilder->WriteNext(row["output"])) {
+ break;
+ }
+ }
+ return skiffBuilder->Make();
+ }
+ case IDataProvider::EResultFormat::Yson: {
+ auto ysonBuilder = factory.Create();
+ for (; reader->IsValid(); reader->Next()) {
+ auto& row = reader->GetRow();
+ if (!ysonBuilder->WriteNext(row["output"])) {
+ break;
+ }
+ }
+ return ysonBuilder->Make();
}
+ default:
+ YQL_LOG_CTX_THROW yexception() << "Unexpected result type: " << format;
}
- return builder->Make();
})
.Apply([tmpTable, execCtx, entry](const TFuture<typename TResultFactory::TResult>& f) {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
diff --git a/ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.cpp b/ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.cpp
index bab2518be2e..ada1131e5b7 100644
--- a/ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.cpp
+++ b/ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.cpp
@@ -13,24 +13,15 @@ namespace NYql {
using namespace NKikimr;
using namespace NKikimr::NMiniKQL;
-TExecuteResOrPull::TExecuteResOrPull(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, const TMaybe<TVector<TString>>& columns)
- : Rows(rowLimit)
- , Bytes(byteLimit)
- , Columns(columns)
- , Out(new THoldingStream<TCountingOutput>(THolder(new TStringOutput(Result))))
+//////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+TYsonExecuteResOrPull::TYsonExecuteResOrPull(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, const TMaybe<TVector<TString>>& columns)
+ : IExecuteResOrPull(rowLimit, byteLimit, columns)
, Writer(new NYson::TYsonWriter(Out.Get(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::Node, true))
- , IsList(false)
- , Truncated(false)
- , Row(0)
{
}
-ui64 TExecuteResOrPull::GetWrittenSize() const {
- YQL_ENSURE(Out, "GetWritten() must be callled before Finish()");
- return Out->Counter();
-}
-
-TString TExecuteResOrPull::Finish() {
+TString TYsonExecuteResOrPull::Finish() {
if (IsList) {
Writer->OnEndList();
}
@@ -39,7 +30,7 @@ TString TExecuteResOrPull::Finish() {
return Result;
}
-bool TExecuteResOrPull::WriteNext(TStringBuf val) {
+bool TYsonExecuteResOrPull::WriteNext(const NYT::TNode& item) {
if (IsList) {
if (!HasCapacity()) {
Truncated = true;
@@ -47,12 +38,14 @@ bool TExecuteResOrPull::WriteNext(TStringBuf val) {
}
Writer->OnListItem();
}
- Writer->OnRaw(val, ::NYson::EYsonType::Node);
+
+ Writer->OnRaw(NYT::NodeToYsonString(item, NYT::NYson::EYsonFormat::Binary), ::NYson::EYsonType::Node);
++Row;
+
return IsList;
}
-void TExecuteResOrPull::WriteValue(const NUdf::TUnboxedValue& value, TType* type) {
+void TYsonExecuteResOrPull::WriteValue(const NUdf::TUnboxedValue& value, TType* type) {
if (type->IsList()) {
auto inputType = AS_TYPE(TListType, type)->GetItemType();
TMaybe<TVector<ui32>> structPositions = NCommon::CreateStructPositions(inputType, Columns.Defined() ? Columns.Get() : nullptr);
@@ -63,6 +56,7 @@ void TExecuteResOrPull::WriteValue(const NUdf::TUnboxedValue& value, TType* type
Truncated = true;
break;
}
+
Writer->OnListItem();
NCommon::WriteYsonValue(*Writer, item, inputType, structPositions.Get());
}
@@ -71,4 +65,153 @@ void TExecuteResOrPull::WriteValue(const NUdf::TUnboxedValue& value, TType* type
}
}
+bool TYsonExecuteResOrPull::WriteNext(TMkqlIOCache& specsCache, const NYT::TNode& rec, ui32 tableIndex) {
+ if (!HasCapacity()) {
+ Truncated = true;
+ return false;
+ }
+
+ NYql::DecodeToYson(specsCache, tableIndex, rec, *Out);
+ ++Row;
+ return true;
}
+
+bool TYsonExecuteResOrPull::WriteNext(TMkqlIOCache& specsCache, const NYT::TYaMRRow& rec, ui32 tableIndex) {
+ if (!HasCapacity()) {
+ Truncated = true;
+ return false;
+ }
+
+ NYql::DecodeToYson(specsCache, tableIndex, rec, *Out);
+ ++Row;
+ return true;
+}
+
+bool TYsonExecuteResOrPull::WriteNext(TMkqlIOCache& specsCache, const NUdf::TUnboxedValue& rec, ui32 tableIndex) {
+ if (!HasCapacity()) {
+ Truncated = true;
+ return false;
+ }
+
+ NYql::DecodeToYson(specsCache, tableIndex, rec, *Out);
+ ++Row;
+ return true;
+}
+
+void TYsonExecuteResOrPull::SetListResult() {
+ if (!IsList) {
+ IsList = true;
+ Writer->OnBeginList();
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+TSkiffExecuteResOrPull::TSkiffExecuteResOrPull(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, NCommon::TCodecContext& codecCtx, const NKikimr::NMiniKQL::THolderFactory& holderFactory, const NYT::TNode& attrs, const TString& optLLVM, const TVector<TString>& columns)
+ : IExecuteResOrPull(rowLimit, byteLimit, columns)
+ , HolderFactory(holderFactory)
+ , SkiffWriter(*Out.Get(), 0, 4_MB)
+{
+ Specs.SetUseSkiff(optLLVM);
+ Specs.Init(codecCtx, attrs);
+
+ SkiffWriter.SetSpecs(Specs, columns);
+}
+
+TString TSkiffExecuteResOrPull::Finish() {
+ SkiffWriter.Finish();
+ Out.Destroy();
+ Specs.Clear();
+ return Result;
+}
+
+bool TSkiffExecuteResOrPull::WriteNext(const NYT::TNode& item) {
+ if (IsList) {
+ if (!HasCapacity()) {
+ Truncated = true;
+ return false;
+ }
+ }
+
+ TStringStream err;
+ auto value = NCommon::ParseYsonNode(HolderFactory, item, Specs.Outputs[0].RowType, &err);
+ if (!value) {
+ throw yexception() << "Could not parse yson node";
+ }
+ SkiffWriter.AddRow(*value);
+
+ return IsList;
+}
+
+void TSkiffExecuteResOrPull::WriteValue(const NUdf::TUnboxedValue& value, TType* type) {
+ if (type->IsList()) {
+ SetListResult();
+ const auto it = value.GetListIterator();
+ for (NUdf::TUnboxedValue item; it.Next(item); ++Row) {
+ if (!HasCapacity()) {
+ Truncated = true;
+ break;
+ }
+
+ SkiffWriter.AddRow(item);
+ }
+ } else {
+ SkiffWriter.AddRow(value);
+ }
+}
+
+bool TSkiffExecuteResOrPull::WriteNext(TMkqlIOCache& specsCache, const NYT::TNode& rec, ui32 tableIndex) {
+ if (!HasCapacity()) {
+ Truncated = true;
+ return false;
+ }
+
+ TStringStream err;
+ auto value = NCommon::ParseYsonNode(specsCache.GetHolderFactory(), rec, Specs.Outputs[tableIndex].RowType, &err);
+ if (!value) {
+ throw yexception() << "Could not parse yson node";
+ }
+ SkiffWriter.AddRow(*value);
+
+ ++Row;
+ return true;
+}
+
+bool TSkiffExecuteResOrPull::WriteNext(TMkqlIOCache& specsCache, const NYT::TYaMRRow& rec, ui32 tableIndex) {
+ if (!HasCapacity()) {
+ Truncated = true;
+ return false;
+ }
+
+ NUdf::TUnboxedValue node;
+ node = DecodeYamr(specsCache, tableIndex, rec);
+ SkiffWriter.AddRow(node);
+
+ ++Row;
+ return true;
+}
+
+bool TSkiffExecuteResOrPull::WriteNext(TMkqlIOCache& specsCache, const NUdf::TUnboxedValue& rec, ui32 tableIndex) {
+ Y_UNUSED(specsCache);
+ Y_UNUSED(tableIndex);
+
+ if (!HasCapacity()) {
+ Truncated = true;
+ return false;
+ }
+
+ SkiffWriter.AddRow(rec);
+
+ ++Row;
+ return true;
+}
+
+void TSkiffExecuteResOrPull::SetListResult() {
+ if (!IsList) {
+ IsList = true;
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+} // NYql
diff --git a/ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.h b/ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.h
index d5d7fbcd7bf..d913a96a29c 100644
--- a/ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.h
+++ b/ydb/library/yql/providers/yt/lib/res_pull/res_or_pull.h
@@ -5,6 +5,8 @@
#include <ydb/library/yql/minikql/mkql_node.h>
+#include <library/cpp/yson/node/node_io.h>
+
#include <library/cpp/yson/writer.h>
#include <util/stream/length.h>
@@ -17,9 +19,22 @@
namespace NYql {
-class TExecuteResOrPull : public TNonCopyable {
+///////////////////////////////////////////////////////////////////////////////
+// IExecuteResOrPull
+///////////////////////////////////////////////////////////////////////////////
+class IExecuteResOrPull : public TNonCopyable {
public:
- TExecuteResOrPull(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, const TMaybe<TVector<TString>>& columns);
+ IExecuteResOrPull(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, const TMaybe<TVector<TString>>& columns)
+ : Rows(rowLimit)
+ , Bytes(byteLimit)
+ , Columns(columns)
+ , Out(new THoldingStream<TCountingOutput>(THolder(new TStringOutput(Result))))
+ , IsList(false)
+ , Truncated(false)
+ , Row(0)
+ {
+ }
+ virtual ~IExecuteResOrPull() = default;
bool HasCapacity() const {
return (!Rows || Row < *Rows) && (!Bytes || Out->Counter() < *Bytes);
@@ -29,42 +44,38 @@ public:
return Truncated;
}
- ui64 GetWrittenSize() const;
+ ui64 GetWrittenSize() const {
+ YQL_ENSURE(Out, "GetWritten() must be callled before Finish()");
+ return Out->Counter();
+ }
ui64 GetWrittenRows() const {
return Row;
}
- TString Finish();
TMaybe<ui64> GetRowsLimit() const {
return Rows;
}
- void SetListResult() {
- if (!IsList) {
- IsList = true;
- Writer->OnBeginList();
- }
- }
-
const TMaybe<TVector<TString>>& GetColumns() const {
return Columns;
}
- bool WriteNext(TStringBuf val);
-
- template <class TRec>
- bool WriteNext(TMkqlIOCache& specCache, const TRec& rec, ui32 tableIndex) {
- if (!HasCapacity()) {
- Truncated = true;
- return false;
- }
- NYql::DecodeToYson(specCache, tableIndex, rec, *Out);
- ++Row;
- return true;
+ std::pair<TString, bool> Make() {
+ return {Finish(), IsTruncated()};
}
- void WriteValue(const NKikimr::NUdf::TUnboxedValue& value, NKikimr::NMiniKQL::TType* type);
+ virtual TString Finish() = 0;
+
+ virtual void SetListResult() = 0;
+
+ virtual bool WriteNext(const NYT::TNode& item) = 0;
+
+ virtual bool WriteNext(TMkqlIOCache& specsCache, const NUdf::TUnboxedValue& rec, ui32 tableIndex) = 0;
+ virtual bool WriteNext(TMkqlIOCache& specsCache, const NYT::TYaMRRow& rec, ui32 tableIndex) = 0;
+ virtual bool WriteNext(TMkqlIOCache& specsCache, const NYT::TNode& rec, ui32 tableIndex) = 0;
+
+ virtual void WriteValue(const NKikimr::NUdf::TUnboxedValue& value, NKikimr::NMiniKQL::TType* type) = 0;
protected:
const TMaybe<ui64> Rows;
@@ -72,10 +83,58 @@ protected:
const TMaybe<TVector<TString>> Columns;
TString Result;
THolder<TCountingOutput> Out;
- THolder<NYson::TYsonWriter> Writer;
bool IsList;
bool Truncated;
ui64 Row;
};
+///////////////////////////////////////////////////////////////////////////////
+// TYsonExecuteResOrPull
+///////////////////////////////////////////////////////////////////////////////
+class TYsonExecuteResOrPull : public IExecuteResOrPull {
+public:
+ TYsonExecuteResOrPull(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, const TMaybe<TVector<TString>>& columns);
+ ~TYsonExecuteResOrPull() = default;
+
+ TString Finish() override;
+
+ void SetListResult() override;
+
+ bool WriteNext(const NYT::TNode& item) override;
+
+ bool WriteNext(TMkqlIOCache& specsCache, const NUdf::TUnboxedValue& rec, ui32 tableIndex) override;
+ bool WriteNext(TMkqlIOCache& specsCache, const NYT::TYaMRRow& rec, ui32 tableIndex) override;
+ bool WriteNext(TMkqlIOCache& specsCache, const NYT::TNode& rec, ui32 tableIndex) override;
+
+ void WriteValue(const NKikimr::NUdf::TUnboxedValue& value, NKikimr::NMiniKQL::TType* type) override;
+protected:
+ THolder<NYson::TYsonWriter> Writer;
+};
+
+///////////////////////////////////////////////////////////////////////////////
+// TSkiffExecuteResOrPull
+///////////////////////////////////////////////////////////////////////////////
+class TSkiffExecuteResOrPull : public IExecuteResOrPull {
+public:
+ TSkiffExecuteResOrPull(TMaybe<ui64> rowLimit, TMaybe<ui64> byteLimit, NCommon::TCodecContext& codecCtx, const NKikimr::NMiniKQL::THolderFactory& holderFactory, const NYT::TNode& attrs, const TString& optLLVM, const TVector<TString>& columns = {});
+ ~TSkiffExecuteResOrPull() = default;
+
+ TString Finish() override;
+
+ void SetListResult() override;
+
+ bool WriteNext(const NYT::TNode& item) override;
+
+ bool WriteNext(TMkqlIOCache& specsCache, const NUdf::TUnboxedValue& rec, ui32 tableIndex) override;
+ bool WriteNext(TMkqlIOCache& specsCache, const NYT::TYaMRRow& rec, ui32 tableIndex) override;
+ bool WriteNext(TMkqlIOCache& specsCache, const NYT::TNode& rec, ui32 tableIndex) override;
+
+ void WriteValue(const NKikimr::NUdf::TUnboxedValue& value, NKikimr::NMiniKQL::TType* type) override;
+protected:
+ const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
+
+ TMkqlIOSpecs Specs;
+ TMkqlWriterImpl SkiffWriter;
+};
+
}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp
index 032c396598a..923b0954178 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp
@@ -78,7 +78,7 @@ protected:
TResOrPullBase resOrPull(input);
IDataProvider::TFillSettings fillSettings = NCommon::GetFillSettings(resOrPull.Ref());
- YQL_ENSURE(fillSettings.Format == IDataProvider::EResultFormat::Yson);
+ YQL_ENSURE(fillSettings.Format == IDataProvider::EResultFormat::Yson || fillSettings.Format == IDataProvider::EResultFormat::Skiff);
auto data = resOrPull.Input();
if (auto maybePull = resOrPull.Maybe<TPull>()) {