diff options
author | gvit <gvit@ydb.tech> | 2023-05-29 21:32:31 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-05-29 21:32:31 +0300 |
commit | 9a75beeb859ebdd2d5391f0b20cb0349f2fec5a3 (patch) | |
tree | 211fa69d4f355f7fc727fac25a143c1d5f198abd | |
parent | 1f91bff6f709b6f762453f020dce570db6d2b180 (diff) | |
download | ydb-9a75beeb859ebdd2d5391f0b20cb0349f2fec5a3.tar.gz |
implement dq task settings class to store all related data of dq task
38 files changed, 348 insertions, 172 deletions
diff --git a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp index 18fa49ed13..4e5bfc6933 100644 --- a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp +++ b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp @@ -74,7 +74,7 @@ void TCheckpointCoordinator::Handle(NYql::NDqs::TEvReadyState::TPtr& ev) { Y_VERIFY(tasks.size() == actorIds.size()); for (int i = 0; i < static_cast<int>(tasks.size()); ++i) { - auto& task = tasks[i]; + const auto& task = GetTask(i); auto& actorId = TaskIdToActor[task.GetId()]; if (actorId) { OnInternalError(TStringBuilder() << "Duplicate task id: " << task.GetId()); diff --git a/ydb/core/fq/libs/checkpointing/utils.cpp b/ydb/core/fq/libs/checkpointing/utils.cpp index 0cae2bf7a9..c15ccfa9a6 100644 --- a/ydb/core/fq/libs/checkpointing/utils.cpp +++ b/ydb/core/fq/libs/checkpointing/utils.cpp @@ -2,7 +2,7 @@ namespace NFq { -bool IsIngress(const NYql::NDqProto::TDqTask& task) { +bool IsIngress(const NYql::NDq::TDqTaskSettings& task) { // No inputs at all or the only inputs are sources. for (const auto& input : task.GetInputs()) { if (!input.HasSource()) { @@ -12,7 +12,7 @@ bool IsIngress(const NYql::NDqProto::TDqTask& task) { return true; } -bool IsEgress(const NYql::NDqProto::TDqTask& task) { +bool IsEgress(const NYql::NDq::TDqTaskSettings& task) { for (const auto& output : task.GetOutputs()) { if (output.HasSink()) { return true; @@ -21,7 +21,7 @@ bool IsEgress(const NYql::NDqProto::TDqTask& task) { return false; } -bool HasState(const NYql::NDqProto::TDqTask& task) { +bool HasState(const NYql::NDq::TDqTaskSettings& task) { Y_UNUSED(task); return true; } diff --git a/ydb/core/fq/libs/checkpointing/utils.h b/ydb/core/fq/libs/checkpointing/utils.h index d5e159b6d6..fd700a4af9 100644 --- a/ydb/core/fq/libs/checkpointing/utils.h +++ b/ydb/core/fq/libs/checkpointing/utils.h @@ -1,13 +1,13 @@ #pragma once -#include <ydb/library/yql/dq/proto/dq_tasks.pb.h> +#include <ydb/library/yql/dq/runtime/dq_tasks_runner.h> namespace NFq { -bool IsIngress(const NYql::NDqProto::TDqTask& task); +bool IsIngress(const NYql::NDq::TDqTaskSettings& task); -bool IsEgress(const NYql::NDqProto::TDqTask& task); +bool IsEgress(const NYql::NDq::TDqTaskSettings& task); -bool HasState(const NYql::NDqProto::TDqTask& task); +bool HasState(const NYql::NDq::TDqTaskSettings& task); } // namespace NFq diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index 0002c927c6..ed48905917 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -227,7 +227,7 @@ void Init( lwmOptions.MkqlProgramHardMemoryLimit = protoConfig.GetResourceManager().GetMkqlTaskHardMemoryLimit(); lwmOptions.MkqlMinAllocSize = mkqlAllocSize; lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory( - [=](const NYql::NDqProto::TDqTask& task, const NYql::NDq::TLogFunc&) { + [=](const NYql::NDq::TDqTaskSettings& task, const NYql::NDq::TLogFunc&) { return lwmOptions.Factory->Get(task); }); if (protoConfig.GetRateLimiter().GetDataPlaneEnabled()) { diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index 3889456bdf..90ba2eb329 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -96,8 +96,7 @@ public: auto wakeup = [this]{ ContinueExecute(); }; try { PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, - std::move(wakeup), TlsActivationContext->AsActorContext()), - ParameterProvider); + std::move(wakeup), TlsActivationContext->AsActorContext())); } catch (const NMiniKQL::TKqpEnsureFail& e) { InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage()); return; diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp index f43d5806b5..bb93f498c6 100644 --- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp @@ -210,8 +210,9 @@ public: auto taskRunner = CreateKqpTaskRunner(context, settings, log); TaskRunners.emplace_back(taskRunner); - taskRunner->Prepare(protoTask, CreateTaskRunnerMemoryLimits(), CreateTaskRunnerExecutionContext(), - TQueryData::GetParameterProvider(stageInfo.Meta.Tx.Params)); + auto taskSettings = NDq::TDqTaskSettings(std::move(protoTask)); + taskSettings.SetParamsProvider(std::move(TQueryData::GetParameterProvider(stageInfo.Meta.Tx.Params))); + taskRunner->Prepare(taskSettings, CreateTaskRunnerMemoryLimits(), CreateTaskRunnerExecutionContext()); auto status = taskRunner->Run(); YQL_ENSURE(status == ERunStatus::Finished); diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp index 651d609db8..3b42b886f7 100644 --- a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp +++ b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp @@ -73,7 +73,7 @@ TIntrusivePtr<IDqTaskRunner> CreateKqpTaskRunner(const TDqTaskRunnerContext& exe } -TKqpTasksRunner::TKqpTasksRunner(const google::protobuf::RepeatedPtrField<NDqProto::TDqTask>& tasks, +TKqpTasksRunner::TKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks, const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc) : LogFunc(logFunc) , Alloc(execCtx.Alloc) @@ -88,13 +88,14 @@ TKqpTasksRunner::TKqpTasksRunner(const google::protobuf::RepeatedPtrField<NDqPro auto guard = execCtx.TypeEnv->BindAllocator(); try { - for (auto& task : tasks) { + for (auto&& task : tasks) { + ui64 taskId = task.GetId(); auto runner = CreateKqpTaskRunner(execCtx, settings, logFunc); if (auto* stats = runner->GetStats()) { - Stats.emplace(task.GetId(), stats); + Stats.emplace(taskId, stats); } - TaskRunners.emplace(task.GetId(), std::move(runner)); - Tasks.emplace(task.GetId(), &task); + TaskRunners.emplace(taskId, std::move(runner)); + Tasks.emplace(taskId, std::move(task)); } } catch (const TMemoryLimitExceededException&) { TaskRunners.clear(); @@ -119,7 +120,9 @@ void TKqpTasksRunner::Prepare(const TDqTaskRunnerMemoryLimits& memoryLimits, con for (auto& [taskId, taskRunner] : TaskRunners) { ComputeCtx->SetCurrentTaskId(taskId); - taskRunner->Prepare(*Tasks[taskId], memoryLimits, execCtx); + auto it = Tasks.find(taskId); + Y_VERIFY(it != Tasks.end()); + taskRunner->Prepare(it->second, memoryLimits, execCtx); } ComputeCtx->SetCurrentTaskId(std::numeric_limits<ui64>::max()); @@ -225,8 +228,8 @@ const IDqTaskRunner& TKqpTasksRunner::GetTaskRunner(ui64 taskId) const { return **task; } -const NYql::NDqProto::TDqTask& TKqpTasksRunner::GetTask(ui64 taskId) const { - return *Tasks.at(taskId); +const NYql::NDq::TDqTaskSettings& TKqpTasksRunner::GetTask(ui64 taskId) const { + return Tasks.at(taskId); } TGuard<NMiniKQL::TScopedAlloc> TKqpTasksRunner::BindAllocator(TMaybe<ui64> memoryLimit) { @@ -236,10 +239,10 @@ TGuard<NMiniKQL::TScopedAlloc> TKqpTasksRunner::BindAllocator(TMaybe<ui64> memor return TGuard(*Alloc); } -TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(const google::protobuf::RepeatedPtrField<NDqProto::TDqTask>& tasks, +TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks, const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc) { - return new TKqpTasksRunner(tasks, execCtx, settings, logFunc); + return new TKqpTasksRunner(std::move(tasks), execCtx, settings, logFunc); } } // namespace NKqp diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.h b/ydb/core/kqp/runtime/kqp_tasks_runner.h index b0e52743b6..6d3788a6a4 100644 --- a/ydb/core/kqp/runtime/kqp_tasks_runner.h +++ b/ydb/core/kqp/runtime/kqp_tasks_runner.h @@ -17,7 +17,7 @@ TIntrusivePtr<NYql::NDq::IDqTaskRunner> CreateKqpTaskRunner(const NYql::NDq::TDq class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCopyable { public: - TKqpTasksRunner(const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& tasks, + TKqpTasksRunner(google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>&& tasks, const NYql::NDq::TDqTaskRunnerContext& execCtx, const NYql::NDq::TDqTaskRunnerSettings& settings, const NYql::NDq::TLogFunc& logFunc); @@ -34,7 +34,11 @@ public: NYql::NDq::IDqTaskRunner& GetTaskRunner(ui64 taskId); const NYql::NDq::IDqTaskRunner& GetTaskRunner(ui64 taskId) const; - const NYql::NDqProto::TDqTask& GetTask(ui64 taskId) const; + const TMap<ui64, NYql::NDq::TDqTaskSettings>& GetTasks() const { + return Tasks; + } + + const NYql::NDq::TDqTaskSettings& GetTask(ui64 taskId) const; NYql::NDq::IDqInputChannel::TPtr GetInputChannel(ui64 taskId, ui64 channelId) { return GetTaskRunner(taskId).GetInputChannel(channelId); @@ -54,7 +58,7 @@ public: private: TMap<ui64, TIntrusivePtr<NYql::NDq::IDqTaskRunner>> TaskRunners; - TMap<ui64, const NYql::NDqProto::TDqTask*> Tasks; + TMap<ui64, NYql::NDq::TDqTaskSettings> Tasks; TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> Stats; NYql::NDq::TLogFunc LogFunc; NMiniKQL::TScopedAlloc* Alloc; @@ -70,7 +74,7 @@ private: }; -TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& tasks, +TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>&& tasks, const NYql::NDq::TDqTaskRunnerContext& execCtx, const NYql::NDq::TDqTaskRunnerSettings& settings, const NYql::NDq::TLogFunc& logFunc); diff --git a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp index 585109ef45..62af3247a3 100644 --- a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp @@ -76,7 +76,8 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac } try { - const auto& kqpTx = dataTx->GetKqpTransaction(); + bool useGenericReadSets = dataTx->GetUseGenericReadSets(); + const auto& kqpLocks = dataTx->GetKqpLocks(); auto& tasksRunner = dataTx->GetKqpTasksRunner(); auto allocGuard = tasksRunner.BindAllocator(txc.GetMemoryLimit() - dataTx->GetTxSize()); @@ -84,7 +85,8 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac NKqp::NRm::TKqpResourcesRequest req; req.MemoryPool = NKqp::NRm::EKqpMemoryPool::DataQuery; req.Memory = txc.GetMemoryLimit(); - ui64 taskId = kqpTx.GetTasks().empty() ? std::numeric_limits<ui64>::max() : kqpTx.GetTasks()[0].GetId(); + ui64 taskId = dataTx->GetFirstKqpTaskId(); + NKqp::GetKqpResourceManager()->NotifyExternalResourcesAllocated(tx->GetTxId(), taskId, req); Y_DEFER { @@ -97,7 +99,7 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac dataTx->SetReadVersion(DataShard.GetReadWriteVersions(tx).ReadVersion); if (dataTx->GetKqpComputeCtx().HasPersistentChannels()) { - auto result = KqpRunTransaction(ctx, op->GetTxId(), kqpTx, tasksRunner); + auto result = KqpRunTransaction(ctx, op->GetTxId(), kqpLocks, useGenericReadSets, tasksRunner); Y_VERIFY_S(!dataTx->GetKqpComputeCtx().HadInconsistentReads(), "Unexpected inconsistent reads in operation " << *op << " when preparing persistent channels"); @@ -108,7 +110,8 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac } } - KqpFillOutReadSets(op->OutReadSets(), kqpTx, tasksRunner, DataShard.SysLocksTable(), tabletId); + KqpFillOutReadSets(op->OutReadSets(), kqpLocks, + dataTx->HasKqpLocks(), useGenericReadSets, tasksRunner, DataShard.SysLocksTable(), tabletId); } catch (const TMemoryLimitExceededException&) { LOG_T("Operation " << *op << " at " << tabletId << " exceeded memory limit " << txc.GetMemoryLimit() diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 469935e245..470fe4f82c 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -1273,7 +1273,7 @@ void TEngineBay::SetLockTxId(ui64 lockTxId, ui32 lockNodeId) { } } -NKqp::TKqpTasksRunner& TEngineBay::GetKqpTasksRunner(const NKikimrTxDataShard::TKqpTransaction& tx) { +NKqp::TKqpTasksRunner& TEngineBay::GetKqpTasksRunner(NKikimrTxDataShard::TKqpTransaction& tx) { if (!KqpTasksRunner) { NYql::NDq::TDqTaskRunnerSettings settings; @@ -1291,7 +1291,7 @@ NKqp::TKqpTasksRunner& TEngineBay::GetKqpTasksRunner(const NKikimrTxDataShard::T settings.TerminateOnError = false; KqpAlloc->SetLimit(10_MB); - KqpTasksRunner = NKqp::CreateKqpTasksRunner(tx.GetTasks(), KqpExecCtx, settings, KqpLogFunc); + KqpTasksRunner = NKqp::CreateKqpTasksRunner(std::move(*tx.MutableTasks()), KqpExecCtx, settings, KqpLogFunc); } return *KqpTasksRunner; diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h index 0c83a56186..f49e944284 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.h +++ b/ydb/core/tx/datashard/datashard__engine_host.h @@ -120,7 +120,7 @@ public: void ResetCounters() { EngineHostCounters = TEngineHostCounters(); } const TEngineHostCounters& GetCounters() const { return EngineHostCounters; } - NKqp::TKqpTasksRunner& GetKqpTasksRunner(const NKikimrTxDataShard::TKqpTransaction& tx); + NKqp::TKqpTasksRunner& GetKqpTasksRunner(NKikimrTxDataShard::TKqpTransaction& tx); NMiniKQL::TKqpDatashardComputeContext& GetKqpComputeCtx(); private: diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index 6f72f6d367..1076c7d904 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -27,6 +27,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, , TxSize(0) , TxCacheUsage(0) , IsReleased(false) + , BuiltTaskRunner(false) , IsReadOnly(true) , AllowCancelROwithReadsets(self->AllowCancelROwithReadsets()) , Cancelled(false) @@ -82,7 +83,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, try { bool hasPersistentChannels = false; - if (!KqpValidateTransaction(GetKqpTransaction(), Immediate(), StepTxId_.TxId, ctx, hasPersistentChannels)) { + if (!KqpValidateTransaction(GetTasks(), Immediate(), StepTxId_.TxId, ctx, hasPersistentChannels)) { LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "KQP transaction validation failed, datashard: " << TabletId() << ", txid: " << StepTxId_.TxId); ErrCode = NKikimrTxDataShard::TError::PROGRAM_ERROR; @@ -91,7 +92,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, } computeCtx.SetHasPersistentChannels(hasPersistentChannels); - for (auto& task : GetKqpTransaction().GetTasks()) { + for (auto& task : GetTasks()) { NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta; if (!task.GetMeta().UnpackTo(&meta)) { LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "KQP transaction validation failed" @@ -154,7 +155,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, IsReadOnly = IsReadOnly && Tx.GetReadOnly(); - KqpSetTxLocksKeys(GetKqpTransaction().GetLocks(), self->SysLocksTable(), EngineBay); + KqpSetTxLocksKeys(GetKqpLocks(), self->SysLocksTable(), EngineBay); EngineBay.MarkTxLoaded(); auto& tasksRunner = GetKqpTasksRunner(); // create tasks runner, can throw TMemoryLimitExceededException @@ -203,11 +204,6 @@ TValidatedDataTx::~TValidatedDataTx() { NActors::NMemory::TLabel<MemoryLabelValidatedDataTx>::Sub(TxSize); } -const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& TValidatedDataTx::GetKqpTasks() const { - Y_VERIFY(IsKqpTx()); - return Tx.GetKqpTransaction().GetTasks(); -} - ui32 TValidatedDataTx::ExtractKeys(bool allowErrors) { using EResult = NMiniKQL::IEngineFlat::EResult; diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index b419ee1ecd..b0f927437e 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -210,9 +210,58 @@ public: return IsKqpTx() && Tx.GetKqpTransaction().GetType() == NKikimrTxDataShard::KQP_TX_TYPE_SCAN; } - const NKikimrTxDataShard::TKqpTransaction &GetKqpTransaction() const { return Tx.GetKqpTransaction(); } - const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& GetKqpTasks() const; - NKqp::TKqpTasksRunner& GetKqpTasksRunner() { Y_VERIFY(IsKqpDataTx()); return EngineBay.GetKqpTasksRunner(Tx.GetKqpTransaction()); } + bool GetUseGenericReadSets() const { + Y_VERIFY(IsKqpDataTx()); + return Tx.GetKqpTransaction().GetUseGenericReadSets(); + } + + inline const ::NKikimrTxDataShard::TKqpLocks& GetKqpLocks() const { + Y_VERIFY(IsKqpDataTx()); + return Tx.GetKqpTransaction().GetLocks(); + } + + inline bool HasKqpLocks() const { + Y_VERIFY(IsKqpDataTx()); + return Tx.GetKqpTransaction().HasLocks(); + } + + inline bool HasKqpSnapshot() const { + Y_VERIFY(IsKqpDataTx()); + return Tx.GetKqpTransaction().HasSnapshot(); + } + + inline const ::NKikimrKqp::TKqpSnapshot& GetKqpSnapshot() const { + Y_VERIFY(IsKqpDataTx()); + return Tx.GetKqpTransaction().GetSnapshot(); + } + + inline const ::google::protobuf::RepeatedPtrField<::NYql::NDqProto::TDqTask>& GetTasks() const { + Y_VERIFY(IsKqpDataTx()); + // ensure that GetTasks is not called after task runner is built + Y_VERIFY(!BuiltTaskRunner); + return Tx.GetKqpTransaction().GetTasks(); + } + + inline ui64 GetFirstKqpTaskId() { + ui64 taskId = std::numeric_limits<ui64>::max(); + const auto& tasks = GetKqpTasksRunner().GetTasks(); + if (!tasks.empty()) { + taskId = tasks.begin()->second.GetId(); + } + return taskId; + } + + NKqp::TKqpTasksRunner& GetKqpTasksRunner() { + Y_VERIFY(IsKqpDataTx()); + BuiltTaskRunner = true; + return EngineBay.GetKqpTasksRunner(*Tx.MutableKqpTransaction()); + } + + ::NYql::NDqProto::EDqStatsMode GetKqpStatsMode() const { + Y_VERIFY(IsKqpDataTx()); + return Tx.GetKqpTransaction().GetRuntimeSettings().GetStatsMode(); + } + NMiniKQL::TKqpDatashardComputeContext& GetKqpComputeCtx() { Y_VERIFY(IsKqpDataTx()); return EngineBay.GetKqpComputeCtx(); } bool HasStreamResponse() const { return Tx.GetStreamResponse(); } @@ -252,6 +301,7 @@ private: ui64 TxSize; ui64 TxCacheUsage; bool IsReleased; + bool BuiltTaskRunner; TMaybe<ui64> PerShardKeysSizeLimitBytes_; bool IsReadOnly; bool AllowCancelROwithReadsets; diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index 29399bce45..1f531c43bd 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -98,11 +98,12 @@ NUdf::EFetchStatus FetchOutput(NDq::IDqOutputChannel* channel, NDqProto::TData& } NDq::ERunStatus RunKqpTransactionInternal(const TActorContext& ctx, ui64 txId, - const TInputOpData::TInReadSets* inReadSets, const NKikimrTxDataShard::TKqpTransaction& kqpTx, + const TInputOpData::TInReadSets* inReadSets, const NKikimrTxDataShard::TKqpLocks&, + bool useGenericReadSets, NKqp::TKqpTasksRunner& tasksRunner, bool applyEffects) { THashMap<ui64, std::pair<ui64, ui32>> inputChannelsMap; // channelId -> (taskId, input index) - for (auto& task : kqpTx.GetTasks()) { + for (auto& [taskId, task] : tasksRunner.GetTasks()) { for (ui32 i = 0; i < task.InputsSize(); ++i) { auto& input = task.GetInputs(i); for (auto& channel : input.GetChannels()) { @@ -125,7 +126,7 @@ NDq::ERunStatus RunKqpTransactionInternal(const TActorContext& ctx, ui64 txId, for (auto& data : dataList) { NKikimrTxDataShard::TKqpReadset kqpReadset; - if (kqpTx.GetUseGenericReadSets()) { + if (useGenericReadSets) { NKikimrTx::TReadSetData genericData; bool ok = genericData.ParseFromString(data.Body); Y_VERIFY(ok, "Failed to parse generic readset data from %" PRIu64 " to %" PRIu64 " origin %" PRIu64, @@ -175,7 +176,8 @@ NDq::ERunStatus RunKqpTransactionInternal(const TActorContext& ctx, ui64 txId, MKQL_ENSURE_S(runStatus == NDq::ERunStatus::PendingInput); hasInputChanges = false; - for (auto& task : kqpTx.GetTasks()) { + for (auto& taskIt : tasksRunner.GetTasks()) { + const auto& task = taskIt.second; for (ui32 i = 0; i < task.OutputsSize(); ++i) { for (auto& channel : task.GetOutputs(i).GetChannels()) { if (auto* inputInfo = inputChannelsMap.FindPtr(channel.GetId())) { @@ -303,10 +305,10 @@ bool ReceiveLocks(const NKikimrTxDataShard::TKqpLocks& locks, ui64 shardId) { } // namespace -bool KqpValidateTransaction(const NKikimrTxDataShard::TKqpTransaction& tx, bool isImmediate, ui64 txId, +bool KqpValidateTransaction(const ::google::protobuf::RepeatedPtrField< ::NYql::NDqProto::TDqTask>& tasks, bool isImmediate, ui64 txId, const TActorContext& ctx, bool& hasPersistentChannels) { - for (const auto& task : tx.GetTasks()) { + for (const auto& task : tasks) { if (!KqpValidateTask(task, isImmediate, txId, ctx, hasPersistentChannels)) { return false; } @@ -498,17 +500,17 @@ void KqpSetTxLocksKeys(const NKikimrTxDataShard::TKqpLocks& locks, const TSysLoc } NYql::NDq::ERunStatus KqpRunTransaction(const TActorContext& ctx, ui64 txId, - const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner) + const NKikimrTxDataShard::TKqpLocks& kqpLocks, bool useGenericReadSets, NKqp::TKqpTasksRunner& tasksRunner) { - return RunKqpTransactionInternal(ctx, txId, /* inReadSets */ nullptr, kqpTx, tasksRunner, /* applyEffects */ false); + return RunKqpTransactionInternal(ctx, txId, /* inReadSets */ nullptr, kqpLocks, useGenericReadSets, tasksRunner, /* applyEffects */ false); } THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const TActorContext& ctx, ui64 origin, ui64 txId, const TInputOpData::TInReadSets* inReadSets, - const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner, + const NKikimrTxDataShard::TKqpLocks& kqpLocks, bool useGenericReadSets, NKqp::TKqpTasksRunner& tasksRunner, const NMiniKQL::TKqpDatashardComputeContext& computeCtx) { - auto runStatus = RunKqpTransactionInternal(ctx, txId, inReadSets, kqpTx, tasksRunner, /* applyEffects */ true); + auto runStatus = RunKqpTransactionInternal(ctx, txId, inReadSets, kqpLocks, useGenericReadSets, tasksRunner, /* applyEffects */ true); if (computeCtx.HadInconsistentReads()) { return nullptr; @@ -527,7 +529,7 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(NKikimrTxDataShard::TX_KIND_DATA, origin, txId, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); - for (auto& task : kqpTx.GetTasks()) { + for (auto& [taskId, task] : tasksRunner.GetTasks()) { auto& taskRunner = tasksRunner.GetTaskRunner(task.GetId()); for (ui32 i = 0; i < task.OutputsSize(); ++i) { @@ -573,12 +575,13 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const return result; } -void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrTxDataShard::TKqpTransaction& kqpTx, +void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrTxDataShard::TKqpLocks& kqpLocks, + bool hasKqpLocks, bool useGenericReadSets, NKqp::TKqpTasksRunner& tasksRunner, TSysLocks& sysLocks, ui64 tabletId) { TMap<std::pair<ui64, ui64>, NKikimrTxDataShard::TKqpReadset> readsetData; - for (auto& task : kqpTx.GetTasks()) { + for (auto& [taskId, task] : tasksRunner.GetTasks()) { auto& taskRunner = tasksRunner.GetTaskRunner(task.GetId()); for (ui32 i = 0; i < task.OutputsSize(); ++i) { @@ -606,12 +609,12 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT NKikimrTx::TReadSetData::EDecision decision = NKikimrTx::TReadSetData::DECISION_COMMIT; TMap<std::pair<ui64, ui64>, NKikimrTx::TReadSetData> genericData; - if (kqpTx.HasLocks() && NeedValidateLocks(kqpTx.GetLocks().GetOp())) { - bool sendLocks = SendLocks(kqpTx.GetLocks(), tabletId); - YQL_ENSURE(sendLocks == !kqpTx.GetLocks().GetLocks().empty()); + if (hasKqpLocks && NeedValidateLocks(kqpLocks.GetOp())) { + bool sendLocks = SendLocks(kqpLocks, tabletId); + YQL_ENSURE(sendLocks == !kqpLocks.GetLocks().empty()); - if (sendLocks && !kqpTx.GetLocks().GetReceivingShards().empty()) { - auto brokenLocks = ValidateLocks(kqpTx.GetLocks(), sysLocks, tabletId); + if (sendLocks && !kqpLocks.GetReceivingShards().empty()) { + auto brokenLocks = ValidateLocks(kqpLocks, sysLocks, tabletId); NKikimrTxDataShard::TKqpValidateLocksResult validateLocksResult; validateLocksResult.SetSuccess(brokenLocks.empty()); @@ -619,14 +622,14 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT for (auto& lock : brokenLocks) { LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Found broken lock: " << lock.ShortDebugString()); - if (kqpTx.GetUseGenericReadSets()) { + if (useGenericReadSets) { decision = NKikimrTx::TReadSetData::DECISION_ABORT; } else { validateLocksResult.AddBrokenLocks()->Swap(&lock); } } - for (auto& dstTabletId : kqpTx.GetLocks().GetReceivingShards()) { + for (auto& dstTabletId : kqpLocks.GetReceivingShards()) { if (tabletId == dstTabletId) { continue; } @@ -635,7 +638,7 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT << tabletId << " to " << dstTabletId << ", locks: " << validateLocksResult.ShortDebugString()); auto key = std::make_pair(tabletId, dstTabletId); - if (kqpTx.GetUseGenericReadSets()) { + if (useGenericReadSets) { genericData[key].SetDecision(decision); } else { readsetData[key].MutableValidateLocksResult()->CopyFrom(validateLocksResult); @@ -644,7 +647,7 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT } } - if (kqpTx.GetUseGenericReadSets()) { + if (useGenericReadSets) { for (const auto& [key, data] : readsetData) { bool ok = genericData[key].MutableData()->PackFrom(data); Y_VERIFY(ok, "Failed to pack readset data from %" PRIu64 " to %" PRIu64, key.first, key.second); @@ -672,17 +675,18 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT } bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) { - auto& kqpTx = tx->GetDataTx()->GetKqpTransaction(); + auto& kqpLocks = tx->GetDataTx()->GetKqpLocks(); + bool hasKqpLocks = tx->GetDataTx()->HasKqpLocks(); - if (!kqpTx.HasLocks() || !NeedValidateLocks(kqpTx.GetLocks().GetOp())) { + if (!hasKqpLocks || !NeedValidateLocks(kqpLocks.GetOp())) { return true; } - bool sendLocks = SendLocks(kqpTx.GetLocks(), origin); - YQL_ENSURE(sendLocks == !kqpTx.GetLocks().GetLocks().empty()); + bool sendLocks = SendLocks(kqpLocks, origin); + YQL_ENSURE(sendLocks == !kqpLocks.GetLocks().empty()); if (sendLocks) { - auto brokenLocks = ValidateLocks(kqpTx.GetLocks(), sysLocks, origin); + auto brokenLocks = ValidateLocks(kqpLocks, sysLocks, origin); if (!brokenLocks.empty()) { tx->Result() = MakeHolder<TEvDataShard::TEvProposeTransactionResult>( @@ -702,7 +706,7 @@ bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) for (auto& readSet : tx->InReadSets()) { for (auto& data : readSet.second) { - if (kqpTx.GetUseGenericReadSets()) { + if (tx->GetDataTx()->GetUseGenericReadSets()) { NKikimrTx::TReadSetData genericData; bool ok = genericData.ParseFromString(data.Body); Y_VERIFY(ok, "Failed to parse generic readset from %" PRIu64 " to %" PRIu64 " origin %" PRIu64, @@ -745,14 +749,14 @@ bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) } bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) { - auto& kqpTx = tx->GetDataTx()->GetKqpTransaction(); + auto& kqpLocks = tx->GetDataTx()->GetKqpLocks(); - if (!kqpTx.HasLocks() || !NeedValidateLocks(kqpTx.GetLocks().GetOp())) { + if (!tx->GetDataTx()->HasKqpLocks() || !NeedValidateLocks(kqpLocks.GetOp())) { return true; } // Volatile transactions cannot work with non-generic readsets - YQL_ENSURE(kqpTx.GetUseGenericReadSets()); + YQL_ENSURE(tx->GetDataTx()->GetUseGenericReadSets()); // We may have some stale data since before the restart // We expect all stale data to be cleared on restarts @@ -761,10 +765,10 @@ bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLo // Note: usually all shards send locks, since they either have side effects or need to validate locks // However it is technically possible to have pure-read shards, that don't contribute to the final decision - bool sendLocks = SendLocks(kqpTx.GetLocks(), origin); + bool sendLocks = SendLocks(kqpLocks, origin); if (sendLocks) { // Note: it is possible to have no locks - auto brokenLocks = ValidateLocks(kqpTx.GetLocks(), sysLocks, origin); + auto brokenLocks = ValidateLocks(kqpLocks, sysLocks, origin); if (!brokenLocks.empty()) { tx->Result() = MakeHolder<TEvDataShard::TEvProposeTransactionResult>( @@ -782,7 +786,7 @@ bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLo } // We need to form decision readsets for all other participants - for (ui64 dstTabletId : kqpTx.GetLocks().GetReceivingShards()) { + for (ui64 dstTabletId : kqpLocks.GetReceivingShards()) { if (dstTabletId == origin) { // Don't send readsets to ourselves continue; @@ -803,11 +807,11 @@ bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLo } } - bool receiveLocks = ReceiveLocks(kqpTx.GetLocks(), origin); + bool receiveLocks = ReceiveLocks(kqpLocks, origin); if (receiveLocks) { // Note: usually only shards with side-effects receive locks, since they // need the final outcome to decide whether to commit or abort. - for (ui64 srcTabletId : kqpTx.GetLocks().GetSendingShards()) { + for (ui64 srcTabletId : kqpLocks.GetSendingShards()) { if (srcTabletId == origin) { // Don't await decision from ourselves continue; @@ -877,13 +881,13 @@ bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLo } void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) { - auto& kqpTx = tx->GetDataTx()->GetKqpTransaction(); + auto& kqpLocks = tx->GetDataTx()->GetKqpLocks(); - if (!kqpTx.HasLocks() || !NeedEraseLocks(kqpTx.GetLocks().GetOp())) { + if (!tx->GetDataTx()->HasKqpLocks() || !NeedEraseLocks(kqpLocks.GetOp())) { return; } - for (auto& lockProto : kqpTx.GetLocks().GetLocks()) { + for (auto& lockProto : kqpLocks.GetLocks()) { if (lockProto.GetDataShard() != origin) { continue; } @@ -896,17 +900,17 @@ void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) { } void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, const TRowVersion& writeVersion, TDataShard& dataShard) { - auto& kqpTx = tx->GetDataTx()->GetKqpTransaction(); - if (!kqpTx.HasLocks()) { + if (!tx->GetDataTx()->HasKqpLocks()) { return; } + auto& kqpLocks = tx->GetDataTx()->GetKqpLocks(); TSysLocks& sysLocks = dataShard.SysLocksTable(); - if (NeedCommitLocks(kqpTx.GetLocks().GetOp())) { + if (NeedCommitLocks(kqpLocks.GetOp())) { // We assume locks have been validated earlier - for (auto& lockProto : kqpTx.GetLocks().GetLocks()) { + for (auto& lockProto : kqpLocks.GetLocks()) { if (lockProto.GetDataShard() != origin) { continue; } @@ -927,9 +931,9 @@ void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, const TRowVersion& writ } void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets, - const NKikimrTxDataShard::TKqpTransaction& kqpTx, ui64 tabletId) + const NKikimrTxDataShard::TKqpLocks& kqpLocks, const NKqp::TKqpTasksRunner& tasksRunner, ui64 tabletId) { - for (auto& task : kqpTx.GetTasks()) { + for (auto& [taskId, task] : tasksRunner.GetTasks()) { for (ui32 i = 0; i < task.InputsSize(); ++i) { for (auto& channel : task.GetInputs(i).GetChannels()) { if (channel.GetIsPersistent()) { @@ -945,8 +949,8 @@ void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets, } } - if (ReceiveLocks(kqpTx.GetLocks(), tabletId)) { - for (ui64 shardId : kqpTx.GetLocks().GetSendingShards()) { + if (ReceiveLocks(kqpLocks, tabletId)) { + for (ui64 shardId : kqpLocks.GetSendingShards()) { if (shardId == tabletId) { continue; } diff --git a/ydb/core/tx/datashard/datashard_kqp.h b/ydb/core/tx/datashard/datashard_kqp.h index b696c19dbc..0a330218cb 100644 --- a/ydb/core/tx/datashard/datashard_kqp.h +++ b/ydb/core/tx/datashard/datashard_kqp.h @@ -11,7 +11,7 @@ namespace NKikimr { namespace NDataShard { -bool KqpValidateTransaction(const NKikimrTxDataShard::TKqpTransaction& tx, bool isImmediate, +bool KqpValidateTransaction(const ::google::protobuf::RepeatedPtrField<::NYql::NDqProto::TDqTask> & tasks, bool isImmediate, ui64 txId, const TActorContext& ctx, bool& hasPersistentChannels); void KqpSetTxKeys(ui64 tabletId, ui64 taskId, const TUserTable* tableInfo, @@ -21,18 +21,19 @@ void KqpSetTxKeys(ui64 tabletId, ui64 taskId, const TUserTable* tableInfo, void KqpSetTxLocksKeys(const NKikimrTxDataShard::TKqpLocks& locks, const TSysLocks& sysLocks, TEngineBay& engineBay); NYql::NDq::ERunStatus KqpRunTransaction(const TActorContext& ctx, ui64 txId, - const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner); + const NKikimrTxDataShard::TKqpLocks& kqpLocks, bool useGenericReadSets, NKqp::TKqpTasksRunner& tasksRunner); THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const TActorContext& ctx, ui64 origin, ui64 txId, const TInputOpData::TInReadSets* inReadSets, - const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner, + const NKikimrTxDataShard::TKqpLocks& kqpLocks, bool useGenericReadSets, NKqp::TKqpTasksRunner& tasksRunner, const NMiniKQL::TKqpDatashardComputeContext& computeCtx); -void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrTxDataShard::TKqpTransaction& kqpTx, +void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrTxDataShard::TKqpLocks& kqpLocks, + bool hasKqpLocks, bool useGenericReadSets, NKqp::TKqpTasksRunner& tasksRunner, TSysLocks& sysLocks, ui64 tabletId); void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets, - const NKikimrTxDataShard::TKqpTransaction& kqpTx, ui64 tabletId); + const NKikimrTxDataShard::TKqpLocks& kqpLocks, const NKqp::TKqpTasksRunner& tasksRunner, ui64 tabletId); bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks); bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks); diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index a38867a4dc..94e4646ed1 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1395,7 +1395,7 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction:: } else if (tx->IsReadTable() && dataTx->GetReadTableTransaction().HasSnapshotStep() && dataTx->GetReadTableTransaction().HasSnapshotTxId()) { badRequest("Ambiguous snapshot info. Cannot use both MVCC and read table snapshots in one transaction"); return tx; - } else if (tx->IsKqpScanTransaction() && dataTx->GetKqpTransaction().HasSnapshot()) { + } else if (tx->IsKqpScanTransaction() && dataTx->HasKqpSnapshot()) { badRequest("Ambiguous snapshot info. Cannot use both MVCC and kqp scan snapshots in one transaction"); return tx; } @@ -1423,9 +1423,9 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction:: if (!tx->IsImmediate() || !Self->IsMvccEnabled()) { // No op - } else if (tx->IsKqpScanTransaction() && dataTx->GetKqpTransaction().HasSnapshot()) { + } else if (tx->IsKqpScanTransaction() && dataTx->HasKqpSnapshot()) { // to be consistent while dependencies calculation - auto snapshot = dataTx->GetKqpTransaction().GetSnapshot(); + auto snapshot = dataTx->GetKqpSnapshot(); tx->SetMvccSnapshot(TRowVersion(snapshot.GetStep(), snapshot.GetTxId())); } } diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 58d5898a23..137ec95246 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -125,7 +125,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio } try { - auto& kqpTx = dataTx->GetKqpTransaction(); + auto& kqpLocks = dataTx->GetKqpLocks(); + bool useGenericReadSets = dataTx->GetUseGenericReadSets(); auto& tasksRunner = dataTx->GetKqpTasksRunner(); ui64 consumedMemory = dataTx->GetTxSize() + tasksRunner.GetAllocatedMemory(); @@ -200,7 +201,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio NKqp::NRm::TKqpResourcesRequest req; req.MemoryPool = NKqp::NRm::EKqpMemoryPool::DataQuery; req.Memory = txc.GetMemoryLimit(); - ui64 taskId = kqpTx.GetTasks().empty() ? std::numeric_limits<ui64>::max() : kqpTx.GetTasks()[0].GetId(); + ui64 taskId = dataTx->GetFirstKqpTaskId(); NKqp::GetKqpResourceManager()->NotifyExternalResourcesAllocated(tx->GetTxId(), taskId, req); Y_DEFER { @@ -228,7 +229,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio auto& computeCtx = tx->GetDataTx()->GetKqpComputeCtx(); auto result = KqpCompleteTransaction(ctx, tabletId, op->GetTxId(), - op->HasKqpAttachedRSFlag() ? nullptr : &op->InReadSets(), kqpTx, tasksRunner, computeCtx); + op->HasKqpAttachedRSFlag() ? nullptr : &op->InReadSets(), kqpLocks, useGenericReadSets, tasksRunner, computeCtx); if (!result && computeCtx.HadInconsistentReads()) { LOG_T("Operation " << *op << " (execute_kqp_data_tx) at " << tabletId @@ -357,7 +358,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio } KqpUpdateDataShardStatCounters(DataShard, dataTx->GetCounters()); - auto statsMode = kqpTx.GetRuntimeSettings().GetStatsMode(); + auto statsMode = dataTx->GetKqpStatsMode(); KqpFillStats(DataShard, tasksRunner, computeCtx, statsMode, *op->Result()); } catch (const TMemoryLimitExceededException&) { dataTx->ResetCollectedChanges(); diff --git a/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp b/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp index 05ab71fc36..7d7ba2f897 100644 --- a/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp +++ b/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp @@ -56,7 +56,8 @@ EExecutionStatus TPrepareKqpDataTxInRSUnit::Execute(TOperation::TPtr op, TTransa } try { - KqpPrepareInReadsets(op->InReadSets(), tx->GetDataTx()->GetKqpTransaction(), DataShard.TabletID()); + KqpPrepareInReadsets(op->InReadSets(), tx->GetDataTx()->GetKqpLocks(), + tx->GetDataTx()->GetKqpTasksRunner(), DataShard.TabletID()); } catch (const yexception& e) { LOG_CRIT_S(ctx, NKikimrServices::TX_DATASHARD, "Exception while preparing in-readsets for KQP transaction " << *op << " at " << DataShard.TabletID() << ": " << CurrentExceptionMessage()); diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 80d424d77c..3e64051a14 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -59,7 +59,7 @@ public: TLogFunc logger; if (IsDebugLogEnabled(actorSystem)) { - logger = [actorSystem, txId = GetTxId(), taskId = GetTask().GetId()] (const TString& message) { + logger = [actorSystem, txId = GetTxId(), taskId = Task.GetId()] (const TString& message) { LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TxId: " << txId << ", task: " << taskId << ": " << message); }; @@ -73,7 +73,7 @@ public: } } std::tie(TaskRunnerActor, actor) = TaskRunnerActorFactory->Create( - this, GetTxId(), GetTask().GetId(), std::move(inputWithDisabledCheckpointing), InitMemoryQuota()); + this, GetTxId(), Task.GetId(), std::move(inputWithDisabledCheckpointing), InitMemoryQuota()); TaskRunnerActorId = RegisterWithSameMailbox(actor); TDqTaskRunnerMemoryLimits limits; @@ -87,7 +87,7 @@ public: Send(TaskRunnerActorId, new NTaskRunnerActor::TEvTaskRunnerCreate( - GetTask(), limits, execCtx)); + Task.GetSerializedTask(), limits, execCtx)); CA_LOG_D("Use CPU quota: " << UseCpuQuota() << ". Rate limiter resource: { \"" << Task.GetRateLimiter() << "\", \"" << Task.GetRateLimiterResource() << "\" }"); } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp index 4c99e02f9e..2a73a3c72e 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp @@ -53,7 +53,7 @@ public: }; } - auto taskRunner = TaskRunnerFactory(GetTask(), logger); + auto taskRunner = TaskRunnerFactory(Task, logger); SetTaskRunner(taskRunner); PrepareTaskRunner(); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index 5da25141ad..cc6d1b672b 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -309,7 +309,7 @@ struct TComputeMemoryLimits { }; using TTaskRunnerFactory = std::function< - TIntrusivePtr<IDqTaskRunner>(const NDqProto::TDqTask& task, const TLogFunc& logFunc) + TIntrusivePtr<IDqTaskRunner>(const TDqTaskSettings& task, const TLogFunc& logFunc) >; void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& taskStats, diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp index bb6b613b0c..289f0fa0aa 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp @@ -38,7 +38,7 @@ TString InFlightMessagesStr(const TCollection& inFlight) { } // anonymous namespace -TDqComputeActorChannels::TDqComputeActorChannels(TActorId owner, const TTxId& txId, const NDqProto::TDqTask& task, +TDqComputeActorChannels::TDqComputeActorChannels(TActorId owner, const TTxId& txId, const TDqTaskSettings& task, bool retryOnUndelivery, NDqProto::EDqStatsMode statsMode, ui64 channelBufferSize, ICallbacks* cbs, ui32 actorActivityType) : TActor(&TDqComputeActorChannels::WorkState, actorActivityType) , Owner(owner) diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h index f2e893bc24..6a2330f21e 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h @@ -39,7 +39,7 @@ public: }; public: - TDqComputeActorChannels(NActors::TActorId owner, const TTxId& txId, const NYql::NDqProto::TDqTask& task, bool retryOnUndelivery, + TDqComputeActorChannels(NActors::TActorId owner, const TTxId& txId, const TDqTaskSettings& task, bool retryOnUndelivery, NDqProto::EDqStatsMode statsMode, ui64 channelBufferSize, ICallbacks* cbs, ui32 actorActivityType); private: diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp index c7a7ee8fe5..dce6669ae2 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp @@ -52,7 +52,7 @@ TString MakeStringForLog(const NDqProto::TCheckpoint& checkpoint) { return TStringBuilder() << checkpoint.GetGeneration() << "." << checkpoint.GetId(); } -bool IsIngressTask(const NDqProto::TDqTask& task) { +bool IsIngressTask(const TDqTaskSettings& task) { for (const auto& input : task.GetInputs()) { if (!input.HasSource()) { return false; @@ -125,7 +125,7 @@ NDqProto::TComputeActorState CombineForeignState( } // namespace -TDqComputeActorCheckpoints::TDqComputeActorCheckpoints(const NActors::TActorId& owner, const TTxId& txId, NDqProto::TDqTask task, ICallbacks* computeActor) +TDqComputeActorCheckpoints::TDqComputeActorCheckpoints(const NActors::TActorId& owner, const TTxId& txId, TDqTaskSettings task, ICallbacks* computeActor) : TActor(&TDqComputeActorCheckpoints::StateFunc) , Owner(owner) , TxId(txId) @@ -543,7 +543,7 @@ void TDqComputeActorCheckpoints::TPendingCheckpoint::Clear() { ComputeActorState.Clear(); } -size_t TDqComputeActorCheckpoints::TPendingCheckpoint::GetSinksCount(const NDqProto::TDqTask& task) { +size_t TDqComputeActorCheckpoints::TPendingCheckpoint::GetSinksCount(const TDqTaskSettings& task) { size_t sinksCount = 0; for (int outputIndex = 0, outputsCount = task.OutputsSize(); outputIndex < outputsCount; ++outputIndex) { if (task.GetOutputs(outputIndex).HasSink()) { @@ -562,7 +562,7 @@ static bool IsInfiniteSourceType(const TString& sourceType) { return sourceType == "PqSource"; } -NDqProto::ECheckpointingMode GetTaskCheckpointingMode(const NDqProto::TDqTask& task) { +NDqProto::ECheckpointingMode GetTaskCheckpointingMode(const TDqTaskSettings& task) { for (const auto& input : task.GetInputs()) { if (const TString& srcType = input.GetSource().GetType(); srcType && IsInfiniteSourceType(srcType)) { return NDqProto::CHECKPOINTING_MODE_DEFAULT; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h index bbfb4a327f..66e458ec72 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h @@ -20,7 +20,7 @@ enum ECheckpointingMode : int; namespace NYql::NDq { -NDqProto::ECheckpointingMode GetTaskCheckpointingMode(const NDqProto::TDqTask& task); +NDqProto::ECheckpointingMode GetTaskCheckpointingMode(const TDqTaskSettings& task); class TDqComputeActorCheckpoints : public NActors::TActor<TDqComputeActorCheckpoints> { @@ -35,7 +35,7 @@ class TDqComputeActorCheckpoints : public NActors::TActor<TDqComputeActorCheckpo }; struct TPendingCheckpoint { - TPendingCheckpoint(const NDqProto::TDqTask& task) + TPendingCheckpoint(const TDqTaskSettings& task) : SinksCount(GetSinksCount(task)) { } @@ -54,7 +54,7 @@ class TDqComputeActorCheckpoints : public NActors::TActor<TDqComputeActorCheckpo return SavedComputeActorState && SinksCount == SavedSinkStatesCount; } - static size_t GetSinksCount(const NDqProto::TDqTask& task); + static size_t GetSinksCount(const TDqTaskSettings& task); const size_t SinksCount; TMaybe<NDqProto::TCheckpoint> Checkpoint; @@ -89,7 +89,7 @@ public: ComputeActorCurrentStateVersion = 2, }; - TDqComputeActorCheckpoints(const NActors::TActorId& owner, const TTxId& txId, NDqProto::TDqTask task, ICallbacks* computeActor); + TDqComputeActorCheckpoints(const NActors::TActorId& owner, const TTxId& txId, TDqTaskSettings task, ICallbacks* computeActor); void Init(NActors::TActorId computeActorId, NActors::TActorId checkpointsId); [[nodiscard]] bool HasPendingCheckpoint() const; @@ -140,7 +140,7 @@ private: private: const NActors::TActorId Owner; const TTxId TxId; - const NDqProto::TDqTask Task; + const TDqTaskSettings Task; const bool IngressTask; const NActors::TActorId CheckpointStorage; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index afc078e1e7..1d1e406d74 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -108,6 +108,7 @@ struct TComputeActorStateFuncHelper<void (T::*)(STFUNC_SIG)> { } // namespace NDetails + template<typename TDerived> class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived> , public TDqComputeActorChannels::ICallbacks @@ -1458,7 +1459,11 @@ protected: return TxId; } - const NDqProto::TDqTask& GetTask() const { + const TDqTaskSettings& GetTask() const { + return Task; + } + + TDqTaskSettings& GetTaskRef() { return Task; } @@ -1475,8 +1480,7 @@ protected: TaskRunner = taskRunner; } - void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext(), - const TDqTaskRunnerParameterProvider& parameterProvider = {}) { + void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext()) { YQL_ENSURE(TaskRunner); auto guard = TaskRunner->BindAllocator(MemoryQuota->GetMkqlMemoryLimit()); @@ -1488,7 +1492,7 @@ protected: limits.ChannelBufferSize = MemoryLimits.ChannelBufferSize; limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize; - TaskRunner->Prepare(Task, limits, execCtx, parameterProvider); + TaskRunner->Prepare(Task, limits, execCtx); FillIoMaps( TaskRunner->GetHolderFactory(), @@ -2099,7 +2103,7 @@ protected: protected: const NActors::TActorId ExecuterId; const TTxId TxId; - const NDqProto::TDqTask Task; + TDqTaskSettings Task; TString LogPrefix; const TComputeRuntimeSettings RuntimeSettings; TComputeMemoryLimits MemoryLimits; diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h index 79bb8ed258..e803c548ec 100644 --- a/ydb/library/yql/dq/actors/task_runner/events.h +++ b/ydb/library/yql/dq/actors/task_runner/events.h @@ -158,18 +158,15 @@ struct TEvTaskRunnerCreate TEvTaskRunnerCreate( const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits, - const std::shared_ptr<IDqTaskRunnerExecutionContext>& execCtx = std::shared_ptr<IDqTaskRunnerExecutionContext>(new TDqTaskRunnerExecutionContext()), - const TDqTaskRunnerParameterProvider& parameterProvider = {}) + const std::shared_ptr<IDqTaskRunnerExecutionContext>& execCtx = std::shared_ptr<IDqTaskRunnerExecutionContext>(new TDqTaskRunnerExecutionContext())) : Task(task) , MemoryLimits(memoryLimits) , ExecCtx(execCtx) - , ParameterProvider(parameterProvider) { } NDqProto::TDqTask Task; TDqTaskRunnerMemoryLimits MemoryLimits; std::shared_ptr<IDqTaskRunnerExecutionContext> ExecCtx; - TDqTaskRunnerParameterProvider ParameterProvider; }; struct TEvTaskRunnerCreateFinished diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index 41f26a7858..45715caef0 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -408,11 +408,12 @@ private: void OnDqTask(TEvTaskRunnerCreate::TPtr& ev) { ParentId = ev->Sender; - TaskRunner = Factory(ev->Get()->Task, [this](const TString& message) { + auto settings = NDq::TDqTaskSettings(std::move(ev->Get()->Task)); + TaskRunner = Factory(settings, [this](const TString& message) { LOG_D(message); }); - auto& inputs = ev->Get()->Task.GetInputs(); + auto& inputs = settings.GetInputs(); for (auto inputId = 0; inputId < inputs.size(); inputId++) { auto& input = inputs[inputId]; if (input.HasSource()) { @@ -429,7 +430,7 @@ private: MemoryQuota->TrySetIncreaseMemoryLimitCallback(guard.GetMutex()); } - TaskRunner->Prepare(ev->Get()->Task, ev->Get()->MemoryLimits, *ev->Get()->ExecCtx, ev->Get()->ParameterProvider); + TaskRunner->Prepare(settings, ev->Get()->MemoryLimits, *ev->Get()->ExecCtx); auto event = MakeHolder<TEvTaskRunnerCreateFinished>( TaskRunner->GetSecureParams(), diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 278429f9dc..19ffc4651b 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -309,7 +309,7 @@ public: return opts; } - std::shared_ptr<TPatternCacheEntry> CreateComputationPattern(const NDqProto::TDqTask& task, const TString& rawProgram, bool forCache, bool& canBeCached) { + std::shared_ptr<TPatternCacheEntry> CreateComputationPattern(const TDqTaskSettings& task, const TString& rawProgram, bool forCache, bool& canBeCached) { canBeCached = true; auto entry = TComputationPatternLRUCache::CreateCacheEntry(UseSeparatePatternAlloc()); auto& patternAlloc = UseSeparatePatternAlloc() ? entry->Alloc : Alloc(); @@ -417,7 +417,7 @@ public: return entry; } - std::shared_ptr<TPatternCacheEntry> BuildTask(const NDqProto::TDqTask& task, const TDqTaskRunnerParameterProvider& parameterProvider) { + std::shared_ptr<TPatternCacheEntry> BuildTask(const TDqTaskSettings& task) { LOG(TStringBuilder() << "Build task: " << TaskId); auto startTime = TInstant::Now(); @@ -469,17 +469,7 @@ public: std::string_view name = entry->ParamsStruct->GetMemberName(i); TType* type = entry->ParamsStruct->GetMemberType(i); - if (parameterProvider && parameterProvider(name, type, TypeEnv(), graphHolderFactory, structMembers[i])) { -#ifndef NDEBUG - YQL_ENSURE(!task.GetParameters().contains(name), "param: " << name); -#endif - } else { - auto it = task.GetParameters().find(name); - YQL_ENSURE(it != task.GetParameters().end()); - - auto guard = TypeEnv().BindAllocator(); - TDqDataSerializer::DeserializeParam(it->second, type, graphHolderFactory, structMembers[i]); - } + task.GetParameterValue(name, type, TypeEnv(), graphHolderFactory, structMembers[i]); { auto guard = TypeEnv().BindAllocator(); @@ -504,11 +494,11 @@ public: return entry; } - void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits, - const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider& parameterProvider) override + void Prepare(const TDqTaskSettings& task, const TDqTaskRunnerMemoryLimits& memoryLimits, + const IDqTaskRunnerExecutionContext& execCtx) override { TaskId = task.GetId(); - auto entry = BuildTask(task, parameterProvider); + auto entry = BuildTask(task); LOG(TStringBuilder() << "Prepare task: " << TaskId); auto startTime = TInstant::Now(); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index c400b0de79..dcc6c7e49e 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -282,15 +282,127 @@ using TDqTaskRunnerParameterProvider = std::function< const NKikimr::NMiniKQL::THolderFactory& holderFactory, NUdf::TUnboxedValue& value) >; +class TDqTaskSettings { +public: + explicit TDqTaskSettings(NDqProto::TDqTask&& task) + : Task_(std::move(task)) + {} + + explicit TDqTaskSettings(const NDqProto::TDqTask& task) + : Task_(task) + {} + + ui64 GetId() const { + return Task_.GetId(); + } + + bool GetCreateSuspended() const { + return Task_.GetCreateSuspended(); + } + + const NDqProto::TDqTask& GetSerializedTask() const { + Y_VERIFY(!ParamProvider, "GetSerialized isn't supported if external ParamProvider callback is specified!"); + return Task_; + } + + const ::NYql::NDqProto::TTaskInput& GetInputs(size_t index) const { + return Task_.GetInputs(index); + } + + const ::NYql::NDqProto::TTaskOutput& GetOutputs(size_t index) const { + return Task_.GetOutputs(index); + } + + const ::google::protobuf::RepeatedPtrField<::NYql::NDqProto::TTaskInput> GetInputs() const { + return Task_.GetInputs(); + } + + size_t InputsSize() const { + return Task_.InputsSize(); + } + + size_t OutputsSize() const { + return Task_.OutputsSize(); + } + + void SetParamsProvider(TDqTaskRunnerParameterProvider&& provider) { + ParamProvider = std::move(provider); + } + + void GetParameterValue(std::string_view name, NKikimr::NMiniKQL::TType* type, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, NUdf::TUnboxedValue& value) const + { + if (ParamProvider && ParamProvider(name, type, typeEnv, holderFactory, value)) { +#ifndef NDEBUG + YQL_ENSURE(!Task_.GetParameters().contains(name), "param: " << name); +#endif + } else { + auto it = Task_.GetParameters().find(name); + YQL_ENSURE(it != Task_.GetParameters().end()); + + auto guard = typeEnv.BindAllocator(); + TDqDataSerializer::DeserializeParam(it->second, type, holderFactory, value); + } + } + + ui64 GetStageId() const { + return Task_.GetStageId(); + } + + const ::NYql::NDqProto::TProgram& GetProgram() const { + return Task_.GetProgram(); + } + + const TProtoStringType & GetRateLimiterResource() const { + return Task_.GetRateLimiterResource(); + } + + const TProtoStringType& GetRateLimiter() const { + return Task_.GetRateLimiter(); + } + + const ::google::protobuf::Map<TProtoStringType, ::NYql::NDqProto::TData>& GetParameters() const { + return Task_.GetParameters(); + } + + const ::google::protobuf::Map<TProtoStringType, TProtoStringType>& GetTaskParams() const { + return Task_.GetTaskParams(); + } + + const ::google::protobuf::Map<TProtoStringType, TProtoStringType>& GetSecureParams() const { + return Task_.GetSecureParams(); + } + + const ::google::protobuf::RepeatedPtrField<::NYql::NDqProto::TTaskOutput>& GetOutputs() const { + return Task_.GetOutputs(); + } + + const ::google::protobuf::Any& GetMeta() const { + return Task_.GetMeta(); + } + + bool GetUseLlvm() const { + return Task_.GetUseLlvm(); + } + + bool HasUseLlvm() const { + return Task_.HasUseLlvm(); + } + +private: + // external callback to retrieve parameter value. + TDqTaskRunnerParameterProvider ParamProvider; + NDqProto::TDqTask Task_; +}; + class IDqTaskRunner : public TSimpleRefCount<IDqTaskRunner>, private TNonCopyable { public: virtual ~IDqTaskRunner() = default; virtual ui64 GetTaskId() const = 0; - virtual void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits, - const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext(), - const TDqTaskRunnerParameterProvider& parameterProvider = {}) = 0; + virtual void Prepare(const TDqTaskSettings& task, const TDqTaskRunnerMemoryLimits& memoryLimits, + const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext()) = 0; virtual ERunStatus Run() = 0; virtual bool HasEffects() const = 0; diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp index 0c8e03e285..dadd69897e 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp @@ -54,7 +54,7 @@ IActor* CreateComputeActor( } } - auto taskRunnerFactory = [=](const NDqProto::TDqTask& task, const NDq::TLogFunc& logger) { + auto taskRunnerFactory = [=](const NDq::TDqTaskSettings& task, const NDq::TLogFunc& logger) { Y_UNUSED(logger); return options.Factory->Get(task, {}); }; diff --git a/ydb/library/yql/providers/dq/actors/task_controller_impl.h b/ydb/library/yql/providers/dq/actors/task_controller_impl.h index 61376bfcb3..bb1458a7bc 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller_impl.h +++ b/ydb/library/yql/providers/dq/actors/task_controller_impl.h @@ -516,7 +516,7 @@ public: for (int i = 0; i < static_cast<int>(tasks.size()); ++i) { auto actorId = ActorIdFromProto(actorIds[i]); auto& task = tasks[i]; - Tasks.emplace_back(task, actorId); + Tasks.emplace_back(NDq::TDqTaskSettings(std::move(task)), actorId); ActorIds.emplace(task.GetId(), actorId); TaskIds.emplace(actorId, task.GetId()); Yql::DqsProto::TTaskMeta taskMeta; @@ -536,6 +536,10 @@ public: } } + const NDq::TDqTaskSettings GetTask(size_t idx) const { + return Tasks.at(idx).first; + } + private: void MaybeUpdateChannels() { if (Tasks.empty() || ChannelsUpdated || Tasks.size() != Executing.size()) { @@ -634,7 +638,7 @@ private: bool ChannelsUpdated = false; - TVector<std::pair<NDqProto::TDqTask, TActorId>> Tasks; + TVector<std::pair<NDq::TDqTaskSettings, TActorId>> Tasks; THashSet<ui64> FinishedTasks; THashMap<ui64, TInstant> Executing; THashMap<ui64, TActorId> ActorIds; diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 886f619c23..7c7abd1eca 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -140,10 +140,11 @@ public: settings.CollectBasicStats = true; settings.CollectProfileStats = true; auto runner = NDq::MakeDqTaskRunner(executionContext, settings, {}); + auto runnerSettings = NDq::TDqTaskSettings(std::move(task)); { auto guard = runner->BindAllocator(State->Settings->MemoryLimit.Get().GetOrElse(0)); - runner->Prepare(task, limits); + runner->Prepare(runnerSettings, limits); } TVector<NDqProto::TData> rows; diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index 5a7be9b85d..655e31c44f 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -515,9 +515,12 @@ public: request.Load(&input); }); - request.GetTask().GetMeta().UnpackTo(&taskMeta); + NDqProto::TDqTask task; + request.MutableTask()->Swap(&task); + task.GetMeta().UnpackTo(&taskMeta); + NDq::TDqTaskSettings settings(std::move(task)); try { - Prepare(request.GetTask(), taskMeta, output); + Prepare(settings, taskMeta, output); } catch (const NKikimr::TMemoryLimitExceededException& ex) { throw yexception() << "DQ computation exceeds the memory limit " << DqConfiguration->MemoryLimit.Get().GetOrElse(0) << ". Try to increase the limit using PRAGMA dq.MemoryLimit"; } @@ -699,7 +702,7 @@ public: } template<typename T> - void Prepare(const NDqProto::TDqTask& task, const T& taskMeta, TPipedOutput& output) { + void Prepare(const NDq::TDqTaskSettings& task, const T& taskMeta, TPipedOutput& output) { NYql::NDqProto::TPrepareResponse result; result.SetResult(true); // COMPAT(aozeritsky) YQL-14268 diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp index 8c5e80cbbe..9c221241b2 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp @@ -113,7 +113,7 @@ private: class TLocalTaskRunner: public ITaskRunner { public: - TLocalTaskRunner(const NDqProto::TDqTask& task, TIntrusivePtr<IDqTaskRunner> runner) + TLocalTaskRunner(const NDq::TDqTaskSettings& task, TIntrusivePtr<IDqTaskRunner> runner) : Task(task) , Runner(runner) { } @@ -196,7 +196,7 @@ private: QueryStat.AddTaskRunnerStats(*Runner->GetStats(), Stats, Task.GetId()); } - NDqProto::TDqTask Task; + NDq::TDqTaskSettings Task; TIntrusivePtr<IDqTaskRunner> Runner; TCounters QueryStat; TDqTaskRunnerStats Stats; @@ -252,11 +252,11 @@ public: , TerminateOnError(terminateOnError) { } - ITaskRunner::TPtr GetOld(const NDqProto::TDqTask& task, const TString& traceId) override { + ITaskRunner::TPtr GetOld(const TDqTaskSettings& task, const TString& traceId) override { return new TLocalTaskRunner(task, Get(task, traceId)); } - TIntrusivePtr<NDq::IDqTaskRunner> Get(const NDqProto::TDqTask& task, const TString& traceId) override { + TIntrusivePtr<NDq::IDqTaskRunner> Get(const TDqTaskSettings& task, const TString& traceId) override { Y_UNUSED(traceId); NDq::TDqTaskRunnerSettings settings; settings.TerminateOnError = TerminateOnError; diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index e85962746f..722ce32ddb 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -1477,8 +1477,8 @@ public: return Task.GetId(); } - void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits, - const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider&) override + void Prepare(const TDqTaskSettings& task, const TDqTaskRunnerMemoryLimits& memoryLimits, + const IDqTaskRunnerExecutionContext& execCtx) override { Y_UNUSED(memoryLimits); Y_UNUSED(execCtx); @@ -1729,7 +1729,7 @@ public: TaskScheduler.Start(); } - ITaskRunner::TPtr GetOld(const NDqProto::TDqTask& tmp, const TString& traceId) override { + ITaskRunner::TPtr GetOld(const NDq::TDqTaskSettings& tmp, const TString& traceId) override { Yql::DqsProto::TTaskMeta taskMeta; tmp.GetMeta().UnpackTo(&taskMeta); ui64 stageId = taskMeta.GetStageId(); @@ -1738,7 +1738,7 @@ public: return new TTaskRunner(task, std::move(result), stageId, traceId); } - TIntrusivePtr<NDq::IDqTaskRunner> Get(const NDqProto::TDqTask& tmp, const TString& traceId) override + TIntrusivePtr<NDq::IDqTaskRunner> Get(const NDq::TDqTaskSettings& tmp, const TString& traceId) override { Yql::DqsProto::TTaskMeta taskMeta; tmp.GetMeta().UnpackTo(&taskMeta); @@ -1780,9 +1780,9 @@ private: return exePath + "," + settings.ToString(); } - NDqProto::TDqTask PrepareTask(const NDqProto::TDqTask& tmp, TChildProcess* result) { + NDqProto::TDqTask PrepareTask(const NDq::TDqTaskSettings& tmp, TChildProcess* result) { // get files from fileCache - NDqProto::TDqTask task = tmp; + auto task = tmp.GetSerializedTask(); Yql::DqsProto::TTaskMeta taskMeta; task.GetMeta().UnpackTo(&taskMeta); diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h index 06aac8ca3f..20785ed4d1 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h @@ -89,9 +89,9 @@ class IProxyFactory: public TThrRefBase, private TNonCopyable { public: using TPtr = TIntrusivePtr<IProxyFactory>; - virtual ITaskRunner::TPtr GetOld(const NDqProto::TDqTask& task, const TString& traceId = "") = 0; + virtual ITaskRunner::TPtr GetOld(const NDq::TDqTaskSettings& task, const TString& traceId = "") = 0; - virtual TIntrusivePtr<NDq::IDqTaskRunner> Get(const NDqProto::TDqTask& task, const TString& traceId = "TODO") = 0; + virtual TIntrusivePtr<NDq::IDqTaskRunner> Get(const NDq::TDqTaskSettings& task, const TString& traceId = "TODO") = 0; }; diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index b2d41827aa..1cac519e70 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -449,7 +449,8 @@ private: ParentId = ev->Sender; try { - TaskRunner = Factory->GetOld(ev->Get()->Task, TraceId); + NDq::TDqTaskSettings settings(std::move(ev->Get()->Task)); + TaskRunner = Factory->GetOld(settings, TraceId); } catch (...) { TString message = "Could not create TaskRunner for " + ToString(taskId) + " on node " + ToString(replyTo.NodeId()) + ", error: " + CurrentExceptionMessage(); Send(replyTo, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, message), 0, cookie); |