aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHor911 <hor911@ydb.tech>2025-03-14 12:36:09 +0300
committerGitHub <noreply@github.com>2025-03-14 12:36:09 +0300
commit1108e70f8c85b6d16c654f44f135ac23c3523452 (patch)
tree38086deefd06c24463554233693c3e48c94f26d5
parent4eb60f8c618ef6167c802a8a27153cef74cc57f2 (diff)
downloadydb-1108e70f8c85b6d16c654f44f135ac23c3523452.tar.gz
Add RowsCount/Rows and use it for stats purposes (#15629)
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp6
-rw-r--r--ydb/library/yql/dq/common/dq_serialized_batch.cpp3
-rw-r--r--ydb/library/yql/dq/common/dq_serialized_batch.h2
-rw-r--r--ydb/library/yql/dq/proto/dq_transport.proto1
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.cpp52
-rw-r--r--ydb/library/yql/dq/runtime/dq_transport.cpp3
6 files changed, 59 insertions, 8 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
index 8a66a9688e..7fa6127b67 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
@@ -112,6 +112,12 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelData::TPtr& ev)
TInputChannelState& inputChannel = InCh(channelId);
+ if (Y_UNLIKELY(channelData.Proto.GetData().GetRows() == 0 && channelData.Proto.GetData().GetChunks() > 0)) {
+ // For backward compatibility, to support communication with old nodes during rollback/migration
+ // Should be deleted eventually ~ mid 2025
+ channelData.Proto.MutableData()->SetRows(channelData.Proto.GetData().GetChunks());
+ }
+
LOG_T("Received input for channelId: " << channelId
<< ", seqNo: " << record.GetSeqNo()
<< ", size: " << channelData.Proto.GetData().GetRaw().size()
diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.cpp b/ydb/library/yql/dq/common/dq_serialized_batch.cpp
index 2a9798c353..037ac73a19 100644
--- a/ydb/library/yql/dq/common/dq_serialized_batch.cpp
+++ b/ydb/library/yql/dq/common/dq_serialized_batch.cpp
@@ -65,11 +65,13 @@ TChunkedBuffer SaveForSpilling(TDqSerializedBatch&& batch) {
ui32 transportVersion = batch.Proto.GetTransportVersion();
ui32 chunkCount = batch.Proto.GetChunks();
+ ui32 rowCount = batch.Proto.GetRows();
TChunkedBuffer protoPayload(std::move(*batch.Proto.MutableRaw()));
AppendNumber(result, transportVersion);
AppendNumber(result, chunkCount);
+ AppendNumber(result, rowCount);
AppendNumber(result, protoPayload.Size());
result.Append(std::move(protoPayload));
AppendNumber(result, batch.Payload.Size());
@@ -85,6 +87,7 @@ TDqSerializedBatch LoadSpilled(TBuffer&& blob) {
TDqSerializedBatch result;
result.Proto.SetTransportVersion(ReadNumber<ui32>(source));
result.Proto.SetChunks(ReadNumber<ui32>(source));
+ result.Proto.SetRows(ReadNumber<ui32>(source));
size_t protoSize = ReadNumber<size_t>(source);
YQL_ENSURE(source.size() >= protoSize, "Premature end of spilled data");
diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.h b/ydb/library/yql/dq/common/dq_serialized_batch.h
index a9c4a73af2..4560c0214e 100644
--- a/ydb/library/yql/dq/common/dq_serialized_batch.h
+++ b/ydb/library/yql/dq/common/dq_serialized_batch.h
@@ -29,7 +29,7 @@ struct TDqSerializedBatch {
}
ui32 RowCount() const {
- return Proto.GetChunks(); // FIXME with Rows
+ return Proto.GetRows() ? Proto.GetRows() : Proto.GetChunks();
}
void Clear() {
diff --git a/ydb/library/yql/dq/proto/dq_transport.proto b/ydb/library/yql/dq/proto/dq_transport.proto
index 20402f60a3..ac4af04c05 100644
--- a/ydb/library/yql/dq/proto/dq_transport.proto
+++ b/ydb/library/yql/dq/proto/dq_transport.proto
@@ -18,4 +18,5 @@ message TData {
bytes Raw = 2;
uint32 Chunks = 3;
optional uint32 PayloadId = 4;
+ uint32 Rows = 5;
}
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
index 0968d366ec..ddf5e63fca 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
@@ -95,8 +95,12 @@ public:
return;
}
+ ui32 rows = Packer.IsBlock() ?
+ NKikimr::NMiniKQL::TArrowBlock::From(values[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value
+ : 1;
+
if (PushStats.CollectBasic()) {
- PushStats.Rows++;
+ PushStats.Rows += rows;
PushStats.Chunks++;
PushStats.Resume();
}
@@ -111,6 +115,7 @@ public:
}
PackerCurrentChunkCount++;
+ PackerCurrentRowCount += rows;
size_t packerSize = Packer.PackedSizeEstimate();
if (packerSize >= MaxChunkBytes) {
@@ -121,8 +126,11 @@ public:
}
PackedDataSize += Data.back().Buffer.Size();
PackedChunkCount += PackerCurrentChunkCount;
+ PackedRowCount += PackerCurrentRowCount;
Data.back().ChunkCount = PackerCurrentChunkCount;
+ Data.back().RowCount = PackerCurrentRowCount;
PackerCurrentChunkCount = 0;
+ PackerCurrentRowCount = 0;
packerSize = 0;
}
@@ -134,11 +142,13 @@ public:
TDqSerializedBatch data;
data.Proto.SetTransportVersion(TransportVersion);
data.Proto.SetChunks(head.ChunkCount);
+ data.Proto.SetRows(head.RowCount);
data.SetPayload(std::move(head.Buffer));
Storage->Put(NextStoredId++, SaveForSpilling(std::move(data)));
PackedDataSize -= bufSize;
PackedChunkCount -= head.ChunkCount;
+ PackedRowCount -= head.RowCount;
SpilledChunkCount += head.ChunkCount;
@@ -199,14 +209,18 @@ public:
} else if (!Data.empty()) {
auto& packed = Data.front();
PackedChunkCount -= packed.ChunkCount;
+ PackedRowCount -= packed.RowCount;
PackedDataSize -= packed.Buffer.Size();
data.Proto.SetChunks(packed.ChunkCount);
+ data.Proto.SetRows(packed.RowCount);
data.SetPayload(std::move(packed.Buffer));
Data.pop_front();
} else {
data.Proto.SetChunks(PackerCurrentChunkCount);
+ data.Proto.SetRows(PackerCurrentRowCount);
data.SetPayload(FinishPackAndCheckSize());
PackerCurrentChunkCount = 0;
+ PackerCurrentRowCount = 0;
}
DLOG("Took " << data.RowCount() << " rows");
@@ -214,7 +228,7 @@ public:
if (PopStats.CollectBasic()) {
PopStats.Bytes += data.Size();
PopStats.Rows += data.RowCount();
- PopStats.Chunks++;
+ PopStats.Chunks++; // pop chunks do not match push chunks
if (!IsFull() || FirstStoredId == NextStoredId) {
PopStats.Resume();
}
@@ -257,8 +271,13 @@ public:
data.Proto.SetTransportVersion(TransportVersion);
if (SpilledChunkCount == 0 && PackedChunkCount == 0) {
data.Proto.SetChunks(PackerCurrentChunkCount);
+ data.Proto.SetRows(PackerCurrentRowCount);
data.SetPayload(FinishPackAndCheckSize());
+ if (PushStats.CollectBasic()) {
+ PushStats.Bytes += data.Payload.Size();
+ }
PackerCurrentChunkCount = 0;
+ PackerCurrentRowCount = 0;
return true;
}
@@ -266,19 +285,29 @@ public:
if (PackerCurrentChunkCount) {
Data.emplace_back();
Data.back().Buffer = FinishPackAndCheckSize();
+ if (PushStats.CollectBasic()) {
+ PushStats.Bytes += Data.back().Buffer.Size();
+ }
PackedDataSize += Data.back().Buffer.Size();
PackedChunkCount += PackerCurrentChunkCount;
+ PackedRowCount += PackerCurrentRowCount;
Data.back().ChunkCount = PackerCurrentChunkCount;
+ Data.back().RowCount = PackerCurrentRowCount;
PackerCurrentChunkCount = 0;
+ PackerCurrentRowCount = 0;
}
NKikimr::NMiniKQL::TUnboxedValueBatch rows(OutputType);
+ size_t repackedChunkCount = 0;
+ size_t repackedRowCount = 0;
for (;;) {
- TDqSerializedBatch chunk;
- if (!this->Pop(chunk)) {
+ TDqSerializedBatch batch;
+ if (!this->Pop(batch)) {
break;
}
- Packer.UnpackBatch(chunk.PullPayload(), HolderFactory, rows);
+ repackedChunkCount += batch.ChunkCount();
+ repackedRowCount += batch.RowCount();
+ Packer.UnpackBatch(batch.PullPayload(), HolderFactory, rows);
}
if (OutputType->IsMulti()) {
@@ -291,7 +320,8 @@ public:
});
}
- data.Proto.SetChunks(rows.RowCount()); // 1 UVB "row" is Chunk
+ data.Proto.SetChunks(repackedChunkCount);
+ data.Proto.SetRows(repackedRowCount);
data.SetPayload(FinishPackAndCheckSize());
if (PopStats.CollectBasic()) {
PopStats.Bytes += data.Size();
@@ -332,7 +362,12 @@ public:
ui64 rows = GetValuesCount();
Data.clear();
Packer.Clear();
- SpilledChunkCount = PackedDataSize = PackedChunkCount = PackerCurrentChunkCount = 0;
+ PackedDataSize = 0;
+ PackedChunkCount = 0;
+ PackedRowCount = 0;
+ SpilledChunkCount = 0;
+ PackerCurrentChunkCount = 0;
+ PackerCurrentRowCount = 0;
FirstStoredId = NextStoredId;
return rows;
}
@@ -359,6 +394,7 @@ private:
struct TSerializedBatch {
TChunkedBuffer Buffer;
ui64 ChunkCount = 0;
+ ui64 RowCount = 0;
};
std::deque<TSerializedBatch> Data;
@@ -368,8 +404,10 @@ private:
size_t PackedDataSize = 0;
size_t PackedChunkCount = 0;
+ size_t PackedRowCount = 0;
size_t PackerCurrentChunkCount = 0;
+ size_t PackerCurrentRowCount = 0;
bool Finished = false;
diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp
index 338f66bb1f..0ca93ce3bf 100644
--- a/ydb/library/yql/dq/runtime/dq_transport.cpp
+++ b/ydb/library/yql/dq/runtime/dq_transport.cpp
@@ -44,6 +44,7 @@ TDqSerializedBatch SerializeValue(NDqProto::EDataTransportVersion version, const
TDqSerializedBatch result;
result.Proto.SetTransportVersion(version);
result.Proto.SetChunks(1);
+ result.Proto.SetRows(1);
result.SetPayload(std::move(packResult));
return result;
}
@@ -88,6 +89,7 @@ TDqSerializedBatch SerializeBuffer(NDqProto::EDataTransportVersion version, cons
TDqSerializedBatch result;
result.Proto.SetTransportVersion(version);
result.Proto.SetChunks(buffer.RowCount());
+ result.Proto.SetRows(buffer.RowCount()); // maybe incorrect for Arrow Blocks
result.SetPayload(std::move(packResult));
return result;
}
@@ -177,6 +179,7 @@ NDqProto::TData TDqDataSerializer::SerializeParamValue(const TType* type, const
data.SetTransportVersion(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
data.SetRaw(packResult.data(), packResult.size());
data.SetChunks(1);
+ data.SetRows(1);
return data;
}