aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHor911 <hor911@ydb.tech>2024-11-23 20:07:00 +0300
committerGitHub <noreply@github.com>2024-11-23 20:07:00 +0300
commit398fb410adba8fede893681a5e67a809f02d0750 (patch)
tree7528af3d5e8e23c3d2a4cc71c32a338b6ac1a261
parentb83d01cbfdc83e5af0fc9ff12fbb8da67f587fea (diff)
downloadydb-398fb410adba8fede893681a5e67a809f02d0750.tar.gz
Correct Rows count in Block Output Channels (#11893)
-rw-r--r--ydb/library/yql/dq/common/dq_serialized_batch.cpp3
-rw-r--r--ydb/library/yql/dq/common/dq_serialized_batch.h4
-rw-r--r--ydb/library/yql/dq/proto/dq_transport.proto3
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_channel.cpp5
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.cpp69
5 files changed, 63 insertions, 21 deletions
diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.cpp b/ydb/library/yql/dq/common/dq_serialized_batch.cpp
index 81ec0c2334..e2efc380a7 100644
--- a/ydb/library/yql/dq/common/dq_serialized_batch.cpp
+++ b/ydb/library/yql/dq/common/dq_serialized_batch.cpp
@@ -65,11 +65,13 @@ TChunkedBuffer SaveForSpilling(TDqSerializedBatch&& batch) {
ui32 transportversion = batch.Proto.GetTransportVersion();
ui32 rowCount = batch.Proto.GetRows();
+ ui32 chunkCount = batch.Proto.GetChunks();
TChunkedBuffer protoPayload(std::move(*batch.Proto.MutableRaw()));
AppendNumber(result, transportversion);
AppendNumber(result, rowCount);
+ AppendNumber(result, chunkCount);
AppendNumber(result, protoPayload.Size());
result.Append(std::move(protoPayload));
AppendNumber(result, batch.Payload.Size());
@@ -85,6 +87,7 @@ TDqSerializedBatch LoadSpilled(TBuffer&& blob) {
TDqSerializedBatch result;
result.Proto.SetTransportVersion(ReadNumber<ui32>(source));
result.Proto.SetRows(ReadNumber<ui32>(source));
+ result.Proto.SetChunks(ReadNumber<ui32>(source));
size_t protoSize = ReadNumber<size_t>(source);
YQL_ENSURE(source.size() >= protoSize, "Premature end of spilled data");
diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.h b/ydb/library/yql/dq/common/dq_serialized_batch.h
index adfb9bb481..c3a1b80ec1 100644
--- a/ydb/library/yql/dq/common/dq_serialized_batch.h
+++ b/ydb/library/yql/dq/common/dq_serialized_batch.h
@@ -28,6 +28,10 @@ struct TDqSerializedBatch {
return Proto.GetRows();
}
+ ui32 ChunkCount() const {
+ return Proto.GetChunks();
+ }
+
void Clear() {
Payload.Clear();
Proto.Clear();
diff --git a/ydb/library/yql/dq/proto/dq_transport.proto b/ydb/library/yql/dq/proto/dq_transport.proto
index 3da4786236..a04067c975 100644
--- a/ydb/library/yql/dq/proto/dq_transport.proto
+++ b/ydb/library/yql/dq/proto/dq_transport.proto
@@ -16,6 +16,7 @@ enum EDataTransportVersion {
message TData {
uint32 TransportVersion = 1;
bytes Raw = 2;
- uint32 Rows = 3;
+ uint32 Rows = 5;
+ uint32 Chunks = 3;
optional uint32 PayloadId = 4;
}
diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.cpp b/ydb/library/yql/dq/runtime/dq_input_channel.cpp
index f61abd6d5e..d604387c73 100644
--- a/ydb/library/yql/dq/runtime/dq_input_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_input_channel.cpp
@@ -47,7 +47,7 @@ private:
void PushImpl(TDqSerializedBatch&& data) {
const i64 space = data.Size();
- const size_t rowCount = data.RowCount();
+ const size_t chunkCount = data.ChunkCount();
auto inputType = Impl.GetInputType();
NKikimr::NMiniKQL::TUnboxedValueBatch batch(inputType);
if (Y_UNLIKELY(PushStats.CollectProfile())) {
@@ -58,7 +58,8 @@ private:
DataSerializer.Deserialize(std::move(data), inputType, batch);
}
- YQL_ENSURE(batch.RowCount() == rowCount);
+ // single batch row is chunk and may be Arrow block
+ YQL_ENSURE(batch.RowCount() == chunkCount);
Impl.AddBatch(std::move(batch), space);
}
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
index bfc01284a4..7425eacc92 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
@@ -58,7 +58,7 @@ public:
}
ui64 GetValuesCount() const override {
- return SpilledRowCount + PackedRowCount + ChunkRowCount;
+ return SpilledRowCount + PackedRowCount + PackerCurrentRowCount;
}
const TDqOutputStats& GetPushStats() const override {
@@ -95,8 +95,12 @@ public:
return;
}
+ ui32 rows = Packer.IsBlock() ?
+ NKikimr::NMiniKQL::TArrowBlock::From(values[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value
+ : 1;
+
if (PushStats.CollectBasic()) {
- PushStats.Rows++;
+ PushStats.Rows += rows;
PushStats.Chunks++;
PushStats.Resume();
}
@@ -110,7 +114,8 @@ public:
values[i] = {};
}
- ChunkRowCount++;
+ PackerCurrentRowCount += rows;
+ PackerCurrentChunkCount++;
size_t packerSize = Packer.PackedSizeEstimate();
if (packerSize >= MaxChunkBytes) {
@@ -120,9 +125,12 @@ public:
PushStats.Bytes += Data.back().Buffer.Size();
}
PackedDataSize += Data.back().Buffer.Size();
- PackedRowCount += ChunkRowCount;
- Data.back().RowCount = ChunkRowCount;
- ChunkRowCount = 0;
+ PackedRowCount += PackerCurrentRowCount;
+ PackedChunkCount += PackerCurrentChunkCount;
+ Data.back().RowCount = PackerCurrentRowCount;
+ Data.back().ChunkCount = PackerCurrentChunkCount;
+ PackerCurrentRowCount = 0;
+ PackerCurrentChunkCount = 0;
packerSize = 0;
}
@@ -134,11 +142,13 @@ public:
TDqSerializedBatch data;
data.Proto.SetTransportVersion(TransportVersion);
data.Proto.SetRows(head.RowCount);
+ data.Proto.SetChunks(head.ChunkCount);
data.SetPayload(std::move(head.Buffer));
Storage->Put(NextStoredId++, SaveForSpilling(std::move(data)));
PackedDataSize -= bufSize;
PackedRowCount -= head.RowCount;
+ PackedChunkCount -= head.ChunkCount;
SpilledRowCount += head.RowCount;
@@ -199,22 +209,29 @@ public:
} else if (!Data.empty()) {
auto& packed = Data.front();
PackedRowCount -= packed.RowCount;
+ PackedChunkCount -= packed.ChunkCount;
PackedDataSize -= packed.Buffer.Size();
data.Proto.SetRows(packed.RowCount);
+ data.Proto.SetChunks(packed.ChunkCount);
data.SetPayload(std::move(packed.Buffer));
Data.pop_front();
} else {
- data.Proto.SetRows(ChunkRowCount);
+ data.Proto.SetRows(PackerCurrentRowCount);
+ data.Proto.SetChunks(PackerCurrentChunkCount);
data.SetPayload(FinishPackAndCheckSize());
- ChunkRowCount = 0;
+ if (PushStats.CollectBasic()) {
+ PushStats.Bytes += data.Payload.Size();
+ }
+ PackerCurrentRowCount = 0;
+ PackerCurrentChunkCount = 0;
}
DLOG("Took " << data.RowCount() << " rows");
if (PopStats.CollectBasic()) {
PopStats.Bytes += data.Size();
- PopStats.Rows += data.RowCount();
- PopStats.Chunks++;
+ PopStats.Rows += data.RowCount();
+ PopStats.Chunks++; // pop chunks do not match push chunks
if (!IsFull() || FirstStoredId == NextStoredId) {
PopStats.Resume();
}
@@ -256,20 +273,31 @@ public:
data.Clear();
data.Proto.SetTransportVersion(TransportVersion);
if (SpilledRowCount == 0 && PackedRowCount == 0) {
- data.Proto.SetRows(ChunkRowCount);
+ data.Proto.SetRows(PackerCurrentRowCount);
+ data.Proto.SetChunks(PackerCurrentChunkCount);
data.SetPayload(FinishPackAndCheckSize());
- ChunkRowCount = 0;
+ if (PushStats.CollectBasic()) {
+ PushStats.Bytes += data.Payload.Size();
+ }
+ PackerCurrentRowCount = 0;
+ PackerCurrentChunkCount = 0;
return true;
}
// Repack all - thats why PopAll should never be used
- if (ChunkRowCount) {
+ if (PackerCurrentRowCount) {
Data.emplace_back();
Data.back().Buffer = FinishPackAndCheckSize();
+ if (PushStats.CollectBasic()) {
+ PushStats.Bytes += Data.back().Buffer.Size();
+ }
PackedDataSize += Data.back().Buffer.Size();
- PackedRowCount += ChunkRowCount;
- Data.back().RowCount = ChunkRowCount;
- ChunkRowCount = 0;
+ PackedRowCount += PackerCurrentRowCount;
+ PackedChunkCount += PackerCurrentChunkCount;
+ Data.back().RowCount = PackerCurrentRowCount;
+ Data.back().ChunkCount = PackerCurrentChunkCount;
+ PackerCurrentRowCount = 0;
+ PackerCurrentChunkCount = 0;
}
NKikimr::NMiniKQL::TUnboxedValueBatch rows(OutputType);
@@ -332,7 +360,9 @@ public:
ui64 rows = GetValuesCount();
Data.clear();
Packer.Clear();
- SpilledRowCount = PackedDataSize = PackedRowCount = ChunkRowCount = 0;
+ PackedDataSize = 0;
+ SpilledRowCount = PackedRowCount = PackerCurrentRowCount = 0;
+ PackedChunkCount = PackerCurrentChunkCount = 0;
FirstStoredId = NextStoredId;
return rows;
}
@@ -359,6 +389,7 @@ private:
struct TSerializedBatch {
TChunkedBuffer Buffer;
ui64 RowCount = 0;
+ ui64 ChunkCount = 0;
};
std::deque<TSerializedBatch> Data;
@@ -368,8 +399,10 @@ private:
size_t PackedDataSize = 0;
size_t PackedRowCount = 0;
+ size_t PackedChunkCount = 0;
- size_t ChunkRowCount = 0;
+ size_t PackerCurrentRowCount = 0;
+ size_t PackerCurrentChunkCount = 0;
bool Finished = false;