diff options
author | kardymon-d <kardymon-d@yandex-team.com> | 2023-10-27 11:20:05 +0300 |
---|---|---|
committer | kardymon-d <kardymon-d@yandex-team.com> | 2023-10-27 11:43:16 +0300 |
commit | 9f0d269f99492ad31a0882ded8db0547a5d92fad (patch) | |
tree | f61ecceb241e2ce53f5de2b4eeb62a5ce8debdca | |
parent | 508f523be35942ffcbcdac5175b94cff16fc030d (diff) | |
download | ydb-9f0d269f99492ad31a0882ded8db0547a5d92fad.tar.gz |
Стриминговый запрос прерывается с ошибкой Head(): requirement RowCount_ failed. Head() on empty batch
Fix Empty()
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_input_channel.cpp | 101 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_input_impl.h | 2 |
2 files changed, 75 insertions, 28 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.cpp b/ydb/library/yql/dq/runtime/dq_input_channel.cpp index 322d9c28c5..f61abd6d5e 100644 --- a/ydb/library/yql/dq/runtime/dq_input_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_input_channel.cpp @@ -3,9 +3,44 @@ namespace NYql::NDq { -class TDqInputChannel : public TDqInputImpl<TDqInputChannel, IDqInputChannel> { - using TBaseImpl = TDqInputImpl<TDqInputChannel, IDqInputChannel>; - friend TBaseImpl; +class TDqInputChannelImpl : public TDqInputImpl<TDqInputChannelImpl, IDqInputChannel> { + using TBaseImpl = TDqInputImpl<TDqInputChannelImpl, IDqInputChannel>; + +public: + TDqInputChannelStats PushStats; + TDqInputStats PopStats; + + TDqInputChannelImpl(ui64 channelId, ui32 srcStageId, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes, TCollectStatsLevel level, + const NKikimr::NMiniKQL::TTypeEnvironment&, const NKikimr::NMiniKQL::THolderFactory&, + NDqProto::EDataTransportVersion) + : TBaseImpl(inputType, maxBufferBytes) + { + PopStats.Level = level; + PushStats.Level = level; + PushStats.ChannelId = channelId; + PushStats.SrcStageId = srcStageId; + } + + ui64 GetChannelId() const override { + return PushStats.ChannelId; + } + + const TDqInputChannelStats& GetPushStats() const override { + return PushStats; + } + + const TDqInputStats& GetPopStats() const override { + return PopStats; + } + +private: + void Push(TDqSerializedBatch&&) override { + Y_ABORT("Not implemented"); + } +}; + +class TDqInputChannel : public IDqInputChannel { + private: std::deque<TDqSerializedBatch> DataForDeserialize; ui64 StoredSerializedBytes = 0; @@ -13,18 +48,18 @@ private: void PushImpl(TDqSerializedBatch&& data) { const i64 space = data.Size(); const size_t rowCount = data.RowCount(); - - NKikimr::NMiniKQL::TUnboxedValueBatch batch(InputType); + auto inputType = Impl.GetInputType(); + NKikimr::NMiniKQL::TUnboxedValueBatch batch(inputType); if (Y_UNLIKELY(PushStats.CollectProfile())) { auto startTime = TInstant::Now(); - DataSerializer.Deserialize(std::move(data), InputType, batch); + DataSerializer.Deserialize(std::move(data), inputType, batch); PushStats.DeserializationTime += (TInstant::Now() - startTime); } else { - DataSerializer.Deserialize(std::move(data), InputType, batch); + DataSerializer.Deserialize(std::move(data), inputType, batch); } YQL_ENSURE(batch.RowCount() == rowCount); - AddBatch(std::move(batch), space); + Impl.AddBatch(std::move(batch), space); } void DeserializeAllData() { @@ -42,59 +77,52 @@ public: TDqInputChannel(ui64 channelId, ui32 srcStageId, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes, TCollectStatsLevel level, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, NDqProto::EDataTransportVersion transportVersion) - : TBaseImpl(inputType, maxBufferBytes) - , DataSerializer(typeEnv, holderFactory, transportVersion) - { - PopStats.Level = level; - PushStats.Level = level; - PushStats.ChannelId = channelId; - PushStats.SrcStageId = srcStageId; + : Impl(channelId, srcStageId, inputType, maxBufferBytes, level, typeEnv, holderFactory, transportVersion) + , DataSerializer(typeEnv, holderFactory, transportVersion) { } ui64 GetChannelId() const override { - return PushStats.ChannelId; + return Impl.GetChannelId(); } const TDqInputChannelStats& GetPushStats() const override { - return PushStats; + return Impl.GetPushStats(); } const TDqInputStats& GetPopStats() const override { - return PopStats; + return Impl.GetPopStats(); } i64 GetFreeSpace() const override { - return TBaseImpl::GetFreeSpace() - i64(StoredSerializedBytes); + return Impl.GetFreeSpace() - i64(StoredSerializedBytes); } ui64 GetStoredBytes() const override { - return StoredBytes + StoredSerializedBytes; + return Impl.GetStoredBytes() + StoredSerializedBytes; } bool IsFinished() const override { - return DataForDeserialize.empty() && TBaseImpl::IsFinished(); + return DataForDeserialize.empty() && Impl.IsFinished(); } - [[nodiscard]] bool Empty() const override { - return DataForDeserialize.empty() && TBaseImpl::Empty(); + return (DataForDeserialize.empty() || Impl.IsPaused()) && Impl.Empty(); } void Pause() override { DeserializeAllData(); - TBaseImpl::Pause(); + Impl.Pause(); } - [[nodiscard]] bool Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch) override { - if (Batches.empty()) { + if (Impl.Empty() && !Impl.IsPaused()) { DeserializeAllData(); } - return TBaseImpl::Pop(batch); + return Impl.Pop(batch); } void Push(TDqSerializedBatch&& data) override { - YQL_ENSURE(!Finished, "input channel " << PushStats.ChannelId << " already finished"); + YQL_ENSURE(!Impl.IsFinished(), "input channel " << PushStats.ChannelId << " already finished"); if (Y_UNLIKELY(data.Proto.GetRows() == 0)) { return; } @@ -102,7 +130,24 @@ public: DataForDeserialize.emplace_back(std::move(data)); } + NKikimr::NMiniKQL::TType* GetInputType() const override { + return Impl.GetInputType(); + } + + void Resume() override { + Impl.Resume(); + } + + bool IsPaused() const override { + return Impl.IsPaused(); + } + + void Finish() override { + Impl.Finish(); + } + private: + TDqInputChannelImpl Impl; TDqDataSerializer DataSerializer; }; diff --git a/ydb/library/yql/dq/runtime/dq_input_impl.h b/ydb/library/yql/dq/runtime/dq_input_impl.h index d9665f5457..d0d96cdfda 100644 --- a/ydb/library/yql/dq/runtime/dq_input_impl.h +++ b/ydb/library/yql/dq/runtime/dq_input_impl.h @@ -66,6 +66,7 @@ public: if (IsPaused()) { ui64 batchesCount = GetBatchesBeforePause(); + Y_ABORT_UNLESS(batchesCount > 0); Y_ABORT_UNLESS(batchesCount <= Batches.size()); if (batch.IsWide()) { @@ -122,6 +123,7 @@ public: static_cast<TDerived*>(this)->PopStats.Chunks++; } + Y_ABORT_UNLESS(!batch.empty()); return true; } |