aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-05-23 17:28:27 +0300
committergvit <gvit@ydb.tech>2023-05-23 17:28:27 +0300
commitd526bcd286b8ec67b456cfe4355158b950352a1c (patch)
treeb5636927b1638def3e39e8f853d7bfadfd4e7471
parent2e7ed60ae97dc177d2c964c338ac10bfbc6977cf (diff)
downloadydb-d526bcd286b8ec67b456cfe4355158b950352a1c.tar.gz
store shard params in the query data
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h3
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp13
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp9
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h15
-rw-r--r--ydb/core/kqp/executer_actor/kqp_literal_executer.cpp21
-rw-r--r--ydb/core/kqp/executer_actor/kqp_partition_helper.cpp90
-rw-r--r--ydb/core/kqp/executer_actor/kqp_partition_helper.h11
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp8
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp19
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.h1
-rw-r--r--ydb/core/kqp/query_data/kqp_query_data.cpp59
-rw-r--r--ydb/core/kqp/query_data/kqp_query_data.h57
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp1
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h5
14 files changed, 157 insertions, 155 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index 798b3d6523..e61e3a177d 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -45,7 +45,8 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
- NWilson::TTraceId traceId = {});
+ NWilson::TTraceId traceId = {},
+ const NYql::NDq::TDqTaskRunnerParameterProvider& parameterProvider = {});
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index e038a56550..3889456bdf 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -37,9 +37,11 @@ public:
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
- NWilson::TTraceId traceId)
+ NWilson::TTraceId traceId,
+ const TDqTaskRunnerParameterProvider& parameterProvider)
: TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId))
, ComputeCtx(settings.StatsMode)
+ , ParameterProvider(parameterProvider)
{
if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) {
Meta.ConstructInPlace();
@@ -94,7 +96,8 @@ public:
auto wakeup = [this]{ ContinueExecute(); };
try {
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling,
- std::move(wakeup), TlsActivationContext->AsActorContext()));
+ std::move(wakeup), TlsActivationContext->AsActorContext()),
+ ParameterProvider);
} catch (const NMiniKQL::TKqpEnsureFail& e) {
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
return;
@@ -300,6 +303,7 @@ private:
TMaybe<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta> Meta;
NMiniKQL::TKqpScanComputeContext::TScanData* ScanData = nullptr;
TActorId SysViewActorId;
+ const TDqTaskRunnerParameterProvider ParameterProvider;
};
} // anonymous namespace
@@ -308,10 +312,11 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
- NWilson::TTraceId traceId)
+ NWilson::TTraceId traceId,
+ const TDqTaskRunnerParameterProvider& parameterProvider)
{
return new TKqpComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory),
- functionRegistry, settings, memoryLimits, std::move(traceId));
+ functionRegistry, settings, memoryLimits, std::move(traceId), parameterProvider);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 2adf967d56..bce4d003c9 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1318,20 +1318,12 @@ private:
YQL_ENSURE(!shardInfo.KeyWriteRanges);
auto& task = getShardTask(shardId);
- for (auto& [name, value] : shardInfo.Params) {
- task.Meta.Params.emplace(name, std::move(value));
- }
-
FillGeneralReadInfo(task.Meta, readSettings.ItemsLimit, readSettings.Reverse);
TTaskMeta::TShardReadInfo readInfo;
readInfo.Ranges = std::move(*shardInfo.KeyReadRanges);
readInfo.Columns = columns;
- if (readSettings.ItemsLimitParamName) {
- task.Meta.Params.emplace(readSettings.ItemsLimitParamName, readSettings.ItemsLimitBytes);
- }
-
if (!task.Meta.Reads) {
task.Meta.Reads.ConstructInPlace();
}
@@ -1379,7 +1371,6 @@ private:
YQL_ENSURE(shardInfo.KeyWriteRanges);
auto& task = getShardTask(shardId);
- task.Meta.Params = std::move(shardInfo.Params);
if (!task.Meta.Writes) {
task.Meta.Writes.ConstructInPlace();
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index cf50500b21..ca3e207109 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -721,12 +721,6 @@ protected:
auto columns = BuildKqpColumns(source, table);
- ui64 itemsLimit = 0;
-
- TString itemsLimitParamName;
- NYql::NDqProto::TData itemsLimitBytes;
- NKikimr::NMiniKQL::TType* itemsLimitType = nullptr;
-
const auto& snapshot = GetSnapshot();
auto addPartiton = [&](TMaybe<ui64> shardId, const TShardInfo& shardInfo, TMaybe<ui64> maxInFlightShards = Nothing()) {
@@ -742,11 +736,6 @@ protected:
}
}
- for (auto& [name, value] : shardInfo.Params) {
- auto ret = task.Meta.Params.emplace(name, std::move(value));
- YQL_ENSURE(ret.second);
- }
-
NKikimrTxDataShard::TKqpReadRangesSourceSettings settings;
FillTableMeta(stageInfo, settings.MutableTable());
@@ -797,8 +786,8 @@ protected:
}
}
- ExtractItemsLimit(stageInfo, source.GetItemsLimit(), Request.TxAlloc->HolderFactory,
- Request.TxAlloc->TypeEnv, itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType);
+ ui64 itemsLimit = ExtractItemsLimit(stageInfo, source.GetItemsLimit(), Request.TxAlloc->HolderFactory,
+ Request.TxAlloc->TypeEnv);
settings.SetItemsLimit(itemsLimit);
auto self = static_cast<TDerived*>(this)->SelfId();
diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
index 8436fd683a..f43d5806b5 100644
--- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
@@ -202,25 +202,6 @@ public:
YQL_ENSURE(resultChannel.DstTask == 0);
}
- TQueryData::TPtr params = stageInfo.Meta.Tx.Params;
- auto parameterProvider = [&params, &task, &stageInfo](std::string_view name, NMiniKQL::TType* type,
- const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
- NUdf::TUnboxedValue& value)
- {
- Y_UNUSED(typeEnv);
- if (auto* data = task.Meta.Params.FindPtr(name)) {
- TDqDataSerializer::DeserializeParam(*data, type, holderFactory, value);
- return true;
- }
-
- if (auto* param = params->GetParameterType(TString(name))) {
- std::tie(type, value) = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(TString(name));
- return true;
- }
-
- return false;
- };
-
auto log = [as = TlsActivationContext->ActorSystem(), txId = TxId, taskId = task.Id](const TString& message) {
LOG_DEBUG_S(*as, NKikimrServices::KQP_TASKS_RUNNER, "TxId: " << txId << ", task: " << taskId << ". "
<< message);
@@ -230,7 +211,7 @@ public:
TaskRunners.emplace_back(taskRunner);
taskRunner->Prepare(protoTask, CreateTaskRunnerMemoryLimits(), CreateTaskRunnerExecutionContext(),
- parameterProvider);
+ TQueryData::GetParameterProvider(stageInfo.Meta.Tx.Params));
auto status = taskRunner->Run();
YQL_ENSURE(status == ERunStatus::Finished);
diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
index 19caff8c19..d1790f20c9 100644
--- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
@@ -22,8 +22,8 @@ struct TColumnStats {
};
struct TShardParamValuesAndRanges {
- NDqProto::TData ParamValues;
- NKikimr::NMiniKQL::TType* ParamType;
+ NMiniKQL::TUnboxedValueVector UnboxedValues;
+ NKikimr::NMiniKQL::TType* ItemType;
// either FullRange or Ranges are set
TVector<TSerializedPointOrRange> Ranges;
std::optional<TSerializedTableRange> FullRange;
@@ -33,7 +33,7 @@ struct TShardParamValuesAndRanges {
THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey(
const NUdf::TUnboxedValue& value, NKikimr::NMiniKQL::TType* type,
const TTableId& tableId,
- const TKqpTableKeys& tableKeys, const TKeyDesc& key, const NMiniKQL::THolderFactory& holderFactory,
+ const TKqpTableKeys& tableKeys, const TKeyDesc& key, const NMiniKQL::THolderFactory&,
const NMiniKQL::TTypeEnvironment& typeEnv)
{
auto guard = typeEnv.BindAllocator();
@@ -41,7 +41,6 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey(
auto& table = tableKeys.GetTable(tableId);
THashMap<ui64, TShardParamValuesAndRanges> ret;
- THashMap<ui64, NMiniKQL::TUnboxedValueVector> shardParamValues;
YQL_ENSURE(type->GetKind() == NMiniKQL::TType::EKind::List);
auto* itemType = static_cast<NMiniKQL::TListType*>(type)->GetItemType();
@@ -103,15 +102,8 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey(
columnWrite.MaxValueSizeBytes = std::max(columnWrite.MaxValueSizeBytes, sizeBytes);
}
- shardParamValues[shardId].emplace_back(std::move(paramValue));
-
- shardData.ParamType = itemType;
- }
-
- NDq::TDqDataSerializer dataSerializer{typeEnv, holderFactory, NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0};
- for (auto& [shardId, data] : ret) {
- auto& batch = shardParamValues[shardId];
- ret[shardId].ParamValues = dataSerializer.Serialize(batch.begin(), batch.end(), itemType);
+ shardData.UnboxedValues.emplace_back(std::move(paramValue));
+ shardData.ItemType = itemType;
}
return ret;
@@ -120,14 +112,13 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey(
THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKeyPrefix(
const NUdf::TUnboxedValue& value, NKikimr::NMiniKQL::TType* type,
const TTableId& tableId, const TKqpTableKeys& tableKeys, const TKeyDesc& key,
- const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
+ const NMiniKQL::THolderFactory&, const NMiniKQL::TTypeEnvironment& typeEnv)
{
auto guard = typeEnv.BindAllocator();
YQL_ENSURE(tableId.HasSamePath(key.TableId));
auto& table = tableKeys.GetTable(tableId);
THashMap<ui64, TShardParamValuesAndRanges> ret;
- THashMap<ui64, NMiniKQL::TUnboxedValueVector> shardParamValues;
YQL_ENSURE(type->GetKind() == NMiniKQL::TType::EKind::List);
auto itemType = static_cast<NMiniKQL::TListType&>(*type).GetItemType();
@@ -180,26 +171,17 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKeyPrefix(
for (TPartitionWithRange& partitionWithRange : rangePartitions) {
ui64 shardId = partitionWithRange.PartitionInfo->ShardId;
-
- shardParamValues[shardId].emplace_back(paramValue);
-
auto& shardData = ret[shardId];
+ shardData.UnboxedValues.emplace_back(paramValue);
+ shardData.ItemType = itemType;
if (partitionWithRange.FullRange) {
shardData.FullRange = std::move(partitionWithRange.FullRange);
shardData.Ranges.clear();
} else if (!shardData.FullRange) {
shardData.Ranges.emplace_back(std::move(partitionWithRange.PointOrRange));
}
- shardData.ParamType = itemType;
}
}
-
- NDq::TDqDataSerializer dataSerializer(typeEnv, holderFactory, NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0);
- for (auto& [shardId, data] : ret) {
- auto& batch = shardParamValues[shardId];
- data.ParamValues = dataSerializer.Serialize(batch.begin(), batch.end(), itemType);
- }
-
return ret;
}
@@ -499,11 +481,7 @@ TString TShardInfo::ToString(const TVector<NScheme::TTypeInfo>& keyTypes, const
sb << "TShardInfo{ ";
sb << "ReadRanges: " << (KeyReadRanges ? KeyReadRanges->ToString(keyTypes, typeRegistry) : "<none>");
sb << ", WriteRanges: " << (KeyWriteRanges ? KeyWriteRanges->ToString(keyTypes, typeRegistry) : "<none>");
- sb << ", Parameters: {";
- for (auto& param: Params) {
- sb << param.first << ", ";
- }
- sb << "} }";
+ sb << " }";
return sb;
}
@@ -738,14 +716,11 @@ THashMap<ui64, TShardInfo> PartitionLookupByParameterValue(const NKqpProto::TKqp
for (auto& [shardId, shardData] : shardsMap) {
auto& shardInfo = shardInfoMap[shardId];
-
if (!shardInfo.KeyReadRanges) {
shardInfo.KeyReadRanges.ConstructInPlace();
}
- auto ret = shardInfo.Params.emplace(name, std::move(shardData.ParamValues));
- Y_VERIFY_DEBUG(ret.second);
-
+ stageInfo.Meta.Tx.Params->AddShardParam(shardId, name, shardData.ItemType, std::move(shardData.UnboxedValues));
if (shardData.FullRange) {
shardInfo.KeyReadRanges->MakeFullRange(std::move(*shardData.FullRange));
} else {
@@ -835,13 +810,6 @@ THashMap<ui64, TShardInfo> PartitionLookupByRowsList(const NKqpProto::TKqpPhyRow
shardInfo.KeyReadRanges.ConstructInPlace();
}
- for (const auto& paramName : shardParams[shardId]) {
- shardInfo.Params.emplace(
- paramName,
- stageInfo.Meta.Tx.Params->SerializeParamValue(paramName)
- );
- }
-
if (shardData.FullRange) {
shardInfo.KeyReadRanges->MakeFullRange(std::move(*shardData.FullRange));
} else {
@@ -904,8 +872,7 @@ THashMap<ui64, TShardInfo> PruneEffectPartitionsImpl(const TKqpTableKeys& tableK
for (auto& [shardId, shardData] : shardsMap) {
auto& shardInfo = shardInfoMap[shardId];
- auto ret = shardInfo.Params.emplace(name, std::move(shardData.ParamValues));
- YQL_ENSURE(ret.second);
+ stageInfo.Meta.Tx.Params->AddShardParam(shardId, name, shardData.ItemType, std::move(shardData.UnboxedValues));
if (!shardInfo.KeyWriteRanges) {
shardInfo.KeyWriteRanges.ConstructInPlace();
@@ -956,10 +923,8 @@ THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys,
}
}
-void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValue& protoItemsLimit,
- const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv,
- ui64& itemsLimit, TString& itemsLimitParamName, NYql::NDqProto::TData& itemsLimitBytes,
- NKikimr::NMiniKQL::TType*& itemsLimitType)
+ui64 ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValue& protoItemsLimit,
+ const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
switch (protoItemsLimit.GetKindCase()) {
case NKqpProto::TKqpPhyValue::kLiteralValue: {
@@ -969,27 +934,18 @@ void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValu
literalValue.GetType(), literalValue.GetValue(), typeEnv, holderFactory);
YQL_ENSURE(type->GetKind() == NMiniKQL::TType::EKind::Data);
- itemsLimit = value.Get<ui64>();
- itemsLimitType = type;
-
- return;
+ return value.Get<ui64>();
}
case NKqpProto::TKqpPhyValue::kParamValue: {
- itemsLimitParamName = protoItemsLimit.GetParamValue().GetParamName();
+ const TString& itemsLimitParamName = protoItemsLimit.GetParamValue().GetParamName();
if (!itemsLimitParamName) {
- return;
+ return 0;
}
auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(itemsLimitParamName);
YQL_ENSURE(type->GetKind() == NMiniKQL::TType::EKind::Data);
- itemsLimit = value.Get<ui64>();
-
- NYql::NDq::TDqDataSerializer dataSerializer(typeEnv, holderFactory, NYql::NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
- itemsLimitBytes = dataSerializer.Serialize(value, type);
- itemsLimitType = type;
-
- return;
+ return value.Get<ui64>();
}
case NKqpProto::TKqpPhyValue::kParamElementValue:
@@ -997,7 +953,7 @@ void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValu
YQL_ENSURE(false, "Unexpected ItemsLimit kind " << protoItemsLimit.DebugString());
case NKqpProto::TKqpPhyValue::KIND_NOT_SET:
- return;
+ return 0;
}
}
@@ -1008,16 +964,13 @@ TPhysicalShardReadSettings ExtractReadSettings(const NKqpProto::TKqpPhyTableOper
switch(operation.GetTypeCase()){
case NKqpProto::TKqpPhyTableOperation::kReadRanges: {
- ExtractItemsLimit(stageInfo, operation.GetReadRanges().GetItemsLimit(), holderFactory, typeEnv,
- readSettings.ItemsLimit, readSettings.ItemsLimitParamName, readSettings.ItemsLimitBytes, readSettings.ItemsLimitType);
+ readSettings.ItemsLimit = ExtractItemsLimit(stageInfo, operation.GetReadRanges().GetItemsLimit(), holderFactory, typeEnv);
readSettings.Reverse = operation.GetReadRanges().GetReverse();
-
break;
}
case NKqpProto::TKqpPhyTableOperation::kReadRange: {
- ExtractItemsLimit(stageInfo, operation.GetReadRange().GetItemsLimit(), holderFactory, typeEnv,
- readSettings.ItemsLimit, readSettings.ItemsLimitParamName, readSettings.ItemsLimitBytes, readSettings.ItemsLimitType);
+ readSettings.ItemsLimit = ExtractItemsLimit(stageInfo, operation.GetReadRange().GetItemsLimit(), holderFactory, typeEnv);
readSettings.Reverse = operation.GetReadRange().GetReverse();
break;
}
@@ -1025,8 +978,7 @@ TPhysicalShardReadSettings ExtractReadSettings(const NKqpProto::TKqpPhyTableOper
case NKqpProto::TKqpPhyTableOperation::kReadOlapRange: {
readSettings.Sorted = operation.GetReadOlapRange().GetSorted();
readSettings.Reverse = operation.GetReadOlapRange().GetReverse();
- ExtractItemsLimit(stageInfo, operation.GetReadOlapRange().GetItemsLimit(), holderFactory, typeEnv,
- readSettings.ItemsLimit, readSettings.ItemsLimitParamName, readSettings.ItemsLimitBytes, readSettings.ItemsLimitType);
+ readSettings.ItemsLimit = ExtractItemsLimit(stageInfo, operation.GetReadOlapRange().GetItemsLimit(), holderFactory, typeEnv);
NKikimrMiniKQL::TType minikqlProtoResultType;
ConvertYdbTypeToMiniKQLType(operation.GetReadOlapRange().GetResultType(), minikqlProtoResultType);
readSettings.ResultType = ImportTypeFromProto(minikqlProtoResultType, typeEnv);
diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.h b/ydb/core/kqp/executer_actor/kqp_partition_helper.h
index 3a2b9a61df..47e599074c 100644
--- a/ydb/core/kqp/executer_actor/kqp_partition_helper.h
+++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.h
@@ -17,8 +17,6 @@ struct TShardInfo {
ui32 MaxValueSizeBytes = 0;
};
- TMap<TString, NYql::NDqProto::TData> Params;
-
TMaybe<TShardKeyRanges> KeyReadRanges; // empty -> no reads
TMaybe<TShardKeyRanges> KeyWriteRanges; // empty -> no writes
THashMap<TString, TColumnWriteInfo> ColumnWrites;
@@ -41,9 +39,6 @@ struct TPhysicalShardReadSettings {
bool Sorted = true;
bool Reverse = false;
ui64 ItemsLimit = 0;
- TString ItemsLimitParamName;
- NYql::NDqProto::TData ItemsLimitBytes;
- NKikimr::NMiniKQL::TType* ItemsLimitType = nullptr;
NKikimr::NMiniKQL::TType* ResultType = nullptr;
};
@@ -79,10 +74,8 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
-void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValue& protoItemsLimit,
- const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv,
- ui64& itemsLimit, TString& itemsLimitParamName, NYql::NDqProto::TData& itemsLimitBytes,
- NKikimr::NMiniKQL::TType*& itemsLimitType);
+ui64 ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValue& protoItemsLimit,
+ const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
// Returns the list of ColumnShards that can store rows from the specified range
// NOTE: Unlike OLTP tables that store data in DataShards, data in OLAP tables is not range
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 9daa9920a2..711e42f7f6 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -278,10 +278,6 @@ private:
void MergeToTaskMeta(TTaskMeta& meta, TShardInfoWithId& shardInfo, const TPhysicalShardReadSettings& readSettings, const TVector<TTaskMeta::TColumn>& columns,
const NKqpProto::TKqpPhyTableOperation& op) const {
YQL_ENSURE(!shardInfo.KeyWriteRanges);
- for (auto& [name, value] : shardInfo.Params) {
- auto ret = meta.Params.emplace(name, std::move(value));
- YQL_ENSURE(ret.second);
- }
TTaskMeta::TShardReadInfo readInfo = {
.Ranges = std::move(*shardInfo.KeyReadRanges), // sorted & non-intersecting
@@ -289,10 +285,6 @@ private:
.ShardId = shardInfo.ShardId,
};
- if (readSettings.ItemsLimitParamName && !meta.Params.contains(readSettings.ItemsLimitParamName)) {
- meta.Params.emplace(readSettings.ItemsLimitParamName, readSettings.ItemsLimitBytes);
- }
-
if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
const auto& readRange = op.GetReadOlapRange();
FillReadInfo(meta, readSettings.ItemsLimit, readSettings.Reverse, readSettings.Sorted, readSettings.ResultType, readRange);
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index 7d2fcda322..329238bc78 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -60,13 +60,6 @@ std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo&
continue;
}
- if (auto* taskParam = task.Meta.Params.FindPtr(name)) {
- // This parameter is the list, holding type from task.Meta.ParamTypes
- // Those parameters can't be used in Olap programs now
- YQL_ENSURE(false, "OLAP program contains task parameter, not supported yet.");
- continue;
- }
-
auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(name);
YQL_ENSURE(NYql::NArrow::IsArrowCompatible(type), "Incompatible parameter type. Can't convert to arrow");
@@ -1005,8 +998,8 @@ NYql::NDqProto::TDqTask SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, c
result.MutableProgram()->CopyFrom(stage.GetProgram());
for (auto& paramName : stage.GetProgramParameters()) {
auto& dqParams = *result.MutableParameters();
- if (auto* taskParam = task.Meta.Params.FindPtr(paramName)) {
- dqParams[paramName] = *taskParam;
+ if (task.Meta.ShardId) {
+ dqParams[paramName] = stageInfo.Meta.Tx.Params->GetShardParam(task.Meta.ShardId, paramName);
} else {
dqParams[paramName] = stageInfo.Meta.Tx.Params->SerializeParamValue(paramName);
}
@@ -1021,13 +1014,7 @@ NYql::NDqProto::TDqTask SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, c
TString TTaskMeta::ToString(const TVector<NScheme::TTypeInfo>& keyTypes, const NScheme::TTypeRegistry& typeRegistry) const
{
TStringBuilder sb;
- sb << "TTaskMeta{ ShardId: " << ShardId << ", Params: [";
-
- for (auto& [name, value] : Params) {
- sb << name << ", ";
- }
-
- sb << "], Reads: { ";
+ sb << "TTaskMeta{ ShardId: " << ShardId << ", Reads: { ";
if (Reads) {
for (ui64 i = 0; i < Reads->size(); ++i) {
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
index 2b52d682ef..911a7da01c 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
@@ -139,7 +139,6 @@ public:
bool ScanTask = false;
TActorId ExecuterId;
- TMap<TString, NYql::NDqProto::TData> Params;
THashMap<TString, TString> DqTaskParams; // Params for sources/sinks
THashMap<TString, TString> DqSecureParams;
diff --git a/ydb/core/kqp/query_data/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp
index 69c2cd65c3..b1c61e3a2a 100644
--- a/ydb/core/kqp/query_data/kqp_query_data.cpp
+++ b/ydb/core/kqp/query_data/kqp_query_data.cpp
@@ -7,6 +7,7 @@
#include <ydb/library/yql/minikql/mkql_string_util.h>
#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/yql/utils/yql_panic.h>
+#include <ydb/core/util/yverify_stream.h>
namespace NKikimr::NKqp {
@@ -136,6 +137,9 @@ TQueryData::~TQueryData() {
TxResults.swap(emptyResultMap);
TUnboxedParamsMap emptyMap;
UnboxedData.swap(emptyMap);
+
+ TPartitionedParamMap empty;
+ empty.swap(PartitionedParams);
}
}
@@ -268,6 +272,15 @@ TQueryData::TTypedUnboxedValue& TQueryData::GetParameterUnboxedValue(const TStri
return it->second;
}
+TQueryData::TTypedUnboxedValue* TQueryData::GetParameterUnboxedValuePtr(const TString& name) {
+ auto it = UnboxedData.find(name);
+ if (it == UnboxedData.end()) {
+ return nullptr;
+ }
+
+ return &it->second;
+}
+
const NKikimrMiniKQL::TParams* TQueryData::GetParameterMiniKqlValue(const TString& name) {
if (UnboxedData.find(name) == UnboxedData.end())
return nullptr;
@@ -334,6 +347,42 @@ bool TQueryData::MaterializeParamValue(bool ensure, const NKqpProto::TKqpPhyPara
return false;
}
+void TQueryData::AddShardParam(ui64 shardId, const TString& name, NKikimr::NMiniKQL::TType* type, NKikimr::NMiniKQL::TUnboxedValueVector&& value) {
+ auto guard = TypeEnv().BindAllocator();
+ auto [it, inserted] = PartitionedParams.emplace(
+ std::piecewise_construct,
+ std::forward_as_tuple(shardId, name),
+ std::forward_as_tuple(type, std::move(value)));
+ YQL_ENSURE(inserted, "duplicate parameter, shardId#: "
+ << shardId << ", paramName#: " << name);
+}
+
+void TQueryData::ClearPrunedParams() {
+ if (PartitionedParams.empty())
+ return;
+
+ auto guard = TypeEnv().BindAllocator();
+ for(auto& [key, value]: PartitionedParams) {
+ TUnboxedValueVector emptyVector;
+ value.Values.swap(emptyVector);
+ }
+
+ TPartitionedParamMap emptyMap;
+ emptyMap.swap(PartitionedParams);
+}
+
+NDqProto::TData TQueryData::GetShardParam(ui64 shardId, const TString& name) {
+ auto kv = std::make_pair(shardId, name);
+ auto it = PartitionedParams.find(kv);
+ if (it == PartitionedParams.end()) {
+ return SerializeParamValue(name);
+ }
+
+ auto guard = TypeEnv().BindAllocator();
+ NDq::TDqDataSerializer dataSerializer{AllocState->TypeEnv, AllocState->HolderFactory, NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0};
+ return dataSerializer.Serialize(it->second.Values.begin(), it->second.Values.end(), it->second.ItemType);
+}
+
NDqProto::TData TQueryData::SerializeParamValue(const TString& name) {
auto guard = TypeEnv().BindAllocator();
const auto& [type, value] = GetParameterUnboxedValue(name);
@@ -344,10 +393,20 @@ void TQueryData::Clear() {
{
auto g = TypeEnv().BindAllocator();
Params.clear();
+
TUnboxedParamsMap emptyMap;
UnboxedData.swap(emptyMap);
+
THashMap<ui32, TVector<TKqpExecuterTxResult>> emptyResultMap;
TxResults.swap(emptyResultMap);
+
+ for(auto& [key, param]: PartitionedParams) {
+ NKikimr::NMiniKQL::TUnboxedValueVector emptyValues;
+ param.Values.swap(emptyValues);
+ }
+
+ PartitionedParams.clear();
+
AllocState->Reset();
}
}
diff --git a/ydb/core/kqp/query_data/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h
index 981f8996a2..d3542dc1c7 100644
--- a/ydb/core/kqp/query_data/kqp_query_data.h
+++ b/ydb/core/kqp/query_data/kqp_query_data.h
@@ -10,12 +10,12 @@
#include <library/cpp/random_provider/random_provider.h>
#include <library/cpp/time_provider/time_provider.h>
+#include <library/cpp/containers/absl_flat_hash/flat_hash_map.h>
#include <util/generic/ptr.h>
#include <util/generic/guid.h>
#include <google/protobuf/arena.h>
-#include <unordered_map>
#include <vector>
namespace NKqpProto {
@@ -37,7 +37,7 @@ namespace NKikimr::NKqp {
using TTypedUnboxedValue = std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue>;
using TNamedUnboxedValue = std::pair<const TString, TTypedUnboxedValue>;
-using TUnboxedParamsMap = std::unordered_map<
+using TUnboxedParamsMap = absl::flat_hash_map<
TString,
TTypedUnboxedValue,
std::hash<TString>,
@@ -45,6 +45,26 @@ using TUnboxedParamsMap = std::unordered_map<
NKikimr::NMiniKQL::TMKQLAllocator<TNamedUnboxedValue>
>;
+struct TPartitionParam {
+ NKikimr::NMiniKQL::TType *ItemType;
+ NYql::NDqProto::TData Data;
+ NKikimr::NMiniKQL::TUnboxedValueVector Values;
+
+ TPartitionParam(NKikimr::NMiniKQL::TType *itemType,
+ NKikimr::NMiniKQL::TUnboxedValueVector&& values)
+ : ItemType(itemType)
+ , Values(std::move(values))
+ {}
+};
+
+using TPartitionedParamWithKey = std::pair<const std::pair<ui64, TString>, TPartitionParam>;
+
+using TPartitionedParamMap = absl::flat_hash_map<
+ std::pair<ui64, TString>,
+ TPartitionParam,
+ THash<std::pair<ui64, TString>>
+>;
+
using TTypedUnboxedValueVector = std::vector<
TTypedUnboxedValue,
NKikimr::NMiniKQL::TMKQLAllocator<TTypedUnboxedValue>
@@ -158,16 +178,22 @@ private:
using TTypedUnboxedValue = std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue>;
using TNamedUnboxedValue = std::pair<const TString, TTypedUnboxedValue>;
- using TParamMap = std::unordered_map<
+ using TParamMap = absl::flat_hash_map<
TString,
NKikimrMiniKQL::TParams
>;
+ using TParamProvider = std::function<
+ bool(std::string_view name, NKikimr::NMiniKQL::TType* type, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory, NUdf::TUnboxedValue& value)
+ >;
+
TParamMap Params;
TUnboxedParamsMap UnboxedData;
THashMap<ui32, TVector<TKqpExecuterTxResult>> TxResults;
TVector<TVector<TKqpPhyTxHolder::TConstPtr>> TxHolders;
TTxAllocatorState::TPtr AllocState;
+ mutable TPartitionedParamMap PartitionedParams;
public:
using TPtr = std::shared_ptr<TQueryData>;
@@ -191,6 +217,10 @@ public:
void AddTxResults(ui32 txIndex, TVector<TKqpExecuterTxResult>&& results);
void AddTxHolders(TVector<TKqpPhyTxHolder::TConstPtr>&& holders);
+ void AddShardParam(ui64 shardId, const TString& name, NKikimr::NMiniKQL::TType* type, NKikimr::NMiniKQL::TUnboxedValueVector&& value);
+ void ClearPrunedParams();
+ NYql::NDqProto::TData GetShardParam(ui64 shardId, const TString& name);
+
bool HasResult(ui32 txIndex, ui32 resultIndex) {
if (!TxResults.contains(txIndex))
return false;
@@ -211,9 +241,30 @@ public:
std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> GetInternalBindingValue(const NKqpProto::TKqpPhyParamBinding& paramBinding);
TTypedUnboxedValue& GetParameterUnboxedValue(const TString& name);
+ TTypedUnboxedValue* GetParameterUnboxedValuePtr(const TString& name);
const NKikimrMiniKQL::TParams* GetParameterMiniKqlValue(const TString& name);
NYql::NDqProto::TData SerializeParamValue(const TString& name);
void Clear();
+
+ static TParamProvider GetParameterProvider(const std::shared_ptr<TQueryData>& queryData) {
+ return [queryData](std::string_view name, NMiniKQL::TType* type,
+ const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
+ NUdf::TUnboxedValue& value)
+ {
+ Y_UNUSED(typeEnv);
+ Y_UNUSED(holderFactory);
+
+ if (TTypedUnboxedValue* param = queryData->GetParameterUnboxedValuePtr(TString(name))) {
+ std::tie(type, value) = *param;
+ return true;
+ }
+
+
+ return false;
+ };
+ }
};
+
+
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 2ee6c441ba..c7117fdb0f 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1084,6 +1084,7 @@ public:
YQL_ENSURE(QueryState);
LWTRACK(KqpSessionPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, ev->ResultRowsCount);
+ QueryState->QueryData->ClearPrunedParams();
if (!ev->GetTxResults().empty()) {
QueryState->QueryData->AddTxResults(QueryState->CurrentTx - 1, std::move(ev->GetTxResults()));
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 1fb29ed6ab..afc078e1e7 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1475,7 +1475,8 @@ protected:
TaskRunner = taskRunner;
}
- void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext()) {
+ void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext(),
+ const TDqTaskRunnerParameterProvider& parameterProvider = {}) {
YQL_ENSURE(TaskRunner);
auto guard = TaskRunner->BindAllocator(MemoryQuota->GetMkqlMemoryLimit());
@@ -1487,7 +1488,7 @@ protected:
limits.ChannelBufferSize = MemoryLimits.ChannelBufferSize;
limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
- TaskRunner->Prepare(Task, limits, execCtx);
+ TaskRunner->Prepare(Task, limits, execCtx, parameterProvider);
FillIoMaps(
TaskRunner->GetHolderFactory(),