diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-01-11 18:26:21 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-01-11 18:26:21 +0300 |
commit | aac4f8462008395659ca657daf715ba68b41cedf (patch) | |
tree | fe650d35dc91cce74e64d337e36f73f371ab0de5 | |
parent | f9813f35812edd39d54b76a695b36963254178b2 (diff) | |
download | ydb-aac4f8462008395659ca657daf715ba68b41cedf.tar.gz |
parse objects synchronization with calc graph (parse immediately before using is useful for memory consumption and not affect performance)
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_input_channel.cpp | 55 |
1 files changed, 38 insertions, 17 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.cpp b/ydb/library/yql/dq/runtime/dq_input_channel.cpp index bd0a541e37..13de1dbb6d 100644 --- a/ydb/library/yql/dq/runtime/dq_input_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_input_channel.cpp @@ -6,6 +6,26 @@ namespace NYql::NDq { class TDqInputChannel : public TDqInputImpl<TDqInputChannel, IDqInputChannel> { using TBaseImpl = TDqInputImpl<TDqInputChannel, IDqInputChannel>; friend TBaseImpl; +private: + std::deque<NDqProto::TData> DataForDeserialize; + + void PushImpl(NDqProto::TData&& data) { + const i64 space = data.GetRaw().size(); + + NKikimr::NMiniKQL::TUnboxedValueVector buffer; + buffer.reserve(data.GetRows()); + + if (Y_UNLIKELY(ProfileStats)) { + auto startTime = TInstant::Now(); + DataSerializer.Deserialize(data, InputType, buffer); + ProfileStats->DeserializationTime += (TInstant::Now() - startTime); + } else { + DataSerializer.Deserialize(data, InputType, buffer); + } + + AddBatch(std::move(buffer), space); + } + public: TDqInputChannel(ui64 channelId, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes, bool collectProfileStats, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, @@ -14,33 +34,34 @@ public: , ChannelId(channelId) , BasicStats(ChannelId) , ProfileStats(collectProfileStats ? &BasicStats : nullptr) - , DataSerializer(typeEnv, holderFactory, transportVersion) {} + , DataSerializer(typeEnv, holderFactory, transportVersion) + {} ui64 GetChannelId() const override { return ChannelId; } + [[nodiscard]] + bool Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch) override { + if (Batches.empty()) { + if (DataForDeserialize.size()) { + PushImpl(std::move(DataForDeserialize.front())); + DataForDeserialize.pop_front(); + return TBaseImpl::Pop(batch); + } else { + return false; + } + } else { + return TBaseImpl::Pop(batch); + } + } + void Push(NDqProto::TData&& data) override { YQL_ENSURE(!Finished, "input channel " << ChannelId << " already finished"); - if (Y_UNLIKELY(data.GetRows() == 0)) { return; } - - const i64 space = data.GetRaw().size(); - - NKikimr::NMiniKQL::TUnboxedValueVector buffer; - buffer.reserve(data.GetRows()); - - if (Y_UNLIKELY(ProfileStats)) { - auto startTime = TInstant::Now(); - DataSerializer.Deserialize(data, InputType, buffer); - ProfileStats->DeserializationTime += (TInstant::Now() - startTime); - } else { - DataSerializer.Deserialize(data, InputType, buffer); - } - - AddBatch(std::move(buffer), space); + DataForDeserialize.emplace_back(std::move(data)); } const TDqInputChannelStats* GetStats() const override { |