aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHor911 <hor911@ydb.tech>2025-02-28 15:01:23 +0300
committerGitHub <noreply@github.com>2025-02-28 15:01:23 +0300
commit37290b172ed610df2157bf048f7f1115148b2a4d (patch)
treed7fc74c55201bb2227f58401ce64b3babb2c8e43
parent6964fca4eb4581d6a3055934614c20a54d094657 (diff)
downloadydb-37290b172ed610df2157bf048f7f1115148b2a4d.tar.gz
Rename RowCount => ChunkCount to align with semantics and make it less confusing (#15080)
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h4
-rw-r--r--ydb/core/kqp/runtime/kqp_transport.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp16
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h6
-rw-r--r--ydb/library/yql/dq/actors/dq.h4
-rw-r--r--ydb/library/yql/dq/common/dq_serialized_batch.cpp10
-rw-r--r--ydb/library/yql/dq/common/dq_serialized_batch.h6
-rw-r--r--ydb/library/yql/dq/proto/dq_transport.proto2
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_channel.cpp7
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.cpp62
-rw-r--r--ydb/library/yql/dq/runtime/dq_transport.cpp6
-rw-r--r--ydb/library/yql/dq/runtime/dq_transport.h2
13 files changed, 65 insertions, 64 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 2da8a6e05c..b768bc2695 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -291,10 +291,6 @@ protected:
size_t Size() const {
return Proto.GetChannelData().GetData().GetRaw().size() + Payload.size();
}
-
- ui32 RowCount() const {
- return Proto.GetChannelData().GetData().GetRows();
- }
};
void HandleChannelData(NYql::NDq::TEvDqCompute::TEvChannelData::TPtr& ev) {
diff --git a/ydb/core/kqp/runtime/kqp_transport.cpp b/ydb/core/kqp/runtime/kqp_transport.cpp
index f10770dddf..bb7e6d3499 100644
--- a/ydb/core/kqp/runtime/kqp_transport.cpp
+++ b/ydb/core/kqp/runtime/kqp_transport.cpp
@@ -75,7 +75,7 @@ void TKqpProtoBuilder::BuildYdbResultSet(
}
NDq::TDqDataSerializer dataSerializer(*TypeEnv, *HolderFactory, transportVersion);
for (auto& part : data) {
- if (part.RowCount()) {
+ if (part.ChunkCount()) {
TUnboxedValueBatch rows(mkqlSrcRowType);
dataSerializer.Deserialize(std::move(part), mkqlSrcRowType, rows);
rows.ForEachRow([&](const NUdf::TUnboxedValue& value) {
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index 2d1941cb8b..887fbf5174 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -523,7 +523,7 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const
NDq::TDqSerializedBatch outputData;
auto fetchStatus = FetchOutput(taskRunner.GetOutputChannel(channel.GetId()).Get(), outputData);
MKQL_ENSURE_S(fetchStatus == NUdf::EFetchStatus::Finish);
- MKQL_ENSURE_S(outputData.Proto.GetRows() == 0);
+ MKQL_ENSURE_S(outputData.Proto.GetChunks() == 0);
}
}
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
index 0bb6f37382..8a66a9688e 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
@@ -115,7 +115,7 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelData::TPtr& ev)
LOG_T("Received input for channelId: " << channelId
<< ", seqNo: " << record.GetSeqNo()
<< ", size: " << channelData.Proto.GetData().GetRaw().size()
- << ", rows: " << channelData.Proto.GetData().GetRows()
+ << ", chunks: " << channelData.Proto.GetData().GetChunks()
<< ", watermark: " << channelData.Proto.HasWatermark()
<< ", checkpoint: " << channelData.Proto.HasCheckpoint()
<< ", finished: " << channelData.Proto.GetFinished()
@@ -177,7 +177,7 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelDataAck::TPtr&
if (record.GetFinish()) {
auto it = outputChannel.InFlight.begin();
while (it != outputChannel.InFlight.end()) {
- outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.RowCount());
+ outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.ChunkCount());
it = outputChannel.InFlight.erase(it);
}
outputChannel.RetryState.reset();
@@ -190,7 +190,7 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelDataAck::TPtr&
// remove all messages with seqNo <= ackSeqNo
auto it = outputChannel.InFlight.begin();
while (it != outputChannel.InFlight.end() && it->first <= record.GetSeqNo()) {
- outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.RowCount());
+ outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.ChunkCount());
it = outputChannel.InFlight.erase(it);
}
@@ -549,14 +549,14 @@ void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData, con
YQL_ENSURE(!outputChannel.RetryState);
const ui64 seqNo = ++outputChannel.LastSentSeqNo;
- const ui32 chunkBytes = channelData.PayloadSize();
- const ui32 chunkRows = channelData.RowCount();
+ const ui32 dataBytes = channelData.PayloadSize();
+ const ui32 dataChunks = channelData.ChunkCount();
const bool finished = channelData.Proto.GetFinished();
LOG_T("SendChannelData, channelId: " << channelData.Proto.GetChannelId()
<< ", peer: " << *outputChannel.Peer
- << ", rows: " << chunkRows
- << ", bytes: " << chunkBytes
+ << ", chunks: " << dataChunks
+ << ", bytes: " << dataBytes
<< ", watermark: " << channelData.Proto.HasWatermark()
<< ", checkpoint: " << channelData.Proto.HasCheckpoint()
<< ", seqNo: " << seqNo
@@ -588,7 +588,7 @@ void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData, con
dataEv->Record.SetNoAck(!needAck);
Send(*outputChannel.Peer, dataEv.Release(), flags, /* cookie */ outputChannel.ChannelId);
- outputChannel.PeerState.AddInFlight(chunkBytes, chunkRows);
+ outputChannel.PeerState.AddInFlight(dataBytes, dataChunks);
}
bool TDqComputeActorChannels::PollChannel(ui64 channelId, i64 freeSpace) {
diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h
index 23a08ea6dc..465467db93 100644
--- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h
+++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h
@@ -57,7 +57,7 @@ protected:
}
}
- bool DoHandleChannelsAfterFinishImpl() override final{
+ bool DoHandleChannelsAfterFinishImpl() override final{
Y_ABORT_UNLESS(this->Checkpoints);
if (this->Checkpoints->HasPendingCheckpoint() && !this->Checkpoints->ComputeActorStateSaved() && ReadyToCheckpoint()) {
@@ -83,7 +83,7 @@ protected: //TDqComputeActorChannels::ICalbacks
auto channel = inputChannel->Channel;
- if (channelData.RowCount()) {
+ if (channelData.ChunkCount()) {
TDqSerializedBatch batch;
batch.Proto = std::move(*channelData.Proto.MutableData());
batch.Payload = std::move(channelData.Payload);
@@ -211,7 +211,7 @@ protected:
if (!limits.OutputChunkMaxSize) {
limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
}
-
+
if (this->Task.GetEnableSpilling()) {
TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback()));
}
diff --git a/ydb/library/yql/dq/actors/dq.h b/ydb/library/yql/dq/actors/dq.h
index a5f45cfa72..9d68e9fed1 100644
--- a/ydb/library/yql/dq/actors/dq.h
+++ b/ydb/library/yql/dq/actors/dq.h
@@ -88,8 +88,8 @@ struct TChannelDataOOB {
return Proto.GetData().GetRaw().size() + Payload.Size();
}
- ui32 RowCount() const {
- return Proto.GetData().GetRows();
+ ui32 ChunkCount() const {
+ return Proto.GetData().GetChunks();
}
};
diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.cpp b/ydb/library/yql/dq/common/dq_serialized_batch.cpp
index 81ec0c2334..2a9798c353 100644
--- a/ydb/library/yql/dq/common/dq_serialized_batch.cpp
+++ b/ydb/library/yql/dq/common/dq_serialized_batch.cpp
@@ -63,13 +63,13 @@ void TDqSerializedBatch::ConvertToNoOOB() {
TChunkedBuffer SaveForSpilling(TDqSerializedBatch&& batch) {
TChunkedBuffer result;
- ui32 transportversion = batch.Proto.GetTransportVersion();
- ui32 rowCount = batch.Proto.GetRows();
+ ui32 transportVersion = batch.Proto.GetTransportVersion();
+ ui32 chunkCount = batch.Proto.GetChunks();
TChunkedBuffer protoPayload(std::move(*batch.Proto.MutableRaw()));
- AppendNumber(result, transportversion);
- AppendNumber(result, rowCount);
+ AppendNumber(result, transportVersion);
+ AppendNumber(result, chunkCount);
AppendNumber(result, protoPayload.Size());
result.Append(std::move(protoPayload));
AppendNumber(result, batch.Payload.Size());
@@ -84,7 +84,7 @@ TDqSerializedBatch LoadSpilled(TBuffer&& blob) {
TStringBuf source(sharedBuf->Data(), sharedBuf->Size());
TDqSerializedBatch result;
result.Proto.SetTransportVersion(ReadNumber<ui32>(source));
- result.Proto.SetRows(ReadNumber<ui32>(source));
+ result.Proto.SetChunks(ReadNumber<ui32>(source));
size_t protoSize = ReadNumber<size_t>(source);
YQL_ENSURE(source.size() >= protoSize, "Premature end of spilled data");
diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.h b/ydb/library/yql/dq/common/dq_serialized_batch.h
index adfb9bb481..a9c4a73af2 100644
--- a/ydb/library/yql/dq/common/dq_serialized_batch.h
+++ b/ydb/library/yql/dq/common/dq_serialized_batch.h
@@ -24,8 +24,12 @@ struct TDqSerializedBatch {
return Proto.GetRaw().size() + Payload.Size();
}
+ ui32 ChunkCount() const {
+ return Proto.GetChunks();
+ }
+
ui32 RowCount() const {
- return Proto.GetRows();
+ return Proto.GetChunks(); // FIXME with Rows
}
void Clear() {
diff --git a/ydb/library/yql/dq/proto/dq_transport.proto b/ydb/library/yql/dq/proto/dq_transport.proto
index 3da4786236..20402f60a3 100644
--- a/ydb/library/yql/dq/proto/dq_transport.proto
+++ b/ydb/library/yql/dq/proto/dq_transport.proto
@@ -16,6 +16,6 @@ enum EDataTransportVersion {
message TData {
uint32 TransportVersion = 1;
bytes Raw = 2;
- uint32 Rows = 3;
+ uint32 Chunks = 3;
optional uint32 PayloadId = 4;
}
diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.cpp b/ydb/library/yql/dq/runtime/dq_input_channel.cpp
index f61abd6d5e..3a750897e1 100644
--- a/ydb/library/yql/dq/runtime/dq_input_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_input_channel.cpp
@@ -47,7 +47,7 @@ private:
void PushImpl(TDqSerializedBatch&& data) {
const i64 space = data.Size();
- const size_t rowCount = data.RowCount();
+ const size_t chunkCount = data.ChunkCount();
auto inputType = Impl.GetInputType();
NKikimr::NMiniKQL::TUnboxedValueBatch batch(inputType);
if (Y_UNLIKELY(PushStats.CollectProfile())) {
@@ -58,7 +58,8 @@ private:
DataSerializer.Deserialize(std::move(data), inputType, batch);
}
- YQL_ENSURE(batch.RowCount() == rowCount);
+ // single batch row is chunk and may be Arrow block
+ YQL_ENSURE(batch.RowCount() == chunkCount);
Impl.AddBatch(std::move(batch), space);
}
@@ -123,7 +124,7 @@ public:
void Push(TDqSerializedBatch&& data) override {
YQL_ENSURE(!Impl.IsFinished(), "input channel " << PushStats.ChannelId << " already finished");
- if (Y_UNLIKELY(data.Proto.GetRows() == 0)) {
+ if (Y_UNLIKELY(data.Proto.GetChunks() == 0)) {
return;
}
StoredSerializedBytes += data.Size();
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
index bfc01284a4..0968d366ec 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
@@ -58,7 +58,7 @@ public:
}
ui64 GetValuesCount() const override {
- return SpilledRowCount + PackedRowCount + ChunkRowCount;
+ return SpilledChunkCount + PackedChunkCount + PackerCurrentChunkCount;
}
const TDqOutputStats& GetPushStats() const override {
@@ -110,7 +110,7 @@ public:
values[i] = {};
}
- ChunkRowCount++;
+ PackerCurrentChunkCount++;
size_t packerSize = Packer.PackedSizeEstimate();
if (packerSize >= MaxChunkBytes) {
@@ -120,9 +120,9 @@ public:
PushStats.Bytes += Data.back().Buffer.Size();
}
PackedDataSize += Data.back().Buffer.Size();
- PackedRowCount += ChunkRowCount;
- Data.back().RowCount = ChunkRowCount;
- ChunkRowCount = 0;
+ PackedChunkCount += PackerCurrentChunkCount;
+ Data.back().ChunkCount = PackerCurrentChunkCount;
+ PackerCurrentChunkCount = 0;
packerSize = 0;
}
@@ -133,23 +133,23 @@ public:
TDqSerializedBatch data;
data.Proto.SetTransportVersion(TransportVersion);
- data.Proto.SetRows(head.RowCount);
+ data.Proto.SetChunks(head.ChunkCount);
data.SetPayload(std::move(head.Buffer));
Storage->Put(NextStoredId++, SaveForSpilling(std::move(data)));
PackedDataSize -= bufSize;
- PackedRowCount -= head.RowCount;
+ PackedChunkCount -= head.ChunkCount;
- SpilledRowCount += head.RowCount;
+ SpilledChunkCount += head.ChunkCount;
if (PopStats.CollectFull()) {
- PopStats.SpilledRows += head.RowCount;
- PopStats.SpilledBytes += bufSize + sizeof(head.RowCount);
+ PopStats.SpilledRows += head.ChunkCount; // FIXME with RowCount
+ PopStats.SpilledBytes += bufSize + sizeof(head.ChunkCount);
PopStats.SpilledBlobs++;
}
Data.pop_front();
- LOG("Data spilled. Total rows spilled: " << SpilledRowCount << ", bytesInMemory: " << (PackedDataSize + packerSize));
+ LOG("Data spilled. Total rows spilled: " << SpilledChunkCount << ", bytesInMemory: " << (PackedDataSize + packerSize)); // FIXME with RowCount
}
if (IsFull() || FirstStoredId < NextStoredId) {
@@ -158,7 +158,7 @@ public:
if (PopStats.CollectFull()) {
PopStats.MaxMemoryUsage = std::max(PopStats.MaxMemoryUsage, PackedDataSize + packerSize);
- PopStats.MaxRowsInMemory = std::max(PopStats.MaxRowsInMemory, PackedRowCount);
+ PopStats.MaxRowsInMemory = std::max(PopStats.MaxRowsInMemory, PackedChunkCount);
}
}
@@ -195,18 +195,18 @@ public:
}
++FirstStoredId;
data = LoadSpilled(std::move(blob));
- SpilledRowCount -= data.RowCount();
+ SpilledChunkCount -= data.ChunkCount();
} else if (!Data.empty()) {
auto& packed = Data.front();
- PackedRowCount -= packed.RowCount;
+ PackedChunkCount -= packed.ChunkCount;
PackedDataSize -= packed.Buffer.Size();
- data.Proto.SetRows(packed.RowCount);
+ data.Proto.SetChunks(packed.ChunkCount);
data.SetPayload(std::move(packed.Buffer));
Data.pop_front();
} else {
- data.Proto.SetRows(ChunkRowCount);
+ data.Proto.SetChunks(PackerCurrentChunkCount);
data.SetPayload(FinishPackAndCheckSize());
- ChunkRowCount = 0;
+ PackerCurrentChunkCount = 0;
}
DLOG("Took " << data.RowCount() << " rows");
@@ -255,21 +255,21 @@ public:
data.Clear();
data.Proto.SetTransportVersion(TransportVersion);
- if (SpilledRowCount == 0 && PackedRowCount == 0) {
- data.Proto.SetRows(ChunkRowCount);
+ if (SpilledChunkCount == 0 && PackedChunkCount == 0) {
+ data.Proto.SetChunks(PackerCurrentChunkCount);
data.SetPayload(FinishPackAndCheckSize());
- ChunkRowCount = 0;
+ PackerCurrentChunkCount = 0;
return true;
}
// Repack all - thats why PopAll should never be used
- if (ChunkRowCount) {
+ if (PackerCurrentChunkCount) {
Data.emplace_back();
Data.back().Buffer = FinishPackAndCheckSize();
PackedDataSize += Data.back().Buffer.Size();
- PackedRowCount += ChunkRowCount;
- Data.back().RowCount = ChunkRowCount;
- ChunkRowCount = 0;
+ PackedChunkCount += PackerCurrentChunkCount;
+ Data.back().ChunkCount = PackerCurrentChunkCount;
+ PackerCurrentChunkCount = 0;
}
NKikimr::NMiniKQL::TUnboxedValueBatch rows(OutputType);
@@ -291,7 +291,7 @@ public:
});
}
- data.Proto.SetRows(rows.RowCount());
+ data.Proto.SetChunks(rows.RowCount()); // 1 UVB "row" is Chunk
data.SetPayload(FinishPackAndCheckSize());
if (PopStats.CollectBasic()) {
PopStats.Bytes += data.Size();
@@ -332,7 +332,7 @@ public:
ui64 rows = GetValuesCount();
Data.clear();
Packer.Clear();
- SpilledRowCount = PackedDataSize = PackedRowCount = ChunkRowCount = 0;
+ SpilledChunkCount = PackedDataSize = PackedChunkCount = PackerCurrentChunkCount = 0;
FirstStoredId = NextStoredId;
return rows;
}
@@ -358,18 +358,18 @@ private:
struct TSerializedBatch {
TChunkedBuffer Buffer;
- ui64 RowCount = 0;
+ ui64 ChunkCount = 0;
};
std::deque<TSerializedBatch> Data;
- size_t SpilledRowCount = 0;
+ size_t SpilledChunkCount = 0;
ui64 FirstStoredId = 0;
ui64 NextStoredId = 0;
size_t PackedDataSize = 0;
- size_t PackedRowCount = 0;
-
- size_t ChunkRowCount = 0;
+ size_t PackedChunkCount = 0;
+
+ size_t PackerCurrentChunkCount = 0;
bool Finished = false;
diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp
index 647a977e67..338f66bb1f 100644
--- a/ydb/library/yql/dq/runtime/dq_transport.cpp
+++ b/ydb/library/yql/dq/runtime/dq_transport.cpp
@@ -43,7 +43,7 @@ TDqSerializedBatch SerializeValue(NDqProto::EDataTransportVersion version, const
TDqSerializedBatch result;
result.Proto.SetTransportVersion(version);
- result.Proto.SetRows(1);
+ result.Proto.SetChunks(1);
result.SetPayload(std::move(packResult));
return result;
}
@@ -87,7 +87,7 @@ TDqSerializedBatch SerializeBuffer(NDqProto::EDataTransportVersion version, cons
TDqSerializedBatch result;
result.Proto.SetTransportVersion(version);
- result.Proto.SetRows(buffer.RowCount());
+ result.Proto.SetChunks(buffer.RowCount());
result.SetPayload(std::move(packResult));
return result;
}
@@ -176,7 +176,7 @@ NDqProto::TData TDqDataSerializer::SerializeParamValue(const TType* type, const
NDqProto::TData data;
data.SetTransportVersion(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
data.SetRaw(packResult.data(), packResult.size());
- data.SetRows(1);
+ data.SetChunks(1);
return data;
}
diff --git a/ydb/library/yql/dq/runtime/dq_transport.h b/ydb/library/yql/dq/runtime/dq_transport.h
index 898bf28aa9..e7f91e140d 100644
--- a/ydb/library/yql/dq/runtime/dq_transport.h
+++ b/ydb/library/yql/dq/runtime/dq_transport.h
@@ -82,7 +82,7 @@ private:
}
TDqSerializedBatch result;
result.Proto.SetTransportVersion(TransportVersion);
- result.Proto.SetRows(count);
+ result.Proto.SetChunks(count);
result.SetPayload(packer.Finish());
return result;
}