aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormaxkovalev <maxkovalev@yandex-team.com>2024-12-16 15:06:42 +0300
committermaxkovalev <maxkovalev@yandex-team.com>2024-12-16 15:27:43 +0300
commitc5ab007dbc3c15c63e5fa7ed62b3fc0324225830 (patch)
treeeff90b1e64c43784acd60f5887c13531de3ff520
parentf7349410895855a89832fff42aa1935bd6d27a4a (diff)
downloadydb-c5ab007dbc3c15c63e5fa7ed62b3fc0324225830.tar.gz
YQL-19230: Revert DqYtWriter changes
Revert "YQL-19230: Move unboxed values to WriterState" This reverts commit 4df0abb5155d3251412c0213788290fb37d74d79, reversing changes made to eee144a5106a4ff7262c166852da02f01c9fd4b6. commit_hash:529d8b7ff43e007bb17865924049d6739bc6b5c7
-rw-r--r--yt/yql/providers/yt/comp_nodes/dq/dq_yt_writer.cpp30
1 files changed, 11 insertions, 19 deletions
diff --git a/yt/yql/providers/yt/comp_nodes/dq/dq_yt_writer.cpp b/yt/yql/providers/yt/comp_nodes/dq/dq_yt_writer.cpp
index d0268e9f27..65773c8365 100644
--- a/yt/yql/providers/yt/comp_nodes/dq/dq_yt_writer.cpp
+++ b/yt/yql/providers/yt/comp_nodes/dq/dq_yt_writer.cpp
@@ -28,14 +28,11 @@ class TYtDqWideWriteWrapper final : public TStatefulFlowCodegeneratorNode<TYtDqW
ITransactionPtr&& transaction,
THolder<TMkqlIOSpecs>&& specs,
TRawTableWriterPtr&& outStream,
- THolder<TMkqlWriterImpl>&& writer,
- size_t representationsSize
+ THolder<TMkqlWriterImpl>&& writer
)
: TComputationValue<TWriterState>(memInfo)
, Client(std::move(client)), Transaction(std::move(transaction))
, Specs(std::move(specs)), OutStream(std::move(outStream)), Writer(std::move(writer))
- , Values(representationsSize)
- , Fields(GetPointers(Values))
{}
~TWriterState() override {
@@ -45,19 +42,12 @@ class TYtDqWideWriteWrapper final : public TStatefulFlowCodegeneratorNode<TYtDqW
}
}
- void AddRow() const {
- Writer->AddFlatRow(Values.data());
- }
-
- const std::vector<NUdf::TUnboxedValue*>& GetFields() const {
- return Fields;
+ void AddRow(const NUdf::TUnboxedValuePod* row) const {
+ Writer->AddFlatRow(row);
}
void Finish() {
if (!Finished) {
- Values.clear();
- Values.shrink_to_fit();
- std::fill(Fields.begin(), Fields.end(), nullptr);
Writer->Finish();
OutStream->Finish();
}
@@ -70,8 +60,6 @@ class TYtDqWideWriteWrapper final : public TStatefulFlowCodegeneratorNode<TYtDqW
const THolder<TMkqlIOSpecs> Specs;
const TRawTableWriterPtr OutStream;
const THolder<TMkqlWriterImpl> Writer;
- NKikimr::NMiniKQL::TUnboxedValueVector Values;
- std::vector<NUdf::TUnboxedValue*> Fields;
};
public:
@@ -94,6 +82,7 @@ public:
, OutSpec(outSpec)
, WriterOptions(writerOptions)
, CodecCtx(std::move(codecCtx))
+ , Values(Representations.size()), Fields(GetPointers(Values))
{}
NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
@@ -102,9 +91,9 @@ public:
} else if (state.IsInvalid())
MakeState(ctx, state);
- switch (const auto ptr = static_cast<TWriterState*>(state.AsBoxed().Get()); Flow->FetchValues(ctx, ptr->GetFields().data())) {
+ switch (const auto ptr = static_cast<TWriterState*>(state.AsBoxed().Get()); Flow->FetchValues(ctx, Fields.data())) {
case EFetchResult::One:
- ptr->AddRow();
+ ptr->AddRow(Values.data());
return NUdf::TUnboxedValuePod::Void();
case EFetchResult::Yield:
return NUdf::TUnboxedValuePod::MakeYield();
@@ -236,14 +225,14 @@ private:
auto writer = MakeHolder<TMkqlWriterImpl>(outStream, 4_MB);
writer->SetSpecs(*specs);
- state = ctx.HolderFactory.Create<TWriterState>(std::move(client), std::move(transaction), std::move(specs), std::move(outStream), std::move(writer), Representations.size());
+ state = ctx.HolderFactory.Create<TWriterState>(std::move(client), std::move(transaction), std::move(specs), std::move(outStream), std::move(writer));
}
void RegisterDependencies() const final {
FlowDependsOn(Flow);
}
- static std::vector<NUdf::TUnboxedValue*> GetPointers(NKikimr::NMiniKQL::TUnboxedValueVector& array) {
+ static std::vector<NUdf::TUnboxedValue*> GetPointers(std::vector<NUdf::TUnboxedValue>& array) {
std::vector<NUdf::TUnboxedValue*> pointers;
pointers.reserve(array.size());
std::transform(array.begin(), array.end(), std::back_inserter(pointers), [](NUdf::TUnboxedValue& v) { return std::addressof(v); });
@@ -259,6 +248,9 @@ private:
const NYT::TNode OutSpec;
const NYT::TNode WriterOptions;
const THolder<NCommon::TCodecContext> CodecCtx;
+
+ std::vector<NUdf::TUnboxedValue> Values;
+ const std::vector<NUdf::TUnboxedValue*> Fields;
};
}