diff options
author | Ivan <ilezhankin@yandex-team.ru> | 2024-12-11 23:07:29 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-11 20:07:29 +0000 |
commit | 58620f0d65b5b5013147d6b23ca34747fa7000b5 (patch) | |
tree | 8cb15610de7de77dec8d5e38f49345523cf2a594 | |
parent | 638028c35bf99b471df74727997c8881893cb3eb (diff) | |
download | ydb-58620f0d65b5b5013147d6b23ca34747fa7000b5.tar.gz |
Add config option for ArrayBufferMinFillPercentage (#12520)
20 files changed, 63 insertions, 26 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp index 737b2cadd44..0df7f95021a 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp @@ -118,6 +118,7 @@ public: memoryLimits.MkqlHeavyProgramMemoryLimit = MkqlHeavyProgramMemoryLimit.load(); memoryLimits.MinMemAllocSize = MinMemAllocSize.load(); memoryLimits.MinMemFreeSize = MinMemFreeSize.load(); + memoryLimits.ArrayBufferMinFillPercentage = args.Task->GetArrayBufferMinFillPercentage(); auto estimation = ResourceManager_->EstimateTaskResources(*args.Task, args.NumberOfTasks); NRm::TKqpResourcesRequest resourcesRequest; diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h index bdfbb2eeb7d..fc68f913ace 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h @@ -14,9 +14,10 @@ using namespace NYql::NDq; class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext { public: - TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback) + TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, TMaybe<ui8> minFillPercentage, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback) : TDqTaskRunnerExecutionContext(txId, std::move(wakeUpCallback), std::move(errorCallback)) , WithSpilling_(withSpilling) + , MinFillPercentage_(minFillPercentage) { } @@ -25,7 +26,7 @@ public: const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector<IDqOutput::TPtr>&& outputs) const override { - return KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs)); + return KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs), MinFillPercentage_); } IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const override { @@ -33,7 +34,8 @@ public: } private: - bool WithSpilling_; + const bool WithSpilling_; + const TMaybe<ui8> MinFillPercentage_; }; } // namespace NKqp diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index 271314b0e53..d87c6497863 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -20,6 +20,7 @@ TKqpComputeActor::TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqPro , ComputeCtx(settings.StatsMode) , FederatedQuerySetup(federatedQuerySetup) , BlockTrackingMode(mode) + , ArrayBufferMinFillPercentage(memoryLimits.ArrayBufferMinFillPercentage) { InitializeTask(); if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) { @@ -77,7 +78,7 @@ void TKqpComputeActor::DoBootstrap() { auto wakeupCallback = [this]{ ContinueExecute(); }; auto errorCallback = [this](const TString& error){ SendError(error); }; try { - PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeupCallback), std::move(errorCallback))); + PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, ArrayBufferMinFillPercentage, std::move(wakeupCallback), std::move(errorCallback))); } catch (const NMiniKQL::TKqpEnsureFail& e) { InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage()); return; @@ -173,7 +174,7 @@ void TKqpComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, bool // TODO: CpuTime } - if (auto* x = ScanData->ProfileStats.get()) { + if (ScanData->ProfileStats) { // save your profile stats here } } diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h index c3f94d4eef0..18832925815 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h @@ -63,6 +63,7 @@ private: const TDqTaskRunnerParameterProvider ParameterProvider; const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup; const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode; + const TMaybe<ui8> ArrayBufferMinFillPercentage; }; } // namespace NKqp diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 5f5fa7dcbc4..7e6c8c136cb 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -97,7 +97,7 @@ void TKqpScanComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, b // TODO: CpuTime } - if (auto* x = ScanData->ProfileStats.get()) { + if (ScanData->ProfileStats) { NKqpProto::TKqpTaskExtraStats taskExtraStats; // auto scanTaskExtraStats = taskExtraStats.MutableScanTaskExtraStats(); // scanTaskExtraStats->SetRetriesCount(TotalRetries); @@ -245,7 +245,7 @@ void TKqpScanComputeActor::DoBootstrap() { auto wakeup = [this] { ContinueExecute(); }; auto errorCallback = [this](const TString& error){ SendError(error); }; - TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup), std::move(errorCallback))); + TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, MemoryLimits.ArrayBufferMinFillPercentage, std::move(wakeup), std::move(errorCallback))); ComputeCtx.AddTableScan(0, Meta, GetStatsMode()); ScanData = &ComputeCtx.GetTableScan(0); diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index d46ff2a76bf..bc5f30df86a 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -146,6 +146,10 @@ public: , BlockTrackingMode(tableServiceConfig.GetBlockTrackingMode()) , WaitCAStatsTimeout(TDuration::MilliSeconds(tableServiceConfig.GetQueryLimits().GetWaitCAStatsTimeoutMs())) { + if (tableServiceConfig.HasArrayBufferMinFillPercentage()) { + ArrayBufferMinFillPercentage = tableServiceConfig.GetArrayBufferMinFillPercentage(); + } + Target = creator; YQL_ENSURE(!TxManager || tableServiceConfig.GetEnableOltpSink()); @@ -2671,7 +2675,8 @@ private: .MayRunTasksLocally = mayRunTasksLocally, .ResourceManager_ = Request.ResourceManager_, .CaFactory_ = Request.CaFactory_, - .BlockTrackingMode = BlockTrackingMode + .BlockTrackingMode = BlockTrackingMode, + .ArrayBufferMinFillPercentage = ArrayBufferMinFillPercentage, }); auto err = Planner->PlanExecution(); @@ -2994,6 +2999,7 @@ private: const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode; const TDuration WaitCAStatsTimeout; + TMaybe<ui8> ArrayBufferMinFillPercentage; }; } // namespace diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index dc376b97841..09d50760783 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -110,6 +110,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args) , ResourceManager_(args.ResourceManager_) , CaFactory_(args.CaFactory_) , BlockTrackingMode(args.BlockTrackingMode) + , ArrayBufferMinFillPercentage(args.ArrayBufferMinFillPercentage) { if (GUCSettings) { SerializedGUCSettings = GUCSettings->SerializeToString(); @@ -216,6 +217,9 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque for (ui64 taskId : requestData.TaskIds) { const auto& task = TasksGraph.GetTask(taskId); NYql::NDqProto::TDqTask* serializedTask = ArenaSerializeTaskToProto(TasksGraph, task, /* serializeAsyncIoSettings = */ true); + if (ArrayBufferMinFillPercentage) { + serializedTask->SetArrayBufferMinFillPercentage(*ArrayBufferMinFillPercentage); + } request.AddTasks()->Swap(serializedTask); } @@ -474,6 +478,10 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize) UserRequestContext->PoolId, memoryPoolPercent, Database); } + if (ArrayBufferMinFillPercentage) { + taskDesc->SetArrayBufferMinFillPercentage(*ArrayBufferMinFillPercentage); + } + auto startResult = CaFactory_->CreateKqpComputeActor({ .ExecuterId = ExecuterId, .TxId = TxId, diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 9546f3a9a41..a556b8198e6 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -69,6 +69,7 @@ public: const std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager>& ResourceManager_; const std::shared_ptr<NKikimr::NKqp::NComputeActor::IKqpNodeComputeActorFactory>& CaFactory_; const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode; + const TMaybe<ui8> ArrayBufferMinFillPercentage; }; TKqpPlanner(TKqpPlanner::TArgs&& args); @@ -146,6 +147,7 @@ private: TIntrusivePtr<NRm::TTxState> TxInfo; TVector<TProgressStat> LastStats; const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode; + const TMaybe<ui8> ArrayBufferMinFillPercentage; public: static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index fd41546ee2f..1151910b946 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -315,6 +315,7 @@ private: .MayRunTasksLocally = false, .ResourceManager_ = Request.ResourceManager_, .CaFactory_ = Request.CaFactory_ + // TODO: BlockTrackingMode is not set! }); LOG_D("Execute scan tx, PendingComputeTasks: " << TasksGraph.GetTasks().size()); diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index b0b6bf4cddc..0d6dc18e042 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -217,7 +217,7 @@ private: } else { schedulerGroup = ""; } - } + } std::optional<ui64> querySchedulerGroup; if (msg.HasQueryCpuShare() && schedulerGroup) { @@ -276,6 +276,7 @@ private: .ComputesByStages = &computesByStage, .State = State_, .SchedulingOptions = std::move(schedulingTaskOptions), + // TODO: block tracking mode is not set! }); if (const auto* rmResult = std::get_if<NRm::TKqpRMAllocateResult>(&result)) { diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp index 288bde24f58..51349923edb 100644 --- a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp +++ b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp @@ -24,7 +24,7 @@ using namespace NDq; IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const TType* type, NUdf::IApplyContext* applyCtx, const TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, - TVector<IDqOutput::TPtr>&& outputs) + TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui8> minFillPercentage) { switch (outputDesc.GetTypeCase()) { case NDqProto::TTaskOutput::kRangePartition: { @@ -62,7 +62,7 @@ IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outp } default: { - return DqBuildOutputConsumer(outputDesc, type, typeEnv, holderFactory, std::move(outputs)); + return DqBuildOutputConsumer(outputDesc, type, typeEnv, holderFactory, std::move(outputs), minFillPercentage); } } } diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.h b/ydb/core/kqp/runtime/kqp_tasks_runner.h index 0ddf86137c3..c6df1352c9a 100644 --- a/ydb/core/kqp/runtime/kqp_tasks_runner.h +++ b/ydb/core/kqp/runtime/kqp_tasks_runner.h @@ -10,7 +10,8 @@ namespace NKqp { NYql::NDq::IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NYql::NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, - TVector<NYql::NDq::IDqOutput::TPtr>&& outputs); + TVector<NYql::NDq::IDqOutput::TPtr>&& outputs, + TMaybe<ui8> minFillPercentage); class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCopyable { diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index 24c2300d2ca..e8412b4eaea 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -350,4 +350,10 @@ message TTableServiceConfig { optional EBlockTrackingMode BlockTrackingMode = 73 [ default = BLOCK_TRACKING_SERIALIZE ]; optional bool AllowOlapDataQuery = 74 [default = false]; + + // DISCLAIMER: do not change this value if you don't know what you are doing! + optional uint32 ArrayBufferMinFillPercentage = 75 [default = 100]; + // This value used for `arrow::Array`s built inside TDqOutputHashPartitionConsumerBlock - + // if the underlying array buffer is filled less than this value, then the buffer's capacity gets shrunk to actual size. + // Otherwise, we potentially don't track the real buffer capacity and it may lead to OOM situations inside DqOutputChannel's. }; diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index e07d8b9ad49..2959a0f34e1 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -1002,7 +1002,7 @@ namespace { class TKqpTaskRunnerExecutionContext: public NDq::IDqTaskRunnerExecutionContext { public: NDq::IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector<NDq::IDqOutput::TPtr>&& outputs) const override { - return NKqp::KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs)); + return NKqp::KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs), {}); } NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, bool /* withSpilling */) const override { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index 1d2f5ddd226..e9404a8cda6 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -376,6 +376,7 @@ struct TComputeMemoryLimits { ui64 MinMemAllocSize = 30_MB; ui64 MinMemFreeSize = 30_MB; ui64 OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize; + TMaybe<ui8> ArrayBufferMinFillPercentage; // Used by DqOutputHashPartitionConsumer IMemoryQuotaManager::TPtr MemoryQuotaManager; }; diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index c37d76be90c..aeda89a74db 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -209,4 +209,5 @@ message TDqTask { repeated bytes ReadRanges = 17; map<string, string> RequestContext = 18; optional bool EnableSpilling = 19; + optional uint32 ArrayBufferMinFillPercentage = 20; } diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp index 397e9d46685..edf2c285d91 100644 --- a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp @@ -327,13 +327,15 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer { public: TDqOutputHashPartitionConsumerBlock(TVector<IDqOutput::TPtr>&& outputs, TVector<TColumnInfo>&& keyColumns, const NKikimr::NMiniKQL::TType* outputType, - const NKikimr::NMiniKQL::THolderFactory& holderFactory) + const NKikimr::NMiniKQL::THolderFactory& holderFactory, + TMaybe<ui8> minFillPercentage) : OutputType_(static_cast<const NMiniKQL::TMultiType*>(outputType)) , HolderFactory_(holderFactory) , Outputs_(std::move(outputs)) , KeyColumns_(std::move(keyColumns)) , ScalarColumnHashes_(KeyColumns_.size()) , OutputWidth_(OutputType_->GetElementsCount()) + , MinFillPercentage_(minFillPercentage) { TTypeInfoHelper helper; YQL_ENSURE(OutputWidth_ > KeyColumns_.size()); @@ -516,7 +518,7 @@ private: if (blockType->GetShape() == NMiniKQL::TBlockType::EShape::Many) { auto itemType = blockType->GetItemType(); YQL_ENSURE(!itemType->IsPg(), "pg types are not supported yet"); - Builders_.emplace_back(MakeArrayBuilder(helper, itemType, *NYql::NUdf::GetYqlMemoryPool(), maxBlockLen, nullptr)); + Builders_.emplace_back(MakeArrayBuilder(helper, itemType, *NYql::NUdf::GetYqlMemoryPool(), maxBlockLen, nullptr, {.MinFillPercentage=MinFillPercentage_})); } else { Builders_.emplace_back(); } @@ -533,6 +535,7 @@ private: const TVector<TColumnInfo> KeyColumns_; TVector<TMaybe<ui64>> ScalarColumnHashes_; const ui32 OutputWidth_; + const TMaybe<ui8> MinFillPercentage_; TVector<NUdf::IBlockItemHasher::TPtr> Hashers_; TVector<std::unique_ptr<IBlockReader>> Readers_; @@ -601,7 +604,8 @@ IDqOutputConsumer::TPtr CreateOutputMapConsumer(IDqOutput::TPtr output) { IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer( TVector<IDqOutput::TPtr>&& outputs, TVector<TColumnInfo>&& keyColumns, const NKikimr::NMiniKQL::TType* outputType, - const NKikimr::NMiniKQL::THolderFactory& holderFactory) + const NKikimr::NMiniKQL::THolderFactory& holderFactory, + TMaybe<ui8> minFillPercentage) { YQL_ENSURE(!outputs.empty()); YQL_ENSURE(!keyColumns.empty()); @@ -620,7 +624,7 @@ IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer( return MakeIntrusive<TDqOutputHashPartitionConsumerScalar>(std::move(outputs), std::move(keyColumns), outputType); } - return MakeIntrusive<TDqOutputHashPartitionConsumerBlock>(std::move(outputs), std::move(keyColumns), outputType, holderFactory); + return MakeIntrusive<TDqOutputHashPartitionConsumerBlock>(std::move(outputs), std::move(keyColumns), outputType, holderFactory, minFillPercentage); } IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth) { diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.h b/ydb/library/yql/dq/runtime/dq_output_consumer.h index ac5d5087f48..17a817dd441 100644 --- a/ydb/library/yql/dq/runtime/dq_output_consumer.h +++ b/ydb/library/yql/dq/runtime/dq_output_consumer.h @@ -47,7 +47,8 @@ IDqOutputConsumer::TPtr CreateOutputMapConsumer(IDqOutput::TPtr output); IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer( TVector<IDqOutput::TPtr>&& outputs, TVector<TColumnInfo>&& keyColumns, const NKikimr::NMiniKQL::TType* outputType, - const NKikimr::NMiniKQL::THolderFactory& holderFactory); + const NKikimr::NMiniKQL::THolderFactory& holderFactory, + TMaybe<ui8> minFillPercentage); IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 6c7770b9a60..702e54efa0a 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -163,7 +163,7 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type, const NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, - TVector<IDqOutput::TPtr>&& outputs) + TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui8> minFillPercentage) { TMaybe<ui32> outputWidth; if (type->IsMulti()) { @@ -184,7 +184,7 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu GetColumnsInfo(type, outputDesc.GetHashPartition().GetKeyColumns(), keyColumns); YQL_ENSURE(!keyColumns.empty()); YQL_ENSURE(outputDesc.GetHashPartition().GetPartitionsCount() == outputDesc.ChannelsSize()); - return CreateOutputHashPartitionConsumer(std::move(outputs), std::move(keyColumns), type, holderFactory); + return CreateOutputHashPartitionConsumer(std::move(outputs), std::move(keyColumns), type, holderFactory, minFillPercentage); } case NDqProto::TTaskOutput::kBroadcast: { @@ -240,10 +240,10 @@ public: if (Context.TypeEnv) { YQL_ENSURE(std::addressof(alloc) == std::addressof(TypeEnv().GetAllocator())); - } else { + } else { AllocatedHolder->SelfTypeEnv = std::make_unique<TTypeEnvironment>(alloc); } - + } ~TDqTaskRunner() { @@ -828,7 +828,7 @@ public: const NKikimr::NMiniKQL::THolderFactory& GetHolderFactory() const override { return AllocatedHolder->ProgramParsed.CompGraph->GetHolderFactory(); } - + NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() const override { return Alloc(); } diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index 636e8600346..0814a6b9b2e 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -211,7 +211,7 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NKikimr::NMiniKQL::TType* type, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, - TVector<IDqOutput::TPtr>&& channels); + TVector<IDqOutput::TPtr>&& channels, TMaybe<ui8> minFillPercentage = {}); using TDqTaskRunnerParameterProvider = std::function< bool(std::string_view name, NKikimr::NMiniKQL::TType* type, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, @@ -433,8 +433,8 @@ public: }; TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner( - std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, - const TDqTaskRunnerContext& ctx, + std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, + const TDqTaskRunnerContext& ctx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc ); |