aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-01-11 18:26:21 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-01-11 18:26:21 +0300
commitaac4f8462008395659ca657daf715ba68b41cedf (patch)
treefe650d35dc91cce74e64d337e36f73f371ab0de5
parentf9813f35812edd39d54b76a695b36963254178b2 (diff)
downloadydb-aac4f8462008395659ca657daf715ba68b41cedf.tar.gz
parse objects synchronization with calc graph (parse immediately before using is useful for memory consumption and not affect performance)
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_channel.cpp55
1 files changed, 38 insertions, 17 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.cpp b/ydb/library/yql/dq/runtime/dq_input_channel.cpp
index bd0a541e37..13de1dbb6d 100644
--- a/ydb/library/yql/dq/runtime/dq_input_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_input_channel.cpp
@@ -6,6 +6,26 @@ namespace NYql::NDq {
class TDqInputChannel : public TDqInputImpl<TDqInputChannel, IDqInputChannel> {
using TBaseImpl = TDqInputImpl<TDqInputChannel, IDqInputChannel>;
friend TBaseImpl;
+private:
+ std::deque<NDqProto::TData> DataForDeserialize;
+
+ void PushImpl(NDqProto::TData&& data) {
+ const i64 space = data.GetRaw().size();
+
+ NKikimr::NMiniKQL::TUnboxedValueVector buffer;
+ buffer.reserve(data.GetRows());
+
+ if (Y_UNLIKELY(ProfileStats)) {
+ auto startTime = TInstant::Now();
+ DataSerializer.Deserialize(data, InputType, buffer);
+ ProfileStats->DeserializationTime += (TInstant::Now() - startTime);
+ } else {
+ DataSerializer.Deserialize(data, InputType, buffer);
+ }
+
+ AddBatch(std::move(buffer), space);
+ }
+
public:
TDqInputChannel(ui64 channelId, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes, bool collectProfileStats,
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
@@ -14,33 +34,34 @@ public:
, ChannelId(channelId)
, BasicStats(ChannelId)
, ProfileStats(collectProfileStats ? &BasicStats : nullptr)
- , DataSerializer(typeEnv, holderFactory, transportVersion) {}
+ , DataSerializer(typeEnv, holderFactory, transportVersion)
+ {}
ui64 GetChannelId() const override {
return ChannelId;
}
+ [[nodiscard]]
+ bool Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch) override {
+ if (Batches.empty()) {
+ if (DataForDeserialize.size()) {
+ PushImpl(std::move(DataForDeserialize.front()));
+ DataForDeserialize.pop_front();
+ return TBaseImpl::Pop(batch);
+ } else {
+ return false;
+ }
+ } else {
+ return TBaseImpl::Pop(batch);
+ }
+ }
+
void Push(NDqProto::TData&& data) override {
YQL_ENSURE(!Finished, "input channel " << ChannelId << " already finished");
-
if (Y_UNLIKELY(data.GetRows() == 0)) {
return;
}
-
- const i64 space = data.GetRaw().size();
-
- NKikimr::NMiniKQL::TUnboxedValueVector buffer;
- buffer.reserve(data.GetRows());
-
- if (Y_UNLIKELY(ProfileStats)) {
- auto startTime = TInstant::Now();
- DataSerializer.Deserialize(data, InputType, buffer);
- ProfileStats->DeserializationTime += (TInstant::Now() - startTime);
- } else {
- DataSerializer.Deserialize(data, InputType, buffer);
- }
-
- AddBatch(std::move(buffer), space);
+ DataForDeserialize.emplace_back(std::move(data));
}
const TDqInputChannelStats* GetStats() const override {