diff options
author | Hor911 <hor911@ydb.tech> | 2025-03-14 12:36:09 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-14 12:36:09 +0300 |
commit | 1108e70f8c85b6d16c654f44f135ac23c3523452 (patch) | |
tree | 38086deefd06c24463554233693c3e48c94f26d5 | |
parent | 4eb60f8c618ef6167c802a8a27153cef74cc57f2 (diff) | |
download | ydb-1108e70f8c85b6d16c654f44f135ac23c3523452.tar.gz |
Add RowsCount/Rows and use it for stats purposes (#15629)
6 files changed, 59 insertions, 8 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp index 8a66a9688e..7fa6127b67 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp @@ -112,6 +112,12 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelData::TPtr& ev) TInputChannelState& inputChannel = InCh(channelId); + if (Y_UNLIKELY(channelData.Proto.GetData().GetRows() == 0 && channelData.Proto.GetData().GetChunks() > 0)) { + // For backward compatibility, to support communication with old nodes during rollback/migration + // Should be deleted eventually ~ mid 2025 + channelData.Proto.MutableData()->SetRows(channelData.Proto.GetData().GetChunks()); + } + LOG_T("Received input for channelId: " << channelId << ", seqNo: " << record.GetSeqNo() << ", size: " << channelData.Proto.GetData().GetRaw().size() diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.cpp b/ydb/library/yql/dq/common/dq_serialized_batch.cpp index 2a9798c353..037ac73a19 100644 --- a/ydb/library/yql/dq/common/dq_serialized_batch.cpp +++ b/ydb/library/yql/dq/common/dq_serialized_batch.cpp @@ -65,11 +65,13 @@ TChunkedBuffer SaveForSpilling(TDqSerializedBatch&& batch) { ui32 transportVersion = batch.Proto.GetTransportVersion(); ui32 chunkCount = batch.Proto.GetChunks(); + ui32 rowCount = batch.Proto.GetRows(); TChunkedBuffer protoPayload(std::move(*batch.Proto.MutableRaw())); AppendNumber(result, transportVersion); AppendNumber(result, chunkCount); + AppendNumber(result, rowCount); AppendNumber(result, protoPayload.Size()); result.Append(std::move(protoPayload)); AppendNumber(result, batch.Payload.Size()); @@ -85,6 +87,7 @@ TDqSerializedBatch LoadSpilled(TBuffer&& blob) { TDqSerializedBatch result; result.Proto.SetTransportVersion(ReadNumber<ui32>(source)); result.Proto.SetChunks(ReadNumber<ui32>(source)); + result.Proto.SetRows(ReadNumber<ui32>(source)); size_t protoSize = ReadNumber<size_t>(source); YQL_ENSURE(source.size() >= protoSize, "Premature end of spilled data"); diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.h b/ydb/library/yql/dq/common/dq_serialized_batch.h index a9c4a73af2..4560c0214e 100644 --- a/ydb/library/yql/dq/common/dq_serialized_batch.h +++ b/ydb/library/yql/dq/common/dq_serialized_batch.h @@ -29,7 +29,7 @@ struct TDqSerializedBatch { } ui32 RowCount() const { - return Proto.GetChunks(); // FIXME with Rows + return Proto.GetRows() ? Proto.GetRows() : Proto.GetChunks(); } void Clear() { diff --git a/ydb/library/yql/dq/proto/dq_transport.proto b/ydb/library/yql/dq/proto/dq_transport.proto index 20402f60a3..ac4af04c05 100644 --- a/ydb/library/yql/dq/proto/dq_transport.proto +++ b/ydb/library/yql/dq/proto/dq_transport.proto @@ -18,4 +18,5 @@ message TData { bytes Raw = 2; uint32 Chunks = 3; optional uint32 PayloadId = 4; + uint32 Rows = 5; } diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp index 0968d366ec..ddf5e63fca 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp @@ -95,8 +95,12 @@ public: return; } + ui32 rows = Packer.IsBlock() ? + NKikimr::NMiniKQL::TArrowBlock::From(values[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value + : 1; + if (PushStats.CollectBasic()) { - PushStats.Rows++; + PushStats.Rows += rows; PushStats.Chunks++; PushStats.Resume(); } @@ -111,6 +115,7 @@ public: } PackerCurrentChunkCount++; + PackerCurrentRowCount += rows; size_t packerSize = Packer.PackedSizeEstimate(); if (packerSize >= MaxChunkBytes) { @@ -121,8 +126,11 @@ public: } PackedDataSize += Data.back().Buffer.Size(); PackedChunkCount += PackerCurrentChunkCount; + PackedRowCount += PackerCurrentRowCount; Data.back().ChunkCount = PackerCurrentChunkCount; + Data.back().RowCount = PackerCurrentRowCount; PackerCurrentChunkCount = 0; + PackerCurrentRowCount = 0; packerSize = 0; } @@ -134,11 +142,13 @@ public: TDqSerializedBatch data; data.Proto.SetTransportVersion(TransportVersion); data.Proto.SetChunks(head.ChunkCount); + data.Proto.SetRows(head.RowCount); data.SetPayload(std::move(head.Buffer)); Storage->Put(NextStoredId++, SaveForSpilling(std::move(data))); PackedDataSize -= bufSize; PackedChunkCount -= head.ChunkCount; + PackedRowCount -= head.RowCount; SpilledChunkCount += head.ChunkCount; @@ -199,14 +209,18 @@ public: } else if (!Data.empty()) { auto& packed = Data.front(); PackedChunkCount -= packed.ChunkCount; + PackedRowCount -= packed.RowCount; PackedDataSize -= packed.Buffer.Size(); data.Proto.SetChunks(packed.ChunkCount); + data.Proto.SetRows(packed.RowCount); data.SetPayload(std::move(packed.Buffer)); Data.pop_front(); } else { data.Proto.SetChunks(PackerCurrentChunkCount); + data.Proto.SetRows(PackerCurrentRowCount); data.SetPayload(FinishPackAndCheckSize()); PackerCurrentChunkCount = 0; + PackerCurrentRowCount = 0; } DLOG("Took " << data.RowCount() << " rows"); @@ -214,7 +228,7 @@ public: if (PopStats.CollectBasic()) { PopStats.Bytes += data.Size(); PopStats.Rows += data.RowCount(); - PopStats.Chunks++; + PopStats.Chunks++; // pop chunks do not match push chunks if (!IsFull() || FirstStoredId == NextStoredId) { PopStats.Resume(); } @@ -257,8 +271,13 @@ public: data.Proto.SetTransportVersion(TransportVersion); if (SpilledChunkCount == 0 && PackedChunkCount == 0) { data.Proto.SetChunks(PackerCurrentChunkCount); + data.Proto.SetRows(PackerCurrentRowCount); data.SetPayload(FinishPackAndCheckSize()); + if (PushStats.CollectBasic()) { + PushStats.Bytes += data.Payload.Size(); + } PackerCurrentChunkCount = 0; + PackerCurrentRowCount = 0; return true; } @@ -266,19 +285,29 @@ public: if (PackerCurrentChunkCount) { Data.emplace_back(); Data.back().Buffer = FinishPackAndCheckSize(); + if (PushStats.CollectBasic()) { + PushStats.Bytes += Data.back().Buffer.Size(); + } PackedDataSize += Data.back().Buffer.Size(); PackedChunkCount += PackerCurrentChunkCount; + PackedRowCount += PackerCurrentRowCount; Data.back().ChunkCount = PackerCurrentChunkCount; + Data.back().RowCount = PackerCurrentRowCount; PackerCurrentChunkCount = 0; + PackerCurrentRowCount = 0; } NKikimr::NMiniKQL::TUnboxedValueBatch rows(OutputType); + size_t repackedChunkCount = 0; + size_t repackedRowCount = 0; for (;;) { - TDqSerializedBatch chunk; - if (!this->Pop(chunk)) { + TDqSerializedBatch batch; + if (!this->Pop(batch)) { break; } - Packer.UnpackBatch(chunk.PullPayload(), HolderFactory, rows); + repackedChunkCount += batch.ChunkCount(); + repackedRowCount += batch.RowCount(); + Packer.UnpackBatch(batch.PullPayload(), HolderFactory, rows); } if (OutputType->IsMulti()) { @@ -291,7 +320,8 @@ public: }); } - data.Proto.SetChunks(rows.RowCount()); // 1 UVB "row" is Chunk + data.Proto.SetChunks(repackedChunkCount); + data.Proto.SetRows(repackedRowCount); data.SetPayload(FinishPackAndCheckSize()); if (PopStats.CollectBasic()) { PopStats.Bytes += data.Size(); @@ -332,7 +362,12 @@ public: ui64 rows = GetValuesCount(); Data.clear(); Packer.Clear(); - SpilledChunkCount = PackedDataSize = PackedChunkCount = PackerCurrentChunkCount = 0; + PackedDataSize = 0; + PackedChunkCount = 0; + PackedRowCount = 0; + SpilledChunkCount = 0; + PackerCurrentChunkCount = 0; + PackerCurrentRowCount = 0; FirstStoredId = NextStoredId; return rows; } @@ -359,6 +394,7 @@ private: struct TSerializedBatch { TChunkedBuffer Buffer; ui64 ChunkCount = 0; + ui64 RowCount = 0; }; std::deque<TSerializedBatch> Data; @@ -368,8 +404,10 @@ private: size_t PackedDataSize = 0; size_t PackedChunkCount = 0; + size_t PackedRowCount = 0; size_t PackerCurrentChunkCount = 0; + size_t PackerCurrentRowCount = 0; bool Finished = false; diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp index 338f66bb1f..0ca93ce3bf 100644 --- a/ydb/library/yql/dq/runtime/dq_transport.cpp +++ b/ydb/library/yql/dq/runtime/dq_transport.cpp @@ -44,6 +44,7 @@ TDqSerializedBatch SerializeValue(NDqProto::EDataTransportVersion version, const TDqSerializedBatch result; result.Proto.SetTransportVersion(version); result.Proto.SetChunks(1); + result.Proto.SetRows(1); result.SetPayload(std::move(packResult)); return result; } @@ -88,6 +89,7 @@ TDqSerializedBatch SerializeBuffer(NDqProto::EDataTransportVersion version, cons TDqSerializedBatch result; result.Proto.SetTransportVersion(version); result.Proto.SetChunks(buffer.RowCount()); + result.Proto.SetRows(buffer.RowCount()); // maybe incorrect for Arrow Blocks result.SetPayload(std::move(packResult)); return result; } @@ -177,6 +179,7 @@ NDqProto::TData TDqDataSerializer::SerializeParamValue(const TType* type, const data.SetTransportVersion(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0); data.SetRaw(packResult.data(), packResult.size()); data.SetChunks(1); + data.SetRows(1); return data; } |