diff options
author | gvit <gvit@ydb.tech> | 2023-05-23 17:28:27 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-05-23 17:28:27 +0300 |
commit | d526bcd286b8ec67b456cfe4355158b950352a1c (patch) | |
tree | b5636927b1638def3e39e8f853d7bfadfd4e7471 | |
parent | 2e7ed60ae97dc177d2c964c338ac10bfbc6977cf (diff) | |
download | ydb-d526bcd286b8ec67b456cfe4355158b950352a1c.tar.gz |
store shard params in the query data
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp | 13 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 15 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_literal_executer.cpp | 21 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_partition_helper.cpp | 90 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_partition_helper.h | 11 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 19 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/query_data/kqp_query_data.cpp | 59 | ||||
-rw-r--r-- | ydb/core/kqp/query_data/kqp_query_data.h | 57 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 5 |
14 files changed, 157 insertions, 155 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index 798b3d6523..e61e3a177d 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -45,7 +45,8 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, - NWilson::TTraceId traceId = {}); + NWilson::TTraceId traceId = {}, + const NYql::NDq::TDqTaskRunnerParameterProvider& parameterProvider = {}); IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index e038a56550..3889456bdf 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -37,9 +37,11 @@ public: IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, - NWilson::TTraceId traceId) + NWilson::TTraceId traceId, + const TDqTaskRunnerParameterProvider& parameterProvider) : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId)) , ComputeCtx(settings.StatsMode) + , ParameterProvider(parameterProvider) { if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) { Meta.ConstructInPlace(); @@ -94,7 +96,8 @@ public: auto wakeup = [this]{ ContinueExecute(); }; try { PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, - std::move(wakeup), TlsActivationContext->AsActorContext())); + std::move(wakeup), TlsActivationContext->AsActorContext()), + ParameterProvider); } catch (const NMiniKQL::TKqpEnsureFail& e) { InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage()); return; @@ -300,6 +303,7 @@ private: TMaybe<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta> Meta; NMiniKQL::TKqpScanComputeContext::TScanData* ScanData = nullptr; TActorId SysViewActorId; + const TDqTaskRunnerParameterProvider ParameterProvider; }; } // anonymous namespace @@ -308,10 +312,11 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, - NWilson::TTraceId traceId) + NWilson::TTraceId traceId, + const TDqTaskRunnerParameterProvider& parameterProvider) { return new TKqpComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory), - functionRegistry, settings, memoryLimits, std::move(traceId)); + functionRegistry, settings, memoryLimits, std::move(traceId), parameterProvider); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 2adf967d56..bce4d003c9 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1318,20 +1318,12 @@ private: YQL_ENSURE(!shardInfo.KeyWriteRanges); auto& task = getShardTask(shardId); - for (auto& [name, value] : shardInfo.Params) { - task.Meta.Params.emplace(name, std::move(value)); - } - FillGeneralReadInfo(task.Meta, readSettings.ItemsLimit, readSettings.Reverse); TTaskMeta::TShardReadInfo readInfo; readInfo.Ranges = std::move(*shardInfo.KeyReadRanges); readInfo.Columns = columns; - if (readSettings.ItemsLimitParamName) { - task.Meta.Params.emplace(readSettings.ItemsLimitParamName, readSettings.ItemsLimitBytes); - } - if (!task.Meta.Reads) { task.Meta.Reads.ConstructInPlace(); } @@ -1379,7 +1371,6 @@ private: YQL_ENSURE(shardInfo.KeyWriteRanges); auto& task = getShardTask(shardId); - task.Meta.Params = std::move(shardInfo.Params); if (!task.Meta.Writes) { task.Meta.Writes.ConstructInPlace(); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index cf50500b21..ca3e207109 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -721,12 +721,6 @@ protected: auto columns = BuildKqpColumns(source, table); - ui64 itemsLimit = 0; - - TString itemsLimitParamName; - NYql::NDqProto::TData itemsLimitBytes; - NKikimr::NMiniKQL::TType* itemsLimitType = nullptr; - const auto& snapshot = GetSnapshot(); auto addPartiton = [&](TMaybe<ui64> shardId, const TShardInfo& shardInfo, TMaybe<ui64> maxInFlightShards = Nothing()) { @@ -742,11 +736,6 @@ protected: } } - for (auto& [name, value] : shardInfo.Params) { - auto ret = task.Meta.Params.emplace(name, std::move(value)); - YQL_ENSURE(ret.second); - } - NKikimrTxDataShard::TKqpReadRangesSourceSettings settings; FillTableMeta(stageInfo, settings.MutableTable()); @@ -797,8 +786,8 @@ protected: } } - ExtractItemsLimit(stageInfo, source.GetItemsLimit(), Request.TxAlloc->HolderFactory, - Request.TxAlloc->TypeEnv, itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType); + ui64 itemsLimit = ExtractItemsLimit(stageInfo, source.GetItemsLimit(), Request.TxAlloc->HolderFactory, + Request.TxAlloc->TypeEnv); settings.SetItemsLimit(itemsLimit); auto self = static_cast<TDerived*>(this)->SelfId(); diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp index 8436fd683a..f43d5806b5 100644 --- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp @@ -202,25 +202,6 @@ public: YQL_ENSURE(resultChannel.DstTask == 0); } - TQueryData::TPtr params = stageInfo.Meta.Tx.Params; - auto parameterProvider = [¶ms, &task, &stageInfo](std::string_view name, NMiniKQL::TType* type, - const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, - NUdf::TUnboxedValue& value) - { - Y_UNUSED(typeEnv); - if (auto* data = task.Meta.Params.FindPtr(name)) { - TDqDataSerializer::DeserializeParam(*data, type, holderFactory, value); - return true; - } - - if (auto* param = params->GetParameterType(TString(name))) { - std::tie(type, value) = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(TString(name)); - return true; - } - - return false; - }; - auto log = [as = TlsActivationContext->ActorSystem(), txId = TxId, taskId = task.Id](const TString& message) { LOG_DEBUG_S(*as, NKikimrServices::KQP_TASKS_RUNNER, "TxId: " << txId << ", task: " << taskId << ". " << message); @@ -230,7 +211,7 @@ public: TaskRunners.emplace_back(taskRunner); taskRunner->Prepare(protoTask, CreateTaskRunnerMemoryLimits(), CreateTaskRunnerExecutionContext(), - parameterProvider); + TQueryData::GetParameterProvider(stageInfo.Meta.Tx.Params)); auto status = taskRunner->Run(); YQL_ENSURE(status == ERunStatus::Finished); diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp index 19caff8c19..d1790f20c9 100644 --- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp +++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp @@ -22,8 +22,8 @@ struct TColumnStats { }; struct TShardParamValuesAndRanges { - NDqProto::TData ParamValues; - NKikimr::NMiniKQL::TType* ParamType; + NMiniKQL::TUnboxedValueVector UnboxedValues; + NKikimr::NMiniKQL::TType* ItemType; // either FullRange or Ranges are set TVector<TSerializedPointOrRange> Ranges; std::optional<TSerializedTableRange> FullRange; @@ -33,7 +33,7 @@ struct TShardParamValuesAndRanges { THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey( const NUdf::TUnboxedValue& value, NKikimr::NMiniKQL::TType* type, const TTableId& tableId, - const TKqpTableKeys& tableKeys, const TKeyDesc& key, const NMiniKQL::THolderFactory& holderFactory, + const TKqpTableKeys& tableKeys, const TKeyDesc& key, const NMiniKQL::THolderFactory&, const NMiniKQL::TTypeEnvironment& typeEnv) { auto guard = typeEnv.BindAllocator(); @@ -41,7 +41,6 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey( auto& table = tableKeys.GetTable(tableId); THashMap<ui64, TShardParamValuesAndRanges> ret; - THashMap<ui64, NMiniKQL::TUnboxedValueVector> shardParamValues; YQL_ENSURE(type->GetKind() == NMiniKQL::TType::EKind::List); auto* itemType = static_cast<NMiniKQL::TListType*>(type)->GetItemType(); @@ -103,15 +102,8 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey( columnWrite.MaxValueSizeBytes = std::max(columnWrite.MaxValueSizeBytes, sizeBytes); } - shardParamValues[shardId].emplace_back(std::move(paramValue)); - - shardData.ParamType = itemType; - } - - NDq::TDqDataSerializer dataSerializer{typeEnv, holderFactory, NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0}; - for (auto& [shardId, data] : ret) { - auto& batch = shardParamValues[shardId]; - ret[shardId].ParamValues = dataSerializer.Serialize(batch.begin(), batch.end(), itemType); + shardData.UnboxedValues.emplace_back(std::move(paramValue)); + shardData.ItemType = itemType; } return ret; @@ -120,14 +112,13 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey( THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKeyPrefix( const NUdf::TUnboxedValue& value, NKikimr::NMiniKQL::TType* type, const TTableId& tableId, const TKqpTableKeys& tableKeys, const TKeyDesc& key, - const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) + const NMiniKQL::THolderFactory&, const NMiniKQL::TTypeEnvironment& typeEnv) { auto guard = typeEnv.BindAllocator(); YQL_ENSURE(tableId.HasSamePath(key.TableId)); auto& table = tableKeys.GetTable(tableId); THashMap<ui64, TShardParamValuesAndRanges> ret; - THashMap<ui64, NMiniKQL::TUnboxedValueVector> shardParamValues; YQL_ENSURE(type->GetKind() == NMiniKQL::TType::EKind::List); auto itemType = static_cast<NMiniKQL::TListType&>(*type).GetItemType(); @@ -180,26 +171,17 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKeyPrefix( for (TPartitionWithRange& partitionWithRange : rangePartitions) { ui64 shardId = partitionWithRange.PartitionInfo->ShardId; - - shardParamValues[shardId].emplace_back(paramValue); - auto& shardData = ret[shardId]; + shardData.UnboxedValues.emplace_back(paramValue); + shardData.ItemType = itemType; if (partitionWithRange.FullRange) { shardData.FullRange = std::move(partitionWithRange.FullRange); shardData.Ranges.clear(); } else if (!shardData.FullRange) { shardData.Ranges.emplace_back(std::move(partitionWithRange.PointOrRange)); } - shardData.ParamType = itemType; } } - - NDq::TDqDataSerializer dataSerializer(typeEnv, holderFactory, NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0); - for (auto& [shardId, data] : ret) { - auto& batch = shardParamValues[shardId]; - data.ParamValues = dataSerializer.Serialize(batch.begin(), batch.end(), itemType); - } - return ret; } @@ -499,11 +481,7 @@ TString TShardInfo::ToString(const TVector<NScheme::TTypeInfo>& keyTypes, const sb << "TShardInfo{ "; sb << "ReadRanges: " << (KeyReadRanges ? KeyReadRanges->ToString(keyTypes, typeRegistry) : "<none>"); sb << ", WriteRanges: " << (KeyWriteRanges ? KeyWriteRanges->ToString(keyTypes, typeRegistry) : "<none>"); - sb << ", Parameters: {"; - for (auto& param: Params) { - sb << param.first << ", "; - } - sb << "} }"; + sb << " }"; return sb; } @@ -738,14 +716,11 @@ THashMap<ui64, TShardInfo> PartitionLookupByParameterValue(const NKqpProto::TKqp for (auto& [shardId, shardData] : shardsMap) { auto& shardInfo = shardInfoMap[shardId]; - if (!shardInfo.KeyReadRanges) { shardInfo.KeyReadRanges.ConstructInPlace(); } - auto ret = shardInfo.Params.emplace(name, std::move(shardData.ParamValues)); - Y_VERIFY_DEBUG(ret.second); - + stageInfo.Meta.Tx.Params->AddShardParam(shardId, name, shardData.ItemType, std::move(shardData.UnboxedValues)); if (shardData.FullRange) { shardInfo.KeyReadRanges->MakeFullRange(std::move(*shardData.FullRange)); } else { @@ -835,13 +810,6 @@ THashMap<ui64, TShardInfo> PartitionLookupByRowsList(const NKqpProto::TKqpPhyRow shardInfo.KeyReadRanges.ConstructInPlace(); } - for (const auto& paramName : shardParams[shardId]) { - shardInfo.Params.emplace( - paramName, - stageInfo.Meta.Tx.Params->SerializeParamValue(paramName) - ); - } - if (shardData.FullRange) { shardInfo.KeyReadRanges->MakeFullRange(std::move(*shardData.FullRange)); } else { @@ -904,8 +872,7 @@ THashMap<ui64, TShardInfo> PruneEffectPartitionsImpl(const TKqpTableKeys& tableK for (auto& [shardId, shardData] : shardsMap) { auto& shardInfo = shardInfoMap[shardId]; - auto ret = shardInfo.Params.emplace(name, std::move(shardData.ParamValues)); - YQL_ENSURE(ret.second); + stageInfo.Meta.Tx.Params->AddShardParam(shardId, name, shardData.ItemType, std::move(shardData.UnboxedValues)); if (!shardInfo.KeyWriteRanges) { shardInfo.KeyWriteRanges.ConstructInPlace(); @@ -956,10 +923,8 @@ THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys, } } -void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValue& protoItemsLimit, - const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv, - ui64& itemsLimit, TString& itemsLimitParamName, NYql::NDqProto::TData& itemsLimitBytes, - NKikimr::NMiniKQL::TType*& itemsLimitType) +ui64 ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValue& protoItemsLimit, + const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { switch (protoItemsLimit.GetKindCase()) { case NKqpProto::TKqpPhyValue::kLiteralValue: { @@ -969,27 +934,18 @@ void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValu literalValue.GetType(), literalValue.GetValue(), typeEnv, holderFactory); YQL_ENSURE(type->GetKind() == NMiniKQL::TType::EKind::Data); - itemsLimit = value.Get<ui64>(); - itemsLimitType = type; - - return; + return value.Get<ui64>(); } case NKqpProto::TKqpPhyValue::kParamValue: { - itemsLimitParamName = protoItemsLimit.GetParamValue().GetParamName(); + const TString& itemsLimitParamName = protoItemsLimit.GetParamValue().GetParamName(); if (!itemsLimitParamName) { - return; + return 0; } auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(itemsLimitParamName); YQL_ENSURE(type->GetKind() == NMiniKQL::TType::EKind::Data); - itemsLimit = value.Get<ui64>(); - - NYql::NDq::TDqDataSerializer dataSerializer(typeEnv, holderFactory, NYql::NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0); - itemsLimitBytes = dataSerializer.Serialize(value, type); - itemsLimitType = type; - - return; + return value.Get<ui64>(); } case NKqpProto::TKqpPhyValue::kParamElementValue: @@ -997,7 +953,7 @@ void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValu YQL_ENSURE(false, "Unexpected ItemsLimit kind " << protoItemsLimit.DebugString()); case NKqpProto::TKqpPhyValue::KIND_NOT_SET: - return; + return 0; } } @@ -1008,16 +964,13 @@ TPhysicalShardReadSettings ExtractReadSettings(const NKqpProto::TKqpPhyTableOper switch(operation.GetTypeCase()){ case NKqpProto::TKqpPhyTableOperation::kReadRanges: { - ExtractItemsLimit(stageInfo, operation.GetReadRanges().GetItemsLimit(), holderFactory, typeEnv, - readSettings.ItemsLimit, readSettings.ItemsLimitParamName, readSettings.ItemsLimitBytes, readSettings.ItemsLimitType); + readSettings.ItemsLimit = ExtractItemsLimit(stageInfo, operation.GetReadRanges().GetItemsLimit(), holderFactory, typeEnv); readSettings.Reverse = operation.GetReadRanges().GetReverse(); - break; } case NKqpProto::TKqpPhyTableOperation::kReadRange: { - ExtractItemsLimit(stageInfo, operation.GetReadRange().GetItemsLimit(), holderFactory, typeEnv, - readSettings.ItemsLimit, readSettings.ItemsLimitParamName, readSettings.ItemsLimitBytes, readSettings.ItemsLimitType); + readSettings.ItemsLimit = ExtractItemsLimit(stageInfo, operation.GetReadRange().GetItemsLimit(), holderFactory, typeEnv); readSettings.Reverse = operation.GetReadRange().GetReverse(); break; } @@ -1025,8 +978,7 @@ TPhysicalShardReadSettings ExtractReadSettings(const NKqpProto::TKqpPhyTableOper case NKqpProto::TKqpPhyTableOperation::kReadOlapRange: { readSettings.Sorted = operation.GetReadOlapRange().GetSorted(); readSettings.Reverse = operation.GetReadOlapRange().GetReverse(); - ExtractItemsLimit(stageInfo, operation.GetReadOlapRange().GetItemsLimit(), holderFactory, typeEnv, - readSettings.ItemsLimit, readSettings.ItemsLimitParamName, readSettings.ItemsLimitBytes, readSettings.ItemsLimitType); + readSettings.ItemsLimit = ExtractItemsLimit(stageInfo, operation.GetReadOlapRange().GetItemsLimit(), holderFactory, typeEnv); NKikimrMiniKQL::TType minikqlProtoResultType; ConvertYdbTypeToMiniKQLType(operation.GetReadOlapRange().GetResultType(), minikqlProtoResultType); readSettings.ResultType = ImportTypeFromProto(minikqlProtoResultType, typeEnv); diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.h b/ydb/core/kqp/executer_actor/kqp_partition_helper.h index 3a2b9a61df..47e599074c 100644 --- a/ydb/core/kqp/executer_actor/kqp_partition_helper.h +++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.h @@ -17,8 +17,6 @@ struct TShardInfo { ui32 MaxValueSizeBytes = 0; }; - TMap<TString, NYql::NDqProto::TData> Params; - TMaybe<TShardKeyRanges> KeyReadRanges; // empty -> no reads TMaybe<TShardKeyRanges> KeyWriteRanges; // empty -> no writes THashMap<TString, TColumnWriteInfo> ColumnWrites; @@ -41,9 +39,6 @@ struct TPhysicalShardReadSettings { bool Sorted = true; bool Reverse = false; ui64 ItemsLimit = 0; - TString ItemsLimitParamName; - NYql::NDqProto::TData ItemsLimitBytes; - NKikimr::NMiniKQL::TType* ItemsLimitType = nullptr; NKikimr::NMiniKQL::TType* ResultType = nullptr; }; @@ -79,10 +74,8 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); -void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValue& protoItemsLimit, - const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv, - ui64& itemsLimit, TString& itemsLimitParamName, NYql::NDqProto::TData& itemsLimitBytes, - NKikimr::NMiniKQL::TType*& itemsLimitType); +ui64 ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValue& protoItemsLimit, + const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); // Returns the list of ColumnShards that can store rows from the specified range // NOTE: Unlike OLTP tables that store data in DataShards, data in OLAP tables is not range diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 9daa9920a2..711e42f7f6 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -278,10 +278,6 @@ private: void MergeToTaskMeta(TTaskMeta& meta, TShardInfoWithId& shardInfo, const TPhysicalShardReadSettings& readSettings, const TVector<TTaskMeta::TColumn>& columns, const NKqpProto::TKqpPhyTableOperation& op) const { YQL_ENSURE(!shardInfo.KeyWriteRanges); - for (auto& [name, value] : shardInfo.Params) { - auto ret = meta.Params.emplace(name, std::move(value)); - YQL_ENSURE(ret.second); - } TTaskMeta::TShardReadInfo readInfo = { .Ranges = std::move(*shardInfo.KeyReadRanges), // sorted & non-intersecting @@ -289,10 +285,6 @@ private: .ShardId = shardInfo.ShardId, }; - if (readSettings.ItemsLimitParamName && !meta.Params.contains(readSettings.ItemsLimitParamName)) { - meta.Params.emplace(readSettings.ItemsLimitParamName, readSettings.ItemsLimitBytes); - } - if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) { const auto& readRange = op.GetReadOlapRange(); FillReadInfo(meta, readSettings.ItemsLimit, readSettings.Reverse, readSettings.Sorted, readSettings.ResultType, readRange); diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 7d2fcda322..329238bc78 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -60,13 +60,6 @@ std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo& continue; } - if (auto* taskParam = task.Meta.Params.FindPtr(name)) { - // This parameter is the list, holding type from task.Meta.ParamTypes - // Those parameters can't be used in Olap programs now - YQL_ENSURE(false, "OLAP program contains task parameter, not supported yet."); - continue; - } - auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(name); YQL_ENSURE(NYql::NArrow::IsArrowCompatible(type), "Incompatible parameter type. Can't convert to arrow"); @@ -1005,8 +998,8 @@ NYql::NDqProto::TDqTask SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, c result.MutableProgram()->CopyFrom(stage.GetProgram()); for (auto& paramName : stage.GetProgramParameters()) { auto& dqParams = *result.MutableParameters(); - if (auto* taskParam = task.Meta.Params.FindPtr(paramName)) { - dqParams[paramName] = *taskParam; + if (task.Meta.ShardId) { + dqParams[paramName] = stageInfo.Meta.Tx.Params->GetShardParam(task.Meta.ShardId, paramName); } else { dqParams[paramName] = stageInfo.Meta.Tx.Params->SerializeParamValue(paramName); } @@ -1021,13 +1014,7 @@ NYql::NDqProto::TDqTask SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, c TString TTaskMeta::ToString(const TVector<NScheme::TTypeInfo>& keyTypes, const NScheme::TTypeRegistry& typeRegistry) const { TStringBuilder sb; - sb << "TTaskMeta{ ShardId: " << ShardId << ", Params: ["; - - for (auto& [name, value] : Params) { - sb << name << ", "; - } - - sb << "], Reads: { "; + sb << "TTaskMeta{ ShardId: " << ShardId << ", Reads: { "; if (Reads) { for (ui64 i = 0; i < Reads->size(); ++i) { diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index 2b52d682ef..911a7da01c 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -139,7 +139,6 @@ public: bool ScanTask = false; TActorId ExecuterId; - TMap<TString, NYql::NDqProto::TData> Params; THashMap<TString, TString> DqTaskParams; // Params for sources/sinks THashMap<TString, TString> DqSecureParams; diff --git a/ydb/core/kqp/query_data/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp index 69c2cd65c3..b1c61e3a2a 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.cpp +++ b/ydb/core/kqp/query_data/kqp_query_data.cpp @@ -7,6 +7,7 @@ #include <ydb/library/yql/minikql/mkql_string_util.h> #include <ydb/library/yql/public/udf/udf_data_type.h> #include <ydb/library/yql/utils/yql_panic.h> +#include <ydb/core/util/yverify_stream.h> namespace NKikimr::NKqp { @@ -136,6 +137,9 @@ TQueryData::~TQueryData() { TxResults.swap(emptyResultMap); TUnboxedParamsMap emptyMap; UnboxedData.swap(emptyMap); + + TPartitionedParamMap empty; + empty.swap(PartitionedParams); } } @@ -268,6 +272,15 @@ TQueryData::TTypedUnboxedValue& TQueryData::GetParameterUnboxedValue(const TStri return it->second; } +TQueryData::TTypedUnboxedValue* TQueryData::GetParameterUnboxedValuePtr(const TString& name) { + auto it = UnboxedData.find(name); + if (it == UnboxedData.end()) { + return nullptr; + } + + return &it->second; +} + const NKikimrMiniKQL::TParams* TQueryData::GetParameterMiniKqlValue(const TString& name) { if (UnboxedData.find(name) == UnboxedData.end()) return nullptr; @@ -334,6 +347,42 @@ bool TQueryData::MaterializeParamValue(bool ensure, const NKqpProto::TKqpPhyPara return false; } +void TQueryData::AddShardParam(ui64 shardId, const TString& name, NKikimr::NMiniKQL::TType* type, NKikimr::NMiniKQL::TUnboxedValueVector&& value) { + auto guard = TypeEnv().BindAllocator(); + auto [it, inserted] = PartitionedParams.emplace( + std::piecewise_construct, + std::forward_as_tuple(shardId, name), + std::forward_as_tuple(type, std::move(value))); + YQL_ENSURE(inserted, "duplicate parameter, shardId#: " + << shardId << ", paramName#: " << name); +} + +void TQueryData::ClearPrunedParams() { + if (PartitionedParams.empty()) + return; + + auto guard = TypeEnv().BindAllocator(); + for(auto& [key, value]: PartitionedParams) { + TUnboxedValueVector emptyVector; + value.Values.swap(emptyVector); + } + + TPartitionedParamMap emptyMap; + emptyMap.swap(PartitionedParams); +} + +NDqProto::TData TQueryData::GetShardParam(ui64 shardId, const TString& name) { + auto kv = std::make_pair(shardId, name); + auto it = PartitionedParams.find(kv); + if (it == PartitionedParams.end()) { + return SerializeParamValue(name); + } + + auto guard = TypeEnv().BindAllocator(); + NDq::TDqDataSerializer dataSerializer{AllocState->TypeEnv, AllocState->HolderFactory, NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0}; + return dataSerializer.Serialize(it->second.Values.begin(), it->second.Values.end(), it->second.ItemType); +} + NDqProto::TData TQueryData::SerializeParamValue(const TString& name) { auto guard = TypeEnv().BindAllocator(); const auto& [type, value] = GetParameterUnboxedValue(name); @@ -344,10 +393,20 @@ void TQueryData::Clear() { { auto g = TypeEnv().BindAllocator(); Params.clear(); + TUnboxedParamsMap emptyMap; UnboxedData.swap(emptyMap); + THashMap<ui32, TVector<TKqpExecuterTxResult>> emptyResultMap; TxResults.swap(emptyResultMap); + + for(auto& [key, param]: PartitionedParams) { + NKikimr::NMiniKQL::TUnboxedValueVector emptyValues; + param.Values.swap(emptyValues); + } + + PartitionedParams.clear(); + AllocState->Reset(); } } diff --git a/ydb/core/kqp/query_data/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h index 981f8996a2..d3542dc1c7 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.h +++ b/ydb/core/kqp/query_data/kqp_query_data.h @@ -10,12 +10,12 @@ #include <library/cpp/random_provider/random_provider.h> #include <library/cpp/time_provider/time_provider.h> +#include <library/cpp/containers/absl_flat_hash/flat_hash_map.h> #include <util/generic/ptr.h> #include <util/generic/guid.h> #include <google/protobuf/arena.h> -#include <unordered_map> #include <vector> namespace NKqpProto { @@ -37,7 +37,7 @@ namespace NKikimr::NKqp { using TTypedUnboxedValue = std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue>; using TNamedUnboxedValue = std::pair<const TString, TTypedUnboxedValue>; -using TUnboxedParamsMap = std::unordered_map< +using TUnboxedParamsMap = absl::flat_hash_map< TString, TTypedUnboxedValue, std::hash<TString>, @@ -45,6 +45,26 @@ using TUnboxedParamsMap = std::unordered_map< NKikimr::NMiniKQL::TMKQLAllocator<TNamedUnboxedValue> >; +struct TPartitionParam { + NKikimr::NMiniKQL::TType *ItemType; + NYql::NDqProto::TData Data; + NKikimr::NMiniKQL::TUnboxedValueVector Values; + + TPartitionParam(NKikimr::NMiniKQL::TType *itemType, + NKikimr::NMiniKQL::TUnboxedValueVector&& values) + : ItemType(itemType) + , Values(std::move(values)) + {} +}; + +using TPartitionedParamWithKey = std::pair<const std::pair<ui64, TString>, TPartitionParam>; + +using TPartitionedParamMap = absl::flat_hash_map< + std::pair<ui64, TString>, + TPartitionParam, + THash<std::pair<ui64, TString>> +>; + using TTypedUnboxedValueVector = std::vector< TTypedUnboxedValue, NKikimr::NMiniKQL::TMKQLAllocator<TTypedUnboxedValue> @@ -158,16 +178,22 @@ private: using TTypedUnboxedValue = std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue>; using TNamedUnboxedValue = std::pair<const TString, TTypedUnboxedValue>; - using TParamMap = std::unordered_map< + using TParamMap = absl::flat_hash_map< TString, NKikimrMiniKQL::TParams >; + using TParamProvider = std::function< + bool(std::string_view name, NKikimr::NMiniKQL::TType* type, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, NUdf::TUnboxedValue& value) + >; + TParamMap Params; TUnboxedParamsMap UnboxedData; THashMap<ui32, TVector<TKqpExecuterTxResult>> TxResults; TVector<TVector<TKqpPhyTxHolder::TConstPtr>> TxHolders; TTxAllocatorState::TPtr AllocState; + mutable TPartitionedParamMap PartitionedParams; public: using TPtr = std::shared_ptr<TQueryData>; @@ -191,6 +217,10 @@ public: void AddTxResults(ui32 txIndex, TVector<TKqpExecuterTxResult>&& results); void AddTxHolders(TVector<TKqpPhyTxHolder::TConstPtr>&& holders); + void AddShardParam(ui64 shardId, const TString& name, NKikimr::NMiniKQL::TType* type, NKikimr::NMiniKQL::TUnboxedValueVector&& value); + void ClearPrunedParams(); + NYql::NDqProto::TData GetShardParam(ui64 shardId, const TString& name); + bool HasResult(ui32 txIndex, ui32 resultIndex) { if (!TxResults.contains(txIndex)) return false; @@ -211,9 +241,30 @@ public: std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> GetInternalBindingValue(const NKqpProto::TKqpPhyParamBinding& paramBinding); TTypedUnboxedValue& GetParameterUnboxedValue(const TString& name); + TTypedUnboxedValue* GetParameterUnboxedValuePtr(const TString& name); const NKikimrMiniKQL::TParams* GetParameterMiniKqlValue(const TString& name); NYql::NDqProto::TData SerializeParamValue(const TString& name); void Clear(); + + static TParamProvider GetParameterProvider(const std::shared_ptr<TQueryData>& queryData) { + return [queryData](std::string_view name, NMiniKQL::TType* type, + const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, + NUdf::TUnboxedValue& value) + { + Y_UNUSED(typeEnv); + Y_UNUSED(holderFactory); + + if (TTypedUnboxedValue* param = queryData->GetParameterUnboxedValuePtr(TString(name))) { + std::tie(type, value) = *param; + return true; + } + + + return false; + }; + } }; + + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 2ee6c441ba..c7117fdb0f 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1084,6 +1084,7 @@ public: YQL_ENSURE(QueryState); LWTRACK(KqpSessionPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, ev->ResultRowsCount); + QueryState->QueryData->ClearPrunedParams(); if (!ev->GetTxResults().empty()) { QueryState->QueryData->AddTxResults(QueryState->CurrentTx - 1, std::move(ev->GetTxResults())); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 1fb29ed6ab..afc078e1e7 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1475,7 +1475,8 @@ protected: TaskRunner = taskRunner; } - void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext()) { + void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext(), + const TDqTaskRunnerParameterProvider& parameterProvider = {}) { YQL_ENSURE(TaskRunner); auto guard = TaskRunner->BindAllocator(MemoryQuota->GetMkqlMemoryLimit()); @@ -1487,7 +1488,7 @@ protected: limits.ChannelBufferSize = MemoryLimits.ChannelBufferSize; limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize; - TaskRunner->Prepare(Task, limits, execCtx); + TaskRunner->Prepare(Task, limits, execCtx, parameterProvider); FillIoMaps( TaskRunner->GetHolderFactory(), |