diff options
author | aneporada <aneporada@ydb.tech> | 2023-04-28 15:11:37 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-04-28 15:11:37 +0300 |
commit | 277bf2b433110a4e3a5a127d7cc801d050d0394f (patch) | |
tree | ecb79e093f45663d2f5ebbdfd36ea759327c183e | |
parent | f3b2c09e16a9c2faaf25e3fc928ae01b1e7fe566 (diff) | |
download | ydb-277bf2b433110a4e3a5a127d7cc801d050d0394f.tar.gz |
Perform serialization in IDqOutput::Push() method. Simplify code
initial
27 files changed, 307 insertions, 908 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index 71b8c6a5e25..768440da92a 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -78,7 +78,6 @@ public: settings.CollectProfileStats = RuntimeSettings.StatsMode >= NYql::NDqProto::DQ_STATS_MODE_PROFILE; settings.OptLLVM = GetUseLLVM() ? "--compile-options=disable-opt" : "OFF"; settings.UseCacheForLLVM = AppData()->FeatureFlags.GetEnableLLVMCache(); - settings.AllowGeneratorsInUnboxedValues = false; for (const auto& [paramsName, paramsValue] : GetTask().GetTaskParams()) { settings.TaskParams[paramsName] = paramsValue; diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 44fbf2508e0..9faa89e6d18 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -196,7 +196,6 @@ void TKqpScanComputeActor::DoBootstrap() { settings.CollectProfileStats = GetStatsMode() >= NYql::NDqProto::DQ_STATS_MODE_PROFILE; settings.OptLLVM = TBase::GetUseLLVM() ? "--compile-options=disable-opt" : "OFF"; settings.UseCacheForLLVM = AppData()->FeatureFlags.GetEnableLLVMCache(); - settings.AllowGeneratorsInUnboxedValues = false; for (const auto& [paramsName, paramsValue] : GetTask().GetTaskParams()) { settings.TaskParams[paramsName] = paramsValue; diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp index 4c6059ce26f..577a3a7dbe4 100644 --- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp @@ -41,7 +41,6 @@ TDqTaskRunnerSettings CreateTaskRunnerSettings(Ydb::Table::QueryStatsCollection: settings.CollectProfileStats = CollectProfileStats(statsMode); settings.OptLLVM = "OFF"; settings.TerminateOnError = false; - settings.AllowGeneratorsInUnboxedValues = false; return settings; } @@ -241,9 +240,10 @@ public: for (ui64 outputChannelId : taskOutput.Channels) { auto outputChannel = taskRunner->GetOutputChannel(outputChannelId); auto& channelDesc = TasksGraph.GetChannel(outputChannelId); - NKikimr::NMiniKQL::TUnboxedValueVector outputRows; - outputChannel->PopAll(outputRows); - ResponseEv->TakeResult(channelDesc.DstInputIndex, outputRows); + NDqProto::TData outputData; + while (outputChannel->Pop(outputData)) { + ResponseEv->TakeResult(channelDesc.DstInputIndex, outputData); + } YQL_ENSURE(outputChannel->IsFinished()); } } diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp index 677c1a59f2f..651d609db8b 100644 --- a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp +++ b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp @@ -194,8 +194,11 @@ std::pair<bool, bool> TKqpTasksRunner::TransferData(ui64 fromTask, ui64 fromChan bool finished = false; // todo: transfer data as-is from input- to output- channel (KIKIMR-10658) - NDqProto::TData data; - if (src->Pop(data, std::numeric_limits<ui64>::max())) { + for (;;) { + NDqProto::TData data; + if (!src->Pop(data)) { + break; + } transferred = true; dst->Push(std::move(data)); } diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index c6f78e015f8..24111f604af 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -1269,7 +1269,6 @@ NKqp::TKqpTasksRunner& TEngineBay::GetKqpTasksRunner(const NKikimrTxDataShard::T settings.OptLLVM = "OFF"; settings.TerminateOnError = false; - settings.AllowGeneratorsInUnboxedValues = false; KqpAlloc->SetLimit(10_MB); KqpTasksRunner = NKqp::CreateKqpTasksRunner(tx.GetTasks(), KqpExecCtx, settings, KqpLogFunc); diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index a41e12da5e6..fa3a4842d47 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -86,6 +86,17 @@ NUdf::EFetchStatus FetchAllOutput(NDq::IDqOutputChannel* channel, NDqProto::TDat return NUdf::EFetchStatus::Yield; } +NUdf::EFetchStatus FetchOutput(NDq::IDqOutputChannel* channel, NDqProto::TData& buffer) { + auto result = channel->Pop(buffer); + Y_UNUSED(result); + + if (channel->IsFinished()) { + return NUdf::EFetchStatus::Finish; + } + + return NUdf::EFetchStatus::Yield; +} + NDq::ERunStatus RunKqpTransactionInternal(const TActorContext& ctx, ui64 txId, const TInputOpData::TInReadSets* inReadSets, const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner, bool applyEffects) @@ -526,29 +537,30 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const for (auto& channel : task.GetOutputs(i).GetChannels()) { auto computeActor = computeCtx.GetTaskOutputChannel(task.GetId(), channel.GetId()); if (computeActor) { - auto dataEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelData>(); - dataEv->Record.SetSeqNo(1); - dataEv->Record.MutableChannelData()->SetChannelId(channel.GetId()); - dataEv->Record.MutableChannelData()->SetFinished(true); - dataEv->Record.SetNoAck(true); - auto outputData = dataEv->Record.MutableChannelData()->MutableData(); - - auto fetchStatus = FetchAllOutput(taskRunner.GetOutputChannel(channel.GetId()).Get(), *outputData); - MKQL_ENSURE_S(fetchStatus == NUdf::EFetchStatus::Finish); - - if (outputData->GetRaw().size() > MaxDatashardReplySize) { - auto message = TStringBuilder() << "Datashard " << origin - << ": reply size limit exceeded (" << outputData->GetRaw().size() << " > " - << MaxDatashardReplySize << ")"; - - LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, message); - result->SetExecutionError(NKikimrTxDataShard::TError::REPLY_SIZE_EXCEEDED, message); - } else { - ctx.Send(computeActor, dataEv.Release()); + size_t seqNo = 1; + auto fetchStatus = NUdf::EFetchStatus::Yield; + while (fetchStatus != NUdf::EFetchStatus::Finish) { + auto dataEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelData>(); + dataEv->Record.SetSeqNo(seqNo++); + dataEv->Record.MutableChannelData()->SetChannelId(channel.GetId()); + dataEv->Record.SetNoAck(true); + auto outputData = dataEv->Record.MutableChannelData()->MutableData(); + fetchStatus = FetchOutput(taskRunner.GetOutputChannel(channel.GetId()).Get(), *outputData); + dataEv->Record.MutableChannelData()->SetFinished(fetchStatus == NUdf::EFetchStatus::Finish); + if (outputData->GetRaw().size() > MaxDatashardReplySize) { + auto message = TStringBuilder() << "Datashard " << origin + << ": reply size limit exceeded (" << outputData->GetRaw().size() << " > " + << MaxDatashardReplySize << ")"; + LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, message); + result->SetExecutionError(NKikimrTxDataShard::TError::REPLY_SIZE_EXCEEDED, message); + break; + } else { + ctx.Send(computeActor, dataEv.Release()); + } } } else { NDqProto::TData outputData; - auto fetchStatus = FetchAllOutput(taskRunner.GetOutputChannel(channel.GetId()).Get(), outputData); + auto fetchStatus = FetchOutput(taskRunner.GetOutputChannel(channel.GetId()).Get(), outputData); MKQL_ENSURE_S(fetchStatus == NUdf::EFetchStatus::Finish); MKQL_ENSURE_S(outputData.GetRows() == 0); } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 1f77357ea7f..8975fa85b8d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1309,7 +1309,7 @@ private: i64 remains = toSend; while (remains > 0 && (!outputChannel.Finished || Checkpoints)) { - ui32 sent = this->SendChannelDataChunk(outputChannel, remains); + ui32 sent = this->SendChannelDataChunk(outputChannel); if (sent == 0) { break; } @@ -1321,14 +1321,14 @@ private: ProcessOutputsState.DataWasSent |= (!wasFinished && outputChannel.Finished) || remains != toSend; } - ui32 SendChannelDataChunk(TOutputChannelInfo& outputChannel, ui64 bytes) { + ui32 SendChannelDataChunk(TOutputChannelInfo& outputChannel) { auto channel = outputChannel.Channel; NDqProto::TData data; NDqProto::TWatermark watermark; NDqProto::TCheckpoint checkpoint; - bool hasData = channel->Pop(data, bytes); + bool hasData = channel->Pop(data); bool hasWatermark = channel->Pop(watermark); bool hasCheckpoint = channel->Pop(checkpoint); if (!hasData && !hasWatermark && !hasCheckpoint) { diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index ce7fb226449..41f26a78580 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -330,7 +330,7 @@ private: TMaybe<NDqProto::TCheckpoint> checkpoint = Nothing(); for (;maxChunks && remain > 0 && !isFinished && hasData; maxChunks--, remain -= dataSize) { NDqProto::TData data; - hasData = channel->Pop(data, remain); + hasData = channel->Pop(data); NDqProto::TWatermark poppedWatermark; bool hasWatermark = channel->Pop(poppedWatermark); diff --git a/ydb/library/yql/dq/runtime/dq_channel_storage.h b/ydb/library/yql/dq/runtime/dq_channel_storage.h index d42238f6835..f132d39dd00 100644 --- a/ydb/library/yql/dq/runtime/dq_channel_storage.h +++ b/ydb/library/yql/dq/runtime/dq_channel_storage.h @@ -21,8 +21,14 @@ public: virtual bool IsEmpty() const = 0; virtual bool IsFull() const = 0; - // these methods can throw `TDqChannelStorageException` + // methods Put/Get can throw `TDqChannelStorageException` + + // TODO: support IZeroCopyInput virtual void Put(ui64 blobId, TBuffer&& blob) = 0; + + // TODO: there is no way for client to delete blob. + // It is better to replace Get() with Pull() which will delete blob after read + // (current clients read each blob exactly once) virtual bool Get(ui64 blobId, TBuffer& data) = 0; }; diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp index c947fb28552..0a8cf3a62af 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp @@ -2,6 +2,7 @@ #include "dq_transport.h" #include <ydb/library/yql/utils/yql_panic.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h> #include <util/generic/buffer.h> #include <util/generic/size_literals.h> @@ -20,339 +21,75 @@ namespace NYql::NDq { namespace { -using namespace NKikimr; - -bool IsSafeToEstimateValueSize(const NMiniKQL::TType* type) { - if (type->GetKind() == NMiniKQL::TType::EKind::Data) { - return true; - } - if (type->GetKind() == NMiniKQL::TType::EKind::Optional) { - const auto* optionalType = static_cast<const NMiniKQL::TOptionalType*>(type); - return IsSafeToEstimateValueSize(optionalType->GetItemType()); - } - if (type->GetKind() == NMiniKQL::TType::EKind::Struct) { - const auto* structType = static_cast<const NMiniKQL::TStructType*>(type); - for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { - if (!IsSafeToEstimateValueSize(structType->GetMemberType(i))) { - return false; - } - } - return true; - } - if (type->GetKind() == NMiniKQL::TType::EKind::Tuple) { - const auto* tupleType = static_cast<const NMiniKQL::TTupleType*>(type); - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - if (!IsSafeToEstimateValueSize(tupleType->GetElementType(i))) { - return false; - } - } - return true; - } - if (type->GetKind() == NMiniKQL::TType::EKind::Variant) { - const auto* variantType = static_cast<const NMiniKQL::TVariantType*>(type); - for (ui32 i = 0; i < variantType->GetAlternativesCount(); ++i) { - if (!IsSafeToEstimateValueSize(variantType->GetAlternativeType(i))) { - return false; - } - } - return true; - } - return false; -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -class TDqOutputChannelOld : public IDqOutputChannel { +class TProfileGuard { public: - TDqOutputChannelOld(ui64 channelId, NMiniKQL::TType* outputType, bool collectProfileStats, - const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, ui64 maxStoredBytes, - ui64 maxChunkBytes, ui64 chunkSizeLimit, NDqProto::EDataTransportVersion transportVersion, const TLogFunc& logFunc) - : ChannelId(channelId) - , OutputType(outputType) - , BasicStats(ChannelId) - , ProfileStats(collectProfileStats ? &BasicStats : nullptr) - , DataSerializer(typeEnv, holderFactory, transportVersion) - , MaxStoredBytes(maxStoredBytes) - , MaxChunkBytes(maxChunkBytes) - , ChunkSizeLimit(chunkSizeLimit) - , LogFunc(logFunc) {} - - ui64 GetChannelId() const override { - return ChannelId; - } - - ui64 GetValuesCount(bool /* inMemoryOnly */) const override { - return Data.size(); - } - - [[nodiscard]] - bool IsFull() const override { - return Data.size() * EstimatedRowBytes >= MaxStoredBytes; - } - - void Push(NUdf::TUnboxedValue&& value) override { - if (!BasicStats.FirstRowIn) { - BasicStats.FirstRowIn = TInstant::Now(); - } - - ui64 rowsInMemory = Data.size(); - - LOG("Push request, rows in memory: " << rowsInMemory << ", bytesInMemory: " << (rowsInMemory * EstimatedRowBytes) - << ", finished: " << Finished); - YQL_ENSURE(!IsFull()); - - if (Finished) { - return; - } - - if (!EstimatedRowBytes) { - EstimatedRowBytes = IsSafeToEstimateValueSize(OutputType) - ? TDqDataSerializer::EstimateSize(value, OutputType) - : DataSerializer.CalcSerializedSize(value, OutputType); - LOG("EstimatedRowSize: " << EstimatedRowBytes << ", type: " << *OutputType - << ", safe: " << IsSafeToEstimateValueSize(OutputType)); - } - - Data.emplace_back(std::move(value)); - rowsInMemory++; - - BasicStats.Bytes += EstimatedRowBytes; - BasicStats.RowsIn++; - - if (Y_UNLIKELY(ProfileStats)) { - ProfileStats->MaxMemoryUsage = std::max(ProfileStats->MaxMemoryUsage, rowsInMemory * EstimatedRowBytes); - ProfileStats->MaxRowsInMemory = std::max(ProfileStats->MaxRowsInMemory, rowsInMemory); - } - } - - void Push(NDqProto::TWatermark&& watermark) override { - YQL_ENSURE(!Watermark); - Watermark.ConstructInPlace(std::move(watermark)); - } - - void Push(NDqProto::TCheckpoint&& checkpoint) override { - YQL_ENSURE(!Checkpoint); - Checkpoint.ConstructInPlace(std::move(checkpoint)); - } - - [[nodiscard]] - bool Pop(NDqProto::TData& data, ui64 bytes) override { - LOG("Pop request, rows in memory: " << Data.size() << ", finished: " << Finished << ", requested: " << bytes - << ", estimatedRowSize: " << EstimatedRowBytes); - - if (!HasData()) { - if (Finished) { - data.SetTransportVersion(DataSerializer.GetTransportVersion()); - data.SetRows(0); - data.ClearRaw(); - } - return false; - } - - bytes = std::min(bytes, MaxChunkBytes); - auto last = Data.begin(); - if (Y_UNLIKELY(ProfileStats)) { - TInstant startTime = TInstant::Now(); - data = DataSerializer.Serialize(last, Data.end(), OutputType, bytes); - ProfileStats->SerializationTime += (TInstant::Now() - startTime); - } else { - data = DataSerializer.Serialize(last, Data.end(), OutputType, bytes); - } - - - ui64 takeRows = data.GetRows(); - DLOG("Took " << takeRows << " rows"); - - if (EstimatedRowBytes) { - EstimatedRowBytes = EstimatedRowBytes * 0.6 + std::max<ui64>(data.GetRaw().Size() / takeRows, 1) * 0.4; - DLOG("Recalc estimated row size: " << EstimatedRowBytes); - } - - YQL_ENSURE(data.GetRaw().size() < ChunkSizeLimit); - - BasicStats.Chunks++; - BasicStats.RowsOut += takeRows; - - Data.erase(Data.begin(), last); - return true; - } - - [[nodiscard]] - bool Pop(NDqProto::TWatermark& watermark) override { - if (!HasData() && Watermark) { - watermark = std::move(*Watermark); - Watermark = Nothing(); - return true; - } - return false; - } - - [[nodiscard]] - bool Pop(NDqProto::TCheckpoint& checkpoint) override { - if (!HasData() && Checkpoint) { - checkpoint = std::move(*Checkpoint); - Checkpoint = Nothing(); - return true; - } - return false; - } - - [[nodiscard]] - bool PopAll(NDqProto::TData& data) override { - if (Data.empty()) { - if (Finished) { - data.SetTransportVersion(DataSerializer.GetTransportVersion()); - data.SetRows(0); - data.ClearRaw(); - } - return false; - } - - if (Y_UNLIKELY(ProfileStats)) { - TInstant startTime = TInstant::Now(); - data = DataSerializer.Serialize(Data.begin(), Data.end(), OutputType); - ProfileStats->SerializationTime += (TInstant::Now() - startTime); - } else { - data = DataSerializer.Serialize(Data.begin(), Data.end(), OutputType); - } - - BasicStats.Chunks++; - BasicStats.RowsOut += data.GetRows(); - - Data.clear(); - return true; - } - - bool PopAll(NKikimr::NMiniKQL::TUnboxedValueVector& data) override { - if (Data.empty()) { - return false; - } - - data.reserve(data.size() + Data.size()); - for (auto&& v : Data) { - data.emplace_back(std::move(v)); + TProfileGuard(TDuration* duration) + : Duration(duration) + { + if (Y_UNLIKELY(duration)) { + Start = TInstant::Now(); } - - BasicStats.Chunks++; - BasicStats.RowsOut += data.size(); - - Data.clear(); - return true; } - void Finish() override { - LOG("Finish request"); - Finished = true; - - if (!BasicStats.FirstRowIn) { - BasicStats.FirstRowIn = TInstant::Now(); + ~TProfileGuard() { + if (Y_UNLIKELY(Duration)) { + *Duration += TInstant::Now() - Start; } } - - bool HasData() const override { - return !Data.empty(); - } - - bool IsFinished() const override { - return Finished && !HasData(); - } - - ui64 Drop() override { // Drop channel data because channel was finished. Leave checkpoint because checkpoints keep going through channel after finishing channel data transfer. - ui64 rows = Data.size(); - Data.clear(); - TDataType().swap(Data); - return rows; - } - - NKikimr::NMiniKQL::TType* GetOutputType() const override { - return OutputType; - } - - const TDqOutputChannelStats* GetStats() const override { - return &BasicStats; - } - - void Terminate() override { - } - private: - const ui64 ChannelId; - NKikimr::NMiniKQL::TType* OutputType; - TDqOutputChannelStats BasicStats; - TDqOutputChannelStats* ProfileStats = nullptr; - TDqDataSerializer DataSerializer; - const ui64 MaxStoredBytes; - ui64 MaxChunkBytes; - ui64 ChunkSizeLimit; - TLogFunc LogFunc; - - using TDataType = TDeque<NUdf::TUnboxedValue, NKikimr::NMiniKQL::TMKQLAllocator<NUdf::TUnboxedValue>>; - TDataType Data; - - ui32 EstimatedRowBytes = 0; - bool Finished = false; - - TMaybe<NDqProto::TWatermark> Watermark; - TMaybe<NDqProto::TCheckpoint> Checkpoint; + TInstant Start; + TDuration* Duration; }; -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +using namespace NKikimr; -class TDqOutputChannelNew : public IDqOutputChannel { - struct TSpilledBlob; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +template<bool FastPack> +class TDqOutputChannel : public IDqOutputChannel { public: - TDqOutputChannelNew(ui64 channelId, NMiniKQL::TType* outputType, const NMiniKQL::TTypeEnvironment& typeEnv, - const NMiniKQL::THolderFactory& holderFactory, const TDqOutputChannelSettings& settings, const TLogFunc& log) + TDqOutputChannel(ui64 channelId, NMiniKQL::TType* outputType, const NMiniKQL::TTypeEnvironment& typeEnv, + const NMiniKQL::THolderFactory& holderFactory, const TDqOutputChannelSettings& settings, const TLogFunc& logFunc) : ChannelId(channelId) , OutputType(outputType) , BasicStats(ChannelId) , ProfileStats(settings.CollectProfileStats ? &BasicStats : nullptr) - , DataSerializer(typeEnv, holderFactory, settings.TransportVersion) + , Packer(false, NMiniKQL::TListType::Create(OutputType, typeEnv)) , Storage(settings.ChannelStorage) + , HolderFactory(holderFactory) + , TransportVersion(settings.TransportVersion) , MaxStoredBytes(settings.MaxStoredBytes) , MaxChunkBytes(settings.MaxChunkBytes) , ChunkSizeLimit(settings.ChunkSizeLimit) - , LogFunc(log) + , LogFunc(logFunc) { - if (Storage && 3 * MaxChunkBytes > MaxStoredBytes) { - MaxChunkBytes = Max(MaxStoredBytes / 3, 1_MB); - } } ui64 GetChannelId() const override { return ChannelId; } - ui64 GetValuesCount(bool inMemoryOnly) const override { - if (inMemoryOnly) { - return DataHead.size() + DataTail.size(); - } - return DataHead.size() + DataTail.size() + SpilledRows; + ui64 GetValuesCount() const override { + return SpilledRowCount + PackedRowCount + ChunkRowCount; } [[nodiscard]] bool IsFull() const override { if (!Storage) { - if (MemoryUsed >= MaxStoredBytes) { - YQL_ENSURE(HasData()); - return true; - } - return false; + return PackedDataSize + Packer.PackedSizeEstimate() >= MaxStoredBytes; } - return Storage->IsFull(); } void Push(NUdf::TUnboxedValue&& value) override { + TProfileGuard guard(ProfileStats ? &ProfileStats->SerializationTime : nullptr); if (!BasicStats.FirstRowIn) { BasicStats.FirstRowIn = TInstant::Now(); } - ui64 rowsInMemory = DataHead.size() + DataTail.size(); + ui64 rowsInMemory = PackedRowCount + ChunkRowCount; - LOG("Push request, rows in memory: " << rowsInMemory << ", bytesInMemory: " << MemoryUsed - << ", spilled rows: " << SpilledRows << ", spilled blobs: " << SpilledBlobs.size() + LOG("Push request, rows in memory: " << rowsInMemory << ", bytesInMemory: " << (PackedDataSize + Packer.PackedSizeEstimate()) << ", finished: " << Finished); YQL_ENSURE(!IsFull()); @@ -360,173 +97,56 @@ public: return; } - ui64 estimatedRowSize; + Packer.AddItem(value); + value = {}; + ChunkRowCount++; + BasicStats.RowsIn++; - if (RowFixedSize.has_value()) { - estimatedRowSize = *RowFixedSize; - } else { - bool fixed; - estimatedRowSize = TDqDataSerializer::EstimateSize(value, OutputType, &fixed); - if (fixed) { - RowFixedSize.emplace(estimatedRowSize); - LOG("Raw has fixed size: " << estimatedRowSize); - } + size_t packerSize = Packer.PackedSizeEstimate(); + if (packerSize >= MaxChunkBytes) { + Data.emplace_back(); + Data.back().Buffer = Packer.FinishAndPull(); + BasicStats.Bytes += Data.back().Buffer.Size(); + PackedDataSize += Data.back().Buffer.Size(); + PackedRowCount += ChunkRowCount; + Data.back().RowCount = ChunkRowCount; + ChunkRowCount = 0; + packerSize = 0; } - DLOG("Row size: " << estimatedRowSize); + while (Storage && PackedDataSize && PackedDataSize + packerSize > MaxStoredBytes) { + auto& head = Data.front(); + size_t bufSize = head.Buffer.Size(); + YQL_ENSURE(PackedDataSize >= bufSize); - if (SpilledRows == 0) { - YQL_ENSURE(DataTail.empty() && SizeTail.empty()); - DataHead.emplace_back(std::move(value)); - SizeHead.emplace_back(estimatedRowSize); - } else { - DataTail.emplace_back(std::move(value)); - SizeTail.emplace_back(estimatedRowSize); - } - rowsInMemory++; - MemoryUsed += estimatedRowSize; - - ui64 spilledRows = 0; - ui64 spilledBytes = 0; - ui64 spilledBlobs = 0; - - // TODO: add PushAndFinish call - // if processing is finished we don't have to spill data - - if (MemoryUsed > MaxStoredBytes && Storage) { - LOG("Not enough memory to store rows in memory only, start spilling, rowsInMemory: " << rowsInMemory - << ", memoryUsed: " << MemoryUsed << ", MaxStoredBytes: " << MaxStoredBytes); - - TDataType* data = nullptr; - TDeque<ui64>* size = nullptr; - - ui32 startIndex = std::numeric_limits<ui32>::max(); - - if (SpilledRows) { - // we have spilled rows already, so we can spill all DataTail - data = &DataTail; - size = &SizeTail; - startIndex = 0; - } else { - // spill from head - // but only if we have rows for at least 2 full chunks - ui32 chunks = 1; - ui64 chunkSize = 0; - startIndex = 0; - for (ui64 i = 0; i < SizeHead.size(); ++i) { - chunkSize += SizeHead.at(i); - if (chunkSize >= MaxChunkBytes && (i - startIndex) > 0) { - // LOG("one more chunk, size: " << chunkSize << ", start next chunk from " << i); - chunks++; // this row goes to the next chunk - chunkSize = 0; - - if (chunks >= 3) { - data = &DataHead; - size = &SizeHead; - break; - } - - startIndex = i; - } - } - } + TBuffer blob; + blob.Reserve(bufSize + sizeof(head.RowCount)); + blob.Append((const char*)&head.RowCount, sizeof(head.RowCount)); + head.Buffer.ForEachPage([&blob](const char *data, size_t len) { + blob.Append(data, len); + }); - // LOG("about to spill from index " << startIndex); - - if (data) { - while (true) { - ui64 chunkSize = 0; - ui32 rowsInChunk = 0; - ui32 idx = startIndex + spilledRows; - - while (idx < size->size()) { - ui64 rowSize = size->at(idx); - if (chunkSize == 0 || chunkSize + rowSize <= MaxChunkBytes) { - chunkSize += rowSize; - ++rowsInChunk; - ++idx; - } else { - break; - } - } - - if (rowsInChunk > 0) { - // LOG("about to spill from " << (startIndex + spilledRows) << " to " << (startIndex + spilledRows + rowsInChunk)); - - TDataType::iterator first = std::next(data->begin(), startIndex + spilledRows); - TDataType::iterator last = std::next(data->begin(), startIndex + spilledRows + rowsInChunk); - - NDqProto::TData protoBlob = DataSerializer.Serialize(first, last, OutputType); - TBuffer blob; - TBufferOutput out{blob}; - protoBlob.SerializeToArcadiaStream(&out); - - ui64 blobSize = blob.size(); - ui64 blobId = NextBlobId++; - - LOG("Spill blobId: " << blobId << ", size: " << blobSize << ", rows: " << rowsInChunk); - Storage->Put(blobId, std::move(blob)); - - SpilledBlobs.emplace_back(TSpilledBlob(blobId, chunkSize, blobSize, rowsInChunk)); - SpilledRows += rowsInChunk; - MemoryUsed -= chunkSize; - - spilledRows += rowsInChunk; - spilledBytes += blobSize; - spilledBlobs++; - } else { - break; - } - } - - if (data == &DataHead) { - // move rows after spilled ones to the DataTail - YQL_ENSURE(DataTail.empty()); - for (ui32 i = startIndex + spilledRows; i < data->size(); ++i) { - DataTail.emplace_back(std::move(DataHead[i])); - SizeTail.emplace_back(SizeHead[i]); - } - data->erase(std::next(data->begin(), startIndex), data->end()); - size->erase(std::next(size->begin(), startIndex), size->end()); - } else { - YQL_ENSURE(startIndex == 0); - data->erase(data->begin(), std::next(data->begin(), spilledRows)); - size->erase(size->begin(), std::next(size->begin(), spilledRows)); - } - } else { - // LOG("nothing to spill"); - } - } + YQL_ENSURE(blob.Size() == bufSize + sizeof(head.RowCount)); + Storage->Put(NextStoredId++, std::move(blob)); - BasicStats.Bytes += estimatedRowSize; - BasicStats.RowsIn++; + PackedDataSize -= bufSize; + PackedRowCount -= head.RowCount; - if (Y_UNLIKELY(ProfileStats)) { - ProfileStats->MaxMemoryUsage = std::max(ProfileStats->MaxMemoryUsage, MemoryUsed); - ProfileStats->MaxRowsInMemory = std::max(ProfileStats->MaxRowsInMemory, DataHead.size() + DataTail.size()); - if (spilledRows) { - ProfileStats->SpilledRows += spilledRows; - ProfileStats->SpilledBytes += spilledBytes; - ProfileStats->SpilledBlobs += spilledBlobs; - } - } + SpilledRowCount += head.RowCount; - ValidateUsedMemory(); + if (Y_UNLIKELY(ProfileStats)) { + ProfileStats->SpilledRows += head.RowCount; + ProfileStats->SpilledBytes += bufSize + sizeof(head.RowCount); + ProfileStats->SpilledBlobs++; + } -#if 0 - TStringStream ss; - ss << "-- DUMP --" << Endl; - ss << " Head:" << Endl; - for (auto& v : DataHead) { - ss << " "; v.Dump(ss); ss << Endl; + Data.pop_front(); } - ss << " Spilled: " << SpilledRows << Endl; - ss << " Tail:" << Endl; - for (auto& v : DataTail) { - ss << " "; v.Dump(ss); ss << Endl; + + if (Y_UNLIKELY(ProfileStats)) { + ProfileStats->MaxMemoryUsage = std::max(ProfileStats->MaxMemoryUsage, PackedDataSize + packerSize); + ProfileStats->MaxRowsInMemory = std::max(ProfileStats->MaxRowsInMemory, PackedRowCount); } - LOG(ss.Str()); -#endif } void Push(NDqProto::TWatermark&& watermark) override { @@ -540,120 +160,50 @@ public: } [[nodiscard]] - bool Pop(NDqProto::TData& data, ui64 bytes) override { - LOG("Pop request, rows in memory: " << DataHead.size() << "/" << DataTail.size() - << ", spilled rows: " << SpilledRows << ", spilled blobs: " << SpilledBlobs.size() - << ", finished: " << Finished << ", requested: " << bytes << ", maxChunkBytes: " << MaxChunkBytes); + bool Pop(NDqProto::TData& data) override { + LOG("Pop request, rows in memory: " << GetValuesCount() << ", finished: " << Finished); if (!HasData()) { if (Finished) { - data.SetTransportVersion(DataSerializer.GetTransportVersion()); + data.SetTransportVersion(TransportVersion); data.SetRows(0); data.ClearRaw(); } return false; } - bytes = std::min(bytes, MaxChunkBytes); - - if (DataHead.empty()) { - YQL_ENSURE(SpilledRows); - - TSpilledBlob spilledBlob = SpilledBlobs.front(); - - LOG("Loading spilled blob. BlobId: " << spilledBlob.BlobId << ", rows: " << spilledBlob.Rows - << ", bytes: " << spilledBlob.SerializedSize); + data.SetTransportVersion(TransportVersion); + data.ClearRaw(); + if (FirstStoredId < NextStoredId) { + YQL_ENSURE(Storage); TBuffer blob; - if (!Storage->Get(spilledBlob.BlobId, blob)) { - LOG("BlobId " << spilledBlob.BlobId << " not ready yet"); - // not loaded yet - return false; - } - - YQL_ENSURE(blob.size() == spilledBlob.SerializedSize, "" << blob.size() << " != " << spilledBlob.SerializedSize); - - NDqProto::TData protoBlob; - YQL_ENSURE(protoBlob.ParseFromArray(blob.Data(), blob.size())); - blob.Reset(); - - YQL_ENSURE(protoBlob.GetRows() == spilledBlob.Rows); - bool hasResult = false; - - // LOG("bytes: " << bytes << ", loaded blob: " << spilledBlob.SerializedSize); - - if (spilledBlob.SerializedSize <= bytes * 2) { - data.Swap(&protoBlob); - YQL_ENSURE(data.ByteSizeLong() <= ChunkSizeLimit); - hasResult = true; - // LOG("return loaded blob as-is"); - } else { - NKikimr::NMiniKQL::TUnboxedValueVector buffer; - DataSerializer.Deserialize(protoBlob, OutputType, buffer); - - for (ui32 i = 0; i < buffer.size(); ++i) { - SizeHead.emplace_back(RowFixedSize - ? *RowFixedSize - : TDqDataSerializer::EstimateSize(buffer[i], OutputType)); - DataHead.emplace_back(std::move(buffer[i])); - - MemoryUsed += SizeHead.back(); - } - } - - SpilledRows -= spilledBlob.Rows; - - SpilledBlobs.pop_front(); - if (SpilledBlobs.empty()) { - // LOG("no more spilled blobs, move " << DataTail.size() << " tail rows to the head"); - DataHead.insert(DataHead.end(), std::make_move_iterator(DataTail.begin()), std::make_move_iterator(DataTail.end())); - SizeHead.insert(SizeHead.end(), std::make_move_iterator(SizeTail.begin()), std::make_move_iterator(SizeTail.end())); - DataTail.clear(); - SizeTail.clear(); - } - - if (hasResult) { - BasicStats.Chunks++; - BasicStats.RowsOut += data.GetRows(); - ValidateUsedMemory(); - return true; - } - } - - ui32 takeRows = 0; - ui64 chunkSize = 0; - - while (takeRows == 0 || (takeRows < SizeHead.size() && chunkSize + SizeHead[takeRows] <= bytes)) { - chunkSize += SizeHead[takeRows]; - ++takeRows; - } - - DLOG("return rows: " << takeRows << ", size: " << chunkSize); - - auto firstDataIt = DataHead.begin(); - auto lastDataIt = std::next(firstDataIt, takeRows); - - auto firstSizeIt = SizeHead.begin(); - auto lastSizeIt = std::next(firstSizeIt, takeRows); - - if (Y_UNLIKELY(ProfileStats)) { - TInstant startTime = TInstant::Now(); - data = DataSerializer.Serialize(firstDataIt, lastDataIt, OutputType); - ProfileStats->SerializationTime += (TInstant::Now() - startTime); + YQL_ENSURE(Storage->Get(FirstStoredId++, blob), "Lost block in storage"); + ui64 rows; + YQL_ENSURE(blob.size() >= sizeof(rows)); + std::memcpy((char*)&rows, blob.data(), sizeof(rows)); + data.SetRows(rows); + data.MutableRaw()->insert(data.MutableRaw()->end(), blob.data() + sizeof(rows), blob.data() + blob.size()); + SpilledRowCount -= rows; + } else if (!Data.empty()) { + auto& packed = Data.front(); + data.SetRows(packed.RowCount); + data.MutableRaw()->reserve(packed.Buffer.Size()); + packed.Buffer.CopyTo(*data.MutableRaw()); + PackedRowCount -= packed.RowCount; + PackedDataSize -= packed.Buffer.Size(); + Data.pop_front(); } else { - data = DataSerializer.Serialize(firstDataIt, lastDataIt, OutputType); + data.SetRows(ChunkRowCount); + auto buffer = Packer.FinishAndPull(); + data.MutableRaw()->reserve(buffer.Size()); + buffer.CopyTo(*data.MutableRaw()); + ChunkRowCount = 0; } - YQL_ENSURE(data.ByteSizeLong() <= ChunkSizeLimit); + DLOG("Took " << data.GetRows() << " rows"); BasicStats.Chunks++; - BasicStats.RowsOut += takeRows; - - DataHead.erase(firstDataIt, lastDataIt); - SizeHead.erase(firstSizeIt, lastSizeIt); - MemoryUsed -= chunkSize; - - ValidateUsedMemory(); - + BasicStats.RowsOut += data.GetRows(); return true; } @@ -679,61 +229,61 @@ public: [[nodiscard]] bool PopAll(NDqProto::TData& data) override { - YQL_ENSURE(!SpilledRows); - YQL_ENSURE(DataTail.empty()); - - if (DataHead.empty()) { + if (!HasData()) { if (Finished) { - data.SetTransportVersion(DataSerializer.GetTransportVersion()); + data.SetTransportVersion(TransportVersion); data.SetRows(0); data.ClearRaw(); } return false; } - if (Y_UNLIKELY(ProfileStats)) { - TInstant startTime = TInstant::Now(); - data = DataSerializer.Serialize(DataHead.begin(), DataHead.end(), OutputType); - ProfileStats->SerializationTime += (TInstant::Now() - startTime); - } else { - data = DataSerializer.Serialize(DataHead.begin(), DataHead.end(), OutputType); + data.SetTransportVersion(TransportVersion); + data.ClearRaw(); + if (SpilledRowCount == 0 && PackedRowCount == 0) { + data.SetRows(ChunkRowCount); + auto buffer = Packer.FinishAndPull(); + data.MutableRaw()->reserve(buffer.Size()); + buffer.CopyTo(*data.MutableRaw()); + ChunkRowCount = 0; + return true; } - BasicStats.Chunks++; - BasicStats.RowsOut += data.GetRows(); - - DataHead.clear(); - SizeHead.clear(); - MemoryUsed = 0; - - return true; - } - - bool PopAll(NKikimr::NMiniKQL::TUnboxedValueVector& data) override { - YQL_ENSURE(!SpilledRows); - YQL_ENSURE(DataTail.empty()); - - if (DataHead.empty()) { - return false; + // Repack all - thats why PopAll should never be used + if (ChunkRowCount) { + Data.emplace_back(); + Data.back().Buffer = Packer.FinishAndPull(); + BasicStats.Bytes += Data.back().Buffer.Size(); + PackedDataSize += Data.back().Buffer.Size(); + PackedRowCount += ChunkRowCount; + Data.back().RowCount = ChunkRowCount; + ChunkRowCount = 0; } - data.reserve(data.size() + DataHead.size()); - for (auto&& v : DataHead) { - data.emplace_back(std::move(v)); + NKikimr::NMiniKQL::TUnboxedValueVector rows; + for (;;) { + NDqProto::TData chunk; + if (!this->Pop(chunk)) { + break; + } + TStringBuf buf(chunk.GetRaw()); + Packer.UnpackBatch(buf, HolderFactory, rows); } - BasicStats.Chunks++; - BasicStats.RowsOut += data.size(); - - DataHead.clear(); - SizeHead.clear(); - MemoryUsed = 0; + for (auto& row : rows) { + Packer.AddItem(row); + } + data.SetRows(rows.size()); + auto buffer = Packer.FinishAndPull(); + data.MutableRaw()->reserve(buffer.Size()); + buffer.CopyTo(*data.MutableRaw()); + YQL_ENSURE(!HasData()); return true; } void Finish() override { - DLOG("Finish request"); + LOG("Finish request"); Finished = true; if (!BasicStats.FirstRowIn) { @@ -742,7 +292,7 @@ public: } bool HasData() const override { - return !DataHead.empty() || SpilledRows != 0 || !DataTail.empty(); + return GetValuesCount() != 0; } bool IsFinished() const override { @@ -750,17 +300,11 @@ public: } ui64 Drop() override { // Drop channel data because channel was finished. Leave checkpoint because checkpoints keep going through channel after finishing channel data transfer. - ui64 rows = DataHead.size() + SpilledRows + DataTail.size(); - DataHead.clear(); - SizeHead.clear(); - DataTail.clear(); - SizeTail.clear(); - MemoryUsed = 0; - TDataType().swap(DataHead); - TDataType().swap(DataTail); - // todo: send remove request - SpilledRows = 0; - SpilledBlobs.clear(); + ui64 rows = GetValuesCount(); + Data.clear(); + Packer.Clear(); + SpilledRowCount = PackedDataSize = PackedRowCount = ChunkRowCount = 0; + FirstStoredId = NextStoredId; return rows; } @@ -773,18 +317,6 @@ public: } void Terminate() override { - if (Storage) { - Storage.Reset(); - } - } - - void ValidateUsedMemory() { -#ifndef NDEBUG - ui64 x = 0; - for (auto z : SizeHead) x += z; - for (auto z : SizeTail) x += z; - YQL_ENSURE(x == MemoryUsed, "" << x << " != " << MemoryUsed); -#endif } private: @@ -792,36 +324,29 @@ private: NKikimr::NMiniKQL::TType* OutputType; TDqOutputChannelStats BasicStats; TDqOutputChannelStats* ProfileStats = nullptr; - TDqDataSerializer DataSerializer; - IDqChannelStorage::TPtr Storage; + NKikimr::NMiniKQL::TValuePackerTransport<FastPack> Packer; + const IDqChannelStorage::TPtr Storage; + const NMiniKQL::THolderFactory& HolderFactory; + const NDqProto::EDataTransportVersion TransportVersion; const ui64 MaxStoredBytes; - ui64 MaxChunkBytes; - ui64 ChunkSizeLimit; + const ui64 MaxChunkBytes; + const ui64 ChunkSizeLimit; TLogFunc LogFunc; - std::optional<ui32> RowFixedSize; - - struct TSpilledBlob { - ui64 BlobId; - ui64 InMemorySize; - ui64 SerializedSize; - ui32 Rows; - TSpilledBlob(ui64 blobId, ui64 inMemorySize, ui64 serializedSize, ui32 rows) - : BlobId(blobId), InMemorySize(inMemorySize), SerializedSize(serializedSize), Rows(rows) {} + struct TSerializedBatch { + NKikimr::NMiniKQL::TPagedBuffer Buffer; + ui64 RowCount = 0; }; + std::deque<TSerializedBatch> Data; - // DataHead ( . SpilledBlobs (. DataTail)? )? - using TDataType = TDeque<NUdf::TUnboxedValue, NKikimr::NMiniKQL::TMKQLAllocator<NUdf::TUnboxedValue>>; - TDataType DataHead; - TDeque<ui64> SizeHead; - TDeque<TSpilledBlob> SpilledBlobs; - TDataType DataTail; - TDeque<ui64> SizeTail; + size_t SpilledRowCount = 0; + ui64 FirstStoredId = 0; + ui64 NextStoredId = 0; - ui64 MemoryUsed = 0; // approx memory usage - - ui64 NextBlobId = 1; - ui64 SpilledRows = 0; + size_t PackedDataSize = 0; + size_t PackedRowCount = 0; + + size_t ChunkRowCount = 0; bool Finished = false; @@ -836,12 +361,14 @@ IDqOutputChannel::TPtr CreateDqOutputChannel(ui64 channelId, NKikimr::NMiniKQL:: const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, const TDqOutputChannelSettings& settings, const TLogFunc& logFunc) { - if (settings.AllowGeneratorsInUnboxedValues) { - YQL_ENSURE(!settings.ChannelStorage); - return new TDqOutputChannelOld(channelId, outputType, settings.CollectProfileStats, typeEnv, holderFactory, - settings.MaxStoredBytes, settings.MaxChunkBytes, settings.ChunkSizeLimit, settings.TransportVersion, logFunc); + if (settings.TransportVersion == NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0 || + settings.TransportVersion == NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED) + { + return new TDqOutputChannel<false>(channelId, outputType, typeEnv, holderFactory, settings, logFunc); } else { - return new TDqOutputChannelNew(channelId, outputType, typeEnv, holderFactory, settings, logFunc); + YQL_ENSURE(settings.TransportVersion == NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_FAST_PICKLE_1_0, + "Unsupported transport version " << (ui32)settings.TransportVersion); + return new TDqOutputChannel<true>(channelId, outputType, typeEnv, holderFactory, settings, logFunc); } } diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.h b/ydb/library/yql/dq/runtime/dq_output_channel.h index 8c74f3b71b6..49b65c2eda7 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.h +++ b/ydb/library/yql/dq/runtime/dq_output_channel.h @@ -46,12 +46,12 @@ public: using TPtr = TIntrusivePtr<IDqOutputChannel>; virtual ui64 GetChannelId() const = 0; - virtual ui64 GetValuesCount(bool inMemoryOnly = true) const = 0; + virtual ui64 GetValuesCount() const = 0; // <| consumer methods // can throw TDqChannelStorageException [[nodiscard]] - virtual bool Pop(NDqProto::TData& data, ui64 bytes) = 0; + virtual bool Pop(NDqProto::TData& data) = 0; // Pop watermark. [[nodiscard]] virtual bool Pop(NDqProto::TWatermark& watermark) = 0; @@ -65,7 +65,6 @@ public: // can throw TDqChannelStorageException [[nodiscard]] virtual bool PopAll(NDqProto::TData& data) = 0; - virtual bool PopAll(NKikimr::NMiniKQL::TUnboxedValueVector& rows) = 0; // |> virtual ui64 Drop() = 0; @@ -81,7 +80,6 @@ struct TDqOutputChannelSettings { NDqProto::EDataTransportVersion TransportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0; IDqChannelStorage::TPtr ChannelStorage; bool CollectProfileStats = false; - bool AllowGeneratorsInUnboxedValues = true; }; IDqOutputChannel::TPtr CreateDqOutputChannel(ui64 channelId, NKikimr::NMiniKQL::TType* outputType, diff --git a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp index 609436f0e8b..fac3102ea74 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp @@ -84,13 +84,12 @@ struct TTestContext { } }; -void TestSingleRead(TTestContext& ctx, bool quantum) { +void TestSingleRead(TTestContext& ctx) { TDqOutputChannelSettings settings; settings.MaxStoredBytes = 1000; settings.MaxChunkBytes = 200; settings.CollectProfileStats = true; settings.TransportVersion = ctx.TransportVersion; - settings.AllowGeneratorsInUnboxedValues = quantum; auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log); @@ -105,7 +104,7 @@ void TestSingleRead(TTestContext& ctx, bool quantum) { UNIT_ASSERT_VALUES_EQUAL(0, ch->GetStats()->RowsOut); NDqProto::TData data; - UNIT_ASSERT(ch->Pop(data, 1000)); + UNIT_ASSERT(ch->Pop(data)); UNIT_ASSERT_VALUES_EQUAL(10, data.GetRows()); UNIT_ASSERT_VALUES_EQUAL(1, ch->GetStats()->Chunks); @@ -122,16 +121,15 @@ void TestSingleRead(TTestContext& ctx, bool quantum) { } data.Clear(); - UNIT_ASSERT(!ch->Pop(data, 1000)); + UNIT_ASSERT(!ch->Pop(data)); } -void TestPartialRead(TTestContext& ctx, bool quantum) { +void TestPartialRead(TTestContext& ctx) { TDqOutputChannelSettings settings; settings.MaxStoredBytes = 1000; - settings.MaxChunkBytes = 100; + settings.MaxChunkBytes = 17; settings.CollectProfileStats = true; settings.TransportVersion = ctx.TransportVersion; - settings.AllowGeneratorsInUnboxedValues = quantum; auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log); @@ -146,19 +144,15 @@ void TestPartialRead(TTestContext& ctx, bool quantum) { UNIT_ASSERT_VALUES_EQUAL(0, ch->GetStats()->RowsOut); int req = 0; - ui32 expected[] = {3, 3, 3}; - ui32 expectedQ[] = {3, 3, 3}; + ui32 expected[] = {2, 2, 2, 2, 1}; ui32 readChunks = 0; ui32 readRows = 0; while (readRows < 9) { NDqProto::TData data; - size_t limit = quantum ? - // packed size is 13 byte header + 2 bytes for each row - 20 : 50; - UNIT_ASSERT(ch->Pop(data, limit)); + UNIT_ASSERT(ch->Pop(data)); - ui32 v = quantum ? expectedQ[req] : expected[req]; + ui32 v = expected[req]; ++req; UNIT_ASSERT_VALUES_EQUAL(v, data.GetRows()); @@ -180,16 +174,15 @@ void TestPartialRead(TTestContext& ctx, bool quantum) { } NDqProto::TData data; - UNIT_ASSERT(!ch->Pop(data, 1000)); + UNIT_ASSERT(!ch->Pop(data)); } -void TestOverflow(TTestContext& ctx, bool quantum) { +void TestOverflow(TTestContext& ctx) { TDqOutputChannelSettings settings; - settings.MaxStoredBytes = 100; + settings.MaxStoredBytes = 30; settings.MaxChunkBytes = 10; settings.CollectProfileStats = true; settings.TransportVersion = ctx.TransportVersion; - settings.AllowGeneratorsInUnboxedValues = quantum; auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log); @@ -212,13 +205,12 @@ void TestOverflow(TTestContext& ctx, bool quantum) { } } -void TestPopAll(TTestContext& ctx, bool quantum) { +void TestPopAll(TTestContext& ctx) { TDqOutputChannelSettings settings; settings.MaxStoredBytes = 1000; settings.MaxChunkBytes = 10; settings.CollectProfileStats = true; settings.TransportVersion = ctx.TransportVersion; - settings.AllowGeneratorsInUnboxedValues = quantum; auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log); @@ -247,16 +239,15 @@ void TestPopAll(TTestContext& ctx, bool quantum) { } data.Clear(); - UNIT_ASSERT(!ch->Pop(data, 100'500)); + UNIT_ASSERT(!ch->Pop(data)); } -void TestBigRow(TTestContext& ctx, bool quantum) { +void TestBigRow(TTestContext& ctx) { TDqOutputChannelSettings settings; settings.MaxStoredBytes = std::numeric_limits<ui32>::max(); settings.MaxChunkBytes = 2_MB; settings.CollectProfileStats = true; settings.TransportVersion = ctx.TransportVersion; - settings.AllowGeneratorsInUnboxedValues = quantum; auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log); @@ -277,12 +268,31 @@ void TestBigRow(TTestContext& ctx, bool quantum) { UNIT_ASSERT_VALUES_EQUAL(9, ch->GetStats()->RowsIn); UNIT_ASSERT_VALUES_EQUAL(0, ch->GetStats()->RowsOut); - for (ui32 i = 1; i < 10; ++i) { + { NDqProto::TData data; - UNIT_ASSERT(ch->Pop(data, 1_MB)); + UNIT_ASSERT(ch->Pop(data)); + + UNIT_ASSERT_VALUES_EQUAL(2, data.GetRows()); + UNIT_ASSERT_VALUES_EQUAL(1, ch->GetStats()->Chunks); + UNIT_ASSERT_VALUES_EQUAL(9, ch->GetStats()->RowsIn); + UNIT_ASSERT_VALUES_EQUAL(2, ch->GetStats()->RowsOut); + + TUnboxedValueVector buffer; + ctx.Ds.Deserialize(data, ctx.OutputType, buffer); + + UNIT_ASSERT_VALUES_EQUAL(2, buffer.size()); + for (ui32 i = 1; i < 3; ++i) { + UNIT_ASSERT_VALUES_EQUAL(i, buffer[i - 1].GetElement(0).Get<i32>()); + UNIT_ASSERT_VALUES_EQUAL(i * i, buffer[i - 1].GetElement(1).Get<ui64>()); + } + } + + for (ui32 i = 3; i < 10; ++i) { + NDqProto::TData data; + UNIT_ASSERT(ch->Pop(data)); UNIT_ASSERT_VALUES_EQUAL(1, data.GetRows()); - UNIT_ASSERT_VALUES_EQUAL(i, ch->GetStats()->Chunks); + UNIT_ASSERT_VALUES_EQUAL(i - 1, ch->GetStats()->Chunks); UNIT_ASSERT_VALUES_EQUAL(9, ch->GetStats()->RowsIn); UNIT_ASSERT_VALUES_EQUAL(i, ch->GetStats()->RowsOut); @@ -295,17 +305,15 @@ void TestBigRow(TTestContext& ctx, bool quantum) { } NDqProto::TData data; - UNIT_ASSERT(!ch->Pop(data, 10_MB)); + UNIT_ASSERT(!ch->Pop(data)); } - void TestSpillWithMockStorage(TTestContext& ctx) { TDqOutputChannelSettings settings; settings.MaxStoredBytes = 100; - settings.MaxChunkBytes = 10; + settings.MaxChunkBytes = 20; settings.CollectProfileStats = true; settings.TransportVersion = ctx.TransportVersion; - settings.AllowGeneratorsInUnboxedValues = false; auto storage = MakeIntrusive<TMockChannelStorage>(100'500ul); settings.ChannelStorage = storage; @@ -318,74 +326,35 @@ void TestSpillWithMockStorage(TTestContext& ctx) { ch->Push(std::move(row)); } - UNIT_ASSERT_VALUES_EQUAL(7, ch->GetValuesCount(/* inMemoryOnly */ true)); - UNIT_ASSERT_VALUES_EQUAL(35, ch->GetValuesCount(/* inMemoryOnly */ false)); + UNIT_ASSERT_VALUES_EQUAL(35, ch->GetValuesCount()); UNIT_ASSERT_VALUES_EQUAL(35, ch->GetStats()->RowsIn); UNIT_ASSERT_VALUES_EQUAL(0, ch->GetStats()->RowsOut); - UNIT_ASSERT_VALUES_EQUAL(35 - 7, ch->GetStats()->SpilledRows); - UNIT_ASSERT_VALUES_EQUAL(35 - 7, ch->GetStats()->SpilledBlobs); - UNIT_ASSERT(ch->GetStats()->SpilledBytes > 200); + UNIT_ASSERT_VALUES_EQUAL(18, ch->GetStats()->SpilledRows); + UNIT_ASSERT_VALUES_EQUAL(5, ch->GetStats()->SpilledBlobs); + UNIT_ASSERT(ch->GetStats()->SpilledBytes > 5 * 8); ui32 loadedRows = 0; - storage->SetBlankGetRequests(2); - - { - Cerr << "-- pop rows before spilled ones\n"; - NDqProto::TData data; - while (ch->Pop(data, 1000)) { - TUnboxedValueVector buffer; - ctx.Ds.Deserialize(data, ctx.OutputType, buffer); - UNIT_ASSERT_VALUES_EQUAL(data.GetRows(), buffer.size()); - for (ui32 i = 0; i < data.GetRows(); ++i) { - auto j = loadedRows + i; - UNIT_ASSERT_VALUES_EQUAL(j, buffer[i].GetElement(0).Get<i32>()); - UNIT_ASSERT_VALUES_EQUAL(j * j, buffer[i].GetElement(1).Get<ui64>()); - } + NDqProto::TData data; + while (ch->Pop(data)) { + TUnboxedValueVector buffer; + ctx.Ds.Deserialize(data, ctx.OutputType, buffer); - loadedRows += data.GetRows(); + UNIT_ASSERT_VALUES_EQUAL(data.GetRows(), buffer.size()); + for (ui32 i = 0; i < data.GetRows(); ++i) { + auto j = loadedRows + i; + UNIT_ASSERT_VALUES_EQUAL(j, buffer[i].GetElement(0).Get<i32>()); + UNIT_ASSERT_VALUES_EQUAL(j * j, buffer[i].GetElement(1).Get<ui64>()); } - } - - UNIT_ASSERT_VALUES_EQUAL(7 - loadedRows, ch->GetValuesCount(/* inMemoryOnly */ true)); - UNIT_ASSERT_VALUES_EQUAL(35 - loadedRows, ch->GetValuesCount(/* inMemoryOnly */ false)); - - // just blank request - { - NDqProto::TData data; - UNIT_ASSERT(!ch->Pop(data, 1000)); - - UNIT_ASSERT_VALUES_EQUAL(7 - loadedRows, ch->GetValuesCount(/* inMemoryOnly */ true)); - UNIT_ASSERT_VALUES_EQUAL(35 - loadedRows, ch->GetValuesCount(/* inMemoryOnly */ false)); - } - - while (loadedRows < 35) { - NDqProto::TData data; - while (ch->Pop(data, 10)) { - storage->SetBlankGetRequests(1); - - TUnboxedValueVector buffer; - ctx.Ds.Deserialize(data, ctx.OutputType, buffer); - UNIT_ASSERT_VALUES_EQUAL(data.GetRows(), buffer.size()); - for (ui32 i = 0; i < data.GetRows(); ++i) { - auto j = loadedRows + i; - UNIT_ASSERT_VALUES_EQUAL(j, buffer[i].GetElement(0).Get<i32>()); - UNIT_ASSERT_VALUES_EQUAL(j * j, buffer[i].GetElement(1).Get<ui64>()); - } - - loadedRows += data.GetRows(); - - UNIT_ASSERT_VALUES_EQUAL(35 - loadedRows, ch->GetValuesCount(/* inMemoryOnly */ false)); - } + loadedRows += data.GetRows(); } - UNIT_ASSERT_VALUES_EQUAL(35, loadedRows); + UNIT_ASSERT_VALUES_EQUAL(0, ch->GetValuesCount()); // in memory only { - storage->SetBlankGetRequests(0); loadedRows = 0; for (i32 i = 100; i < 105; ++i) { @@ -394,11 +363,10 @@ void TestSpillWithMockStorage(TTestContext& ctx) { ch->Push(std::move(row)); } - UNIT_ASSERT_VALUES_EQUAL(5, ch->GetValuesCount(/* inMemoryOnly */ true)); - UNIT_ASSERT_VALUES_EQUAL(5, ch->GetValuesCount(/* inMemoryOnly */ false)); + UNIT_ASSERT_VALUES_EQUAL(5, ch->GetValuesCount()); NDqProto::TData data; - while (ch->Pop(data, 1000)) { + while (ch->Pop(data)) { TUnboxedValueVector buffer; ctx.Ds.Deserialize(data, ctx.OutputType, buffer); @@ -411,19 +379,17 @@ void TestSpillWithMockStorage(TTestContext& ctx) { loadedRows += data.GetRows(); } + UNIT_ASSERT_VALUES_EQUAL(5, loadedRows); + UNIT_ASSERT_VALUES_EQUAL(0, ch->GetValuesCount()); } - - UNIT_ASSERT_VALUES_EQUAL(0, ch->GetValuesCount(/* inMemoryOnly */ true)); - UNIT_ASSERT_VALUES_EQUAL(0, ch->GetValuesCount(/* inMemoryOnly */ false)); } void TestOverflowWithMockStorage(TTestContext& ctx) { TDqOutputChannelSettings settings; - settings.MaxStoredBytes = 100; + settings.MaxStoredBytes = 500; settings.MaxChunkBytes = 10; settings.CollectProfileStats = true; settings.TransportVersion = ctx.TransportVersion; - settings.AllowGeneratorsInUnboxedValues = false; auto storage = MakeIntrusive<TMockChannelStorage>(500ul); settings.ChannelStorage = storage; @@ -441,66 +407,40 @@ void TestOverflowWithMockStorage(TTestContext& ctx) { // UNIT_ASSERT(ch->IsFull()); it can be false-negative with storage enabled try { - auto row = ctx.CreateRow(100'500); - ch->Push(std::move(row)); + ch->Push(ctx.CreateBigRow(0, 100'500)); UNIT_FAIL(""); - } catch (yexception& e) { + } catch (yexception &e) { UNIT_ASSERT(TString(e.what()).Contains("Space limit exceeded")); } } } // anonymous namespace -Y_UNIT_TEST_SUITE(DqOutputChannelNoStorageTests) { +Y_UNIT_TEST_SUITE(DqOutputChannelTests) { Y_UNIT_TEST(SingleRead) { TTestContext ctx; - TestSingleRead(ctx, false); -} - -Y_UNIT_TEST(SingleReadQ) { - TTestContext ctx; - TestSingleRead(ctx, true); + TestSingleRead(ctx); } Y_UNIT_TEST(PartialRead) { TTestContext ctx; - TestPartialRead(ctx, false); -} - -Y_UNIT_TEST(PartialReadQ) { - TTestContext ctx; - TestPartialRead(ctx, true); + TestPartialRead(ctx); } Y_UNIT_TEST(Overflow) { TTestContext ctx; - TestOverflow(ctx, false); -} - -Y_UNIT_TEST(OverflowQ) { - TTestContext ctx; - TestOverflow(ctx, true); + TestOverflow(ctx); } Y_UNIT_TEST(PopAll) { TTestContext ctx; - TestPopAll(ctx, false); -} - -Y_UNIT_TEST(PopAllQ) { - TTestContext ctx; - TestPopAll(ctx, true); + TestPopAll(ctx); } Y_UNIT_TEST(BigRow) { TTestContext ctx(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true); - TestBigRow(ctx, false); -} - -Y_UNIT_TEST(BigRowQ) { - TTestContext ctx(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true); - TestBigRow(ctx, true); + TestBigRow(ctx); } } @@ -513,7 +453,7 @@ Y_UNIT_TEST(Spill) { } Y_UNIT_TEST(Overflow) { - TTestContext ctx; + TTestContext ctx(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true); TestOverflowWithMockStorage(ctx); } diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index ac9f9040b2c..c31f1afde3c 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -642,7 +642,6 @@ public: settings.ChunkSizeLimit = memoryLimits.ChunkSizeLimit; settings.TransportVersion = outputChannelDesc.GetTransportVersion(); settings.CollectProfileStats = Settings.CollectProfileStats; - settings.AllowGeneratorsInUnboxedValues = Settings.AllowGeneratorsInUnboxedValues; if (!outputChannelDesc.GetInMemory()) { settings.ChannelStorage = execCtx.CreateChannelStorage(channelId); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index 713b5d3c60d..c400b0de798 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -259,7 +259,6 @@ struct TDqTaskRunnerSettings { bool CollectBasicStats = false; bool CollectProfileStats = false; bool TerminateOnError = false; - bool AllowGeneratorsInUnboxedValues = true; bool UseCacheForLLVM = false; TString OptLLVM = ""; THashMap<TString, TString> SecureParams; diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp index 2872774437a..2e006b0ce4d 100644 --- a/ydb/library/yql/dq/runtime/dq_transport.cpp +++ b/ydb/library/yql/dq/runtime/dq_transport.cpp @@ -125,13 +125,6 @@ NDqProto::TData TDqDataSerializer::SerializeParamValue(const TType* type, const return data; } -ui64 TDqDataSerializer::CalcSerializedSize(NUdf::TUnboxedValue& value, const NKikimr::NMiniKQL::TType* itemType) const { - auto data = Serialize(value, itemType); - // YQL-9648 - Deserialize(data, itemType, value); - return data.GetRaw().size(); -} - namespace { std::optional<ui64> EstimateIntegralDataSize(const TDataType* dataType) { diff --git a/ydb/library/yql/dq/runtime/dq_transport.h b/ydb/library/yql/dq/runtime/dq_transport.h index 0d7f4eb382d..36fa0d9b263 100644 --- a/ydb/library/yql/dq/runtime/dq_transport.h +++ b/ydb/library/yql/dq/runtime/dq_transport.h @@ -26,22 +26,17 @@ public: template <class TForwardIterator> NDqProto::TData Serialize(TForwardIterator first, TForwardIterator last, const NKikimr::NMiniKQL::TType* itemType) const { - return Serialize(first, last, itemType, {}); - } - - template <class TForwardIterator> - NDqProto::TData Serialize(TForwardIterator& first, TForwardIterator last, const NKikimr::NMiniKQL::TType* itemType, TMaybe<size_t> limit) const { const auto listType = NKikimr::NMiniKQL::TListType::Create(const_cast<NKikimr::NMiniKQL::TType*>(itemType), TypeEnv); if (TransportVersion == NDqProto::DATA_TRANSPORT_VERSION_UNSPECIFIED || TransportVersion == NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0) { NKikimr::NMiniKQL::TValuePackerTransport<false> packer(listType); - return SerializeBatch(packer, first, last, limit); + return SerializeBatch(packer, first, last); } if (TransportVersion == NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0) { NKikimr::NMiniKQL::TValuePackerTransport<true> packer(listType); - return SerializeBatch(packer, first, last, limit); + return SerializeBatch(packer, first, last); } YQL_ENSURE(false, "Unsupported TransportVersion"); } @@ -50,8 +45,6 @@ public: NKikimr::NMiniKQL::TUnboxedValueVector& buffer) const; void Deserialize(const NDqProto::TData& data, const NKikimr::NMiniKQL::TType* itemType, NUdf::TUnboxedValue& value) const; - ui64 CalcSerializedSize(NUdf::TUnboxedValue& value, const NKikimr::NMiniKQL::TType* type) const; - struct TEstimateSizeSettings { bool WithHeaders; bool DiscardUnsupportedTypes; @@ -75,14 +68,10 @@ public: const NDqProto::EDataTransportVersion TransportVersion; private: template <class TForwardIterator, class TPacker> - NDqProto::TData SerializeBatch(TPacker& packer, TForwardIterator& first, TForwardIterator last, TMaybe<size_t> limit) const { + NDqProto::TData SerializeBatch(TPacker& packer, TForwardIterator first, TForwardIterator last) const { size_t count = 0; while (first != last) { packer.AddItem(*first); - if (limit.Defined() && count && packer.PackedSizeEstimate() > *limit) { - packer.Rollback(*first); - break; - } ++first; ++count; } diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp index 3cc224c7439..5ed7cf03acf 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp @@ -927,7 +927,7 @@ void TValuePackerTransport<Fast>::UnpackBatch(TStringBuf buf, const THolderFacto len = NDetails::GetRawData<ui64>(buf); } - result.reserve(len); + result.reserve(len + result.size()); for (ui64 i = 0; i < len; ++i) { result.emplace_back(UnpackFromContigousBuffer<Fast>(itemType, buf, topLength, holderFactory, s)); } @@ -962,12 +962,6 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddItem(const NUdf::TU State_.OptionalUsageMask.Reset(); Buffer_.ReserveHeader(sizeof(ui32) + State_.OptionalMaskReserve + MAX_PACKED64_SIZE); } - Rollback_.ConstructInPlace(); - } - - Rollback_->BufferSize = Buffer_.Size(); - if constexpr (!Fast) { - Rollback_->OptionalCount = State_.OptionalUsageMask.GetOptionalCount(); } PackImpl<Fast, false>(itemType, Buffer_, value, State_); @@ -976,14 +970,9 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddItem(const NUdf::TU } template<bool Fast> -void TValuePackerTransport<Fast>::Rollback(NUdf::TUnboxedValuePod& value) { - MKQL_ENSURE(Rollback_.Defined() && ItemCount_ > 0, "AddItem() was never called or RemoveLastItem() is called twice in a row"); - Buffer_.Resize(Rollback_->BufferSize); - if constexpr (!Fast) { - State_.OptionalUsageMask.Shrink(Rollback_->OptionalCount); - } - --ItemCount_; - Rollback_ = {}; +void TValuePackerTransport<Fast>::Clear() { + Buffer_.Clear(); + ItemCount_ = 0; } template<bool Fast> @@ -997,7 +986,6 @@ const TPagedBuffer& TValuePackerTransport<Fast>::Finish() { BuildMeta(true); } ItemCount_ = 0; - Rollback_ = {}; return Buffer_; } diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h index b388ded4db9..cc4fabc071d 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h @@ -78,11 +78,10 @@ public: // incremental packing - works only for List<T> type TSelf& AddItem(const NUdf::TUnboxedValuePod& value); - // can only be called once - void Rollback(NUdf::TUnboxedValuePod& value); size_t PackedSizeEstimate() const { return Buffer_.Size() + Buffer_.ReservedHeaderSize(); } + void Clear(); const TPagedBuffer& Finish(); TPagedBuffer FinishAndPull(); @@ -93,14 +92,8 @@ public: private: void BuildMeta(bool addItemCount) const; - struct TRollbackState { - ui32 OptionalCount = 0; - size_t BufferSize = 0; - }; - const TType* const Type_; ui64 ItemCount_ = 0; - TMaybe<TRollbackState> Rollback_; mutable TPagedBuffer Buffer_; mutable NDetails::TPackerState State_; }; diff --git a/ydb/library/yql/minikql/computation/mkql_optional_usage_mask.h b/ydb/library/yql/minikql/computation/mkql_optional_usage_mask.h index 060423f5876..91d29d978f1 100644 --- a/ydb/library/yql/minikql/computation/mkql_optional_usage_mask.h +++ b/ydb/library/yql/minikql/computation/mkql_optional_usage_mask.h @@ -32,16 +32,6 @@ public: } } - ui32 GetOptionalCount() const { - return CountOfOptional; - } - - void Shrink(ui32 newSize) { - Y_VERIFY(newSize <= CountOfOptional, "Invalid shrink size"); - Mask.Reset(newSize, CountOfOptional); - CountOfOptional = newSize; - } - void SetNextEmptyOptional(bool empty) { if (empty) { Mask.Set(CountOfOptional); diff --git a/ydb/library/yql/minikql/mkql_buffer.cpp b/ydb/library/yql/minikql/mkql_buffer.cpp index 765b94b460a..824b4841710 100644 --- a/ydb/library/yql/minikql/mkql_buffer.cpp +++ b/ydb/library/yql/minikql/mkql_buffer.cpp @@ -28,7 +28,6 @@ void TPagedBuffer::AppendPage() { page->Clear(); } else { page = TBufferPage::Allocate(); - page->Prev_ = tailPage; tailPage->Next_ = page; } tailPage->Size_ = TailSize_; diff --git a/ydb/library/yql/minikql/mkql_buffer.h b/ydb/library/yql/minikql/mkql_buffer.h index 2a505c600d3..fa5690514c6 100644 --- a/ydb/library/yql/minikql/mkql_buffer.h +++ b/ydb/library/yql/minikql/mkql_buffer.h @@ -25,18 +25,10 @@ public: return Next_; } - inline const TBufferPage* Prev() const { - return Prev_; - } - inline TBufferPage* Next() { return Next_; } - inline TBufferPage* Prev() { - return Prev_; - } - inline size_t Size() const { return Size_; } @@ -55,7 +47,6 @@ public: private: TBufferPage* Next_ = nullptr; - TBufferPage* Prev_ = nullptr; size_t Size_ = 0; static TBufferPage* Allocate(); @@ -194,25 +185,6 @@ public: TailSize_ -= len; } - inline void Resize(size_t newSize) { - size_t sz = Size(); - if (newSize >= sz) { - return Advance(newSize - sz); - } - size_t delta = sz - newSize; - Y_VERIFY_DEBUG(Tail_); - while (delta > TailSize_) { - delta -= TailSize_; - auto prev = TBufferPage::GetPage(Tail_)->Prev(); - Y_VERIFY_DEBUG(prev); - Y_VERIFY_DEBUG(prev->Size() <= ClosedPagesSize_); - ClosedPagesSize_ -= prev->Size(); - TailSize_ = prev->Size(); - Tail_ = prev->Data(); - } - TailSize_ -= delta; - } - inline void Advance(size_t len) { if (Y_LIKELY(TailSize_ + len <= TBufferPage::PageCapacity)) { TailSize_ += len; diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 1cf14115d03..886f619c23b 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -139,7 +139,6 @@ public: settings.SecureParams = secureParams; settings.CollectBasicStats = true; settings.CollectProfileStats = true; - settings.AllowGeneratorsInUnboxedValues = true; auto runner = NDq::MakeDqTaskRunner(executionContext, settings, {}); { @@ -154,9 +153,12 @@ public: NDq::ERunStatus status; while ((status = runner->Run()) == NDq::ERunStatus::PendingOutput || status == NDq::ERunStatus::Finished) { - NDqProto::TData data; - if (runner->GetOutputChannel(0)->PopAll(data) && !fillSettings.Discard) { - rows.push_back(data); + if (!fillSettings.Discard) { + NDqProto::TData data; + while (runner->GetOutputChannel(0)->Pop(data)) { + rows.push_back(std::move(data)); + data = {}; + } } if (status == NDq::ERunStatus::Finished) { break; diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index 04d81b577fd..5a7be9b85d4 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -472,7 +472,7 @@ public: NDqProto::TPopResponse response; - response.SetResult(channel->Pop(*response.MutableData(), 5 << 20)); + response.SetResult(channel->Pop(*response.MutableData())); UpdateOutputChannelStats(channelId); QueryStat.FlushCounters(response); response.MutableStats()->PackFrom(GetStats(taskId)); @@ -752,7 +752,6 @@ public: settings.CollectBasicStats = true; settings.CollectProfileStats = true; settings.TerminateOnError = TerminateOnError; - settings.AllowGeneratorsInUnboxedValues = true; for (const auto& x: taskMeta.GetSecureParams()) { settings.SecureParams[x.first] = x.second; YQL_CLOG(DEBUG, ProviderDq) << "SecureParam " << x.first << ":XXX"; diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp index bcda5b21199..8c5e80cbbe2 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp @@ -84,9 +84,9 @@ public: { } [[nodiscard]] - NDqProto::TPopResponse Pop(NDqProto::TData& data, ui64 bytes) override { + NDqProto::TPopResponse Pop(NDqProto::TData& data) override { NDqProto::TPopResponse response; - response.SetResult(Channel->Pop(data, bytes)); + response.SetResult(Channel->Pop(data)); if (Channel->IsFinished()) { UpdateOutputChannelStats(); QueryStat.FlushCounters(response); @@ -262,7 +262,6 @@ public: settings.TerminateOnError = TerminateOnError; settings.CollectBasicStats = true; settings.CollectProfileStats = true; - settings.AllowGeneratorsInUnboxedValues = true; Yql::DqsProto::TTaskMeta taskMeta; task.GetMeta().UnpackTo(&taskMeta); diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index 721ab9ebc29..e85962746f6 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -838,7 +838,7 @@ public: { } [[nodiscard]] - NDqProto::TPopResponse Pop(NDqProto::TData& data, ui64) override { + NDqProto::TPopResponse Pop(NDqProto::TData& data) override { NDqProto::TCommandHeader header; header.SetVersion(1); header.SetCommand(NDqProto::TCommandHeader::POP); @@ -896,8 +896,7 @@ public: return ChannelId; } - ui64 GetValuesCount(bool inMemoryOnly) const override { - Y_UNUSED(inMemoryOnly); + ui64 GetValuesCount() const override { ythrow yexception() << "unimplemented"; } @@ -952,9 +951,9 @@ public: } // can throw TDqChannelStorageException [[nodiscard]] - bool Pop(NDqProto::TData& data, ui64 bytes) override { + bool Pop(NDqProto::TData& data) override { try { - auto response = Delegate->Pop(data, bytes); + auto response = Delegate->Pop(data); return response.GetResult(); } catch (...) { TaskRunner->RaiseException(); @@ -979,11 +978,6 @@ public: Y_UNUSED(data); ythrow yexception() << "unimplemented"; } - - bool PopAll(NKikimr::NMiniKQL::TUnboxedValueVector& rows) override { - Y_UNUSED(rows); - ythrow yexception() << "unimplemented"; - } // |> ui64 Drop() override { diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h index fec8f2c08f1..06aac8ca3f5 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h @@ -43,7 +43,7 @@ public: virtual ~IOutputChannel() = default; [[nodiscard]] - virtual NDqProto::TPopResponse Pop(NDqProto::TData& data, ui64 bytes) = 0; + virtual NDqProto::TPopResponse Pop(NDqProto::TData& data) = 0; virtual bool IsFinished() const = 0; }; diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index e5415bf4685..b2d41827aad 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -317,7 +317,7 @@ private: NDqProto::TPopResponse response; for (;maxChunks && remain > 0 && !isFinished && hasData; maxChunks--, remain -= dataSize) { NDqProto::TData data; - const auto lastPop = std::move(channel->Pop(data, remain)); + const auto lastPop = std::move(channel->Pop(data)); for (auto& metric : lastPop.GetMetric()) { *response.AddMetric() = metric; |