aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-04-28 15:11:37 +0300
committeraneporada <aneporada@ydb.tech>2023-04-28 15:11:37 +0300
commit277bf2b433110a4e3a5a127d7cc801d050d0394f (patch)
treeecb79e093f45663d2f5ebbdfd36ea759327c183e
parentf3b2c09e16a9c2faaf25e3fc928ae01b1e7fe566 (diff)
downloadydb-277bf2b433110a4e3a5a127d7cc801d050d0394f.tar.gz
Perform serialization in IDqOutput::Push() method. Simplify code
initial
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp1
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp1
-rw-r--r--ydb/core/kqp/executer_actor/kqp_literal_executer.cpp8
-rw-r--r--ydb/core/kqp/runtime/kqp_tasks_runner.cpp7
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp52
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h6
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp2
-rw-r--r--ydb/library/yql/dq/runtime/dq_channel_storage.h8
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.cpp799
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.h6
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp194
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp1
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h1
-rw-r--r--ydb/library/yql/dq/runtime/dq_transport.cpp7
-rw-r--r--ydb/library/yql/dq/runtime/dq_transport.h17
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp20
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack.h9
-rw-r--r--ydb/library/yql/minikql/computation/mkql_optional_usage_mask.h10
-rw-r--r--ydb/library/yql/minikql/mkql_buffer.cpp1
-rw-r--r--ydb/library/yql/minikql/mkql_buffer.h28
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp10
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp3
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp5
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp14
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h2
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp2
27 files changed, 307 insertions, 908 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index 71b8c6a5e25..768440da92a 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -78,7 +78,6 @@ public:
settings.CollectProfileStats = RuntimeSettings.StatsMode >= NYql::NDqProto::DQ_STATS_MODE_PROFILE;
settings.OptLLVM = GetUseLLVM() ? "--compile-options=disable-opt" : "OFF";
settings.UseCacheForLLVM = AppData()->FeatureFlags.GetEnableLLVMCache();
- settings.AllowGeneratorsInUnboxedValues = false;
for (const auto& [paramsName, paramsValue] : GetTask().GetTaskParams()) {
settings.TaskParams[paramsName] = paramsValue;
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 44fbf2508e0..9faa89e6d18 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -196,7 +196,6 @@ void TKqpScanComputeActor::DoBootstrap() {
settings.CollectProfileStats = GetStatsMode() >= NYql::NDqProto::DQ_STATS_MODE_PROFILE;
settings.OptLLVM = TBase::GetUseLLVM() ? "--compile-options=disable-opt" : "OFF";
settings.UseCacheForLLVM = AppData()->FeatureFlags.GetEnableLLVMCache();
- settings.AllowGeneratorsInUnboxedValues = false;
for (const auto& [paramsName, paramsValue] : GetTask().GetTaskParams()) {
settings.TaskParams[paramsName] = paramsValue;
diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
index 4c6059ce26f..577a3a7dbe4 100644
--- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
@@ -41,7 +41,6 @@ TDqTaskRunnerSettings CreateTaskRunnerSettings(Ydb::Table::QueryStatsCollection:
settings.CollectProfileStats = CollectProfileStats(statsMode);
settings.OptLLVM = "OFF";
settings.TerminateOnError = false;
- settings.AllowGeneratorsInUnboxedValues = false;
return settings;
}
@@ -241,9 +240,10 @@ public:
for (ui64 outputChannelId : taskOutput.Channels) {
auto outputChannel = taskRunner->GetOutputChannel(outputChannelId);
auto& channelDesc = TasksGraph.GetChannel(outputChannelId);
- NKikimr::NMiniKQL::TUnboxedValueVector outputRows;
- outputChannel->PopAll(outputRows);
- ResponseEv->TakeResult(channelDesc.DstInputIndex, outputRows);
+ NDqProto::TData outputData;
+ while (outputChannel->Pop(outputData)) {
+ ResponseEv->TakeResult(channelDesc.DstInputIndex, outputData);
+ }
YQL_ENSURE(outputChannel->IsFinished());
}
}
diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
index 677c1a59f2f..651d609db8b 100644
--- a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
+++ b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
@@ -194,8 +194,11 @@ std::pair<bool, bool> TKqpTasksRunner::TransferData(ui64 fromTask, ui64 fromChan
bool finished = false;
// todo: transfer data as-is from input- to output- channel (KIKIMR-10658)
- NDqProto::TData data;
- if (src->Pop(data, std::numeric_limits<ui64>::max())) {
+ for (;;) {
+ NDqProto::TData data;
+ if (!src->Pop(data)) {
+ break;
+ }
transferred = true;
dst->Push(std::move(data));
}
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index c6f78e015f8..24111f604af 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -1269,7 +1269,6 @@ NKqp::TKqpTasksRunner& TEngineBay::GetKqpTasksRunner(const NKikimrTxDataShard::T
settings.OptLLVM = "OFF";
settings.TerminateOnError = false;
- settings.AllowGeneratorsInUnboxedValues = false;
KqpAlloc->SetLimit(10_MB);
KqpTasksRunner = NKqp::CreateKqpTasksRunner(tx.GetTasks(), KqpExecCtx, settings, KqpLogFunc);
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index a41e12da5e6..fa3a4842d47 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -86,6 +86,17 @@ NUdf::EFetchStatus FetchAllOutput(NDq::IDqOutputChannel* channel, NDqProto::TDat
return NUdf::EFetchStatus::Yield;
}
+NUdf::EFetchStatus FetchOutput(NDq::IDqOutputChannel* channel, NDqProto::TData& buffer) {
+ auto result = channel->Pop(buffer);
+ Y_UNUSED(result);
+
+ if (channel->IsFinished()) {
+ return NUdf::EFetchStatus::Finish;
+ }
+
+ return NUdf::EFetchStatus::Yield;
+}
+
NDq::ERunStatus RunKqpTransactionInternal(const TActorContext& ctx, ui64 txId,
const TInputOpData::TInReadSets* inReadSets, const NKikimrTxDataShard::TKqpTransaction& kqpTx,
NKqp::TKqpTasksRunner& tasksRunner, bool applyEffects)
@@ -526,29 +537,30 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const
for (auto& channel : task.GetOutputs(i).GetChannels()) {
auto computeActor = computeCtx.GetTaskOutputChannel(task.GetId(), channel.GetId());
if (computeActor) {
- auto dataEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelData>();
- dataEv->Record.SetSeqNo(1);
- dataEv->Record.MutableChannelData()->SetChannelId(channel.GetId());
- dataEv->Record.MutableChannelData()->SetFinished(true);
- dataEv->Record.SetNoAck(true);
- auto outputData = dataEv->Record.MutableChannelData()->MutableData();
-
- auto fetchStatus = FetchAllOutput(taskRunner.GetOutputChannel(channel.GetId()).Get(), *outputData);
- MKQL_ENSURE_S(fetchStatus == NUdf::EFetchStatus::Finish);
-
- if (outputData->GetRaw().size() > MaxDatashardReplySize) {
- auto message = TStringBuilder() << "Datashard " << origin
- << ": reply size limit exceeded (" << outputData->GetRaw().size() << " > "
- << MaxDatashardReplySize << ")";
-
- LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, message);
- result->SetExecutionError(NKikimrTxDataShard::TError::REPLY_SIZE_EXCEEDED, message);
- } else {
- ctx.Send(computeActor, dataEv.Release());
+ size_t seqNo = 1;
+ auto fetchStatus = NUdf::EFetchStatus::Yield;
+ while (fetchStatus != NUdf::EFetchStatus::Finish) {
+ auto dataEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelData>();
+ dataEv->Record.SetSeqNo(seqNo++);
+ dataEv->Record.MutableChannelData()->SetChannelId(channel.GetId());
+ dataEv->Record.SetNoAck(true);
+ auto outputData = dataEv->Record.MutableChannelData()->MutableData();
+ fetchStatus = FetchOutput(taskRunner.GetOutputChannel(channel.GetId()).Get(), *outputData);
+ dataEv->Record.MutableChannelData()->SetFinished(fetchStatus == NUdf::EFetchStatus::Finish);
+ if (outputData->GetRaw().size() > MaxDatashardReplySize) {
+ auto message = TStringBuilder() << "Datashard " << origin
+ << ": reply size limit exceeded (" << outputData->GetRaw().size() << " > "
+ << MaxDatashardReplySize << ")";
+ LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, message);
+ result->SetExecutionError(NKikimrTxDataShard::TError::REPLY_SIZE_EXCEEDED, message);
+ break;
+ } else {
+ ctx.Send(computeActor, dataEv.Release());
+ }
}
} else {
NDqProto::TData outputData;
- auto fetchStatus = FetchAllOutput(taskRunner.GetOutputChannel(channel.GetId()).Get(), outputData);
+ auto fetchStatus = FetchOutput(taskRunner.GetOutputChannel(channel.GetId()).Get(), outputData);
MKQL_ENSURE_S(fetchStatus == NUdf::EFetchStatus::Finish);
MKQL_ENSURE_S(outputData.GetRows() == 0);
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 1f77357ea7f..8975fa85b8d 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1309,7 +1309,7 @@ private:
i64 remains = toSend;
while (remains > 0 && (!outputChannel.Finished || Checkpoints)) {
- ui32 sent = this->SendChannelDataChunk(outputChannel, remains);
+ ui32 sent = this->SendChannelDataChunk(outputChannel);
if (sent == 0) {
break;
}
@@ -1321,14 +1321,14 @@ private:
ProcessOutputsState.DataWasSent |= (!wasFinished && outputChannel.Finished) || remains != toSend;
}
- ui32 SendChannelDataChunk(TOutputChannelInfo& outputChannel, ui64 bytes) {
+ ui32 SendChannelDataChunk(TOutputChannelInfo& outputChannel) {
auto channel = outputChannel.Channel;
NDqProto::TData data;
NDqProto::TWatermark watermark;
NDqProto::TCheckpoint checkpoint;
- bool hasData = channel->Pop(data, bytes);
+ bool hasData = channel->Pop(data);
bool hasWatermark = channel->Pop(watermark);
bool hasCheckpoint = channel->Pop(checkpoint);
if (!hasData && !hasWatermark && !hasCheckpoint) {
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index ce7fb226449..41f26a78580 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -330,7 +330,7 @@ private:
TMaybe<NDqProto::TCheckpoint> checkpoint = Nothing();
for (;maxChunks && remain > 0 && !isFinished && hasData; maxChunks--, remain -= dataSize) {
NDqProto::TData data;
- hasData = channel->Pop(data, remain);
+ hasData = channel->Pop(data);
NDqProto::TWatermark poppedWatermark;
bool hasWatermark = channel->Pop(poppedWatermark);
diff --git a/ydb/library/yql/dq/runtime/dq_channel_storage.h b/ydb/library/yql/dq/runtime/dq_channel_storage.h
index d42238f6835..f132d39dd00 100644
--- a/ydb/library/yql/dq/runtime/dq_channel_storage.h
+++ b/ydb/library/yql/dq/runtime/dq_channel_storage.h
@@ -21,8 +21,14 @@ public:
virtual bool IsEmpty() const = 0;
virtual bool IsFull() const = 0;
- // these methods can throw `TDqChannelStorageException`
+ // methods Put/Get can throw `TDqChannelStorageException`
+
+ // TODO: support IZeroCopyInput
virtual void Put(ui64 blobId, TBuffer&& blob) = 0;
+
+ // TODO: there is no way for client to delete blob.
+ // It is better to replace Get() with Pull() which will delete blob after read
+ // (current clients read each blob exactly once)
virtual bool Get(ui64 blobId, TBuffer& data) = 0;
};
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
index c947fb28552..0a8cf3a62af 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
@@ -2,6 +2,7 @@
#include "dq_transport.h"
#include <ydb/library/yql/utils/yql_panic.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h>
#include <util/generic/buffer.h>
#include <util/generic/size_literals.h>
@@ -20,339 +21,75 @@ namespace NYql::NDq {
namespace {
-using namespace NKikimr;
-
-bool IsSafeToEstimateValueSize(const NMiniKQL::TType* type) {
- if (type->GetKind() == NMiniKQL::TType::EKind::Data) {
- return true;
- }
- if (type->GetKind() == NMiniKQL::TType::EKind::Optional) {
- const auto* optionalType = static_cast<const NMiniKQL::TOptionalType*>(type);
- return IsSafeToEstimateValueSize(optionalType->GetItemType());
- }
- if (type->GetKind() == NMiniKQL::TType::EKind::Struct) {
- const auto* structType = static_cast<const NMiniKQL::TStructType*>(type);
- for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
- if (!IsSafeToEstimateValueSize(structType->GetMemberType(i))) {
- return false;
- }
- }
- return true;
- }
- if (type->GetKind() == NMiniKQL::TType::EKind::Tuple) {
- const auto* tupleType = static_cast<const NMiniKQL::TTupleType*>(type);
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- if (!IsSafeToEstimateValueSize(tupleType->GetElementType(i))) {
- return false;
- }
- }
- return true;
- }
- if (type->GetKind() == NMiniKQL::TType::EKind::Variant) {
- const auto* variantType = static_cast<const NMiniKQL::TVariantType*>(type);
- for (ui32 i = 0; i < variantType->GetAlternativesCount(); ++i) {
- if (!IsSafeToEstimateValueSize(variantType->GetAlternativeType(i))) {
- return false;
- }
- }
- return true;
- }
- return false;
-}
-
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-class TDqOutputChannelOld : public IDqOutputChannel {
+class TProfileGuard {
public:
- TDqOutputChannelOld(ui64 channelId, NMiniKQL::TType* outputType, bool collectProfileStats,
- const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, ui64 maxStoredBytes,
- ui64 maxChunkBytes, ui64 chunkSizeLimit, NDqProto::EDataTransportVersion transportVersion, const TLogFunc& logFunc)
- : ChannelId(channelId)
- , OutputType(outputType)
- , BasicStats(ChannelId)
- , ProfileStats(collectProfileStats ? &BasicStats : nullptr)
- , DataSerializer(typeEnv, holderFactory, transportVersion)
- , MaxStoredBytes(maxStoredBytes)
- , MaxChunkBytes(maxChunkBytes)
- , ChunkSizeLimit(chunkSizeLimit)
- , LogFunc(logFunc) {}
-
- ui64 GetChannelId() const override {
- return ChannelId;
- }
-
- ui64 GetValuesCount(bool /* inMemoryOnly */) const override {
- return Data.size();
- }
-
- [[nodiscard]]
- bool IsFull() const override {
- return Data.size() * EstimatedRowBytes >= MaxStoredBytes;
- }
-
- void Push(NUdf::TUnboxedValue&& value) override {
- if (!BasicStats.FirstRowIn) {
- BasicStats.FirstRowIn = TInstant::Now();
- }
-
- ui64 rowsInMemory = Data.size();
-
- LOG("Push request, rows in memory: " << rowsInMemory << ", bytesInMemory: " << (rowsInMemory * EstimatedRowBytes)
- << ", finished: " << Finished);
- YQL_ENSURE(!IsFull());
-
- if (Finished) {
- return;
- }
-
- if (!EstimatedRowBytes) {
- EstimatedRowBytes = IsSafeToEstimateValueSize(OutputType)
- ? TDqDataSerializer::EstimateSize(value, OutputType)
- : DataSerializer.CalcSerializedSize(value, OutputType);
- LOG("EstimatedRowSize: " << EstimatedRowBytes << ", type: " << *OutputType
- << ", safe: " << IsSafeToEstimateValueSize(OutputType));
- }
-
- Data.emplace_back(std::move(value));
- rowsInMemory++;
-
- BasicStats.Bytes += EstimatedRowBytes;
- BasicStats.RowsIn++;
-
- if (Y_UNLIKELY(ProfileStats)) {
- ProfileStats->MaxMemoryUsage = std::max(ProfileStats->MaxMemoryUsage, rowsInMemory * EstimatedRowBytes);
- ProfileStats->MaxRowsInMemory = std::max(ProfileStats->MaxRowsInMemory, rowsInMemory);
- }
- }
-
- void Push(NDqProto::TWatermark&& watermark) override {
- YQL_ENSURE(!Watermark);
- Watermark.ConstructInPlace(std::move(watermark));
- }
-
- void Push(NDqProto::TCheckpoint&& checkpoint) override {
- YQL_ENSURE(!Checkpoint);
- Checkpoint.ConstructInPlace(std::move(checkpoint));
- }
-
- [[nodiscard]]
- bool Pop(NDqProto::TData& data, ui64 bytes) override {
- LOG("Pop request, rows in memory: " << Data.size() << ", finished: " << Finished << ", requested: " << bytes
- << ", estimatedRowSize: " << EstimatedRowBytes);
-
- if (!HasData()) {
- if (Finished) {
- data.SetTransportVersion(DataSerializer.GetTransportVersion());
- data.SetRows(0);
- data.ClearRaw();
- }
- return false;
- }
-
- bytes = std::min(bytes, MaxChunkBytes);
- auto last = Data.begin();
- if (Y_UNLIKELY(ProfileStats)) {
- TInstant startTime = TInstant::Now();
- data = DataSerializer.Serialize(last, Data.end(), OutputType, bytes);
- ProfileStats->SerializationTime += (TInstant::Now() - startTime);
- } else {
- data = DataSerializer.Serialize(last, Data.end(), OutputType, bytes);
- }
-
-
- ui64 takeRows = data.GetRows();
- DLOG("Took " << takeRows << " rows");
-
- if (EstimatedRowBytes) {
- EstimatedRowBytes = EstimatedRowBytes * 0.6 + std::max<ui64>(data.GetRaw().Size() / takeRows, 1) * 0.4;
- DLOG("Recalc estimated row size: " << EstimatedRowBytes);
- }
-
- YQL_ENSURE(data.GetRaw().size() < ChunkSizeLimit);
-
- BasicStats.Chunks++;
- BasicStats.RowsOut += takeRows;
-
- Data.erase(Data.begin(), last);
- return true;
- }
-
- [[nodiscard]]
- bool Pop(NDqProto::TWatermark& watermark) override {
- if (!HasData() && Watermark) {
- watermark = std::move(*Watermark);
- Watermark = Nothing();
- return true;
- }
- return false;
- }
-
- [[nodiscard]]
- bool Pop(NDqProto::TCheckpoint& checkpoint) override {
- if (!HasData() && Checkpoint) {
- checkpoint = std::move(*Checkpoint);
- Checkpoint = Nothing();
- return true;
- }
- return false;
- }
-
- [[nodiscard]]
- bool PopAll(NDqProto::TData& data) override {
- if (Data.empty()) {
- if (Finished) {
- data.SetTransportVersion(DataSerializer.GetTransportVersion());
- data.SetRows(0);
- data.ClearRaw();
- }
- return false;
- }
-
- if (Y_UNLIKELY(ProfileStats)) {
- TInstant startTime = TInstant::Now();
- data = DataSerializer.Serialize(Data.begin(), Data.end(), OutputType);
- ProfileStats->SerializationTime += (TInstant::Now() - startTime);
- } else {
- data = DataSerializer.Serialize(Data.begin(), Data.end(), OutputType);
- }
-
- BasicStats.Chunks++;
- BasicStats.RowsOut += data.GetRows();
-
- Data.clear();
- return true;
- }
-
- bool PopAll(NKikimr::NMiniKQL::TUnboxedValueVector& data) override {
- if (Data.empty()) {
- return false;
- }
-
- data.reserve(data.size() + Data.size());
- for (auto&& v : Data) {
- data.emplace_back(std::move(v));
+ TProfileGuard(TDuration* duration)
+ : Duration(duration)
+ {
+ if (Y_UNLIKELY(duration)) {
+ Start = TInstant::Now();
}
-
- BasicStats.Chunks++;
- BasicStats.RowsOut += data.size();
-
- Data.clear();
- return true;
}
- void Finish() override {
- LOG("Finish request");
- Finished = true;
-
- if (!BasicStats.FirstRowIn) {
- BasicStats.FirstRowIn = TInstant::Now();
+ ~TProfileGuard() {
+ if (Y_UNLIKELY(Duration)) {
+ *Duration += TInstant::Now() - Start;
}
}
-
- bool HasData() const override {
- return !Data.empty();
- }
-
- bool IsFinished() const override {
- return Finished && !HasData();
- }
-
- ui64 Drop() override { // Drop channel data because channel was finished. Leave checkpoint because checkpoints keep going through channel after finishing channel data transfer.
- ui64 rows = Data.size();
- Data.clear();
- TDataType().swap(Data);
- return rows;
- }
-
- NKikimr::NMiniKQL::TType* GetOutputType() const override {
- return OutputType;
- }
-
- const TDqOutputChannelStats* GetStats() const override {
- return &BasicStats;
- }
-
- void Terminate() override {
- }
-
private:
- const ui64 ChannelId;
- NKikimr::NMiniKQL::TType* OutputType;
- TDqOutputChannelStats BasicStats;
- TDqOutputChannelStats* ProfileStats = nullptr;
- TDqDataSerializer DataSerializer;
- const ui64 MaxStoredBytes;
- ui64 MaxChunkBytes;
- ui64 ChunkSizeLimit;
- TLogFunc LogFunc;
-
- using TDataType = TDeque<NUdf::TUnboxedValue, NKikimr::NMiniKQL::TMKQLAllocator<NUdf::TUnboxedValue>>;
- TDataType Data;
-
- ui32 EstimatedRowBytes = 0;
- bool Finished = false;
-
- TMaybe<NDqProto::TWatermark> Watermark;
- TMaybe<NDqProto::TCheckpoint> Checkpoint;
+ TInstant Start;
+ TDuration* Duration;
};
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+using namespace NKikimr;
-class TDqOutputChannelNew : public IDqOutputChannel {
- struct TSpilledBlob;
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+template<bool FastPack>
+class TDqOutputChannel : public IDqOutputChannel {
public:
- TDqOutputChannelNew(ui64 channelId, NMiniKQL::TType* outputType, const NMiniKQL::TTypeEnvironment& typeEnv,
- const NMiniKQL::THolderFactory& holderFactory, const TDqOutputChannelSettings& settings, const TLogFunc& log)
+ TDqOutputChannel(ui64 channelId, NMiniKQL::TType* outputType, const NMiniKQL::TTypeEnvironment& typeEnv,
+ const NMiniKQL::THolderFactory& holderFactory, const TDqOutputChannelSettings& settings, const TLogFunc& logFunc)
: ChannelId(channelId)
, OutputType(outputType)
, BasicStats(ChannelId)
, ProfileStats(settings.CollectProfileStats ? &BasicStats : nullptr)
- , DataSerializer(typeEnv, holderFactory, settings.TransportVersion)
+ , Packer(false, NMiniKQL::TListType::Create(OutputType, typeEnv))
, Storage(settings.ChannelStorage)
+ , HolderFactory(holderFactory)
+ , TransportVersion(settings.TransportVersion)
, MaxStoredBytes(settings.MaxStoredBytes)
, MaxChunkBytes(settings.MaxChunkBytes)
, ChunkSizeLimit(settings.ChunkSizeLimit)
- , LogFunc(log)
+ , LogFunc(logFunc)
{
- if (Storage && 3 * MaxChunkBytes > MaxStoredBytes) {
- MaxChunkBytes = Max(MaxStoredBytes / 3, 1_MB);
- }
}
ui64 GetChannelId() const override {
return ChannelId;
}
- ui64 GetValuesCount(bool inMemoryOnly) const override {
- if (inMemoryOnly) {
- return DataHead.size() + DataTail.size();
- }
- return DataHead.size() + DataTail.size() + SpilledRows;
+ ui64 GetValuesCount() const override {
+ return SpilledRowCount + PackedRowCount + ChunkRowCount;
}
[[nodiscard]]
bool IsFull() const override {
if (!Storage) {
- if (MemoryUsed >= MaxStoredBytes) {
- YQL_ENSURE(HasData());
- return true;
- }
- return false;
+ return PackedDataSize + Packer.PackedSizeEstimate() >= MaxStoredBytes;
}
-
return Storage->IsFull();
}
void Push(NUdf::TUnboxedValue&& value) override {
+ TProfileGuard guard(ProfileStats ? &ProfileStats->SerializationTime : nullptr);
if (!BasicStats.FirstRowIn) {
BasicStats.FirstRowIn = TInstant::Now();
}
- ui64 rowsInMemory = DataHead.size() + DataTail.size();
+ ui64 rowsInMemory = PackedRowCount + ChunkRowCount;
- LOG("Push request, rows in memory: " << rowsInMemory << ", bytesInMemory: " << MemoryUsed
- << ", spilled rows: " << SpilledRows << ", spilled blobs: " << SpilledBlobs.size()
+ LOG("Push request, rows in memory: " << rowsInMemory << ", bytesInMemory: " << (PackedDataSize + Packer.PackedSizeEstimate())
<< ", finished: " << Finished);
YQL_ENSURE(!IsFull());
@@ -360,173 +97,56 @@ public:
return;
}
- ui64 estimatedRowSize;
+ Packer.AddItem(value);
+ value = {};
+ ChunkRowCount++;
+ BasicStats.RowsIn++;
- if (RowFixedSize.has_value()) {
- estimatedRowSize = *RowFixedSize;
- } else {
- bool fixed;
- estimatedRowSize = TDqDataSerializer::EstimateSize(value, OutputType, &fixed);
- if (fixed) {
- RowFixedSize.emplace(estimatedRowSize);
- LOG("Raw has fixed size: " << estimatedRowSize);
- }
+ size_t packerSize = Packer.PackedSizeEstimate();
+ if (packerSize >= MaxChunkBytes) {
+ Data.emplace_back();
+ Data.back().Buffer = Packer.FinishAndPull();
+ BasicStats.Bytes += Data.back().Buffer.Size();
+ PackedDataSize += Data.back().Buffer.Size();
+ PackedRowCount += ChunkRowCount;
+ Data.back().RowCount = ChunkRowCount;
+ ChunkRowCount = 0;
+ packerSize = 0;
}
- DLOG("Row size: " << estimatedRowSize);
+ while (Storage && PackedDataSize && PackedDataSize + packerSize > MaxStoredBytes) {
+ auto& head = Data.front();
+ size_t bufSize = head.Buffer.Size();
+ YQL_ENSURE(PackedDataSize >= bufSize);
- if (SpilledRows == 0) {
- YQL_ENSURE(DataTail.empty() && SizeTail.empty());
- DataHead.emplace_back(std::move(value));
- SizeHead.emplace_back(estimatedRowSize);
- } else {
- DataTail.emplace_back(std::move(value));
- SizeTail.emplace_back(estimatedRowSize);
- }
- rowsInMemory++;
- MemoryUsed += estimatedRowSize;
-
- ui64 spilledRows = 0;
- ui64 spilledBytes = 0;
- ui64 spilledBlobs = 0;
-
- // TODO: add PushAndFinish call
- // if processing is finished we don't have to spill data
-
- if (MemoryUsed > MaxStoredBytes && Storage) {
- LOG("Not enough memory to store rows in memory only, start spilling, rowsInMemory: " << rowsInMemory
- << ", memoryUsed: " << MemoryUsed << ", MaxStoredBytes: " << MaxStoredBytes);
-
- TDataType* data = nullptr;
- TDeque<ui64>* size = nullptr;
-
- ui32 startIndex = std::numeric_limits<ui32>::max();
-
- if (SpilledRows) {
- // we have spilled rows already, so we can spill all DataTail
- data = &DataTail;
- size = &SizeTail;
- startIndex = 0;
- } else {
- // spill from head
- // but only if we have rows for at least 2 full chunks
- ui32 chunks = 1;
- ui64 chunkSize = 0;
- startIndex = 0;
- for (ui64 i = 0; i < SizeHead.size(); ++i) {
- chunkSize += SizeHead.at(i);
- if (chunkSize >= MaxChunkBytes && (i - startIndex) > 0) {
- // LOG("one more chunk, size: " << chunkSize << ", start next chunk from " << i);
- chunks++; // this row goes to the next chunk
- chunkSize = 0;
-
- if (chunks >= 3) {
- data = &DataHead;
- size = &SizeHead;
- break;
- }
-
- startIndex = i;
- }
- }
- }
+ TBuffer blob;
+ blob.Reserve(bufSize + sizeof(head.RowCount));
+ blob.Append((const char*)&head.RowCount, sizeof(head.RowCount));
+ head.Buffer.ForEachPage([&blob](const char *data, size_t len) {
+ blob.Append(data, len);
+ });
- // LOG("about to spill from index " << startIndex);
-
- if (data) {
- while (true) {
- ui64 chunkSize = 0;
- ui32 rowsInChunk = 0;
- ui32 idx = startIndex + spilledRows;
-
- while (idx < size->size()) {
- ui64 rowSize = size->at(idx);
- if (chunkSize == 0 || chunkSize + rowSize <= MaxChunkBytes) {
- chunkSize += rowSize;
- ++rowsInChunk;
- ++idx;
- } else {
- break;
- }
- }
-
- if (rowsInChunk > 0) {
- // LOG("about to spill from " << (startIndex + spilledRows) << " to " << (startIndex + spilledRows + rowsInChunk));
-
- TDataType::iterator first = std::next(data->begin(), startIndex + spilledRows);
- TDataType::iterator last = std::next(data->begin(), startIndex + spilledRows + rowsInChunk);
-
- NDqProto::TData protoBlob = DataSerializer.Serialize(first, last, OutputType);
- TBuffer blob;
- TBufferOutput out{blob};
- protoBlob.SerializeToArcadiaStream(&out);
-
- ui64 blobSize = blob.size();
- ui64 blobId = NextBlobId++;
-
- LOG("Spill blobId: " << blobId << ", size: " << blobSize << ", rows: " << rowsInChunk);
- Storage->Put(blobId, std::move(blob));
-
- SpilledBlobs.emplace_back(TSpilledBlob(blobId, chunkSize, blobSize, rowsInChunk));
- SpilledRows += rowsInChunk;
- MemoryUsed -= chunkSize;
-
- spilledRows += rowsInChunk;
- spilledBytes += blobSize;
- spilledBlobs++;
- } else {
- break;
- }
- }
-
- if (data == &DataHead) {
- // move rows after spilled ones to the DataTail
- YQL_ENSURE(DataTail.empty());
- for (ui32 i = startIndex + spilledRows; i < data->size(); ++i) {
- DataTail.emplace_back(std::move(DataHead[i]));
- SizeTail.emplace_back(SizeHead[i]);
- }
- data->erase(std::next(data->begin(), startIndex), data->end());
- size->erase(std::next(size->begin(), startIndex), size->end());
- } else {
- YQL_ENSURE(startIndex == 0);
- data->erase(data->begin(), std::next(data->begin(), spilledRows));
- size->erase(size->begin(), std::next(size->begin(), spilledRows));
- }
- } else {
- // LOG("nothing to spill");
- }
- }
+ YQL_ENSURE(blob.Size() == bufSize + sizeof(head.RowCount));
+ Storage->Put(NextStoredId++, std::move(blob));
- BasicStats.Bytes += estimatedRowSize;
- BasicStats.RowsIn++;
+ PackedDataSize -= bufSize;
+ PackedRowCount -= head.RowCount;
- if (Y_UNLIKELY(ProfileStats)) {
- ProfileStats->MaxMemoryUsage = std::max(ProfileStats->MaxMemoryUsage, MemoryUsed);
- ProfileStats->MaxRowsInMemory = std::max(ProfileStats->MaxRowsInMemory, DataHead.size() + DataTail.size());
- if (spilledRows) {
- ProfileStats->SpilledRows += spilledRows;
- ProfileStats->SpilledBytes += spilledBytes;
- ProfileStats->SpilledBlobs += spilledBlobs;
- }
- }
+ SpilledRowCount += head.RowCount;
- ValidateUsedMemory();
+ if (Y_UNLIKELY(ProfileStats)) {
+ ProfileStats->SpilledRows += head.RowCount;
+ ProfileStats->SpilledBytes += bufSize + sizeof(head.RowCount);
+ ProfileStats->SpilledBlobs++;
+ }
-#if 0
- TStringStream ss;
- ss << "-- DUMP --" << Endl;
- ss << " Head:" << Endl;
- for (auto& v : DataHead) {
- ss << " "; v.Dump(ss); ss << Endl;
+ Data.pop_front();
}
- ss << " Spilled: " << SpilledRows << Endl;
- ss << " Tail:" << Endl;
- for (auto& v : DataTail) {
- ss << " "; v.Dump(ss); ss << Endl;
+
+ if (Y_UNLIKELY(ProfileStats)) {
+ ProfileStats->MaxMemoryUsage = std::max(ProfileStats->MaxMemoryUsage, PackedDataSize + packerSize);
+ ProfileStats->MaxRowsInMemory = std::max(ProfileStats->MaxRowsInMemory, PackedRowCount);
}
- LOG(ss.Str());
-#endif
}
void Push(NDqProto::TWatermark&& watermark) override {
@@ -540,120 +160,50 @@ public:
}
[[nodiscard]]
- bool Pop(NDqProto::TData& data, ui64 bytes) override {
- LOG("Pop request, rows in memory: " << DataHead.size() << "/" << DataTail.size()
- << ", spilled rows: " << SpilledRows << ", spilled blobs: " << SpilledBlobs.size()
- << ", finished: " << Finished << ", requested: " << bytes << ", maxChunkBytes: " << MaxChunkBytes);
+ bool Pop(NDqProto::TData& data) override {
+ LOG("Pop request, rows in memory: " << GetValuesCount() << ", finished: " << Finished);
if (!HasData()) {
if (Finished) {
- data.SetTransportVersion(DataSerializer.GetTransportVersion());
+ data.SetTransportVersion(TransportVersion);
data.SetRows(0);
data.ClearRaw();
}
return false;
}
- bytes = std::min(bytes, MaxChunkBytes);
-
- if (DataHead.empty()) {
- YQL_ENSURE(SpilledRows);
-
- TSpilledBlob spilledBlob = SpilledBlobs.front();
-
- LOG("Loading spilled blob. BlobId: " << spilledBlob.BlobId << ", rows: " << spilledBlob.Rows
- << ", bytes: " << spilledBlob.SerializedSize);
+ data.SetTransportVersion(TransportVersion);
+ data.ClearRaw();
+ if (FirstStoredId < NextStoredId) {
+ YQL_ENSURE(Storage);
TBuffer blob;
- if (!Storage->Get(spilledBlob.BlobId, blob)) {
- LOG("BlobId " << spilledBlob.BlobId << " not ready yet");
- // not loaded yet
- return false;
- }
-
- YQL_ENSURE(blob.size() == spilledBlob.SerializedSize, "" << blob.size() << " != " << spilledBlob.SerializedSize);
-
- NDqProto::TData protoBlob;
- YQL_ENSURE(protoBlob.ParseFromArray(blob.Data(), blob.size()));
- blob.Reset();
-
- YQL_ENSURE(protoBlob.GetRows() == spilledBlob.Rows);
- bool hasResult = false;
-
- // LOG("bytes: " << bytes << ", loaded blob: " << spilledBlob.SerializedSize);
-
- if (spilledBlob.SerializedSize <= bytes * 2) {
- data.Swap(&protoBlob);
- YQL_ENSURE(data.ByteSizeLong() <= ChunkSizeLimit);
- hasResult = true;
- // LOG("return loaded blob as-is");
- } else {
- NKikimr::NMiniKQL::TUnboxedValueVector buffer;
- DataSerializer.Deserialize(protoBlob, OutputType, buffer);
-
- for (ui32 i = 0; i < buffer.size(); ++i) {
- SizeHead.emplace_back(RowFixedSize
- ? *RowFixedSize
- : TDqDataSerializer::EstimateSize(buffer[i], OutputType));
- DataHead.emplace_back(std::move(buffer[i]));
-
- MemoryUsed += SizeHead.back();
- }
- }
-
- SpilledRows -= spilledBlob.Rows;
-
- SpilledBlobs.pop_front();
- if (SpilledBlobs.empty()) {
- // LOG("no more spilled blobs, move " << DataTail.size() << " tail rows to the head");
- DataHead.insert(DataHead.end(), std::make_move_iterator(DataTail.begin()), std::make_move_iterator(DataTail.end()));
- SizeHead.insert(SizeHead.end(), std::make_move_iterator(SizeTail.begin()), std::make_move_iterator(SizeTail.end()));
- DataTail.clear();
- SizeTail.clear();
- }
-
- if (hasResult) {
- BasicStats.Chunks++;
- BasicStats.RowsOut += data.GetRows();
- ValidateUsedMemory();
- return true;
- }
- }
-
- ui32 takeRows = 0;
- ui64 chunkSize = 0;
-
- while (takeRows == 0 || (takeRows < SizeHead.size() && chunkSize + SizeHead[takeRows] <= bytes)) {
- chunkSize += SizeHead[takeRows];
- ++takeRows;
- }
-
- DLOG("return rows: " << takeRows << ", size: " << chunkSize);
-
- auto firstDataIt = DataHead.begin();
- auto lastDataIt = std::next(firstDataIt, takeRows);
-
- auto firstSizeIt = SizeHead.begin();
- auto lastSizeIt = std::next(firstSizeIt, takeRows);
-
- if (Y_UNLIKELY(ProfileStats)) {
- TInstant startTime = TInstant::Now();
- data = DataSerializer.Serialize(firstDataIt, lastDataIt, OutputType);
- ProfileStats->SerializationTime += (TInstant::Now() - startTime);
+ YQL_ENSURE(Storage->Get(FirstStoredId++, blob), "Lost block in storage");
+ ui64 rows;
+ YQL_ENSURE(blob.size() >= sizeof(rows));
+ std::memcpy((char*)&rows, blob.data(), sizeof(rows));
+ data.SetRows(rows);
+ data.MutableRaw()->insert(data.MutableRaw()->end(), blob.data() + sizeof(rows), blob.data() + blob.size());
+ SpilledRowCount -= rows;
+ } else if (!Data.empty()) {
+ auto& packed = Data.front();
+ data.SetRows(packed.RowCount);
+ data.MutableRaw()->reserve(packed.Buffer.Size());
+ packed.Buffer.CopyTo(*data.MutableRaw());
+ PackedRowCount -= packed.RowCount;
+ PackedDataSize -= packed.Buffer.Size();
+ Data.pop_front();
} else {
- data = DataSerializer.Serialize(firstDataIt, lastDataIt, OutputType);
+ data.SetRows(ChunkRowCount);
+ auto buffer = Packer.FinishAndPull();
+ data.MutableRaw()->reserve(buffer.Size());
+ buffer.CopyTo(*data.MutableRaw());
+ ChunkRowCount = 0;
}
- YQL_ENSURE(data.ByteSizeLong() <= ChunkSizeLimit);
+ DLOG("Took " << data.GetRows() << " rows");
BasicStats.Chunks++;
- BasicStats.RowsOut += takeRows;
-
- DataHead.erase(firstDataIt, lastDataIt);
- SizeHead.erase(firstSizeIt, lastSizeIt);
- MemoryUsed -= chunkSize;
-
- ValidateUsedMemory();
-
+ BasicStats.RowsOut += data.GetRows();
return true;
}
@@ -679,61 +229,61 @@ public:
[[nodiscard]]
bool PopAll(NDqProto::TData& data) override {
- YQL_ENSURE(!SpilledRows);
- YQL_ENSURE(DataTail.empty());
-
- if (DataHead.empty()) {
+ if (!HasData()) {
if (Finished) {
- data.SetTransportVersion(DataSerializer.GetTransportVersion());
+ data.SetTransportVersion(TransportVersion);
data.SetRows(0);
data.ClearRaw();
}
return false;
}
- if (Y_UNLIKELY(ProfileStats)) {
- TInstant startTime = TInstant::Now();
- data = DataSerializer.Serialize(DataHead.begin(), DataHead.end(), OutputType);
- ProfileStats->SerializationTime += (TInstant::Now() - startTime);
- } else {
- data = DataSerializer.Serialize(DataHead.begin(), DataHead.end(), OutputType);
+ data.SetTransportVersion(TransportVersion);
+ data.ClearRaw();
+ if (SpilledRowCount == 0 && PackedRowCount == 0) {
+ data.SetRows(ChunkRowCount);
+ auto buffer = Packer.FinishAndPull();
+ data.MutableRaw()->reserve(buffer.Size());
+ buffer.CopyTo(*data.MutableRaw());
+ ChunkRowCount = 0;
+ return true;
}
- BasicStats.Chunks++;
- BasicStats.RowsOut += data.GetRows();
-
- DataHead.clear();
- SizeHead.clear();
- MemoryUsed = 0;
-
- return true;
- }
-
- bool PopAll(NKikimr::NMiniKQL::TUnboxedValueVector& data) override {
- YQL_ENSURE(!SpilledRows);
- YQL_ENSURE(DataTail.empty());
-
- if (DataHead.empty()) {
- return false;
+ // Repack all - thats why PopAll should never be used
+ if (ChunkRowCount) {
+ Data.emplace_back();
+ Data.back().Buffer = Packer.FinishAndPull();
+ BasicStats.Bytes += Data.back().Buffer.Size();
+ PackedDataSize += Data.back().Buffer.Size();
+ PackedRowCount += ChunkRowCount;
+ Data.back().RowCount = ChunkRowCount;
+ ChunkRowCount = 0;
}
- data.reserve(data.size() + DataHead.size());
- for (auto&& v : DataHead) {
- data.emplace_back(std::move(v));
+ NKikimr::NMiniKQL::TUnboxedValueVector rows;
+ for (;;) {
+ NDqProto::TData chunk;
+ if (!this->Pop(chunk)) {
+ break;
+ }
+ TStringBuf buf(chunk.GetRaw());
+ Packer.UnpackBatch(buf, HolderFactory, rows);
}
- BasicStats.Chunks++;
- BasicStats.RowsOut += data.size();
-
- DataHead.clear();
- SizeHead.clear();
- MemoryUsed = 0;
+ for (auto& row : rows) {
+ Packer.AddItem(row);
+ }
+ data.SetRows(rows.size());
+ auto buffer = Packer.FinishAndPull();
+ data.MutableRaw()->reserve(buffer.Size());
+ buffer.CopyTo(*data.MutableRaw());
+ YQL_ENSURE(!HasData());
return true;
}
void Finish() override {
- DLOG("Finish request");
+ LOG("Finish request");
Finished = true;
if (!BasicStats.FirstRowIn) {
@@ -742,7 +292,7 @@ public:
}
bool HasData() const override {
- return !DataHead.empty() || SpilledRows != 0 || !DataTail.empty();
+ return GetValuesCount() != 0;
}
bool IsFinished() const override {
@@ -750,17 +300,11 @@ public:
}
ui64 Drop() override { // Drop channel data because channel was finished. Leave checkpoint because checkpoints keep going through channel after finishing channel data transfer.
- ui64 rows = DataHead.size() + SpilledRows + DataTail.size();
- DataHead.clear();
- SizeHead.clear();
- DataTail.clear();
- SizeTail.clear();
- MemoryUsed = 0;
- TDataType().swap(DataHead);
- TDataType().swap(DataTail);
- // todo: send remove request
- SpilledRows = 0;
- SpilledBlobs.clear();
+ ui64 rows = GetValuesCount();
+ Data.clear();
+ Packer.Clear();
+ SpilledRowCount = PackedDataSize = PackedRowCount = ChunkRowCount = 0;
+ FirstStoredId = NextStoredId;
return rows;
}
@@ -773,18 +317,6 @@ public:
}
void Terminate() override {
- if (Storage) {
- Storage.Reset();
- }
- }
-
- void ValidateUsedMemory() {
-#ifndef NDEBUG
- ui64 x = 0;
- for (auto z : SizeHead) x += z;
- for (auto z : SizeTail) x += z;
- YQL_ENSURE(x == MemoryUsed, "" << x << " != " << MemoryUsed);
-#endif
}
private:
@@ -792,36 +324,29 @@ private:
NKikimr::NMiniKQL::TType* OutputType;
TDqOutputChannelStats BasicStats;
TDqOutputChannelStats* ProfileStats = nullptr;
- TDqDataSerializer DataSerializer;
- IDqChannelStorage::TPtr Storage;
+ NKikimr::NMiniKQL::TValuePackerTransport<FastPack> Packer;
+ const IDqChannelStorage::TPtr Storage;
+ const NMiniKQL::THolderFactory& HolderFactory;
+ const NDqProto::EDataTransportVersion TransportVersion;
const ui64 MaxStoredBytes;
- ui64 MaxChunkBytes;
- ui64 ChunkSizeLimit;
+ const ui64 MaxChunkBytes;
+ const ui64 ChunkSizeLimit;
TLogFunc LogFunc;
- std::optional<ui32> RowFixedSize;
-
- struct TSpilledBlob {
- ui64 BlobId;
- ui64 InMemorySize;
- ui64 SerializedSize;
- ui32 Rows;
- TSpilledBlob(ui64 blobId, ui64 inMemorySize, ui64 serializedSize, ui32 rows)
- : BlobId(blobId), InMemorySize(inMemorySize), SerializedSize(serializedSize), Rows(rows) {}
+ struct TSerializedBatch {
+ NKikimr::NMiniKQL::TPagedBuffer Buffer;
+ ui64 RowCount = 0;
};
+ std::deque<TSerializedBatch> Data;
- // DataHead ( . SpilledBlobs (. DataTail)? )?
- using TDataType = TDeque<NUdf::TUnboxedValue, NKikimr::NMiniKQL::TMKQLAllocator<NUdf::TUnboxedValue>>;
- TDataType DataHead;
- TDeque<ui64> SizeHead;
- TDeque<TSpilledBlob> SpilledBlobs;
- TDataType DataTail;
- TDeque<ui64> SizeTail;
+ size_t SpilledRowCount = 0;
+ ui64 FirstStoredId = 0;
+ ui64 NextStoredId = 0;
- ui64 MemoryUsed = 0; // approx memory usage
-
- ui64 NextBlobId = 1;
- ui64 SpilledRows = 0;
+ size_t PackedDataSize = 0;
+ size_t PackedRowCount = 0;
+
+ size_t ChunkRowCount = 0;
bool Finished = false;
@@ -836,12 +361,14 @@ IDqOutputChannel::TPtr CreateDqOutputChannel(ui64 channelId, NKikimr::NMiniKQL::
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const TDqOutputChannelSettings& settings, const TLogFunc& logFunc)
{
- if (settings.AllowGeneratorsInUnboxedValues) {
- YQL_ENSURE(!settings.ChannelStorage);
- return new TDqOutputChannelOld(channelId, outputType, settings.CollectProfileStats, typeEnv, holderFactory,
- settings.MaxStoredBytes, settings.MaxChunkBytes, settings.ChunkSizeLimit, settings.TransportVersion, logFunc);
+ if (settings.TransportVersion == NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0 ||
+ settings.TransportVersion == NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED)
+ {
+ return new TDqOutputChannel<false>(channelId, outputType, typeEnv, holderFactory, settings, logFunc);
} else {
- return new TDqOutputChannelNew(channelId, outputType, typeEnv, holderFactory, settings, logFunc);
+ YQL_ENSURE(settings.TransportVersion == NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_FAST_PICKLE_1_0,
+ "Unsupported transport version " << (ui32)settings.TransportVersion);
+ return new TDqOutputChannel<true>(channelId, outputType, typeEnv, holderFactory, settings, logFunc);
}
}
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.h b/ydb/library/yql/dq/runtime/dq_output_channel.h
index 8c74f3b71b6..49b65c2eda7 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.h
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.h
@@ -46,12 +46,12 @@ public:
using TPtr = TIntrusivePtr<IDqOutputChannel>;
virtual ui64 GetChannelId() const = 0;
- virtual ui64 GetValuesCount(bool inMemoryOnly = true) const = 0;
+ virtual ui64 GetValuesCount() const = 0;
// <| consumer methods
// can throw TDqChannelStorageException
[[nodiscard]]
- virtual bool Pop(NDqProto::TData& data, ui64 bytes) = 0;
+ virtual bool Pop(NDqProto::TData& data) = 0;
// Pop watermark.
[[nodiscard]]
virtual bool Pop(NDqProto::TWatermark& watermark) = 0;
@@ -65,7 +65,6 @@ public:
// can throw TDqChannelStorageException
[[nodiscard]]
virtual bool PopAll(NDqProto::TData& data) = 0;
- virtual bool PopAll(NKikimr::NMiniKQL::TUnboxedValueVector& rows) = 0;
// |>
virtual ui64 Drop() = 0;
@@ -81,7 +80,6 @@ struct TDqOutputChannelSettings {
NDqProto::EDataTransportVersion TransportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0;
IDqChannelStorage::TPtr ChannelStorage;
bool CollectProfileStats = false;
- bool AllowGeneratorsInUnboxedValues = true;
};
IDqOutputChannel::TPtr CreateDqOutputChannel(ui64 channelId, NKikimr::NMiniKQL::TType* outputType,
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp
index 609436f0e8b..fac3102ea74 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp
@@ -84,13 +84,12 @@ struct TTestContext {
}
};
-void TestSingleRead(TTestContext& ctx, bool quantum) {
+void TestSingleRead(TTestContext& ctx) {
TDqOutputChannelSettings settings;
settings.MaxStoredBytes = 1000;
settings.MaxChunkBytes = 200;
settings.CollectProfileStats = true;
settings.TransportVersion = ctx.TransportVersion;
- settings.AllowGeneratorsInUnboxedValues = quantum;
auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log);
@@ -105,7 +104,7 @@ void TestSingleRead(TTestContext& ctx, bool quantum) {
UNIT_ASSERT_VALUES_EQUAL(0, ch->GetStats()->RowsOut);
NDqProto::TData data;
- UNIT_ASSERT(ch->Pop(data, 1000));
+ UNIT_ASSERT(ch->Pop(data));
UNIT_ASSERT_VALUES_EQUAL(10, data.GetRows());
UNIT_ASSERT_VALUES_EQUAL(1, ch->GetStats()->Chunks);
@@ -122,16 +121,15 @@ void TestSingleRead(TTestContext& ctx, bool quantum) {
}
data.Clear();
- UNIT_ASSERT(!ch->Pop(data, 1000));
+ UNIT_ASSERT(!ch->Pop(data));
}
-void TestPartialRead(TTestContext& ctx, bool quantum) {
+void TestPartialRead(TTestContext& ctx) {
TDqOutputChannelSettings settings;
settings.MaxStoredBytes = 1000;
- settings.MaxChunkBytes = 100;
+ settings.MaxChunkBytes = 17;
settings.CollectProfileStats = true;
settings.TransportVersion = ctx.TransportVersion;
- settings.AllowGeneratorsInUnboxedValues = quantum;
auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log);
@@ -146,19 +144,15 @@ void TestPartialRead(TTestContext& ctx, bool quantum) {
UNIT_ASSERT_VALUES_EQUAL(0, ch->GetStats()->RowsOut);
int req = 0;
- ui32 expected[] = {3, 3, 3};
- ui32 expectedQ[] = {3, 3, 3};
+ ui32 expected[] = {2, 2, 2, 2, 1};
ui32 readChunks = 0;
ui32 readRows = 0;
while (readRows < 9) {
NDqProto::TData data;
- size_t limit = quantum ?
- // packed size is 13 byte header + 2 bytes for each row
- 20 : 50;
- UNIT_ASSERT(ch->Pop(data, limit));
+ UNIT_ASSERT(ch->Pop(data));
- ui32 v = quantum ? expectedQ[req] : expected[req];
+ ui32 v = expected[req];
++req;
UNIT_ASSERT_VALUES_EQUAL(v, data.GetRows());
@@ -180,16 +174,15 @@ void TestPartialRead(TTestContext& ctx, bool quantum) {
}
NDqProto::TData data;
- UNIT_ASSERT(!ch->Pop(data, 1000));
+ UNIT_ASSERT(!ch->Pop(data));
}
-void TestOverflow(TTestContext& ctx, bool quantum) {
+void TestOverflow(TTestContext& ctx) {
TDqOutputChannelSettings settings;
- settings.MaxStoredBytes = 100;
+ settings.MaxStoredBytes = 30;
settings.MaxChunkBytes = 10;
settings.CollectProfileStats = true;
settings.TransportVersion = ctx.TransportVersion;
- settings.AllowGeneratorsInUnboxedValues = quantum;
auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log);
@@ -212,13 +205,12 @@ void TestOverflow(TTestContext& ctx, bool quantum) {
}
}
-void TestPopAll(TTestContext& ctx, bool quantum) {
+void TestPopAll(TTestContext& ctx) {
TDqOutputChannelSettings settings;
settings.MaxStoredBytes = 1000;
settings.MaxChunkBytes = 10;
settings.CollectProfileStats = true;
settings.TransportVersion = ctx.TransportVersion;
- settings.AllowGeneratorsInUnboxedValues = quantum;
auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log);
@@ -247,16 +239,15 @@ void TestPopAll(TTestContext& ctx, bool quantum) {
}
data.Clear();
- UNIT_ASSERT(!ch->Pop(data, 100'500));
+ UNIT_ASSERT(!ch->Pop(data));
}
-void TestBigRow(TTestContext& ctx, bool quantum) {
+void TestBigRow(TTestContext& ctx) {
TDqOutputChannelSettings settings;
settings.MaxStoredBytes = std::numeric_limits<ui32>::max();
settings.MaxChunkBytes = 2_MB;
settings.CollectProfileStats = true;
settings.TransportVersion = ctx.TransportVersion;
- settings.AllowGeneratorsInUnboxedValues = quantum;
auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log);
@@ -277,12 +268,31 @@ void TestBigRow(TTestContext& ctx, bool quantum) {
UNIT_ASSERT_VALUES_EQUAL(9, ch->GetStats()->RowsIn);
UNIT_ASSERT_VALUES_EQUAL(0, ch->GetStats()->RowsOut);
- for (ui32 i = 1; i < 10; ++i) {
+ {
NDqProto::TData data;
- UNIT_ASSERT(ch->Pop(data, 1_MB));
+ UNIT_ASSERT(ch->Pop(data));
+
+ UNIT_ASSERT_VALUES_EQUAL(2, data.GetRows());
+ UNIT_ASSERT_VALUES_EQUAL(1, ch->GetStats()->Chunks);
+ UNIT_ASSERT_VALUES_EQUAL(9, ch->GetStats()->RowsIn);
+ UNIT_ASSERT_VALUES_EQUAL(2, ch->GetStats()->RowsOut);
+
+ TUnboxedValueVector buffer;
+ ctx.Ds.Deserialize(data, ctx.OutputType, buffer);
+
+ UNIT_ASSERT_VALUES_EQUAL(2, buffer.size());
+ for (ui32 i = 1; i < 3; ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(i, buffer[i - 1].GetElement(0).Get<i32>());
+ UNIT_ASSERT_VALUES_EQUAL(i * i, buffer[i - 1].GetElement(1).Get<ui64>());
+ }
+ }
+
+ for (ui32 i = 3; i < 10; ++i) {
+ NDqProto::TData data;
+ UNIT_ASSERT(ch->Pop(data));
UNIT_ASSERT_VALUES_EQUAL(1, data.GetRows());
- UNIT_ASSERT_VALUES_EQUAL(i, ch->GetStats()->Chunks);
+ UNIT_ASSERT_VALUES_EQUAL(i - 1, ch->GetStats()->Chunks);
UNIT_ASSERT_VALUES_EQUAL(9, ch->GetStats()->RowsIn);
UNIT_ASSERT_VALUES_EQUAL(i, ch->GetStats()->RowsOut);
@@ -295,17 +305,15 @@ void TestBigRow(TTestContext& ctx, bool quantum) {
}
NDqProto::TData data;
- UNIT_ASSERT(!ch->Pop(data, 10_MB));
+ UNIT_ASSERT(!ch->Pop(data));
}
-
void TestSpillWithMockStorage(TTestContext& ctx) {
TDqOutputChannelSettings settings;
settings.MaxStoredBytes = 100;
- settings.MaxChunkBytes = 10;
+ settings.MaxChunkBytes = 20;
settings.CollectProfileStats = true;
settings.TransportVersion = ctx.TransportVersion;
- settings.AllowGeneratorsInUnboxedValues = false;
auto storage = MakeIntrusive<TMockChannelStorage>(100'500ul);
settings.ChannelStorage = storage;
@@ -318,74 +326,35 @@ void TestSpillWithMockStorage(TTestContext& ctx) {
ch->Push(std::move(row));
}
- UNIT_ASSERT_VALUES_EQUAL(7, ch->GetValuesCount(/* inMemoryOnly */ true));
- UNIT_ASSERT_VALUES_EQUAL(35, ch->GetValuesCount(/* inMemoryOnly */ false));
+ UNIT_ASSERT_VALUES_EQUAL(35, ch->GetValuesCount());
UNIT_ASSERT_VALUES_EQUAL(35, ch->GetStats()->RowsIn);
UNIT_ASSERT_VALUES_EQUAL(0, ch->GetStats()->RowsOut);
- UNIT_ASSERT_VALUES_EQUAL(35 - 7, ch->GetStats()->SpilledRows);
- UNIT_ASSERT_VALUES_EQUAL(35 - 7, ch->GetStats()->SpilledBlobs);
- UNIT_ASSERT(ch->GetStats()->SpilledBytes > 200);
+ UNIT_ASSERT_VALUES_EQUAL(18, ch->GetStats()->SpilledRows);
+ UNIT_ASSERT_VALUES_EQUAL(5, ch->GetStats()->SpilledBlobs);
+ UNIT_ASSERT(ch->GetStats()->SpilledBytes > 5 * 8);
ui32 loadedRows = 0;
- storage->SetBlankGetRequests(2);
-
- {
- Cerr << "-- pop rows before spilled ones\n";
- NDqProto::TData data;
- while (ch->Pop(data, 1000)) {
- TUnboxedValueVector buffer;
- ctx.Ds.Deserialize(data, ctx.OutputType, buffer);
- UNIT_ASSERT_VALUES_EQUAL(data.GetRows(), buffer.size());
- for (ui32 i = 0; i < data.GetRows(); ++i) {
- auto j = loadedRows + i;
- UNIT_ASSERT_VALUES_EQUAL(j, buffer[i].GetElement(0).Get<i32>());
- UNIT_ASSERT_VALUES_EQUAL(j * j, buffer[i].GetElement(1).Get<ui64>());
- }
+ NDqProto::TData data;
+ while (ch->Pop(data)) {
+ TUnboxedValueVector buffer;
+ ctx.Ds.Deserialize(data, ctx.OutputType, buffer);
- loadedRows += data.GetRows();
+ UNIT_ASSERT_VALUES_EQUAL(data.GetRows(), buffer.size());
+ for (ui32 i = 0; i < data.GetRows(); ++i) {
+ auto j = loadedRows + i;
+ UNIT_ASSERT_VALUES_EQUAL(j, buffer[i].GetElement(0).Get<i32>());
+ UNIT_ASSERT_VALUES_EQUAL(j * j, buffer[i].GetElement(1).Get<ui64>());
}
- }
-
- UNIT_ASSERT_VALUES_EQUAL(7 - loadedRows, ch->GetValuesCount(/* inMemoryOnly */ true));
- UNIT_ASSERT_VALUES_EQUAL(35 - loadedRows, ch->GetValuesCount(/* inMemoryOnly */ false));
-
- // just blank request
- {
- NDqProto::TData data;
- UNIT_ASSERT(!ch->Pop(data, 1000));
-
- UNIT_ASSERT_VALUES_EQUAL(7 - loadedRows, ch->GetValuesCount(/* inMemoryOnly */ true));
- UNIT_ASSERT_VALUES_EQUAL(35 - loadedRows, ch->GetValuesCount(/* inMemoryOnly */ false));
- }
-
- while (loadedRows < 35) {
- NDqProto::TData data;
- while (ch->Pop(data, 10)) {
- storage->SetBlankGetRequests(1);
-
- TUnboxedValueVector buffer;
- ctx.Ds.Deserialize(data, ctx.OutputType, buffer);
- UNIT_ASSERT_VALUES_EQUAL(data.GetRows(), buffer.size());
- for (ui32 i = 0; i < data.GetRows(); ++i) {
- auto j = loadedRows + i;
- UNIT_ASSERT_VALUES_EQUAL(j, buffer[i].GetElement(0).Get<i32>());
- UNIT_ASSERT_VALUES_EQUAL(j * j, buffer[i].GetElement(1).Get<ui64>());
- }
-
- loadedRows += data.GetRows();
-
- UNIT_ASSERT_VALUES_EQUAL(35 - loadedRows, ch->GetValuesCount(/* inMemoryOnly */ false));
- }
+ loadedRows += data.GetRows();
}
-
UNIT_ASSERT_VALUES_EQUAL(35, loadedRows);
+ UNIT_ASSERT_VALUES_EQUAL(0, ch->GetValuesCount());
// in memory only
{
- storage->SetBlankGetRequests(0);
loadedRows = 0;
for (i32 i = 100; i < 105; ++i) {
@@ -394,11 +363,10 @@ void TestSpillWithMockStorage(TTestContext& ctx) {
ch->Push(std::move(row));
}
- UNIT_ASSERT_VALUES_EQUAL(5, ch->GetValuesCount(/* inMemoryOnly */ true));
- UNIT_ASSERT_VALUES_EQUAL(5, ch->GetValuesCount(/* inMemoryOnly */ false));
+ UNIT_ASSERT_VALUES_EQUAL(5, ch->GetValuesCount());
NDqProto::TData data;
- while (ch->Pop(data, 1000)) {
+ while (ch->Pop(data)) {
TUnboxedValueVector buffer;
ctx.Ds.Deserialize(data, ctx.OutputType, buffer);
@@ -411,19 +379,17 @@ void TestSpillWithMockStorage(TTestContext& ctx) {
loadedRows += data.GetRows();
}
+ UNIT_ASSERT_VALUES_EQUAL(5, loadedRows);
+ UNIT_ASSERT_VALUES_EQUAL(0, ch->GetValuesCount());
}
-
- UNIT_ASSERT_VALUES_EQUAL(0, ch->GetValuesCount(/* inMemoryOnly */ true));
- UNIT_ASSERT_VALUES_EQUAL(0, ch->GetValuesCount(/* inMemoryOnly */ false));
}
void TestOverflowWithMockStorage(TTestContext& ctx) {
TDqOutputChannelSettings settings;
- settings.MaxStoredBytes = 100;
+ settings.MaxStoredBytes = 500;
settings.MaxChunkBytes = 10;
settings.CollectProfileStats = true;
settings.TransportVersion = ctx.TransportVersion;
- settings.AllowGeneratorsInUnboxedValues = false;
auto storage = MakeIntrusive<TMockChannelStorage>(500ul);
settings.ChannelStorage = storage;
@@ -441,66 +407,40 @@ void TestOverflowWithMockStorage(TTestContext& ctx) {
// UNIT_ASSERT(ch->IsFull()); it can be false-negative with storage enabled
try {
- auto row = ctx.CreateRow(100'500);
- ch->Push(std::move(row));
+ ch->Push(ctx.CreateBigRow(0, 100'500));
UNIT_FAIL("");
- } catch (yexception& e) {
+ } catch (yexception &e) {
UNIT_ASSERT(TString(e.what()).Contains("Space limit exceeded"));
}
}
} // anonymous namespace
-Y_UNIT_TEST_SUITE(DqOutputChannelNoStorageTests) {
+Y_UNIT_TEST_SUITE(DqOutputChannelTests) {
Y_UNIT_TEST(SingleRead) {
TTestContext ctx;
- TestSingleRead(ctx, false);
-}
-
-Y_UNIT_TEST(SingleReadQ) {
- TTestContext ctx;
- TestSingleRead(ctx, true);
+ TestSingleRead(ctx);
}
Y_UNIT_TEST(PartialRead) {
TTestContext ctx;
- TestPartialRead(ctx, false);
-}
-
-Y_UNIT_TEST(PartialReadQ) {
- TTestContext ctx;
- TestPartialRead(ctx, true);
+ TestPartialRead(ctx);
}
Y_UNIT_TEST(Overflow) {
TTestContext ctx;
- TestOverflow(ctx, false);
-}
-
-Y_UNIT_TEST(OverflowQ) {
- TTestContext ctx;
- TestOverflow(ctx, true);
+ TestOverflow(ctx);
}
Y_UNIT_TEST(PopAll) {
TTestContext ctx;
- TestPopAll(ctx, false);
-}
-
-Y_UNIT_TEST(PopAllQ) {
- TTestContext ctx;
- TestPopAll(ctx, true);
+ TestPopAll(ctx);
}
Y_UNIT_TEST(BigRow) {
TTestContext ctx(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true);
- TestBigRow(ctx, false);
-}
-
-Y_UNIT_TEST(BigRowQ) {
- TTestContext ctx(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true);
- TestBigRow(ctx, true);
+ TestBigRow(ctx);
}
}
@@ -513,7 +453,7 @@ Y_UNIT_TEST(Spill) {
}
Y_UNIT_TEST(Overflow) {
- TTestContext ctx;
+ TTestContext ctx(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true);
TestOverflowWithMockStorage(ctx);
}
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index ac9f9040b2c..c31f1afde3c 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -642,7 +642,6 @@ public:
settings.ChunkSizeLimit = memoryLimits.ChunkSizeLimit;
settings.TransportVersion = outputChannelDesc.GetTransportVersion();
settings.CollectProfileStats = Settings.CollectProfileStats;
- settings.AllowGeneratorsInUnboxedValues = Settings.AllowGeneratorsInUnboxedValues;
if (!outputChannelDesc.GetInMemory()) {
settings.ChannelStorage = execCtx.CreateChannelStorage(channelId);
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index 713b5d3c60d..c400b0de798 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -259,7 +259,6 @@ struct TDqTaskRunnerSettings {
bool CollectBasicStats = false;
bool CollectProfileStats = false;
bool TerminateOnError = false;
- bool AllowGeneratorsInUnboxedValues = true;
bool UseCacheForLLVM = false;
TString OptLLVM = "";
THashMap<TString, TString> SecureParams;
diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp
index 2872774437a..2e006b0ce4d 100644
--- a/ydb/library/yql/dq/runtime/dq_transport.cpp
+++ b/ydb/library/yql/dq/runtime/dq_transport.cpp
@@ -125,13 +125,6 @@ NDqProto::TData TDqDataSerializer::SerializeParamValue(const TType* type, const
return data;
}
-ui64 TDqDataSerializer::CalcSerializedSize(NUdf::TUnboxedValue& value, const NKikimr::NMiniKQL::TType* itemType) const {
- auto data = Serialize(value, itemType);
- // YQL-9648
- Deserialize(data, itemType, value);
- return data.GetRaw().size();
-}
-
namespace {
std::optional<ui64> EstimateIntegralDataSize(const TDataType* dataType) {
diff --git a/ydb/library/yql/dq/runtime/dq_transport.h b/ydb/library/yql/dq/runtime/dq_transport.h
index 0d7f4eb382d..36fa0d9b263 100644
--- a/ydb/library/yql/dq/runtime/dq_transport.h
+++ b/ydb/library/yql/dq/runtime/dq_transport.h
@@ -26,22 +26,17 @@ public:
template <class TForwardIterator>
NDqProto::TData Serialize(TForwardIterator first, TForwardIterator last, const NKikimr::NMiniKQL::TType* itemType) const {
- return Serialize(first, last, itemType, {});
- }
-
- template <class TForwardIterator>
- NDqProto::TData Serialize(TForwardIterator& first, TForwardIterator last, const NKikimr::NMiniKQL::TType* itemType, TMaybe<size_t> limit) const {
const auto listType = NKikimr::NMiniKQL::TListType::Create(const_cast<NKikimr::NMiniKQL::TType*>(itemType), TypeEnv);
if (TransportVersion == NDqProto::DATA_TRANSPORT_VERSION_UNSPECIFIED ||
TransportVersion == NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0)
{
NKikimr::NMiniKQL::TValuePackerTransport<false> packer(listType);
- return SerializeBatch(packer, first, last, limit);
+ return SerializeBatch(packer, first, last);
}
if (TransportVersion == NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0) {
NKikimr::NMiniKQL::TValuePackerTransport<true> packer(listType);
- return SerializeBatch(packer, first, last, limit);
+ return SerializeBatch(packer, first, last);
}
YQL_ENSURE(false, "Unsupported TransportVersion");
}
@@ -50,8 +45,6 @@ public:
NKikimr::NMiniKQL::TUnboxedValueVector& buffer) const;
void Deserialize(const NDqProto::TData& data, const NKikimr::NMiniKQL::TType* itemType, NUdf::TUnboxedValue& value) const;
- ui64 CalcSerializedSize(NUdf::TUnboxedValue& value, const NKikimr::NMiniKQL::TType* type) const;
-
struct TEstimateSizeSettings {
bool WithHeaders;
bool DiscardUnsupportedTypes;
@@ -75,14 +68,10 @@ public:
const NDqProto::EDataTransportVersion TransportVersion;
private:
template <class TForwardIterator, class TPacker>
- NDqProto::TData SerializeBatch(TPacker& packer, TForwardIterator& first, TForwardIterator last, TMaybe<size_t> limit) const {
+ NDqProto::TData SerializeBatch(TPacker& packer, TForwardIterator first, TForwardIterator last) const {
size_t count = 0;
while (first != last) {
packer.AddItem(*first);
- if (limit.Defined() && count && packer.PackedSizeEstimate() > *limit) {
- packer.Rollback(*first);
- break;
- }
++first;
++count;
}
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
index 3cc224c7439..5ed7cf03acf 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
@@ -927,7 +927,7 @@ void TValuePackerTransport<Fast>::UnpackBatch(TStringBuf buf, const THolderFacto
len = NDetails::GetRawData<ui64>(buf);
}
- result.reserve(len);
+ result.reserve(len + result.size());
for (ui64 i = 0; i < len; ++i) {
result.emplace_back(UnpackFromContigousBuffer<Fast>(itemType, buf, topLength, holderFactory, s));
}
@@ -962,12 +962,6 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddItem(const NUdf::TU
State_.OptionalUsageMask.Reset();
Buffer_.ReserveHeader(sizeof(ui32) + State_.OptionalMaskReserve + MAX_PACKED64_SIZE);
}
- Rollback_.ConstructInPlace();
- }
-
- Rollback_->BufferSize = Buffer_.Size();
- if constexpr (!Fast) {
- Rollback_->OptionalCount = State_.OptionalUsageMask.GetOptionalCount();
}
PackImpl<Fast, false>(itemType, Buffer_, value, State_);
@@ -976,14 +970,9 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddItem(const NUdf::TU
}
template<bool Fast>
-void TValuePackerTransport<Fast>::Rollback(NUdf::TUnboxedValuePod& value) {
- MKQL_ENSURE(Rollback_.Defined() && ItemCount_ > 0, "AddItem() was never called or RemoveLastItem() is called twice in a row");
- Buffer_.Resize(Rollback_->BufferSize);
- if constexpr (!Fast) {
- State_.OptionalUsageMask.Shrink(Rollback_->OptionalCount);
- }
- --ItemCount_;
- Rollback_ = {};
+void TValuePackerTransport<Fast>::Clear() {
+ Buffer_.Clear();
+ ItemCount_ = 0;
}
template<bool Fast>
@@ -997,7 +986,6 @@ const TPagedBuffer& TValuePackerTransport<Fast>::Finish() {
BuildMeta(true);
}
ItemCount_ = 0;
- Rollback_ = {};
return Buffer_;
}
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
index b388ded4db9..cc4fabc071d 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
@@ -78,11 +78,10 @@ public:
// incremental packing - works only for List<T> type
TSelf& AddItem(const NUdf::TUnboxedValuePod& value);
- // can only be called once
- void Rollback(NUdf::TUnboxedValuePod& value);
size_t PackedSizeEstimate() const {
return Buffer_.Size() + Buffer_.ReservedHeaderSize();
}
+ void Clear();
const TPagedBuffer& Finish();
TPagedBuffer FinishAndPull();
@@ -93,14 +92,8 @@ public:
private:
void BuildMeta(bool addItemCount) const;
- struct TRollbackState {
- ui32 OptionalCount = 0;
- size_t BufferSize = 0;
- };
-
const TType* const Type_;
ui64 ItemCount_ = 0;
- TMaybe<TRollbackState> Rollback_;
mutable TPagedBuffer Buffer_;
mutable NDetails::TPackerState State_;
};
diff --git a/ydb/library/yql/minikql/computation/mkql_optional_usage_mask.h b/ydb/library/yql/minikql/computation/mkql_optional_usage_mask.h
index 060423f5876..91d29d978f1 100644
--- a/ydb/library/yql/minikql/computation/mkql_optional_usage_mask.h
+++ b/ydb/library/yql/minikql/computation/mkql_optional_usage_mask.h
@@ -32,16 +32,6 @@ public:
}
}
- ui32 GetOptionalCount() const {
- return CountOfOptional;
- }
-
- void Shrink(ui32 newSize) {
- Y_VERIFY(newSize <= CountOfOptional, "Invalid shrink size");
- Mask.Reset(newSize, CountOfOptional);
- CountOfOptional = newSize;
- }
-
void SetNextEmptyOptional(bool empty) {
if (empty) {
Mask.Set(CountOfOptional);
diff --git a/ydb/library/yql/minikql/mkql_buffer.cpp b/ydb/library/yql/minikql/mkql_buffer.cpp
index 765b94b460a..824b4841710 100644
--- a/ydb/library/yql/minikql/mkql_buffer.cpp
+++ b/ydb/library/yql/minikql/mkql_buffer.cpp
@@ -28,7 +28,6 @@ void TPagedBuffer::AppendPage() {
page->Clear();
} else {
page = TBufferPage::Allocate();
- page->Prev_ = tailPage;
tailPage->Next_ = page;
}
tailPage->Size_ = TailSize_;
diff --git a/ydb/library/yql/minikql/mkql_buffer.h b/ydb/library/yql/minikql/mkql_buffer.h
index 2a505c600d3..fa5690514c6 100644
--- a/ydb/library/yql/minikql/mkql_buffer.h
+++ b/ydb/library/yql/minikql/mkql_buffer.h
@@ -25,18 +25,10 @@ public:
return Next_;
}
- inline const TBufferPage* Prev() const {
- return Prev_;
- }
-
inline TBufferPage* Next() {
return Next_;
}
- inline TBufferPage* Prev() {
- return Prev_;
- }
-
inline size_t Size() const {
return Size_;
}
@@ -55,7 +47,6 @@ public:
private:
TBufferPage* Next_ = nullptr;
- TBufferPage* Prev_ = nullptr;
size_t Size_ = 0;
static TBufferPage* Allocate();
@@ -194,25 +185,6 @@ public:
TailSize_ -= len;
}
- inline void Resize(size_t newSize) {
- size_t sz = Size();
- if (newSize >= sz) {
- return Advance(newSize - sz);
- }
- size_t delta = sz - newSize;
- Y_VERIFY_DEBUG(Tail_);
- while (delta > TailSize_) {
- delta -= TailSize_;
- auto prev = TBufferPage::GetPage(Tail_)->Prev();
- Y_VERIFY_DEBUG(prev);
- Y_VERIFY_DEBUG(prev->Size() <= ClosedPagesSize_);
- ClosedPagesSize_ -= prev->Size();
- TailSize_ = prev->Size();
- Tail_ = prev->Data();
- }
- TailSize_ -= delta;
- }
-
inline void Advance(size_t len) {
if (Y_LIKELY(TailSize_ + len <= TBufferPage::PageCapacity)) {
TailSize_ += len;
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index 1cf14115d03..886f619c23b 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -139,7 +139,6 @@ public:
settings.SecureParams = secureParams;
settings.CollectBasicStats = true;
settings.CollectProfileStats = true;
- settings.AllowGeneratorsInUnboxedValues = true;
auto runner = NDq::MakeDqTaskRunner(executionContext, settings, {});
{
@@ -154,9 +153,12 @@ public:
NDq::ERunStatus status;
while ((status = runner->Run()) == NDq::ERunStatus::PendingOutput || status == NDq::ERunStatus::Finished) {
- NDqProto::TData data;
- if (runner->GetOutputChannel(0)->PopAll(data) && !fillSettings.Discard) {
- rows.push_back(data);
+ if (!fillSettings.Discard) {
+ NDqProto::TData data;
+ while (runner->GetOutputChannel(0)->Pop(data)) {
+ rows.push_back(std::move(data));
+ data = {};
+ }
}
if (status == NDq::ERunStatus::Finished) {
break;
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index 04d81b577fd..5a7be9b85d4 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -472,7 +472,7 @@ public:
NDqProto::TPopResponse response;
- response.SetResult(channel->Pop(*response.MutableData(), 5 << 20));
+ response.SetResult(channel->Pop(*response.MutableData()));
UpdateOutputChannelStats(channelId);
QueryStat.FlushCounters(response);
response.MutableStats()->PackFrom(GetStats(taskId));
@@ -752,7 +752,6 @@ public:
settings.CollectBasicStats = true;
settings.CollectProfileStats = true;
settings.TerminateOnError = TerminateOnError;
- settings.AllowGeneratorsInUnboxedValues = true;
for (const auto& x: taskMeta.GetSecureParams()) {
settings.SecureParams[x.first] = x.second;
YQL_CLOG(DEBUG, ProviderDq) << "SecureParam " << x.first << ":XXX";
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
index bcda5b21199..8c5e80cbbe2 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
@@ -84,9 +84,9 @@ public:
{ }
[[nodiscard]]
- NDqProto::TPopResponse Pop(NDqProto::TData& data, ui64 bytes) override {
+ NDqProto::TPopResponse Pop(NDqProto::TData& data) override {
NDqProto::TPopResponse response;
- response.SetResult(Channel->Pop(data, bytes));
+ response.SetResult(Channel->Pop(data));
if (Channel->IsFinished()) {
UpdateOutputChannelStats();
QueryStat.FlushCounters(response);
@@ -262,7 +262,6 @@ public:
settings.TerminateOnError = TerminateOnError;
settings.CollectBasicStats = true;
settings.CollectProfileStats = true;
- settings.AllowGeneratorsInUnboxedValues = true;
Yql::DqsProto::TTaskMeta taskMeta;
task.GetMeta().UnpackTo(&taskMeta);
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index 721ab9ebc29..e85962746f6 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -838,7 +838,7 @@ public:
{ }
[[nodiscard]]
- NDqProto::TPopResponse Pop(NDqProto::TData& data, ui64) override {
+ NDqProto::TPopResponse Pop(NDqProto::TData& data) override {
NDqProto::TCommandHeader header;
header.SetVersion(1);
header.SetCommand(NDqProto::TCommandHeader::POP);
@@ -896,8 +896,7 @@ public:
return ChannelId;
}
- ui64 GetValuesCount(bool inMemoryOnly) const override {
- Y_UNUSED(inMemoryOnly);
+ ui64 GetValuesCount() const override {
ythrow yexception() << "unimplemented";
}
@@ -952,9 +951,9 @@ public:
}
// can throw TDqChannelStorageException
[[nodiscard]]
- bool Pop(NDqProto::TData& data, ui64 bytes) override {
+ bool Pop(NDqProto::TData& data) override {
try {
- auto response = Delegate->Pop(data, bytes);
+ auto response = Delegate->Pop(data);
return response.GetResult();
} catch (...) {
TaskRunner->RaiseException();
@@ -979,11 +978,6 @@ public:
Y_UNUSED(data);
ythrow yexception() << "unimplemented";
}
-
- bool PopAll(NKikimr::NMiniKQL::TUnboxedValueVector& rows) override {
- Y_UNUSED(rows);
- ythrow yexception() << "unimplemented";
- }
// |>
ui64 Drop() override {
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
index fec8f2c08f1..06aac8ca3f5 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
@@ -43,7 +43,7 @@ public:
virtual ~IOutputChannel() = default;
[[nodiscard]]
- virtual NDqProto::TPopResponse Pop(NDqProto::TData& data, ui64 bytes) = 0;
+ virtual NDqProto::TPopResponse Pop(NDqProto::TData& data) = 0;
virtual bool IsFinished() const = 0;
};
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index e5415bf4685..b2d41827aad 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -317,7 +317,7 @@ private:
NDqProto::TPopResponse response;
for (;maxChunks && remain > 0 && !isFinished && hasData; maxChunks--, remain -= dataSize) {
NDqProto::TData data;
- const auto lastPop = std::move(channel->Pop(data, remain));
+ const auto lastPop = std::move(channel->Pop(data));
for (auto& metric : lastPop.GetMetric()) {
*response.AddMetric() = metric;