diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2024-10-16 11:38:32 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-16 11:38:32 +0300 |
commit | 0025e009a6b0188b20f8a7f221668f46b814d192 (patch) | |
tree | db962f765068fb1af01698124a0fb4140c11992b | |
parent | 6b4d585b8294f2c6556001aff159332e478c527b (diff) | |
download | ydb-0025e009a6b0188b20f8a7f221668f46b814d192.tar.gz |
Fix sink empty batch (#10463)
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_table.cpp | 11 |
1 files changed, 8 insertions, 3 deletions
diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp index ebe0fdc64d..a228163619 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.cpp +++ b/ydb/core/kqp/runtime/kqp_write_table.cpp @@ -889,14 +889,17 @@ public: void MakeNextBatches(i64 maxDataSize, ui64 maxCount) { YQL_ENSURE(BatchesInFlight == 0); + YQL_ENSURE(!IsEmpty()); i64 dataSize = 0; + // For columnshard batch can be slightly larger than the limit. while (BatchesInFlight < maxCount && BatchesInFlight < Batches.size() - && dataSize + GetBatch(BatchesInFlight)->GetMemory() <= maxDataSize) { + && (dataSize + GetBatch(BatchesInFlight)->GetMemory() <= maxDataSize || BatchesInFlight == 0)) { dataSize += GetBatch(BatchesInFlight)->GetMemory(); ++BatchesInFlight; } - YQL_ENSURE(BatchesInFlight == Batches.size() || GetBatch(BatchesInFlight)->GetMemory() <= maxDataSize); + YQL_ENSURE(BatchesInFlight != 0); + YQL_ENSURE(BatchesInFlight == maxCount || BatchesInFlight == Batches.size() || dataSize + GetBatch(BatchesInFlight)->GetMemory() >= maxDataSize); } const IPayloadSerializer::IBatchPtr& GetBatch(size_t index) const { @@ -1200,7 +1203,9 @@ private: if (force) { for (auto& [shardId, batches] : Serializer->FlushBatchesForce()) { for (auto& batch : batches) { - ShardsInfo.GetShard(shardId).PushBatch(std::move(batch)); + if (batch && !batch->IsEmpty()) { + ShardsInfo.GetShard(shardId).PushBatch(std::move(batch)); + } } } } else { |