aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2024-10-16 11:38:32 +0300
committerGitHub <noreply@github.com>2024-10-16 11:38:32 +0300
commit0025e009a6b0188b20f8a7f221668f46b814d192 (patch)
treedb962f765068fb1af01698124a0fb4140c11992b
parent6b4d585b8294f2c6556001aff159332e478c527b (diff)
downloadydb-0025e009a6b0188b20f8a7f221668f46b814d192.tar.gz
Fix sink empty batch (#10463)
-rw-r--r--ydb/core/kqp/runtime/kqp_write_table.cpp11
1 files changed, 8 insertions, 3 deletions
diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp
index ebe0fdc64d..a228163619 100644
--- a/ydb/core/kqp/runtime/kqp_write_table.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_table.cpp
@@ -889,14 +889,17 @@ public:
void MakeNextBatches(i64 maxDataSize, ui64 maxCount) {
YQL_ENSURE(BatchesInFlight == 0);
+ YQL_ENSURE(!IsEmpty());
i64 dataSize = 0;
+ // For columnshard batch can be slightly larger than the limit.
while (BatchesInFlight < maxCount
&& BatchesInFlight < Batches.size()
- && dataSize + GetBatch(BatchesInFlight)->GetMemory() <= maxDataSize) {
+ && (dataSize + GetBatch(BatchesInFlight)->GetMemory() <= maxDataSize || BatchesInFlight == 0)) {
dataSize += GetBatch(BatchesInFlight)->GetMemory();
++BatchesInFlight;
}
- YQL_ENSURE(BatchesInFlight == Batches.size() || GetBatch(BatchesInFlight)->GetMemory() <= maxDataSize);
+ YQL_ENSURE(BatchesInFlight != 0);
+ YQL_ENSURE(BatchesInFlight == maxCount || BatchesInFlight == Batches.size() || dataSize + GetBatch(BatchesInFlight)->GetMemory() >= maxDataSize);
}
const IPayloadSerializer::IBatchPtr& GetBatch(size_t index) const {
@@ -1200,7 +1203,9 @@ private:
if (force) {
for (auto& [shardId, batches] : Serializer->FlushBatchesForce()) {
for (auto& batch : batches) {
- ShardsInfo.GetShard(shardId).PushBatch(std::move(batch));
+ if (batch && !batch->IsEmpty()) {
+ ShardsInfo.GetShard(shardId).PushBatch(std::move(batch));
+ }
}
}
} else {