aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIvan <ilezhankin@yandex-team.ru>2024-12-11 23:07:29 +0300
committerGitHub <noreply@github.com>2024-12-11 20:07:29 +0000
commit58620f0d65b5b5013147d6b23ca34747fa7000b5 (patch)
tree8cb15610de7de77dec8d5e38f49345523cf2a594
parent638028c35bf99b471df74727997c8881893cb3eb (diff)
downloadydb-58620f0d65b5b5013147d6b23ca34747fa7000b5.tar.gz
Add config option for ArrayBufferMinFillPercentage (#12520)
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp1
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h8
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp5
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h1
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp8
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp8
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.h2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp1
-rw-r--r--ydb/core/kqp/node_service/kqp_node_service.cpp3
-rw-r--r--ydb/core/kqp/runtime/kqp_tasks_runner.cpp4
-rw-r--r--ydb/core/kqp/runtime/kqp_tasks_runner.h3
-rw-r--r--ydb/core/protos/table_service_config.proto6
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.h1
-rw-r--r--ydb/library/yql/dq/proto/dq_tasks.proto1
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_consumer.cpp12
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_consumer.h3
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp10
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h6
20 files changed, 63 insertions, 26 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
index 737b2cadd44..0df7f95021a 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
@@ -118,6 +118,7 @@ public:
memoryLimits.MkqlHeavyProgramMemoryLimit = MkqlHeavyProgramMemoryLimit.load();
memoryLimits.MinMemAllocSize = MinMemAllocSize.load();
memoryLimits.MinMemFreeSize = MinMemFreeSize.load();
+ memoryLimits.ArrayBufferMinFillPercentage = args.Task->GetArrayBufferMinFillPercentage();
auto estimation = ResourceManager_->EstimateTaskResources(*args.Task, args.NumberOfTasks);
NRm::TKqpResourcesRequest resourcesRequest;
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
index bdfbb2eeb7d..fc68f913ace 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
@@ -14,9 +14,10 @@ using namespace NYql::NDq;
class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
public:
- TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback)
+ TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, TMaybe<ui8> minFillPercentage, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback)
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUpCallback), std::move(errorCallback))
, WithSpilling_(withSpilling)
+ , MinFillPercentage_(minFillPercentage)
{
}
@@ -25,7 +26,7 @@ public:
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
TVector<IDqOutput::TPtr>&& outputs) const override
{
- return KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs));
+ return KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs), MinFillPercentage_);
}
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const override {
@@ -33,7 +34,8 @@ public:
}
private:
- bool WithSpilling_;
+ const bool WithSpilling_;
+ const TMaybe<ui8> MinFillPercentage_;
};
} // namespace NKqp
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index 271314b0e53..d87c6497863 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -20,6 +20,7 @@ TKqpComputeActor::TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqPro
, ComputeCtx(settings.StatsMode)
, FederatedQuerySetup(federatedQuerySetup)
, BlockTrackingMode(mode)
+ , ArrayBufferMinFillPercentage(memoryLimits.ArrayBufferMinFillPercentage)
{
InitializeTask();
if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) {
@@ -77,7 +78,7 @@ void TKqpComputeActor::DoBootstrap() {
auto wakeupCallback = [this]{ ContinueExecute(); };
auto errorCallback = [this](const TString& error){ SendError(error); };
try {
- PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeupCallback), std::move(errorCallback)));
+ PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, ArrayBufferMinFillPercentage, std::move(wakeupCallback), std::move(errorCallback)));
} catch (const NMiniKQL::TKqpEnsureFail& e) {
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
return;
@@ -173,7 +174,7 @@ void TKqpComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, bool
// TODO: CpuTime
}
- if (auto* x = ScanData->ProfileStats.get()) {
+ if (ScanData->ProfileStats) {
// save your profile stats here
}
}
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h
index c3f94d4eef0..18832925815 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h
@@ -63,6 +63,7 @@ private:
const TDqTaskRunnerParameterProvider ParameterProvider;
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode;
+ const TMaybe<ui8> ArrayBufferMinFillPercentage;
};
} // namespace NKqp
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 5f5fa7dcbc4..7e6c8c136cb 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -97,7 +97,7 @@ void TKqpScanComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, b
// TODO: CpuTime
}
- if (auto* x = ScanData->ProfileStats.get()) {
+ if (ScanData->ProfileStats) {
NKqpProto::TKqpTaskExtraStats taskExtraStats;
// auto scanTaskExtraStats = taskExtraStats.MutableScanTaskExtraStats();
// scanTaskExtraStats->SetRetriesCount(TotalRetries);
@@ -245,7 +245,7 @@ void TKqpScanComputeActor::DoBootstrap() {
auto wakeup = [this] { ContinueExecute(); };
auto errorCallback = [this](const TString& error){ SendError(error); };
- TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup), std::move(errorCallback)));
+ TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, MemoryLimits.ArrayBufferMinFillPercentage, std::move(wakeup), std::move(errorCallback)));
ComputeCtx.AddTableScan(0, Meta, GetStatsMode());
ScanData = &ComputeCtx.GetTableScan(0);
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index d46ff2a76bf..bc5f30df86a 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -146,6 +146,10 @@ public:
, BlockTrackingMode(tableServiceConfig.GetBlockTrackingMode())
, WaitCAStatsTimeout(TDuration::MilliSeconds(tableServiceConfig.GetQueryLimits().GetWaitCAStatsTimeoutMs()))
{
+ if (tableServiceConfig.HasArrayBufferMinFillPercentage()) {
+ ArrayBufferMinFillPercentage = tableServiceConfig.GetArrayBufferMinFillPercentage();
+ }
+
Target = creator;
YQL_ENSURE(!TxManager || tableServiceConfig.GetEnableOltpSink());
@@ -2671,7 +2675,8 @@ private:
.MayRunTasksLocally = mayRunTasksLocally,
.ResourceManager_ = Request.ResourceManager_,
.CaFactory_ = Request.CaFactory_,
- .BlockTrackingMode = BlockTrackingMode
+ .BlockTrackingMode = BlockTrackingMode,
+ .ArrayBufferMinFillPercentage = ArrayBufferMinFillPercentage,
});
auto err = Planner->PlanExecution();
@@ -2994,6 +2999,7 @@ private:
const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode;
const TDuration WaitCAStatsTimeout;
+ TMaybe<ui8> ArrayBufferMinFillPercentage;
};
} // namespace
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index dc376b97841..09d50760783 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -110,6 +110,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
, ResourceManager_(args.ResourceManager_)
, CaFactory_(args.CaFactory_)
, BlockTrackingMode(args.BlockTrackingMode)
+ , ArrayBufferMinFillPercentage(args.ArrayBufferMinFillPercentage)
{
if (GUCSettings) {
SerializedGUCSettings = GUCSettings->SerializeToString();
@@ -216,6 +217,9 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
for (ui64 taskId : requestData.TaskIds) {
const auto& task = TasksGraph.GetTask(taskId);
NYql::NDqProto::TDqTask* serializedTask = ArenaSerializeTaskToProto(TasksGraph, task, /* serializeAsyncIoSettings = */ true);
+ if (ArrayBufferMinFillPercentage) {
+ serializedTask->SetArrayBufferMinFillPercentage(*ArrayBufferMinFillPercentage);
+ }
request.AddTasks()->Swap(serializedTask);
}
@@ -474,6 +478,10 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
UserRequestContext->PoolId, memoryPoolPercent, Database);
}
+ if (ArrayBufferMinFillPercentage) {
+ taskDesc->SetArrayBufferMinFillPercentage(*ArrayBufferMinFillPercentage);
+ }
+
auto startResult = CaFactory_->CreateKqpComputeActor({
.ExecuterId = ExecuterId,
.TxId = TxId,
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h
index 9546f3a9a41..a556b8198e6 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.h
+++ b/ydb/core/kqp/executer_actor/kqp_planner.h
@@ -69,6 +69,7 @@ public:
const std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager>& ResourceManager_;
const std::shared_ptr<NKikimr::NKqp::NComputeActor::IKqpNodeComputeActorFactory>& CaFactory_;
const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode;
+ const TMaybe<ui8> ArrayBufferMinFillPercentage;
};
TKqpPlanner(TKqpPlanner::TArgs&& args);
@@ -146,6 +147,7 @@ private:
TIntrusivePtr<NRm::TTxState> TxInfo;
TVector<TProgressStat> LastStats;
const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode;
+ const TMaybe<ui8> ArrayBufferMinFillPercentage;
public:
static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index fd41546ee2f..1151910b946 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -315,6 +315,7 @@ private:
.MayRunTasksLocally = false,
.ResourceManager_ = Request.ResourceManager_,
.CaFactory_ = Request.CaFactory_
+ // TODO: BlockTrackingMode is not set!
});
LOG_D("Execute scan tx, PendingComputeTasks: " << TasksGraph.GetTasks().size());
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp
index b0b6bf4cddc..0d6dc18e042 100644
--- a/ydb/core/kqp/node_service/kqp_node_service.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_service.cpp
@@ -217,7 +217,7 @@ private:
} else {
schedulerGroup = "";
}
- }
+ }
std::optional<ui64> querySchedulerGroup;
if (msg.HasQueryCpuShare() && schedulerGroup) {
@@ -276,6 +276,7 @@ private:
.ComputesByStages = &computesByStage,
.State = State_,
.SchedulingOptions = std::move(schedulingTaskOptions),
+ // TODO: block tracking mode is not set!
});
if (const auto* rmResult = std::get_if<NRm::TKqpRMAllocateResult>(&result)) {
diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
index 288bde24f58..51349923edb 100644
--- a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
+++ b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
@@ -24,7 +24,7 @@ using namespace NDq;
IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const TType* type,
NUdf::IApplyContext* applyCtx, const TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
- TVector<IDqOutput::TPtr>&& outputs)
+ TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui8> minFillPercentage)
{
switch (outputDesc.GetTypeCase()) {
case NDqProto::TTaskOutput::kRangePartition: {
@@ -62,7 +62,7 @@ IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outp
}
default: {
- return DqBuildOutputConsumer(outputDesc, type, typeEnv, holderFactory, std::move(outputs));
+ return DqBuildOutputConsumer(outputDesc, type, typeEnv, holderFactory, std::move(outputs), minFillPercentage);
}
}
}
diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.h b/ydb/core/kqp/runtime/kqp_tasks_runner.h
index 0ddf86137c3..c6df1352c9a 100644
--- a/ydb/core/kqp/runtime/kqp_tasks_runner.h
+++ b/ydb/core/kqp/runtime/kqp_tasks_runner.h
@@ -10,7 +10,8 @@ namespace NKqp {
NYql::NDq::IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NYql::NDqProto::TTaskOutput& outputDesc,
const NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
- TVector<NYql::NDq::IDqOutput::TPtr>&& outputs);
+ TVector<NYql::NDq::IDqOutput::TPtr>&& outputs,
+ TMaybe<ui8> minFillPercentage);
class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCopyable {
diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto
index 24c2300d2ca..e8412b4eaea 100644
--- a/ydb/core/protos/table_service_config.proto
+++ b/ydb/core/protos/table_service_config.proto
@@ -350,4 +350,10 @@ message TTableServiceConfig {
optional EBlockTrackingMode BlockTrackingMode = 73 [ default = BLOCK_TRACKING_SERIALIZE ];
optional bool AllowOlapDataQuery = 74 [default = false];
+
+ // DISCLAIMER: do not change this value if you don't know what you are doing!
+ optional uint32 ArrayBufferMinFillPercentage = 75 [default = 100];
+ // This value used for `arrow::Array`s built inside TDqOutputHashPartitionConsumerBlock -
+ // if the underlying array buffer is filled less than this value, then the buffer's capacity gets shrunk to actual size.
+ // Otherwise, we potentially don't track the real buffer capacity and it may lead to OOM situations inside DqOutputChannel's.
};
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index e07d8b9ad49..2959a0f34e1 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -1002,7 +1002,7 @@ namespace {
class TKqpTaskRunnerExecutionContext: public NDq::IDqTaskRunnerExecutionContext {
public:
NDq::IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector<NDq::IDqOutput::TPtr>&& outputs) const override {
- return NKqp::KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs));
+ return NKqp::KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs), {});
}
NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, bool /* withSpilling */) const override {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
index 1d2f5ddd226..e9404a8cda6 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
@@ -376,6 +376,7 @@ struct TComputeMemoryLimits {
ui64 MinMemAllocSize = 30_MB;
ui64 MinMemFreeSize = 30_MB;
ui64 OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
+ TMaybe<ui8> ArrayBufferMinFillPercentage; // Used by DqOutputHashPartitionConsumer
IMemoryQuotaManager::TPtr MemoryQuotaManager;
};
diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto
index c37d76be90c..aeda89a74db 100644
--- a/ydb/library/yql/dq/proto/dq_tasks.proto
+++ b/ydb/library/yql/dq/proto/dq_tasks.proto
@@ -209,4 +209,5 @@ message TDqTask {
repeated bytes ReadRanges = 17;
map<string, string> RequestContext = 18;
optional bool EnableSpilling = 19;
+ optional uint32 ArrayBufferMinFillPercentage = 20;
}
diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
index 397e9d46685..edf2c285d91 100644
--- a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
@@ -327,13 +327,15 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer {
public:
TDqOutputHashPartitionConsumerBlock(TVector<IDqOutput::TPtr>&& outputs, TVector<TColumnInfo>&& keyColumns,
const NKikimr::NMiniKQL::TType* outputType,
- const NKikimr::NMiniKQL::THolderFactory& holderFactory)
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ TMaybe<ui8> minFillPercentage)
: OutputType_(static_cast<const NMiniKQL::TMultiType*>(outputType))
, HolderFactory_(holderFactory)
, Outputs_(std::move(outputs))
, KeyColumns_(std::move(keyColumns))
, ScalarColumnHashes_(KeyColumns_.size())
, OutputWidth_(OutputType_->GetElementsCount())
+ , MinFillPercentage_(minFillPercentage)
{
TTypeInfoHelper helper;
YQL_ENSURE(OutputWidth_ > KeyColumns_.size());
@@ -516,7 +518,7 @@ private:
if (blockType->GetShape() == NMiniKQL::TBlockType::EShape::Many) {
auto itemType = blockType->GetItemType();
YQL_ENSURE(!itemType->IsPg(), "pg types are not supported yet");
- Builders_.emplace_back(MakeArrayBuilder(helper, itemType, *NYql::NUdf::GetYqlMemoryPool(), maxBlockLen, nullptr));
+ Builders_.emplace_back(MakeArrayBuilder(helper, itemType, *NYql::NUdf::GetYqlMemoryPool(), maxBlockLen, nullptr, {.MinFillPercentage=MinFillPercentage_}));
} else {
Builders_.emplace_back();
}
@@ -533,6 +535,7 @@ private:
const TVector<TColumnInfo> KeyColumns_;
TVector<TMaybe<ui64>> ScalarColumnHashes_;
const ui32 OutputWidth_;
+ const TMaybe<ui8> MinFillPercentage_;
TVector<NUdf::IBlockItemHasher::TPtr> Hashers_;
TVector<std::unique_ptr<IBlockReader>> Readers_;
@@ -601,7 +604,8 @@ IDqOutputConsumer::TPtr CreateOutputMapConsumer(IDqOutput::TPtr output) {
IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer(
TVector<IDqOutput::TPtr>&& outputs,
TVector<TColumnInfo>&& keyColumns, const NKikimr::NMiniKQL::TType* outputType,
- const NKikimr::NMiniKQL::THolderFactory& holderFactory)
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ TMaybe<ui8> minFillPercentage)
{
YQL_ENSURE(!outputs.empty());
YQL_ENSURE(!keyColumns.empty());
@@ -620,7 +624,7 @@ IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer(
return MakeIntrusive<TDqOutputHashPartitionConsumerScalar>(std::move(outputs), std::move(keyColumns), outputType);
}
- return MakeIntrusive<TDqOutputHashPartitionConsumerBlock>(std::move(outputs), std::move(keyColumns), outputType, holderFactory);
+ return MakeIntrusive<TDqOutputHashPartitionConsumerBlock>(std::move(outputs), std::move(keyColumns), outputType, holderFactory, minFillPercentage);
}
IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth) {
diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.h b/ydb/library/yql/dq/runtime/dq_output_consumer.h
index ac5d5087f48..17a817dd441 100644
--- a/ydb/library/yql/dq/runtime/dq_output_consumer.h
+++ b/ydb/library/yql/dq/runtime/dq_output_consumer.h
@@ -47,7 +47,8 @@ IDqOutputConsumer::TPtr CreateOutputMapConsumer(IDqOutput::TPtr output);
IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer(
TVector<IDqOutput::TPtr>&& outputs,
TVector<TColumnInfo>&& keyColumns, const NKikimr::NMiniKQL::TType* outputType,
- const NKikimr::NMiniKQL::THolderFactory& holderFactory);
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ TMaybe<ui8> minFillPercentage);
IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth);
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 6c7770b9a60..702e54efa0a 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -163,7 +163,7 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con
IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type,
const NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
- TVector<IDqOutput::TPtr>&& outputs)
+ TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui8> minFillPercentage)
{
TMaybe<ui32> outputWidth;
if (type->IsMulti()) {
@@ -184,7 +184,7 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu
GetColumnsInfo(type, outputDesc.GetHashPartition().GetKeyColumns(), keyColumns);
YQL_ENSURE(!keyColumns.empty());
YQL_ENSURE(outputDesc.GetHashPartition().GetPartitionsCount() == outputDesc.ChannelsSize());
- return CreateOutputHashPartitionConsumer(std::move(outputs), std::move(keyColumns), type, holderFactory);
+ return CreateOutputHashPartitionConsumer(std::move(outputs), std::move(keyColumns), type, holderFactory, minFillPercentage);
}
case NDqProto::TTaskOutput::kBroadcast: {
@@ -240,10 +240,10 @@ public:
if (Context.TypeEnv) {
YQL_ENSURE(std::addressof(alloc) == std::addressof(TypeEnv().GetAllocator()));
- } else {
+ } else {
AllocatedHolder->SelfTypeEnv = std::make_unique<TTypeEnvironment>(alloc);
}
-
+
}
~TDqTaskRunner() {
@@ -828,7 +828,7 @@ public:
const NKikimr::NMiniKQL::THolderFactory& GetHolderFactory() const override {
return AllocatedHolder->ProgramParsed.CompGraph->GetHolderFactory();
}
-
+
NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() const override {
return Alloc();
}
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index 636e8600346..0814a6b9b2e 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -211,7 +211,7 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con
IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NKikimr::NMiniKQL::TType* type,
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
- TVector<IDqOutput::TPtr>&& channels);
+ TVector<IDqOutput::TPtr>&& channels, TMaybe<ui8> minFillPercentage = {});
using TDqTaskRunnerParameterProvider = std::function<
bool(std::string_view name, NKikimr::NMiniKQL::TType* type, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
@@ -433,8 +433,8 @@ public:
};
TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(
- std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
- const TDqTaskRunnerContext& ctx,
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
+ const TDqTaskRunnerContext& ctx,
const TDqTaskRunnerSettings& settings,
const TLogFunc& logFunc
);