aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkardymon-d <kardymon-d@yandex-team.com>2023-10-27 11:20:05 +0300
committerkardymon-d <kardymon-d@yandex-team.com>2023-10-27 11:43:16 +0300
commit9f0d269f99492ad31a0882ded8db0547a5d92fad (patch)
treef61ecceb241e2ce53f5de2b4eeb62a5ce8debdca
parent508f523be35942ffcbcdac5175b94cff16fc030d (diff)
downloadydb-9f0d269f99492ad31a0882ded8db0547a5d92fad.tar.gz
Стриминговый запрос прерывается с ошибкой Head(): requirement RowCount_ failed. Head() on empty batch
Fix Empty()
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_channel.cpp101
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_impl.h2
2 files changed, 75 insertions, 28 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.cpp b/ydb/library/yql/dq/runtime/dq_input_channel.cpp
index 322d9c28c5..f61abd6d5e 100644
--- a/ydb/library/yql/dq/runtime/dq_input_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_input_channel.cpp
@@ -3,9 +3,44 @@
namespace NYql::NDq {
-class TDqInputChannel : public TDqInputImpl<TDqInputChannel, IDqInputChannel> {
- using TBaseImpl = TDqInputImpl<TDqInputChannel, IDqInputChannel>;
- friend TBaseImpl;
+class TDqInputChannelImpl : public TDqInputImpl<TDqInputChannelImpl, IDqInputChannel> {
+ using TBaseImpl = TDqInputImpl<TDqInputChannelImpl, IDqInputChannel>;
+
+public:
+ TDqInputChannelStats PushStats;
+ TDqInputStats PopStats;
+
+ TDqInputChannelImpl(ui64 channelId, ui32 srcStageId, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes, TCollectStatsLevel level,
+ const NKikimr::NMiniKQL::TTypeEnvironment&, const NKikimr::NMiniKQL::THolderFactory&,
+ NDqProto::EDataTransportVersion)
+ : TBaseImpl(inputType, maxBufferBytes)
+ {
+ PopStats.Level = level;
+ PushStats.Level = level;
+ PushStats.ChannelId = channelId;
+ PushStats.SrcStageId = srcStageId;
+ }
+
+ ui64 GetChannelId() const override {
+ return PushStats.ChannelId;
+ }
+
+ const TDqInputChannelStats& GetPushStats() const override {
+ return PushStats;
+ }
+
+ const TDqInputStats& GetPopStats() const override {
+ return PopStats;
+ }
+
+private:
+ void Push(TDqSerializedBatch&&) override {
+ Y_ABORT("Not implemented");
+ }
+};
+
+class TDqInputChannel : public IDqInputChannel {
+
private:
std::deque<TDqSerializedBatch> DataForDeserialize;
ui64 StoredSerializedBytes = 0;
@@ -13,18 +48,18 @@ private:
void PushImpl(TDqSerializedBatch&& data) {
const i64 space = data.Size();
const size_t rowCount = data.RowCount();
-
- NKikimr::NMiniKQL::TUnboxedValueBatch batch(InputType);
+ auto inputType = Impl.GetInputType();
+ NKikimr::NMiniKQL::TUnboxedValueBatch batch(inputType);
if (Y_UNLIKELY(PushStats.CollectProfile())) {
auto startTime = TInstant::Now();
- DataSerializer.Deserialize(std::move(data), InputType, batch);
+ DataSerializer.Deserialize(std::move(data), inputType, batch);
PushStats.DeserializationTime += (TInstant::Now() - startTime);
} else {
- DataSerializer.Deserialize(std::move(data), InputType, batch);
+ DataSerializer.Deserialize(std::move(data), inputType, batch);
}
YQL_ENSURE(batch.RowCount() == rowCount);
- AddBatch(std::move(batch), space);
+ Impl.AddBatch(std::move(batch), space);
}
void DeserializeAllData() {
@@ -42,59 +77,52 @@ public:
TDqInputChannel(ui64 channelId, ui32 srcStageId, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes, TCollectStatsLevel level,
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
NDqProto::EDataTransportVersion transportVersion)
- : TBaseImpl(inputType, maxBufferBytes)
- , DataSerializer(typeEnv, holderFactory, transportVersion)
- {
- PopStats.Level = level;
- PushStats.Level = level;
- PushStats.ChannelId = channelId;
- PushStats.SrcStageId = srcStageId;
+ : Impl(channelId, srcStageId, inputType, maxBufferBytes, level, typeEnv, holderFactory, transportVersion)
+ , DataSerializer(typeEnv, holderFactory, transportVersion) {
}
ui64 GetChannelId() const override {
- return PushStats.ChannelId;
+ return Impl.GetChannelId();
}
const TDqInputChannelStats& GetPushStats() const override {
- return PushStats;
+ return Impl.GetPushStats();
}
const TDqInputStats& GetPopStats() const override {
- return PopStats;
+ return Impl.GetPopStats();
}
i64 GetFreeSpace() const override {
- return TBaseImpl::GetFreeSpace() - i64(StoredSerializedBytes);
+ return Impl.GetFreeSpace() - i64(StoredSerializedBytes);
}
ui64 GetStoredBytes() const override {
- return StoredBytes + StoredSerializedBytes;
+ return Impl.GetStoredBytes() + StoredSerializedBytes;
}
bool IsFinished() const override {
- return DataForDeserialize.empty() && TBaseImpl::IsFinished();
+ return DataForDeserialize.empty() && Impl.IsFinished();
}
- [[nodiscard]]
bool Empty() const override {
- return DataForDeserialize.empty() && TBaseImpl::Empty();
+ return (DataForDeserialize.empty() || Impl.IsPaused()) && Impl.Empty();
}
void Pause() override {
DeserializeAllData();
- TBaseImpl::Pause();
+ Impl.Pause();
}
- [[nodiscard]]
bool Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch) override {
- if (Batches.empty()) {
+ if (Impl.Empty() && !Impl.IsPaused()) {
DeserializeAllData();
}
- return TBaseImpl::Pop(batch);
+ return Impl.Pop(batch);
}
void Push(TDqSerializedBatch&& data) override {
- YQL_ENSURE(!Finished, "input channel " << PushStats.ChannelId << " already finished");
+ YQL_ENSURE(!Impl.IsFinished(), "input channel " << PushStats.ChannelId << " already finished");
if (Y_UNLIKELY(data.Proto.GetRows() == 0)) {
return;
}
@@ -102,7 +130,24 @@ public:
DataForDeserialize.emplace_back(std::move(data));
}
+ NKikimr::NMiniKQL::TType* GetInputType() const override {
+ return Impl.GetInputType();
+ }
+
+ void Resume() override {
+ Impl.Resume();
+ }
+
+ bool IsPaused() const override {
+ return Impl.IsPaused();
+ }
+
+ void Finish() override {
+ Impl.Finish();
+ }
+
private:
+ TDqInputChannelImpl Impl;
TDqDataSerializer DataSerializer;
};
diff --git a/ydb/library/yql/dq/runtime/dq_input_impl.h b/ydb/library/yql/dq/runtime/dq_input_impl.h
index d9665f5457..d0d96cdfda 100644
--- a/ydb/library/yql/dq/runtime/dq_input_impl.h
+++ b/ydb/library/yql/dq/runtime/dq_input_impl.h
@@ -66,6 +66,7 @@ public:
if (IsPaused()) {
ui64 batchesCount = GetBatchesBeforePause();
+ Y_ABORT_UNLESS(batchesCount > 0);
Y_ABORT_UNLESS(batchesCount <= Batches.size());
if (batch.IsWide()) {
@@ -122,6 +123,7 @@ public:
static_cast<TDerived*>(this)->PopStats.Chunks++;
}
+ Y_ABORT_UNLESS(!batch.empty());
return true;
}