diff options
author | Hor911 <hor911@ydb.tech> | 2024-11-23 20:07:00 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-23 20:07:00 +0300 |
commit | 398fb410adba8fede893681a5e67a809f02d0750 (patch) | |
tree | 7528af3d5e8e23c3d2a4cc71c32a338b6ac1a261 | |
parent | b83d01cbfdc83e5af0fc9ff12fbb8da67f587fea (diff) | |
download | ydb-398fb410adba8fede893681a5e67a809f02d0750.tar.gz |
Correct Rows count in Block Output Channels (#11893)
-rw-r--r-- | ydb/library/yql/dq/common/dq_serialized_batch.cpp | 3 | ||||
-rw-r--r-- | ydb/library/yql/dq/common/dq_serialized_batch.h | 4 | ||||
-rw-r--r-- | ydb/library/yql/dq/proto/dq_transport.proto | 3 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_input_channel.cpp | 5 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_output_channel.cpp | 69 |
5 files changed, 63 insertions, 21 deletions
diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.cpp b/ydb/library/yql/dq/common/dq_serialized_batch.cpp index 81ec0c2334..e2efc380a7 100644 --- a/ydb/library/yql/dq/common/dq_serialized_batch.cpp +++ b/ydb/library/yql/dq/common/dq_serialized_batch.cpp @@ -65,11 +65,13 @@ TChunkedBuffer SaveForSpilling(TDqSerializedBatch&& batch) { ui32 transportversion = batch.Proto.GetTransportVersion(); ui32 rowCount = batch.Proto.GetRows(); + ui32 chunkCount = batch.Proto.GetChunks(); TChunkedBuffer protoPayload(std::move(*batch.Proto.MutableRaw())); AppendNumber(result, transportversion); AppendNumber(result, rowCount); + AppendNumber(result, chunkCount); AppendNumber(result, protoPayload.Size()); result.Append(std::move(protoPayload)); AppendNumber(result, batch.Payload.Size()); @@ -85,6 +87,7 @@ TDqSerializedBatch LoadSpilled(TBuffer&& blob) { TDqSerializedBatch result; result.Proto.SetTransportVersion(ReadNumber<ui32>(source)); result.Proto.SetRows(ReadNumber<ui32>(source)); + result.Proto.SetChunks(ReadNumber<ui32>(source)); size_t protoSize = ReadNumber<size_t>(source); YQL_ENSURE(source.size() >= protoSize, "Premature end of spilled data"); diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.h b/ydb/library/yql/dq/common/dq_serialized_batch.h index adfb9bb481..c3a1b80ec1 100644 --- a/ydb/library/yql/dq/common/dq_serialized_batch.h +++ b/ydb/library/yql/dq/common/dq_serialized_batch.h @@ -28,6 +28,10 @@ struct TDqSerializedBatch { return Proto.GetRows(); } + ui32 ChunkCount() const { + return Proto.GetChunks(); + } + void Clear() { Payload.Clear(); Proto.Clear(); diff --git a/ydb/library/yql/dq/proto/dq_transport.proto b/ydb/library/yql/dq/proto/dq_transport.proto index 3da4786236..a04067c975 100644 --- a/ydb/library/yql/dq/proto/dq_transport.proto +++ b/ydb/library/yql/dq/proto/dq_transport.proto @@ -16,6 +16,7 @@ enum EDataTransportVersion { message TData { uint32 TransportVersion = 1; bytes Raw = 2; - uint32 Rows = 3; + uint32 Rows = 5; + uint32 Chunks = 3; optional uint32 PayloadId = 4; } diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.cpp b/ydb/library/yql/dq/runtime/dq_input_channel.cpp index f61abd6d5e..d604387c73 100644 --- a/ydb/library/yql/dq/runtime/dq_input_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_input_channel.cpp @@ -47,7 +47,7 @@ private: void PushImpl(TDqSerializedBatch&& data) { const i64 space = data.Size(); - const size_t rowCount = data.RowCount(); + const size_t chunkCount = data.ChunkCount(); auto inputType = Impl.GetInputType(); NKikimr::NMiniKQL::TUnboxedValueBatch batch(inputType); if (Y_UNLIKELY(PushStats.CollectProfile())) { @@ -58,7 +58,8 @@ private: DataSerializer.Deserialize(std::move(data), inputType, batch); } - YQL_ENSURE(batch.RowCount() == rowCount); + // single batch row is chunk and may be Arrow block + YQL_ENSURE(batch.RowCount() == chunkCount); Impl.AddBatch(std::move(batch), space); } diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp index bfc01284a4..7425eacc92 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp @@ -58,7 +58,7 @@ public: } ui64 GetValuesCount() const override { - return SpilledRowCount + PackedRowCount + ChunkRowCount; + return SpilledRowCount + PackedRowCount + PackerCurrentRowCount; } const TDqOutputStats& GetPushStats() const override { @@ -95,8 +95,12 @@ public: return; } + ui32 rows = Packer.IsBlock() ? + NKikimr::NMiniKQL::TArrowBlock::From(values[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value + : 1; + if (PushStats.CollectBasic()) { - PushStats.Rows++; + PushStats.Rows += rows; PushStats.Chunks++; PushStats.Resume(); } @@ -110,7 +114,8 @@ public: values[i] = {}; } - ChunkRowCount++; + PackerCurrentRowCount += rows; + PackerCurrentChunkCount++; size_t packerSize = Packer.PackedSizeEstimate(); if (packerSize >= MaxChunkBytes) { @@ -120,9 +125,12 @@ public: PushStats.Bytes += Data.back().Buffer.Size(); } PackedDataSize += Data.back().Buffer.Size(); - PackedRowCount += ChunkRowCount; - Data.back().RowCount = ChunkRowCount; - ChunkRowCount = 0; + PackedRowCount += PackerCurrentRowCount; + PackedChunkCount += PackerCurrentChunkCount; + Data.back().RowCount = PackerCurrentRowCount; + Data.back().ChunkCount = PackerCurrentChunkCount; + PackerCurrentRowCount = 0; + PackerCurrentChunkCount = 0; packerSize = 0; } @@ -134,11 +142,13 @@ public: TDqSerializedBatch data; data.Proto.SetTransportVersion(TransportVersion); data.Proto.SetRows(head.RowCount); + data.Proto.SetChunks(head.ChunkCount); data.SetPayload(std::move(head.Buffer)); Storage->Put(NextStoredId++, SaveForSpilling(std::move(data))); PackedDataSize -= bufSize; PackedRowCount -= head.RowCount; + PackedChunkCount -= head.ChunkCount; SpilledRowCount += head.RowCount; @@ -199,22 +209,29 @@ public: } else if (!Data.empty()) { auto& packed = Data.front(); PackedRowCount -= packed.RowCount; + PackedChunkCount -= packed.ChunkCount; PackedDataSize -= packed.Buffer.Size(); data.Proto.SetRows(packed.RowCount); + data.Proto.SetChunks(packed.ChunkCount); data.SetPayload(std::move(packed.Buffer)); Data.pop_front(); } else { - data.Proto.SetRows(ChunkRowCount); + data.Proto.SetRows(PackerCurrentRowCount); + data.Proto.SetChunks(PackerCurrentChunkCount); data.SetPayload(FinishPackAndCheckSize()); - ChunkRowCount = 0; + if (PushStats.CollectBasic()) { + PushStats.Bytes += data.Payload.Size(); + } + PackerCurrentRowCount = 0; + PackerCurrentChunkCount = 0; } DLOG("Took " << data.RowCount() << " rows"); if (PopStats.CollectBasic()) { PopStats.Bytes += data.Size(); - PopStats.Rows += data.RowCount(); - PopStats.Chunks++; + PopStats.Rows += data.RowCount(); + PopStats.Chunks++; // pop chunks do not match push chunks if (!IsFull() || FirstStoredId == NextStoredId) { PopStats.Resume(); } @@ -256,20 +273,31 @@ public: data.Clear(); data.Proto.SetTransportVersion(TransportVersion); if (SpilledRowCount == 0 && PackedRowCount == 0) { - data.Proto.SetRows(ChunkRowCount); + data.Proto.SetRows(PackerCurrentRowCount); + data.Proto.SetChunks(PackerCurrentChunkCount); data.SetPayload(FinishPackAndCheckSize()); - ChunkRowCount = 0; + if (PushStats.CollectBasic()) { + PushStats.Bytes += data.Payload.Size(); + } + PackerCurrentRowCount = 0; + PackerCurrentChunkCount = 0; return true; } // Repack all - thats why PopAll should never be used - if (ChunkRowCount) { + if (PackerCurrentRowCount) { Data.emplace_back(); Data.back().Buffer = FinishPackAndCheckSize(); + if (PushStats.CollectBasic()) { + PushStats.Bytes += Data.back().Buffer.Size(); + } PackedDataSize += Data.back().Buffer.Size(); - PackedRowCount += ChunkRowCount; - Data.back().RowCount = ChunkRowCount; - ChunkRowCount = 0; + PackedRowCount += PackerCurrentRowCount; + PackedChunkCount += PackerCurrentChunkCount; + Data.back().RowCount = PackerCurrentRowCount; + Data.back().ChunkCount = PackerCurrentChunkCount; + PackerCurrentRowCount = 0; + PackerCurrentChunkCount = 0; } NKikimr::NMiniKQL::TUnboxedValueBatch rows(OutputType); @@ -332,7 +360,9 @@ public: ui64 rows = GetValuesCount(); Data.clear(); Packer.Clear(); - SpilledRowCount = PackedDataSize = PackedRowCount = ChunkRowCount = 0; + PackedDataSize = 0; + SpilledRowCount = PackedRowCount = PackerCurrentRowCount = 0; + PackedChunkCount = PackerCurrentChunkCount = 0; FirstStoredId = NextStoredId; return rows; } @@ -359,6 +389,7 @@ private: struct TSerializedBatch { TChunkedBuffer Buffer; ui64 RowCount = 0; + ui64 ChunkCount = 0; }; std::deque<TSerializedBatch> Data; @@ -368,8 +399,10 @@ private: size_t PackedDataSize = 0; size_t PackedRowCount = 0; + size_t PackedChunkCount = 0; - size_t ChunkRowCount = 0; + size_t PackerCurrentRowCount = 0; + size_t PackerCurrentChunkCount = 0; bool Finished = false; |