diff options
author | Hor911 <hor911@ydb.tech> | 2025-02-28 15:01:23 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-28 15:01:23 +0300 |
commit | 37290b172ed610df2157bf048f7f1115148b2a4d (patch) | |
tree | d7fc74c55201bb2227f58401ce64b3babb2c8e43 | |
parent | 6964fca4eb4581d6a3055934614c20a54d094657 (diff) | |
download | ydb-37290b172ed610df2157bf048f7f1115148b2a4d.tar.gz |
Rename RowCount => ChunkCount to align with semantics and make it less confusing (#15080)
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_transport.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp | 16 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h | 6 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/dq.h | 4 | ||||
-rw-r--r-- | ydb/library/yql/dq/common/dq_serialized_batch.cpp | 10 | ||||
-rw-r--r-- | ydb/library/yql/dq/common/dq_serialized_batch.h | 6 | ||||
-rw-r--r-- | ydb/library/yql/dq/proto/dq_transport.proto | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_input_channel.cpp | 7 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_output_channel.cpp | 62 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_transport.cpp | 6 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_transport.h | 2 |
13 files changed, 65 insertions, 64 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 2da8a6e05c..b768bc2695 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -291,10 +291,6 @@ protected: size_t Size() const { return Proto.GetChannelData().GetData().GetRaw().size() + Payload.size(); } - - ui32 RowCount() const { - return Proto.GetChannelData().GetData().GetRows(); - } }; void HandleChannelData(NYql::NDq::TEvDqCompute::TEvChannelData::TPtr& ev) { diff --git a/ydb/core/kqp/runtime/kqp_transport.cpp b/ydb/core/kqp/runtime/kqp_transport.cpp index f10770dddf..bb7e6d3499 100644 --- a/ydb/core/kqp/runtime/kqp_transport.cpp +++ b/ydb/core/kqp/runtime/kqp_transport.cpp @@ -75,7 +75,7 @@ void TKqpProtoBuilder::BuildYdbResultSet( } NDq::TDqDataSerializer dataSerializer(*TypeEnv, *HolderFactory, transportVersion); for (auto& part : data) { - if (part.RowCount()) { + if (part.ChunkCount()) { TUnboxedValueBatch rows(mkqlSrcRowType); dataSerializer.Deserialize(std::move(part), mkqlSrcRowType, rows); rows.ForEachRow([&](const NUdf::TUnboxedValue& value) { diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index 2d1941cb8b..887fbf5174 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -523,7 +523,7 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const NDq::TDqSerializedBatch outputData; auto fetchStatus = FetchOutput(taskRunner.GetOutputChannel(channel.GetId()).Get(), outputData); MKQL_ENSURE_S(fetchStatus == NUdf::EFetchStatus::Finish); - MKQL_ENSURE_S(outputData.Proto.GetRows() == 0); + MKQL_ENSURE_S(outputData.Proto.GetChunks() == 0); } } } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp index 0bb6f37382..8a66a9688e 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp @@ -115,7 +115,7 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelData::TPtr& ev) LOG_T("Received input for channelId: " << channelId << ", seqNo: " << record.GetSeqNo() << ", size: " << channelData.Proto.GetData().GetRaw().size() - << ", rows: " << channelData.Proto.GetData().GetRows() + << ", chunks: " << channelData.Proto.GetData().GetChunks() << ", watermark: " << channelData.Proto.HasWatermark() << ", checkpoint: " << channelData.Proto.HasCheckpoint() << ", finished: " << channelData.Proto.GetFinished() @@ -177,7 +177,7 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelDataAck::TPtr& if (record.GetFinish()) { auto it = outputChannel.InFlight.begin(); while (it != outputChannel.InFlight.end()) { - outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.RowCount()); + outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.ChunkCount()); it = outputChannel.InFlight.erase(it); } outputChannel.RetryState.reset(); @@ -190,7 +190,7 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelDataAck::TPtr& // remove all messages with seqNo <= ackSeqNo auto it = outputChannel.InFlight.begin(); while (it != outputChannel.InFlight.end() && it->first <= record.GetSeqNo()) { - outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.RowCount()); + outputChannel.PeerState.RemoveInFlight(it->second.Data.PayloadSize(), it->second.Data.ChunkCount()); it = outputChannel.InFlight.erase(it); } @@ -549,14 +549,14 @@ void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData, con YQL_ENSURE(!outputChannel.RetryState); const ui64 seqNo = ++outputChannel.LastSentSeqNo; - const ui32 chunkBytes = channelData.PayloadSize(); - const ui32 chunkRows = channelData.RowCount(); + const ui32 dataBytes = channelData.PayloadSize(); + const ui32 dataChunks = channelData.ChunkCount(); const bool finished = channelData.Proto.GetFinished(); LOG_T("SendChannelData, channelId: " << channelData.Proto.GetChannelId() << ", peer: " << *outputChannel.Peer - << ", rows: " << chunkRows - << ", bytes: " << chunkBytes + << ", chunks: " << dataChunks + << ", bytes: " << dataBytes << ", watermark: " << channelData.Proto.HasWatermark() << ", checkpoint: " << channelData.Proto.HasCheckpoint() << ", seqNo: " << seqNo @@ -588,7 +588,7 @@ void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData, con dataEv->Record.SetNoAck(!needAck); Send(*outputChannel.Peer, dataEv.Release(), flags, /* cookie */ outputChannel.ChannelId); - outputChannel.PeerState.AddInFlight(chunkBytes, chunkRows); + outputChannel.PeerState.AddInFlight(dataBytes, dataChunks); } bool TDqComputeActorChannels::PollChannel(ui64 channelId, i64 freeSpace) { diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h index 23a08ea6dc..465467db93 100644 --- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h +++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h @@ -57,7 +57,7 @@ protected: } } - bool DoHandleChannelsAfterFinishImpl() override final{ + bool DoHandleChannelsAfterFinishImpl() override final{ Y_ABORT_UNLESS(this->Checkpoints); if (this->Checkpoints->HasPendingCheckpoint() && !this->Checkpoints->ComputeActorStateSaved() && ReadyToCheckpoint()) { @@ -83,7 +83,7 @@ protected: //TDqComputeActorChannels::ICalbacks auto channel = inputChannel->Channel; - if (channelData.RowCount()) { + if (channelData.ChunkCount()) { TDqSerializedBatch batch; batch.Proto = std::move(*channelData.Proto.MutableData()); batch.Payload = std::move(channelData.Payload); @@ -211,7 +211,7 @@ protected: if (!limits.OutputChunkMaxSize) { limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize; } - + if (this->Task.GetEnableSpilling()) { TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback())); } diff --git a/ydb/library/yql/dq/actors/dq.h b/ydb/library/yql/dq/actors/dq.h index a5f45cfa72..9d68e9fed1 100644 --- a/ydb/library/yql/dq/actors/dq.h +++ b/ydb/library/yql/dq/actors/dq.h @@ -88,8 +88,8 @@ struct TChannelDataOOB { return Proto.GetData().GetRaw().size() + Payload.Size(); } - ui32 RowCount() const { - return Proto.GetData().GetRows(); + ui32 ChunkCount() const { + return Proto.GetData().GetChunks(); } }; diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.cpp b/ydb/library/yql/dq/common/dq_serialized_batch.cpp index 81ec0c2334..2a9798c353 100644 --- a/ydb/library/yql/dq/common/dq_serialized_batch.cpp +++ b/ydb/library/yql/dq/common/dq_serialized_batch.cpp @@ -63,13 +63,13 @@ void TDqSerializedBatch::ConvertToNoOOB() { TChunkedBuffer SaveForSpilling(TDqSerializedBatch&& batch) { TChunkedBuffer result; - ui32 transportversion = batch.Proto.GetTransportVersion(); - ui32 rowCount = batch.Proto.GetRows(); + ui32 transportVersion = batch.Proto.GetTransportVersion(); + ui32 chunkCount = batch.Proto.GetChunks(); TChunkedBuffer protoPayload(std::move(*batch.Proto.MutableRaw())); - AppendNumber(result, transportversion); - AppendNumber(result, rowCount); + AppendNumber(result, transportVersion); + AppendNumber(result, chunkCount); AppendNumber(result, protoPayload.Size()); result.Append(std::move(protoPayload)); AppendNumber(result, batch.Payload.Size()); @@ -84,7 +84,7 @@ TDqSerializedBatch LoadSpilled(TBuffer&& blob) { TStringBuf source(sharedBuf->Data(), sharedBuf->Size()); TDqSerializedBatch result; result.Proto.SetTransportVersion(ReadNumber<ui32>(source)); - result.Proto.SetRows(ReadNumber<ui32>(source)); + result.Proto.SetChunks(ReadNumber<ui32>(source)); size_t protoSize = ReadNumber<size_t>(source); YQL_ENSURE(source.size() >= protoSize, "Premature end of spilled data"); diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.h b/ydb/library/yql/dq/common/dq_serialized_batch.h index adfb9bb481..a9c4a73af2 100644 --- a/ydb/library/yql/dq/common/dq_serialized_batch.h +++ b/ydb/library/yql/dq/common/dq_serialized_batch.h @@ -24,8 +24,12 @@ struct TDqSerializedBatch { return Proto.GetRaw().size() + Payload.Size(); } + ui32 ChunkCount() const { + return Proto.GetChunks(); + } + ui32 RowCount() const { - return Proto.GetRows(); + return Proto.GetChunks(); // FIXME with Rows } void Clear() { diff --git a/ydb/library/yql/dq/proto/dq_transport.proto b/ydb/library/yql/dq/proto/dq_transport.proto index 3da4786236..20402f60a3 100644 --- a/ydb/library/yql/dq/proto/dq_transport.proto +++ b/ydb/library/yql/dq/proto/dq_transport.proto @@ -16,6 +16,6 @@ enum EDataTransportVersion { message TData { uint32 TransportVersion = 1; bytes Raw = 2; - uint32 Rows = 3; + uint32 Chunks = 3; optional uint32 PayloadId = 4; } diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.cpp b/ydb/library/yql/dq/runtime/dq_input_channel.cpp index f61abd6d5e..3a750897e1 100644 --- a/ydb/library/yql/dq/runtime/dq_input_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_input_channel.cpp @@ -47,7 +47,7 @@ private: void PushImpl(TDqSerializedBatch&& data) { const i64 space = data.Size(); - const size_t rowCount = data.RowCount(); + const size_t chunkCount = data.ChunkCount(); auto inputType = Impl.GetInputType(); NKikimr::NMiniKQL::TUnboxedValueBatch batch(inputType); if (Y_UNLIKELY(PushStats.CollectProfile())) { @@ -58,7 +58,8 @@ private: DataSerializer.Deserialize(std::move(data), inputType, batch); } - YQL_ENSURE(batch.RowCount() == rowCount); + // single batch row is chunk and may be Arrow block + YQL_ENSURE(batch.RowCount() == chunkCount); Impl.AddBatch(std::move(batch), space); } @@ -123,7 +124,7 @@ public: void Push(TDqSerializedBatch&& data) override { YQL_ENSURE(!Impl.IsFinished(), "input channel " << PushStats.ChannelId << " already finished"); - if (Y_UNLIKELY(data.Proto.GetRows() == 0)) { + if (Y_UNLIKELY(data.Proto.GetChunks() == 0)) { return; } StoredSerializedBytes += data.Size(); diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp index bfc01284a4..0968d366ec 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp @@ -58,7 +58,7 @@ public: } ui64 GetValuesCount() const override { - return SpilledRowCount + PackedRowCount + ChunkRowCount; + return SpilledChunkCount + PackedChunkCount + PackerCurrentChunkCount; } const TDqOutputStats& GetPushStats() const override { @@ -110,7 +110,7 @@ public: values[i] = {}; } - ChunkRowCount++; + PackerCurrentChunkCount++; size_t packerSize = Packer.PackedSizeEstimate(); if (packerSize >= MaxChunkBytes) { @@ -120,9 +120,9 @@ public: PushStats.Bytes += Data.back().Buffer.Size(); } PackedDataSize += Data.back().Buffer.Size(); - PackedRowCount += ChunkRowCount; - Data.back().RowCount = ChunkRowCount; - ChunkRowCount = 0; + PackedChunkCount += PackerCurrentChunkCount; + Data.back().ChunkCount = PackerCurrentChunkCount; + PackerCurrentChunkCount = 0; packerSize = 0; } @@ -133,23 +133,23 @@ public: TDqSerializedBatch data; data.Proto.SetTransportVersion(TransportVersion); - data.Proto.SetRows(head.RowCount); + data.Proto.SetChunks(head.ChunkCount); data.SetPayload(std::move(head.Buffer)); Storage->Put(NextStoredId++, SaveForSpilling(std::move(data))); PackedDataSize -= bufSize; - PackedRowCount -= head.RowCount; + PackedChunkCount -= head.ChunkCount; - SpilledRowCount += head.RowCount; + SpilledChunkCount += head.ChunkCount; if (PopStats.CollectFull()) { - PopStats.SpilledRows += head.RowCount; - PopStats.SpilledBytes += bufSize + sizeof(head.RowCount); + PopStats.SpilledRows += head.ChunkCount; // FIXME with RowCount + PopStats.SpilledBytes += bufSize + sizeof(head.ChunkCount); PopStats.SpilledBlobs++; } Data.pop_front(); - LOG("Data spilled. Total rows spilled: " << SpilledRowCount << ", bytesInMemory: " << (PackedDataSize + packerSize)); + LOG("Data spilled. Total rows spilled: " << SpilledChunkCount << ", bytesInMemory: " << (PackedDataSize + packerSize)); // FIXME with RowCount } if (IsFull() || FirstStoredId < NextStoredId) { @@ -158,7 +158,7 @@ public: if (PopStats.CollectFull()) { PopStats.MaxMemoryUsage = std::max(PopStats.MaxMemoryUsage, PackedDataSize + packerSize); - PopStats.MaxRowsInMemory = std::max(PopStats.MaxRowsInMemory, PackedRowCount); + PopStats.MaxRowsInMemory = std::max(PopStats.MaxRowsInMemory, PackedChunkCount); } } @@ -195,18 +195,18 @@ public: } ++FirstStoredId; data = LoadSpilled(std::move(blob)); - SpilledRowCount -= data.RowCount(); + SpilledChunkCount -= data.ChunkCount(); } else if (!Data.empty()) { auto& packed = Data.front(); - PackedRowCount -= packed.RowCount; + PackedChunkCount -= packed.ChunkCount; PackedDataSize -= packed.Buffer.Size(); - data.Proto.SetRows(packed.RowCount); + data.Proto.SetChunks(packed.ChunkCount); data.SetPayload(std::move(packed.Buffer)); Data.pop_front(); } else { - data.Proto.SetRows(ChunkRowCount); + data.Proto.SetChunks(PackerCurrentChunkCount); data.SetPayload(FinishPackAndCheckSize()); - ChunkRowCount = 0; + PackerCurrentChunkCount = 0; } DLOG("Took " << data.RowCount() << " rows"); @@ -255,21 +255,21 @@ public: data.Clear(); data.Proto.SetTransportVersion(TransportVersion); - if (SpilledRowCount == 0 && PackedRowCount == 0) { - data.Proto.SetRows(ChunkRowCount); + if (SpilledChunkCount == 0 && PackedChunkCount == 0) { + data.Proto.SetChunks(PackerCurrentChunkCount); data.SetPayload(FinishPackAndCheckSize()); - ChunkRowCount = 0; + PackerCurrentChunkCount = 0; return true; } // Repack all - thats why PopAll should never be used - if (ChunkRowCount) { + if (PackerCurrentChunkCount) { Data.emplace_back(); Data.back().Buffer = FinishPackAndCheckSize(); PackedDataSize += Data.back().Buffer.Size(); - PackedRowCount += ChunkRowCount; - Data.back().RowCount = ChunkRowCount; - ChunkRowCount = 0; + PackedChunkCount += PackerCurrentChunkCount; + Data.back().ChunkCount = PackerCurrentChunkCount; + PackerCurrentChunkCount = 0; } NKikimr::NMiniKQL::TUnboxedValueBatch rows(OutputType); @@ -291,7 +291,7 @@ public: }); } - data.Proto.SetRows(rows.RowCount()); + data.Proto.SetChunks(rows.RowCount()); // 1 UVB "row" is Chunk data.SetPayload(FinishPackAndCheckSize()); if (PopStats.CollectBasic()) { PopStats.Bytes += data.Size(); @@ -332,7 +332,7 @@ public: ui64 rows = GetValuesCount(); Data.clear(); Packer.Clear(); - SpilledRowCount = PackedDataSize = PackedRowCount = ChunkRowCount = 0; + SpilledChunkCount = PackedDataSize = PackedChunkCount = PackerCurrentChunkCount = 0; FirstStoredId = NextStoredId; return rows; } @@ -358,18 +358,18 @@ private: struct TSerializedBatch { TChunkedBuffer Buffer; - ui64 RowCount = 0; + ui64 ChunkCount = 0; }; std::deque<TSerializedBatch> Data; - size_t SpilledRowCount = 0; + size_t SpilledChunkCount = 0; ui64 FirstStoredId = 0; ui64 NextStoredId = 0; size_t PackedDataSize = 0; - size_t PackedRowCount = 0; - - size_t ChunkRowCount = 0; + size_t PackedChunkCount = 0; + + size_t PackerCurrentChunkCount = 0; bool Finished = false; diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp index 647a977e67..338f66bb1f 100644 --- a/ydb/library/yql/dq/runtime/dq_transport.cpp +++ b/ydb/library/yql/dq/runtime/dq_transport.cpp @@ -43,7 +43,7 @@ TDqSerializedBatch SerializeValue(NDqProto::EDataTransportVersion version, const TDqSerializedBatch result; result.Proto.SetTransportVersion(version); - result.Proto.SetRows(1); + result.Proto.SetChunks(1); result.SetPayload(std::move(packResult)); return result; } @@ -87,7 +87,7 @@ TDqSerializedBatch SerializeBuffer(NDqProto::EDataTransportVersion version, cons TDqSerializedBatch result; result.Proto.SetTransportVersion(version); - result.Proto.SetRows(buffer.RowCount()); + result.Proto.SetChunks(buffer.RowCount()); result.SetPayload(std::move(packResult)); return result; } @@ -176,7 +176,7 @@ NDqProto::TData TDqDataSerializer::SerializeParamValue(const TType* type, const NDqProto::TData data; data.SetTransportVersion(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0); data.SetRaw(packResult.data(), packResult.size()); - data.SetRows(1); + data.SetChunks(1); return data; } diff --git a/ydb/library/yql/dq/runtime/dq_transport.h b/ydb/library/yql/dq/runtime/dq_transport.h index 898bf28aa9..e7f91e140d 100644 --- a/ydb/library/yql/dq/runtime/dq_transport.h +++ b/ydb/library/yql/dq/runtime/dq_transport.h @@ -82,7 +82,7 @@ private: } TDqSerializedBatch result; result.Proto.SetTransportVersion(TransportVersion); - result.Proto.SetRows(count); + result.Proto.SetChunks(count); result.SetPayload(packer.Finish()); return result; } |