aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-05-29 21:32:31 +0300
committergvit <gvit@ydb.tech>2023-05-29 21:32:31 +0300
commit9a75beeb859ebdd2d5391f0b20cb0349f2fec5a3 (patch)
tree211fa69d4f355f7fc727fac25a143c1d5f198abd
parent1f91bff6f709b6f762453f020dce570db6d2b180 (diff)
downloadydb-9a75beeb859ebdd2d5391f0b20cb0349f2fec5a3.tar.gz
implement dq task settings class to store all related data of dq task
-rw-r--r--ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp2
-rw-r--r--ydb/core/fq/libs/checkpointing/utils.cpp6
-rw-r--r--ydb/core/fq/libs/checkpointing/utils.h8
-rw-r--r--ydb/core/fq/libs/init/init.cpp2
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_literal_executer.cpp5
-rw-r--r--ydb/core/kqp/runtime/kqp_tasks_runner.cpp23
-rw-r--r--ydb/core/kqp/runtime/kqp_tasks_runner.h12
-rw-r--r--ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp11
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.h2
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.cpp12
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.h56
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp98
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.h11
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp6
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp9
-rw-r--r--ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp3
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp6
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp8
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h10
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h14
-rw-r--r--ydb/library/yql/dq/actors/task_runner/events.h5
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp7
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp22
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h118
-rw-r--r--ydb/library/yql/providers/dq/actors/compute_actor.cpp2
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller_impl.h8
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp3
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp9
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp8
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp12
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h4
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp3
38 files changed, 348 insertions, 172 deletions
diff --git a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
index 18fa49ed13..4e5bfc6933 100644
--- a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
+++ b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
@@ -74,7 +74,7 @@ void TCheckpointCoordinator::Handle(NYql::NDqs::TEvReadyState::TPtr& ev) {
Y_VERIFY(tasks.size() == actorIds.size());
for (int i = 0; i < static_cast<int>(tasks.size()); ++i) {
- auto& task = tasks[i];
+ const auto& task = GetTask(i);
auto& actorId = TaskIdToActor[task.GetId()];
if (actorId) {
OnInternalError(TStringBuilder() << "Duplicate task id: " << task.GetId());
diff --git a/ydb/core/fq/libs/checkpointing/utils.cpp b/ydb/core/fq/libs/checkpointing/utils.cpp
index 0cae2bf7a9..c15ccfa9a6 100644
--- a/ydb/core/fq/libs/checkpointing/utils.cpp
+++ b/ydb/core/fq/libs/checkpointing/utils.cpp
@@ -2,7 +2,7 @@
namespace NFq {
-bool IsIngress(const NYql::NDqProto::TDqTask& task) {
+bool IsIngress(const NYql::NDq::TDqTaskSettings& task) {
// No inputs at all or the only inputs are sources.
for (const auto& input : task.GetInputs()) {
if (!input.HasSource()) {
@@ -12,7 +12,7 @@ bool IsIngress(const NYql::NDqProto::TDqTask& task) {
return true;
}
-bool IsEgress(const NYql::NDqProto::TDqTask& task) {
+bool IsEgress(const NYql::NDq::TDqTaskSettings& task) {
for (const auto& output : task.GetOutputs()) {
if (output.HasSink()) {
return true;
@@ -21,7 +21,7 @@ bool IsEgress(const NYql::NDqProto::TDqTask& task) {
return false;
}
-bool HasState(const NYql::NDqProto::TDqTask& task) {
+bool HasState(const NYql::NDq::TDqTaskSettings& task) {
Y_UNUSED(task);
return true;
}
diff --git a/ydb/core/fq/libs/checkpointing/utils.h b/ydb/core/fq/libs/checkpointing/utils.h
index d5e159b6d6..fd700a4af9 100644
--- a/ydb/core/fq/libs/checkpointing/utils.h
+++ b/ydb/core/fq/libs/checkpointing/utils.h
@@ -1,13 +1,13 @@
#pragma once
-#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
+#include <ydb/library/yql/dq/runtime/dq_tasks_runner.h>
namespace NFq {
-bool IsIngress(const NYql::NDqProto::TDqTask& task);
+bool IsIngress(const NYql::NDq::TDqTaskSettings& task);
-bool IsEgress(const NYql::NDqProto::TDqTask& task);
+bool IsEgress(const NYql::NDq::TDqTaskSettings& task);
-bool HasState(const NYql::NDqProto::TDqTask& task);
+bool HasState(const NYql::NDq::TDqTaskSettings& task);
} // namespace NFq
diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp
index 0002c927c6..ed48905917 100644
--- a/ydb/core/fq/libs/init/init.cpp
+++ b/ydb/core/fq/libs/init/init.cpp
@@ -227,7 +227,7 @@ void Init(
lwmOptions.MkqlProgramHardMemoryLimit = protoConfig.GetResourceManager().GetMkqlTaskHardMemoryLimit();
lwmOptions.MkqlMinAllocSize = mkqlAllocSize;
lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
- [=](const NYql::NDqProto::TDqTask& task, const NYql::NDq::TLogFunc&) {
+ [=](const NYql::NDq::TDqTaskSettings& task, const NYql::NDq::TLogFunc&) {
return lwmOptions.Factory->Get(task);
});
if (protoConfig.GetRateLimiter().GetDataPlaneEnabled()) {
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index 3889456bdf..90ba2eb329 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -96,8 +96,7 @@ public:
auto wakeup = [this]{ ContinueExecute(); };
try {
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling,
- std::move(wakeup), TlsActivationContext->AsActorContext()),
- ParameterProvider);
+ std::move(wakeup), TlsActivationContext->AsActorContext()));
} catch (const NMiniKQL::TKqpEnsureFail& e) {
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
return;
diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
index f43d5806b5..bb93f498c6 100644
--- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
@@ -210,8 +210,9 @@ public:
auto taskRunner = CreateKqpTaskRunner(context, settings, log);
TaskRunners.emplace_back(taskRunner);
- taskRunner->Prepare(protoTask, CreateTaskRunnerMemoryLimits(), CreateTaskRunnerExecutionContext(),
- TQueryData::GetParameterProvider(stageInfo.Meta.Tx.Params));
+ auto taskSettings = NDq::TDqTaskSettings(std::move(protoTask));
+ taskSettings.SetParamsProvider(std::move(TQueryData::GetParameterProvider(stageInfo.Meta.Tx.Params)));
+ taskRunner->Prepare(taskSettings, CreateTaskRunnerMemoryLimits(), CreateTaskRunnerExecutionContext());
auto status = taskRunner->Run();
YQL_ENSURE(status == ERunStatus::Finished);
diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
index 651d609db8..3b42b886f7 100644
--- a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
+++ b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
@@ -73,7 +73,7 @@ TIntrusivePtr<IDqTaskRunner> CreateKqpTaskRunner(const TDqTaskRunnerContext& exe
}
-TKqpTasksRunner::TKqpTasksRunner(const google::protobuf::RepeatedPtrField<NDqProto::TDqTask>& tasks,
+TKqpTasksRunner::TKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks,
const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
: LogFunc(logFunc)
, Alloc(execCtx.Alloc)
@@ -88,13 +88,14 @@ TKqpTasksRunner::TKqpTasksRunner(const google::protobuf::RepeatedPtrField<NDqPro
auto guard = execCtx.TypeEnv->BindAllocator();
try {
- for (auto& task : tasks) {
+ for (auto&& task : tasks) {
+ ui64 taskId = task.GetId();
auto runner = CreateKqpTaskRunner(execCtx, settings, logFunc);
if (auto* stats = runner->GetStats()) {
- Stats.emplace(task.GetId(), stats);
+ Stats.emplace(taskId, stats);
}
- TaskRunners.emplace(task.GetId(), std::move(runner));
- Tasks.emplace(task.GetId(), &task);
+ TaskRunners.emplace(taskId, std::move(runner));
+ Tasks.emplace(taskId, std::move(task));
}
} catch (const TMemoryLimitExceededException&) {
TaskRunners.clear();
@@ -119,7 +120,9 @@ void TKqpTasksRunner::Prepare(const TDqTaskRunnerMemoryLimits& memoryLimits, con
for (auto& [taskId, taskRunner] : TaskRunners) {
ComputeCtx->SetCurrentTaskId(taskId);
- taskRunner->Prepare(*Tasks[taskId], memoryLimits, execCtx);
+ auto it = Tasks.find(taskId);
+ Y_VERIFY(it != Tasks.end());
+ taskRunner->Prepare(it->second, memoryLimits, execCtx);
}
ComputeCtx->SetCurrentTaskId(std::numeric_limits<ui64>::max());
@@ -225,8 +228,8 @@ const IDqTaskRunner& TKqpTasksRunner::GetTaskRunner(ui64 taskId) const {
return **task;
}
-const NYql::NDqProto::TDqTask& TKqpTasksRunner::GetTask(ui64 taskId) const {
- return *Tasks.at(taskId);
+const NYql::NDq::TDqTaskSettings& TKqpTasksRunner::GetTask(ui64 taskId) const {
+ return Tasks.at(taskId);
}
TGuard<NMiniKQL::TScopedAlloc> TKqpTasksRunner::BindAllocator(TMaybe<ui64> memoryLimit) {
@@ -236,10 +239,10 @@ TGuard<NMiniKQL::TScopedAlloc> TKqpTasksRunner::BindAllocator(TMaybe<ui64> memor
return TGuard(*Alloc);
}
-TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(const google::protobuf::RepeatedPtrField<NDqProto::TDqTask>& tasks,
+TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks,
const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
{
- return new TKqpTasksRunner(tasks, execCtx, settings, logFunc);
+ return new TKqpTasksRunner(std::move(tasks), execCtx, settings, logFunc);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.h b/ydb/core/kqp/runtime/kqp_tasks_runner.h
index b0e52743b6..6d3788a6a4 100644
--- a/ydb/core/kqp/runtime/kqp_tasks_runner.h
+++ b/ydb/core/kqp/runtime/kqp_tasks_runner.h
@@ -17,7 +17,7 @@ TIntrusivePtr<NYql::NDq::IDqTaskRunner> CreateKqpTaskRunner(const NYql::NDq::TDq
class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCopyable {
public:
- TKqpTasksRunner(const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& tasks,
+ TKqpTasksRunner(google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>&& tasks,
const NYql::NDq::TDqTaskRunnerContext& execCtx, const NYql::NDq::TDqTaskRunnerSettings& settings,
const NYql::NDq::TLogFunc& logFunc);
@@ -34,7 +34,11 @@ public:
NYql::NDq::IDqTaskRunner& GetTaskRunner(ui64 taskId);
const NYql::NDq::IDqTaskRunner& GetTaskRunner(ui64 taskId) const;
- const NYql::NDqProto::TDqTask& GetTask(ui64 taskId) const;
+ const TMap<ui64, NYql::NDq::TDqTaskSettings>& GetTasks() const {
+ return Tasks;
+ }
+
+ const NYql::NDq::TDqTaskSettings& GetTask(ui64 taskId) const;
NYql::NDq::IDqInputChannel::TPtr GetInputChannel(ui64 taskId, ui64 channelId) {
return GetTaskRunner(taskId).GetInputChannel(channelId);
@@ -54,7 +58,7 @@ public:
private:
TMap<ui64, TIntrusivePtr<NYql::NDq::IDqTaskRunner>> TaskRunners;
- TMap<ui64, const NYql::NDqProto::TDqTask*> Tasks;
+ TMap<ui64, NYql::NDq::TDqTaskSettings> Tasks;
TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> Stats;
NYql::NDq::TLogFunc LogFunc;
NMiniKQL::TScopedAlloc* Alloc;
@@ -70,7 +74,7 @@ private:
};
-TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& tasks,
+TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>&& tasks,
const NYql::NDq::TDqTaskRunnerContext& execCtx, const NYql::NDq::TDqTaskRunnerSettings& settings,
const NYql::NDq::TLogFunc& logFunc);
diff --git a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
index 585109ef45..62af3247a3 100644
--- a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
+++ b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
@@ -76,7 +76,8 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac
}
try {
- const auto& kqpTx = dataTx->GetKqpTransaction();
+ bool useGenericReadSets = dataTx->GetUseGenericReadSets();
+ const auto& kqpLocks = dataTx->GetKqpLocks();
auto& tasksRunner = dataTx->GetKqpTasksRunner();
auto allocGuard = tasksRunner.BindAllocator(txc.GetMemoryLimit() - dataTx->GetTxSize());
@@ -84,7 +85,8 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac
NKqp::NRm::TKqpResourcesRequest req;
req.MemoryPool = NKqp::NRm::EKqpMemoryPool::DataQuery;
req.Memory = txc.GetMemoryLimit();
- ui64 taskId = kqpTx.GetTasks().empty() ? std::numeric_limits<ui64>::max() : kqpTx.GetTasks()[0].GetId();
+ ui64 taskId = dataTx->GetFirstKqpTaskId();
+
NKqp::GetKqpResourceManager()->NotifyExternalResourcesAllocated(tx->GetTxId(), taskId, req);
Y_DEFER {
@@ -97,7 +99,7 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac
dataTx->SetReadVersion(DataShard.GetReadWriteVersions(tx).ReadVersion);
if (dataTx->GetKqpComputeCtx().HasPersistentChannels()) {
- auto result = KqpRunTransaction(ctx, op->GetTxId(), kqpTx, tasksRunner);
+ auto result = KqpRunTransaction(ctx, op->GetTxId(), kqpLocks, useGenericReadSets, tasksRunner);
Y_VERIFY_S(!dataTx->GetKqpComputeCtx().HadInconsistentReads(),
"Unexpected inconsistent reads in operation " << *op << " when preparing persistent channels");
@@ -108,7 +110,8 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac
}
}
- KqpFillOutReadSets(op->OutReadSets(), kqpTx, tasksRunner, DataShard.SysLocksTable(), tabletId);
+ KqpFillOutReadSets(op->OutReadSets(), kqpLocks,
+ dataTx->HasKqpLocks(), useGenericReadSets, tasksRunner, DataShard.SysLocksTable(), tabletId);
} catch (const TMemoryLimitExceededException&) {
LOG_T("Operation " << *op << " at " << tabletId
<< " exceeded memory limit " << txc.GetMemoryLimit()
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 469935e245..470fe4f82c 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -1273,7 +1273,7 @@ void TEngineBay::SetLockTxId(ui64 lockTxId, ui32 lockNodeId) {
}
}
-NKqp::TKqpTasksRunner& TEngineBay::GetKqpTasksRunner(const NKikimrTxDataShard::TKqpTransaction& tx) {
+NKqp::TKqpTasksRunner& TEngineBay::GetKqpTasksRunner(NKikimrTxDataShard::TKqpTransaction& tx) {
if (!KqpTasksRunner) {
NYql::NDq::TDqTaskRunnerSettings settings;
@@ -1291,7 +1291,7 @@ NKqp::TKqpTasksRunner& TEngineBay::GetKqpTasksRunner(const NKikimrTxDataShard::T
settings.TerminateOnError = false;
KqpAlloc->SetLimit(10_MB);
- KqpTasksRunner = NKqp::CreateKqpTasksRunner(tx.GetTasks(), KqpExecCtx, settings, KqpLogFunc);
+ KqpTasksRunner = NKqp::CreateKqpTasksRunner(std::move(*tx.MutableTasks()), KqpExecCtx, settings, KqpLogFunc);
}
return *KqpTasksRunner;
diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h
index 0c83a56186..f49e944284 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.h
+++ b/ydb/core/tx/datashard/datashard__engine_host.h
@@ -120,7 +120,7 @@ public:
void ResetCounters() { EngineHostCounters = TEngineHostCounters(); }
const TEngineHostCounters& GetCounters() const { return EngineHostCounters; }
- NKqp::TKqpTasksRunner& GetKqpTasksRunner(const NKikimrTxDataShard::TKqpTransaction& tx);
+ NKqp::TKqpTasksRunner& GetKqpTasksRunner(NKikimrTxDataShard::TKqpTransaction& tx);
NMiniKQL::TKqpDatashardComputeContext& GetKqpComputeCtx();
private:
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp
index 6f72f6d367..1076c7d904 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.cpp
+++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp
@@ -27,6 +27,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
, TxSize(0)
, TxCacheUsage(0)
, IsReleased(false)
+ , BuiltTaskRunner(false)
, IsReadOnly(true)
, AllowCancelROwithReadsets(self->AllowCancelROwithReadsets())
, Cancelled(false)
@@ -82,7 +83,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
try {
bool hasPersistentChannels = false;
- if (!KqpValidateTransaction(GetKqpTransaction(), Immediate(), StepTxId_.TxId, ctx, hasPersistentChannels)) {
+ if (!KqpValidateTransaction(GetTasks(), Immediate(), StepTxId_.TxId, ctx, hasPersistentChannels)) {
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "KQP transaction validation failed, datashard: "
<< TabletId() << ", txid: " << StepTxId_.TxId);
ErrCode = NKikimrTxDataShard::TError::PROGRAM_ERROR;
@@ -91,7 +92,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
}
computeCtx.SetHasPersistentChannels(hasPersistentChannels);
- for (auto& task : GetKqpTransaction().GetTasks()) {
+ for (auto& task : GetTasks()) {
NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;
if (!task.GetMeta().UnpackTo(&meta)) {
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "KQP transaction validation failed"
@@ -154,7 +155,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
IsReadOnly = IsReadOnly && Tx.GetReadOnly();
- KqpSetTxLocksKeys(GetKqpTransaction().GetLocks(), self->SysLocksTable(), EngineBay);
+ KqpSetTxLocksKeys(GetKqpLocks(), self->SysLocksTable(), EngineBay);
EngineBay.MarkTxLoaded();
auto& tasksRunner = GetKqpTasksRunner(); // create tasks runner, can throw TMemoryLimitExceededException
@@ -203,11 +204,6 @@ TValidatedDataTx::~TValidatedDataTx() {
NActors::NMemory::TLabel<MemoryLabelValidatedDataTx>::Sub(TxSize);
}
-const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& TValidatedDataTx::GetKqpTasks() const {
- Y_VERIFY(IsKqpTx());
- return Tx.GetKqpTransaction().GetTasks();
-}
-
ui32 TValidatedDataTx::ExtractKeys(bool allowErrors)
{
using EResult = NMiniKQL::IEngineFlat::EResult;
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h
index b419ee1ecd..b0f927437e 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.h
+++ b/ydb/core/tx/datashard/datashard_active_transaction.h
@@ -210,9 +210,58 @@ public:
return IsKqpTx() && Tx.GetKqpTransaction().GetType() == NKikimrTxDataShard::KQP_TX_TYPE_SCAN;
}
- const NKikimrTxDataShard::TKqpTransaction &GetKqpTransaction() const { return Tx.GetKqpTransaction(); }
- const google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>& GetKqpTasks() const;
- NKqp::TKqpTasksRunner& GetKqpTasksRunner() { Y_VERIFY(IsKqpDataTx()); return EngineBay.GetKqpTasksRunner(Tx.GetKqpTransaction()); }
+ bool GetUseGenericReadSets() const {
+ Y_VERIFY(IsKqpDataTx());
+ return Tx.GetKqpTransaction().GetUseGenericReadSets();
+ }
+
+ inline const ::NKikimrTxDataShard::TKqpLocks& GetKqpLocks() const {
+ Y_VERIFY(IsKqpDataTx());
+ return Tx.GetKqpTransaction().GetLocks();
+ }
+
+ inline bool HasKqpLocks() const {
+ Y_VERIFY(IsKqpDataTx());
+ return Tx.GetKqpTransaction().HasLocks();
+ }
+
+ inline bool HasKqpSnapshot() const {
+ Y_VERIFY(IsKqpDataTx());
+ return Tx.GetKqpTransaction().HasSnapshot();
+ }
+
+ inline const ::NKikimrKqp::TKqpSnapshot& GetKqpSnapshot() const {
+ Y_VERIFY(IsKqpDataTx());
+ return Tx.GetKqpTransaction().GetSnapshot();
+ }
+
+ inline const ::google::protobuf::RepeatedPtrField<::NYql::NDqProto::TDqTask>& GetTasks() const {
+ Y_VERIFY(IsKqpDataTx());
+ // ensure that GetTasks is not called after task runner is built
+ Y_VERIFY(!BuiltTaskRunner);
+ return Tx.GetKqpTransaction().GetTasks();
+ }
+
+ inline ui64 GetFirstKqpTaskId() {
+ ui64 taskId = std::numeric_limits<ui64>::max();
+ const auto& tasks = GetKqpTasksRunner().GetTasks();
+ if (!tasks.empty()) {
+ taskId = tasks.begin()->second.GetId();
+ }
+ return taskId;
+ }
+
+ NKqp::TKqpTasksRunner& GetKqpTasksRunner() {
+ Y_VERIFY(IsKqpDataTx());
+ BuiltTaskRunner = true;
+ return EngineBay.GetKqpTasksRunner(*Tx.MutableKqpTransaction());
+ }
+
+ ::NYql::NDqProto::EDqStatsMode GetKqpStatsMode() const {
+ Y_VERIFY(IsKqpDataTx());
+ return Tx.GetKqpTransaction().GetRuntimeSettings().GetStatsMode();
+ }
+
NMiniKQL::TKqpDatashardComputeContext& GetKqpComputeCtx() { Y_VERIFY(IsKqpDataTx()); return EngineBay.GetKqpComputeCtx(); }
bool HasStreamResponse() const { return Tx.GetStreamResponse(); }
@@ -252,6 +301,7 @@ private:
ui64 TxSize;
ui64 TxCacheUsage;
bool IsReleased;
+ bool BuiltTaskRunner;
TMaybe<ui64> PerShardKeysSizeLimitBytes_;
bool IsReadOnly;
bool AllowCancelROwithReadsets;
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index 29399bce45..1f531c43bd 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -98,11 +98,12 @@ NUdf::EFetchStatus FetchOutput(NDq::IDqOutputChannel* channel, NDqProto::TData&
}
NDq::ERunStatus RunKqpTransactionInternal(const TActorContext& ctx, ui64 txId,
- const TInputOpData::TInReadSets* inReadSets, const NKikimrTxDataShard::TKqpTransaction& kqpTx,
+ const TInputOpData::TInReadSets* inReadSets, const NKikimrTxDataShard::TKqpLocks&,
+ bool useGenericReadSets,
NKqp::TKqpTasksRunner& tasksRunner, bool applyEffects)
{
THashMap<ui64, std::pair<ui64, ui32>> inputChannelsMap; // channelId -> (taskId, input index)
- for (auto& task : kqpTx.GetTasks()) {
+ for (auto& [taskId, task] : tasksRunner.GetTasks()) {
for (ui32 i = 0; i < task.InputsSize(); ++i) {
auto& input = task.GetInputs(i);
for (auto& channel : input.GetChannels()) {
@@ -125,7 +126,7 @@ NDq::ERunStatus RunKqpTransactionInternal(const TActorContext& ctx, ui64 txId,
for (auto& data : dataList) {
NKikimrTxDataShard::TKqpReadset kqpReadset;
- if (kqpTx.GetUseGenericReadSets()) {
+ if (useGenericReadSets) {
NKikimrTx::TReadSetData genericData;
bool ok = genericData.ParseFromString(data.Body);
Y_VERIFY(ok, "Failed to parse generic readset data from %" PRIu64 " to %" PRIu64 " origin %" PRIu64,
@@ -175,7 +176,8 @@ NDq::ERunStatus RunKqpTransactionInternal(const TActorContext& ctx, ui64 txId,
MKQL_ENSURE_S(runStatus == NDq::ERunStatus::PendingInput);
hasInputChanges = false;
- for (auto& task : kqpTx.GetTasks()) {
+ for (auto& taskIt : tasksRunner.GetTasks()) {
+ const auto& task = taskIt.second;
for (ui32 i = 0; i < task.OutputsSize(); ++i) {
for (auto& channel : task.GetOutputs(i).GetChannels()) {
if (auto* inputInfo = inputChannelsMap.FindPtr(channel.GetId())) {
@@ -303,10 +305,10 @@ bool ReceiveLocks(const NKikimrTxDataShard::TKqpLocks& locks, ui64 shardId) {
} // namespace
-bool KqpValidateTransaction(const NKikimrTxDataShard::TKqpTransaction& tx, bool isImmediate, ui64 txId,
+bool KqpValidateTransaction(const ::google::protobuf::RepeatedPtrField< ::NYql::NDqProto::TDqTask>& tasks, bool isImmediate, ui64 txId,
const TActorContext& ctx, bool& hasPersistentChannels)
{
- for (const auto& task : tx.GetTasks()) {
+ for (const auto& task : tasks) {
if (!KqpValidateTask(task, isImmediate, txId, ctx, hasPersistentChannels)) {
return false;
}
@@ -498,17 +500,17 @@ void KqpSetTxLocksKeys(const NKikimrTxDataShard::TKqpLocks& locks, const TSysLoc
}
NYql::NDq::ERunStatus KqpRunTransaction(const TActorContext& ctx, ui64 txId,
- const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner)
+ const NKikimrTxDataShard::TKqpLocks& kqpLocks, bool useGenericReadSets, NKqp::TKqpTasksRunner& tasksRunner)
{
- return RunKqpTransactionInternal(ctx, txId, /* inReadSets */ nullptr, kqpTx, tasksRunner, /* applyEffects */ false);
+ return RunKqpTransactionInternal(ctx, txId, /* inReadSets */ nullptr, kqpLocks, useGenericReadSets, tasksRunner, /* applyEffects */ false);
}
THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const TActorContext& ctx,
ui64 origin, ui64 txId, const TInputOpData::TInReadSets* inReadSets,
- const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner,
+ const NKikimrTxDataShard::TKqpLocks& kqpLocks, bool useGenericReadSets, NKqp::TKqpTasksRunner& tasksRunner,
const NMiniKQL::TKqpDatashardComputeContext& computeCtx)
{
- auto runStatus = RunKqpTransactionInternal(ctx, txId, inReadSets, kqpTx, tasksRunner, /* applyEffects */ true);
+ auto runStatus = RunKqpTransactionInternal(ctx, txId, inReadSets, kqpLocks, useGenericReadSets, tasksRunner, /* applyEffects */ true);
if (computeCtx.HadInconsistentReads()) {
return nullptr;
@@ -527,7 +529,7 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const
auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(NKikimrTxDataShard::TX_KIND_DATA,
origin, txId, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
- for (auto& task : kqpTx.GetTasks()) {
+ for (auto& [taskId, task] : tasksRunner.GetTasks()) {
auto& taskRunner = tasksRunner.GetTaskRunner(task.GetId());
for (ui32 i = 0; i < task.OutputsSize(); ++i) {
@@ -573,12 +575,13 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const
return result;
}
-void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrTxDataShard::TKqpTransaction& kqpTx,
+void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrTxDataShard::TKqpLocks& kqpLocks,
+ bool hasKqpLocks, bool useGenericReadSets,
NKqp::TKqpTasksRunner& tasksRunner, TSysLocks& sysLocks, ui64 tabletId)
{
TMap<std::pair<ui64, ui64>, NKikimrTxDataShard::TKqpReadset> readsetData;
- for (auto& task : kqpTx.GetTasks()) {
+ for (auto& [taskId, task] : tasksRunner.GetTasks()) {
auto& taskRunner = tasksRunner.GetTaskRunner(task.GetId());
for (ui32 i = 0; i < task.OutputsSize(); ++i) {
@@ -606,12 +609,12 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT
NKikimrTx::TReadSetData::EDecision decision = NKikimrTx::TReadSetData::DECISION_COMMIT;
TMap<std::pair<ui64, ui64>, NKikimrTx::TReadSetData> genericData;
- if (kqpTx.HasLocks() && NeedValidateLocks(kqpTx.GetLocks().GetOp())) {
- bool sendLocks = SendLocks(kqpTx.GetLocks(), tabletId);
- YQL_ENSURE(sendLocks == !kqpTx.GetLocks().GetLocks().empty());
+ if (hasKqpLocks && NeedValidateLocks(kqpLocks.GetOp())) {
+ bool sendLocks = SendLocks(kqpLocks, tabletId);
+ YQL_ENSURE(sendLocks == !kqpLocks.GetLocks().empty());
- if (sendLocks && !kqpTx.GetLocks().GetReceivingShards().empty()) {
- auto brokenLocks = ValidateLocks(kqpTx.GetLocks(), sysLocks, tabletId);
+ if (sendLocks && !kqpLocks.GetReceivingShards().empty()) {
+ auto brokenLocks = ValidateLocks(kqpLocks, sysLocks, tabletId);
NKikimrTxDataShard::TKqpValidateLocksResult validateLocksResult;
validateLocksResult.SetSuccess(brokenLocks.empty());
@@ -619,14 +622,14 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT
for (auto& lock : brokenLocks) {
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Found broken lock: " << lock.ShortDebugString());
- if (kqpTx.GetUseGenericReadSets()) {
+ if (useGenericReadSets) {
decision = NKikimrTx::TReadSetData::DECISION_ABORT;
} else {
validateLocksResult.AddBrokenLocks()->Swap(&lock);
}
}
- for (auto& dstTabletId : kqpTx.GetLocks().GetReceivingShards()) {
+ for (auto& dstTabletId : kqpLocks.GetReceivingShards()) {
if (tabletId == dstTabletId) {
continue;
}
@@ -635,7 +638,7 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT
<< tabletId << " to " << dstTabletId << ", locks: " << validateLocksResult.ShortDebugString());
auto key = std::make_pair(tabletId, dstTabletId);
- if (kqpTx.GetUseGenericReadSets()) {
+ if (useGenericReadSets) {
genericData[key].SetDecision(decision);
} else {
readsetData[key].MutableValidateLocksResult()->CopyFrom(validateLocksResult);
@@ -644,7 +647,7 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT
}
}
- if (kqpTx.GetUseGenericReadSets()) {
+ if (useGenericReadSets) {
for (const auto& [key, data] : readsetData) {
bool ok = genericData[key].MutableData()->PackFrom(data);
Y_VERIFY(ok, "Failed to pack readset data from %" PRIu64 " to %" PRIu64, key.first, key.second);
@@ -672,17 +675,18 @@ void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrT
}
bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) {
- auto& kqpTx = tx->GetDataTx()->GetKqpTransaction();
+ auto& kqpLocks = tx->GetDataTx()->GetKqpLocks();
+ bool hasKqpLocks = tx->GetDataTx()->HasKqpLocks();
- if (!kqpTx.HasLocks() || !NeedValidateLocks(kqpTx.GetLocks().GetOp())) {
+ if (!hasKqpLocks || !NeedValidateLocks(kqpLocks.GetOp())) {
return true;
}
- bool sendLocks = SendLocks(kqpTx.GetLocks(), origin);
- YQL_ENSURE(sendLocks == !kqpTx.GetLocks().GetLocks().empty());
+ bool sendLocks = SendLocks(kqpLocks, origin);
+ YQL_ENSURE(sendLocks == !kqpLocks.GetLocks().empty());
if (sendLocks) {
- auto brokenLocks = ValidateLocks(kqpTx.GetLocks(), sysLocks, origin);
+ auto brokenLocks = ValidateLocks(kqpLocks, sysLocks, origin);
if (!brokenLocks.empty()) {
tx->Result() = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(
@@ -702,7 +706,7 @@ bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks)
for (auto& readSet : tx->InReadSets()) {
for (auto& data : readSet.second) {
- if (kqpTx.GetUseGenericReadSets()) {
+ if (tx->GetDataTx()->GetUseGenericReadSets()) {
NKikimrTx::TReadSetData genericData;
bool ok = genericData.ParseFromString(data.Body);
Y_VERIFY(ok, "Failed to parse generic readset from %" PRIu64 " to %" PRIu64 " origin %" PRIu64,
@@ -745,14 +749,14 @@ bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks)
}
bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) {
- auto& kqpTx = tx->GetDataTx()->GetKqpTransaction();
+ auto& kqpLocks = tx->GetDataTx()->GetKqpLocks();
- if (!kqpTx.HasLocks() || !NeedValidateLocks(kqpTx.GetLocks().GetOp())) {
+ if (!tx->GetDataTx()->HasKqpLocks() || !NeedValidateLocks(kqpLocks.GetOp())) {
return true;
}
// Volatile transactions cannot work with non-generic readsets
- YQL_ENSURE(kqpTx.GetUseGenericReadSets());
+ YQL_ENSURE(tx->GetDataTx()->GetUseGenericReadSets());
// We may have some stale data since before the restart
// We expect all stale data to be cleared on restarts
@@ -761,10 +765,10 @@ bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLo
// Note: usually all shards send locks, since they either have side effects or need to validate locks
// However it is technically possible to have pure-read shards, that don't contribute to the final decision
- bool sendLocks = SendLocks(kqpTx.GetLocks(), origin);
+ bool sendLocks = SendLocks(kqpLocks, origin);
if (sendLocks) {
// Note: it is possible to have no locks
- auto brokenLocks = ValidateLocks(kqpTx.GetLocks(), sysLocks, origin);
+ auto brokenLocks = ValidateLocks(kqpLocks, sysLocks, origin);
if (!brokenLocks.empty()) {
tx->Result() = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(
@@ -782,7 +786,7 @@ bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLo
}
// We need to form decision readsets for all other participants
- for (ui64 dstTabletId : kqpTx.GetLocks().GetReceivingShards()) {
+ for (ui64 dstTabletId : kqpLocks.GetReceivingShards()) {
if (dstTabletId == origin) {
// Don't send readsets to ourselves
continue;
@@ -803,11 +807,11 @@ bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLo
}
}
- bool receiveLocks = ReceiveLocks(kqpTx.GetLocks(), origin);
+ bool receiveLocks = ReceiveLocks(kqpLocks, origin);
if (receiveLocks) {
// Note: usually only shards with side-effects receive locks, since they
// need the final outcome to decide whether to commit or abort.
- for (ui64 srcTabletId : kqpTx.GetLocks().GetSendingShards()) {
+ for (ui64 srcTabletId : kqpLocks.GetSendingShards()) {
if (srcTabletId == origin) {
// Don't await decision from ourselves
continue;
@@ -877,13 +881,13 @@ bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLo
}
void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) {
- auto& kqpTx = tx->GetDataTx()->GetKqpTransaction();
+ auto& kqpLocks = tx->GetDataTx()->GetKqpLocks();
- if (!kqpTx.HasLocks() || !NeedEraseLocks(kqpTx.GetLocks().GetOp())) {
+ if (!tx->GetDataTx()->HasKqpLocks() || !NeedEraseLocks(kqpLocks.GetOp())) {
return;
}
- for (auto& lockProto : kqpTx.GetLocks().GetLocks()) {
+ for (auto& lockProto : kqpLocks.GetLocks()) {
if (lockProto.GetDataShard() != origin) {
continue;
}
@@ -896,17 +900,17 @@ void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) {
}
void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, const TRowVersion& writeVersion, TDataShard& dataShard) {
- auto& kqpTx = tx->GetDataTx()->GetKqpTransaction();
- if (!kqpTx.HasLocks()) {
+ if (!tx->GetDataTx()->HasKqpLocks()) {
return;
}
+ auto& kqpLocks = tx->GetDataTx()->GetKqpLocks();
TSysLocks& sysLocks = dataShard.SysLocksTable();
- if (NeedCommitLocks(kqpTx.GetLocks().GetOp())) {
+ if (NeedCommitLocks(kqpLocks.GetOp())) {
// We assume locks have been validated earlier
- for (auto& lockProto : kqpTx.GetLocks().GetLocks()) {
+ for (auto& lockProto : kqpLocks.GetLocks()) {
if (lockProto.GetDataShard() != origin) {
continue;
}
@@ -927,9 +931,9 @@ void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, const TRowVersion& writ
}
void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets,
- const NKikimrTxDataShard::TKqpTransaction& kqpTx, ui64 tabletId)
+ const NKikimrTxDataShard::TKqpLocks& kqpLocks, const NKqp::TKqpTasksRunner& tasksRunner, ui64 tabletId)
{
- for (auto& task : kqpTx.GetTasks()) {
+ for (auto& [taskId, task] : tasksRunner.GetTasks()) {
for (ui32 i = 0; i < task.InputsSize(); ++i) {
for (auto& channel : task.GetInputs(i).GetChannels()) {
if (channel.GetIsPersistent()) {
@@ -945,8 +949,8 @@ void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets,
}
}
- if (ReceiveLocks(kqpTx.GetLocks(), tabletId)) {
- for (ui64 shardId : kqpTx.GetLocks().GetSendingShards()) {
+ if (ReceiveLocks(kqpLocks, tabletId)) {
+ for (ui64 shardId : kqpLocks.GetSendingShards()) {
if (shardId == tabletId) {
continue;
}
diff --git a/ydb/core/tx/datashard/datashard_kqp.h b/ydb/core/tx/datashard/datashard_kqp.h
index b696c19dbc..0a330218cb 100644
--- a/ydb/core/tx/datashard/datashard_kqp.h
+++ b/ydb/core/tx/datashard/datashard_kqp.h
@@ -11,7 +11,7 @@
namespace NKikimr {
namespace NDataShard {
-bool KqpValidateTransaction(const NKikimrTxDataShard::TKqpTransaction& tx, bool isImmediate,
+bool KqpValidateTransaction(const ::google::protobuf::RepeatedPtrField<::NYql::NDqProto::TDqTask> & tasks, bool isImmediate,
ui64 txId, const TActorContext& ctx, bool& hasPersistentChannels);
void KqpSetTxKeys(ui64 tabletId, ui64 taskId, const TUserTable* tableInfo,
@@ -21,18 +21,19 @@ void KqpSetTxKeys(ui64 tabletId, ui64 taskId, const TUserTable* tableInfo,
void KqpSetTxLocksKeys(const NKikimrTxDataShard::TKqpLocks& locks, const TSysLocks& sysLocks, TEngineBay& engineBay);
NYql::NDq::ERunStatus KqpRunTransaction(const TActorContext& ctx, ui64 txId,
- const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner);
+ const NKikimrTxDataShard::TKqpLocks& kqpLocks, bool useGenericReadSets, NKqp::TKqpTasksRunner& tasksRunner);
THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const TActorContext& ctx,
ui64 origin, ui64 txId, const TInputOpData::TInReadSets* inReadSets,
- const NKikimrTxDataShard::TKqpTransaction& kqpTx, NKqp::TKqpTasksRunner& tasksRunner,
+ const NKikimrTxDataShard::TKqpLocks& kqpLocks, bool useGenericReadSets, NKqp::TKqpTasksRunner& tasksRunner,
const NMiniKQL::TKqpDatashardComputeContext& computeCtx);
-void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrTxDataShard::TKqpTransaction& kqpTx,
+void KqpFillOutReadSets(TOutputOpData::TOutReadSets& outReadSets, const NKikimrTxDataShard::TKqpLocks& kqpLocks,
+ bool hasKqpLocks, bool useGenericReadSets,
NKqp::TKqpTasksRunner& tasksRunner, TSysLocks& sysLocks, ui64 tabletId);
void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets,
- const NKikimrTxDataShard::TKqpTransaction& kqpTx, ui64 tabletId);
+ const NKikimrTxDataShard::TKqpLocks& kqpLocks, const NKqp::TKqpTasksRunner& tasksRunner, ui64 tabletId);
bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks);
bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks);
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index a38867a4dc..94e4646ed1 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -1395,7 +1395,7 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction::
} else if (tx->IsReadTable() && dataTx->GetReadTableTransaction().HasSnapshotStep() && dataTx->GetReadTableTransaction().HasSnapshotTxId()) {
badRequest("Ambiguous snapshot info. Cannot use both MVCC and read table snapshots in one transaction");
return tx;
- } else if (tx->IsKqpScanTransaction() && dataTx->GetKqpTransaction().HasSnapshot()) {
+ } else if (tx->IsKqpScanTransaction() && dataTx->HasKqpSnapshot()) {
badRequest("Ambiguous snapshot info. Cannot use both MVCC and kqp scan snapshots in one transaction");
return tx;
}
@@ -1423,9 +1423,9 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction::
if (!tx->IsImmediate() || !Self->IsMvccEnabled()) {
// No op
- } else if (tx->IsKqpScanTransaction() && dataTx->GetKqpTransaction().HasSnapshot()) {
+ } else if (tx->IsKqpScanTransaction() && dataTx->HasKqpSnapshot()) {
// to be consistent while dependencies calculation
- auto snapshot = dataTx->GetKqpTransaction().GetSnapshot();
+ auto snapshot = dataTx->GetKqpSnapshot();
tx->SetMvccSnapshot(TRowVersion(snapshot.GetStep(), snapshot.GetTxId()));
}
}
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index 58d5898a23..137ec95246 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -125,7 +125,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
}
try {
- auto& kqpTx = dataTx->GetKqpTransaction();
+ auto& kqpLocks = dataTx->GetKqpLocks();
+ bool useGenericReadSets = dataTx->GetUseGenericReadSets();
auto& tasksRunner = dataTx->GetKqpTasksRunner();
ui64 consumedMemory = dataTx->GetTxSize() + tasksRunner.GetAllocatedMemory();
@@ -200,7 +201,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
NKqp::NRm::TKqpResourcesRequest req;
req.MemoryPool = NKqp::NRm::EKqpMemoryPool::DataQuery;
req.Memory = txc.GetMemoryLimit();
- ui64 taskId = kqpTx.GetTasks().empty() ? std::numeric_limits<ui64>::max() : kqpTx.GetTasks()[0].GetId();
+ ui64 taskId = dataTx->GetFirstKqpTaskId();
NKqp::GetKqpResourceManager()->NotifyExternalResourcesAllocated(tx->GetTxId(), taskId, req);
Y_DEFER {
@@ -228,7 +229,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
auto& computeCtx = tx->GetDataTx()->GetKqpComputeCtx();
auto result = KqpCompleteTransaction(ctx, tabletId, op->GetTxId(),
- op->HasKqpAttachedRSFlag() ? nullptr : &op->InReadSets(), kqpTx, tasksRunner, computeCtx);
+ op->HasKqpAttachedRSFlag() ? nullptr : &op->InReadSets(), kqpLocks, useGenericReadSets, tasksRunner, computeCtx);
if (!result && computeCtx.HadInconsistentReads()) {
LOG_T("Operation " << *op << " (execute_kqp_data_tx) at " << tabletId
@@ -357,7 +358,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
}
KqpUpdateDataShardStatCounters(DataShard, dataTx->GetCounters());
- auto statsMode = kqpTx.GetRuntimeSettings().GetStatsMode();
+ auto statsMode = dataTx->GetKqpStatsMode();
KqpFillStats(DataShard, tasksRunner, computeCtx, statsMode, *op->Result());
} catch (const TMemoryLimitExceededException&) {
dataTx->ResetCollectedChanges();
diff --git a/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp b/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp
index 05ab71fc36..7d7ba2f897 100644
--- a/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp
+++ b/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp
@@ -56,7 +56,8 @@ EExecutionStatus TPrepareKqpDataTxInRSUnit::Execute(TOperation::TPtr op, TTransa
}
try {
- KqpPrepareInReadsets(op->InReadSets(), tx->GetDataTx()->GetKqpTransaction(), DataShard.TabletID());
+ KqpPrepareInReadsets(op->InReadSets(), tx->GetDataTx()->GetKqpLocks(),
+ tx->GetDataTx()->GetKqpTasksRunner(), DataShard.TabletID());
} catch (const yexception& e) {
LOG_CRIT_S(ctx, NKikimrServices::TX_DATASHARD, "Exception while preparing in-readsets for KQP transaction "
<< *op << " at " << DataShard.TabletID() << ": " << CurrentExceptionMessage());
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 80d424d77c..3e64051a14 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -59,7 +59,7 @@ public:
TLogFunc logger;
if (IsDebugLogEnabled(actorSystem)) {
- logger = [actorSystem, txId = GetTxId(), taskId = GetTask().GetId()] (const TString& message) {
+ logger = [actorSystem, txId = GetTxId(), taskId = Task.GetId()] (const TString& message) {
LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TxId: " << txId
<< ", task: " << taskId << ": " << message);
};
@@ -73,7 +73,7 @@ public:
}
}
std::tie(TaskRunnerActor, actor) = TaskRunnerActorFactory->Create(
- this, GetTxId(), GetTask().GetId(), std::move(inputWithDisabledCheckpointing), InitMemoryQuota());
+ this, GetTxId(), Task.GetId(), std::move(inputWithDisabledCheckpointing), InitMemoryQuota());
TaskRunnerActorId = RegisterWithSameMailbox(actor);
TDqTaskRunnerMemoryLimits limits;
@@ -87,7 +87,7 @@ public:
Send(TaskRunnerActorId,
new NTaskRunnerActor::TEvTaskRunnerCreate(
- GetTask(), limits, execCtx));
+ Task.GetSerializedTask(), limits, execCtx));
CA_LOG_D("Use CPU quota: " << UseCpuQuota() << ". Rate limiter resource: { \"" << Task.GetRateLimiter() << "\", \"" << Task.GetRateLimiterResource() << "\" }");
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
index 4c99e02f9e..2a73a3c72e 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
@@ -53,7 +53,7 @@ public:
};
}
- auto taskRunner = TaskRunnerFactory(GetTask(), logger);
+ auto taskRunner = TaskRunnerFactory(Task, logger);
SetTaskRunner(taskRunner);
PrepareTaskRunner();
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
index 5da25141ad..cc6d1b672b 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
@@ -309,7 +309,7 @@ struct TComputeMemoryLimits {
};
using TTaskRunnerFactory = std::function<
- TIntrusivePtr<IDqTaskRunner>(const NDqProto::TDqTask& task, const TLogFunc& logFunc)
+ TIntrusivePtr<IDqTaskRunner>(const TDqTaskSettings& task, const TLogFunc& logFunc)
>;
void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& taskStats,
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
index bb6b613b0c..289f0fa0aa 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
@@ -38,7 +38,7 @@ TString InFlightMessagesStr(const TCollection& inFlight) {
} // anonymous namespace
-TDqComputeActorChannels::TDqComputeActorChannels(TActorId owner, const TTxId& txId, const NDqProto::TDqTask& task,
+TDqComputeActorChannels::TDqComputeActorChannels(TActorId owner, const TTxId& txId, const TDqTaskSettings& task,
bool retryOnUndelivery, NDqProto::EDqStatsMode statsMode, ui64 channelBufferSize, ICallbacks* cbs, ui32 actorActivityType)
: TActor(&TDqComputeActorChannels::WorkState, actorActivityType)
, Owner(owner)
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h
index f2e893bc24..6a2330f21e 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h
@@ -39,7 +39,7 @@ public:
};
public:
- TDqComputeActorChannels(NActors::TActorId owner, const TTxId& txId, const NYql::NDqProto::TDqTask& task, bool retryOnUndelivery,
+ TDqComputeActorChannels(NActors::TActorId owner, const TTxId& txId, const TDqTaskSettings& task, bool retryOnUndelivery,
NDqProto::EDqStatsMode statsMode, ui64 channelBufferSize, ICallbacks* cbs, ui32 actorActivityType);
private:
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
index c7a7ee8fe5..dce6669ae2 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
@@ -52,7 +52,7 @@ TString MakeStringForLog(const NDqProto::TCheckpoint& checkpoint) {
return TStringBuilder() << checkpoint.GetGeneration() << "." << checkpoint.GetId();
}
-bool IsIngressTask(const NDqProto::TDqTask& task) {
+bool IsIngressTask(const TDqTaskSettings& task) {
for (const auto& input : task.GetInputs()) {
if (!input.HasSource()) {
return false;
@@ -125,7 +125,7 @@ NDqProto::TComputeActorState CombineForeignState(
} // namespace
-TDqComputeActorCheckpoints::TDqComputeActorCheckpoints(const NActors::TActorId& owner, const TTxId& txId, NDqProto::TDqTask task, ICallbacks* computeActor)
+TDqComputeActorCheckpoints::TDqComputeActorCheckpoints(const NActors::TActorId& owner, const TTxId& txId, TDqTaskSettings task, ICallbacks* computeActor)
: TActor(&TDqComputeActorCheckpoints::StateFunc)
, Owner(owner)
, TxId(txId)
@@ -543,7 +543,7 @@ void TDqComputeActorCheckpoints::TPendingCheckpoint::Clear() {
ComputeActorState.Clear();
}
-size_t TDqComputeActorCheckpoints::TPendingCheckpoint::GetSinksCount(const NDqProto::TDqTask& task) {
+size_t TDqComputeActorCheckpoints::TPendingCheckpoint::GetSinksCount(const TDqTaskSettings& task) {
size_t sinksCount = 0;
for (int outputIndex = 0, outputsCount = task.OutputsSize(); outputIndex < outputsCount; ++outputIndex) {
if (task.GetOutputs(outputIndex).HasSink()) {
@@ -562,7 +562,7 @@ static bool IsInfiniteSourceType(const TString& sourceType) {
return sourceType == "PqSource";
}
-NDqProto::ECheckpointingMode GetTaskCheckpointingMode(const NDqProto::TDqTask& task) {
+NDqProto::ECheckpointingMode GetTaskCheckpointingMode(const TDqTaskSettings& task) {
for (const auto& input : task.GetInputs()) {
if (const TString& srcType = input.GetSource().GetType(); srcType && IsInfiniteSourceType(srcType)) {
return NDqProto::CHECKPOINTING_MODE_DEFAULT;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
index bbfb4a327f..66e458ec72 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
@@ -20,7 +20,7 @@ enum ECheckpointingMode : int;
namespace NYql::NDq {
-NDqProto::ECheckpointingMode GetTaskCheckpointingMode(const NDqProto::TDqTask& task);
+NDqProto::ECheckpointingMode GetTaskCheckpointingMode(const TDqTaskSettings& task);
class TDqComputeActorCheckpoints : public NActors::TActor<TDqComputeActorCheckpoints>
{
@@ -35,7 +35,7 @@ class TDqComputeActorCheckpoints : public NActors::TActor<TDqComputeActorCheckpo
};
struct TPendingCheckpoint {
- TPendingCheckpoint(const NDqProto::TDqTask& task)
+ TPendingCheckpoint(const TDqTaskSettings& task)
: SinksCount(GetSinksCount(task))
{
}
@@ -54,7 +54,7 @@ class TDqComputeActorCheckpoints : public NActors::TActor<TDqComputeActorCheckpo
return SavedComputeActorState && SinksCount == SavedSinkStatesCount;
}
- static size_t GetSinksCount(const NDqProto::TDqTask& task);
+ static size_t GetSinksCount(const TDqTaskSettings& task);
const size_t SinksCount;
TMaybe<NDqProto::TCheckpoint> Checkpoint;
@@ -89,7 +89,7 @@ public:
ComputeActorCurrentStateVersion = 2,
};
- TDqComputeActorCheckpoints(const NActors::TActorId& owner, const TTxId& txId, NDqProto::TDqTask task, ICallbacks* computeActor);
+ TDqComputeActorCheckpoints(const NActors::TActorId& owner, const TTxId& txId, TDqTaskSettings task, ICallbacks* computeActor);
void Init(NActors::TActorId computeActorId, NActors::TActorId checkpointsId);
[[nodiscard]]
bool HasPendingCheckpoint() const;
@@ -140,7 +140,7 @@ private:
private:
const NActors::TActorId Owner;
const TTxId TxId;
- const NDqProto::TDqTask Task;
+ const TDqTaskSettings Task;
const bool IngressTask;
const NActors::TActorId CheckpointStorage;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index afc078e1e7..1d1e406d74 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -108,6 +108,7 @@ struct TComputeActorStateFuncHelper<void (T::*)(STFUNC_SIG)> {
} // namespace NDetails
+
template<typename TDerived>
class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
, public TDqComputeActorChannels::ICallbacks
@@ -1458,7 +1459,11 @@ protected:
return TxId;
}
- const NDqProto::TDqTask& GetTask() const {
+ const TDqTaskSettings& GetTask() const {
+ return Task;
+ }
+
+ TDqTaskSettings& GetTaskRef() {
return Task;
}
@@ -1475,8 +1480,7 @@ protected:
TaskRunner = taskRunner;
}
- void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext(),
- const TDqTaskRunnerParameterProvider& parameterProvider = {}) {
+ void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext()) {
YQL_ENSURE(TaskRunner);
auto guard = TaskRunner->BindAllocator(MemoryQuota->GetMkqlMemoryLimit());
@@ -1488,7 +1492,7 @@ protected:
limits.ChannelBufferSize = MemoryLimits.ChannelBufferSize;
limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
- TaskRunner->Prepare(Task, limits, execCtx, parameterProvider);
+ TaskRunner->Prepare(Task, limits, execCtx);
FillIoMaps(
TaskRunner->GetHolderFactory(),
@@ -2099,7 +2103,7 @@ protected:
protected:
const NActors::TActorId ExecuterId;
const TTxId TxId;
- const NDqProto::TDqTask Task;
+ TDqTaskSettings Task;
TString LogPrefix;
const TComputeRuntimeSettings RuntimeSettings;
TComputeMemoryLimits MemoryLimits;
diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h
index 79bb8ed258..e803c548ec 100644
--- a/ydb/library/yql/dq/actors/task_runner/events.h
+++ b/ydb/library/yql/dq/actors/task_runner/events.h
@@ -158,18 +158,15 @@ struct TEvTaskRunnerCreate
TEvTaskRunnerCreate(
const NDqProto::TDqTask& task,
const TDqTaskRunnerMemoryLimits& memoryLimits,
- const std::shared_ptr<IDqTaskRunnerExecutionContext>& execCtx = std::shared_ptr<IDqTaskRunnerExecutionContext>(new TDqTaskRunnerExecutionContext()),
- const TDqTaskRunnerParameterProvider& parameterProvider = {})
+ const std::shared_ptr<IDqTaskRunnerExecutionContext>& execCtx = std::shared_ptr<IDqTaskRunnerExecutionContext>(new TDqTaskRunnerExecutionContext()))
: Task(task)
, MemoryLimits(memoryLimits)
, ExecCtx(execCtx)
- , ParameterProvider(parameterProvider)
{ }
NDqProto::TDqTask Task;
TDqTaskRunnerMemoryLimits MemoryLimits;
std::shared_ptr<IDqTaskRunnerExecutionContext> ExecCtx;
- TDqTaskRunnerParameterProvider ParameterProvider;
};
struct TEvTaskRunnerCreateFinished
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index 41f26a7858..45715caef0 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -408,11 +408,12 @@ private:
void OnDqTask(TEvTaskRunnerCreate::TPtr& ev) {
ParentId = ev->Sender;
- TaskRunner = Factory(ev->Get()->Task, [this](const TString& message) {
+ auto settings = NDq::TDqTaskSettings(std::move(ev->Get()->Task));
+ TaskRunner = Factory(settings, [this](const TString& message) {
LOG_D(message);
});
- auto& inputs = ev->Get()->Task.GetInputs();
+ auto& inputs = settings.GetInputs();
for (auto inputId = 0; inputId < inputs.size(); inputId++) {
auto& input = inputs[inputId];
if (input.HasSource()) {
@@ -429,7 +430,7 @@ private:
MemoryQuota->TrySetIncreaseMemoryLimitCallback(guard.GetMutex());
}
- TaskRunner->Prepare(ev->Get()->Task, ev->Get()->MemoryLimits, *ev->Get()->ExecCtx, ev->Get()->ParameterProvider);
+ TaskRunner->Prepare(settings, ev->Get()->MemoryLimits, *ev->Get()->ExecCtx);
auto event = MakeHolder<TEvTaskRunnerCreateFinished>(
TaskRunner->GetSecureParams(),
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 278429f9dc..19ffc4651b 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -309,7 +309,7 @@ public:
return opts;
}
- std::shared_ptr<TPatternCacheEntry> CreateComputationPattern(const NDqProto::TDqTask& task, const TString& rawProgram, bool forCache, bool& canBeCached) {
+ std::shared_ptr<TPatternCacheEntry> CreateComputationPattern(const TDqTaskSettings& task, const TString& rawProgram, bool forCache, bool& canBeCached) {
canBeCached = true;
auto entry = TComputationPatternLRUCache::CreateCacheEntry(UseSeparatePatternAlloc());
auto& patternAlloc = UseSeparatePatternAlloc() ? entry->Alloc : Alloc();
@@ -417,7 +417,7 @@ public:
return entry;
}
- std::shared_ptr<TPatternCacheEntry> BuildTask(const NDqProto::TDqTask& task, const TDqTaskRunnerParameterProvider& parameterProvider) {
+ std::shared_ptr<TPatternCacheEntry> BuildTask(const TDqTaskSettings& task) {
LOG(TStringBuilder() << "Build task: " << TaskId);
auto startTime = TInstant::Now();
@@ -469,17 +469,7 @@ public:
std::string_view name = entry->ParamsStruct->GetMemberName(i);
TType* type = entry->ParamsStruct->GetMemberType(i);
- if (parameterProvider && parameterProvider(name, type, TypeEnv(), graphHolderFactory, structMembers[i])) {
-#ifndef NDEBUG
- YQL_ENSURE(!task.GetParameters().contains(name), "param: " << name);
-#endif
- } else {
- auto it = task.GetParameters().find(name);
- YQL_ENSURE(it != task.GetParameters().end());
-
- auto guard = TypeEnv().BindAllocator();
- TDqDataSerializer::DeserializeParam(it->second, type, graphHolderFactory, structMembers[i]);
- }
+ task.GetParameterValue(name, type, TypeEnv(), graphHolderFactory, structMembers[i]);
{
auto guard = TypeEnv().BindAllocator();
@@ -504,11 +494,11 @@ public:
return entry;
}
- void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits,
- const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider& parameterProvider) override
+ void Prepare(const TDqTaskSettings& task, const TDqTaskRunnerMemoryLimits& memoryLimits,
+ const IDqTaskRunnerExecutionContext& execCtx) override
{
TaskId = task.GetId();
- auto entry = BuildTask(task, parameterProvider);
+ auto entry = BuildTask(task);
LOG(TStringBuilder() << "Prepare task: " << TaskId);
auto startTime = TInstant::Now();
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index c400b0de79..dcc6c7e49e 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -282,15 +282,127 @@ using TDqTaskRunnerParameterProvider = std::function<
const NKikimr::NMiniKQL::THolderFactory& holderFactory, NUdf::TUnboxedValue& value)
>;
+class TDqTaskSettings {
+public:
+ explicit TDqTaskSettings(NDqProto::TDqTask&& task)
+ : Task_(std::move(task))
+ {}
+
+ explicit TDqTaskSettings(const NDqProto::TDqTask& task)
+ : Task_(task)
+ {}
+
+ ui64 GetId() const {
+ return Task_.GetId();
+ }
+
+ bool GetCreateSuspended() const {
+ return Task_.GetCreateSuspended();
+ }
+
+ const NDqProto::TDqTask& GetSerializedTask() const {
+ Y_VERIFY(!ParamProvider, "GetSerialized isn't supported if external ParamProvider callback is specified!");
+ return Task_;
+ }
+
+ const ::NYql::NDqProto::TTaskInput& GetInputs(size_t index) const {
+ return Task_.GetInputs(index);
+ }
+
+ const ::NYql::NDqProto::TTaskOutput& GetOutputs(size_t index) const {
+ return Task_.GetOutputs(index);
+ }
+
+ const ::google::protobuf::RepeatedPtrField<::NYql::NDqProto::TTaskInput> GetInputs() const {
+ return Task_.GetInputs();
+ }
+
+ size_t InputsSize() const {
+ return Task_.InputsSize();
+ }
+
+ size_t OutputsSize() const {
+ return Task_.OutputsSize();
+ }
+
+ void SetParamsProvider(TDqTaskRunnerParameterProvider&& provider) {
+ ParamProvider = std::move(provider);
+ }
+
+ void GetParameterValue(std::string_view name, NKikimr::NMiniKQL::TType* type, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory, NUdf::TUnboxedValue& value) const
+ {
+ if (ParamProvider && ParamProvider(name, type, typeEnv, holderFactory, value)) {
+#ifndef NDEBUG
+ YQL_ENSURE(!Task_.GetParameters().contains(name), "param: " << name);
+#endif
+ } else {
+ auto it = Task_.GetParameters().find(name);
+ YQL_ENSURE(it != Task_.GetParameters().end());
+
+ auto guard = typeEnv.BindAllocator();
+ TDqDataSerializer::DeserializeParam(it->second, type, holderFactory, value);
+ }
+ }
+
+ ui64 GetStageId() const {
+ return Task_.GetStageId();
+ }
+
+ const ::NYql::NDqProto::TProgram& GetProgram() const {
+ return Task_.GetProgram();
+ }
+
+ const TProtoStringType & GetRateLimiterResource() const {
+ return Task_.GetRateLimiterResource();
+ }
+
+ const TProtoStringType& GetRateLimiter() const {
+ return Task_.GetRateLimiter();
+ }
+
+ const ::google::protobuf::Map<TProtoStringType, ::NYql::NDqProto::TData>& GetParameters() const {
+ return Task_.GetParameters();
+ }
+
+ const ::google::protobuf::Map<TProtoStringType, TProtoStringType>& GetTaskParams() const {
+ return Task_.GetTaskParams();
+ }
+
+ const ::google::protobuf::Map<TProtoStringType, TProtoStringType>& GetSecureParams() const {
+ return Task_.GetSecureParams();
+ }
+
+ const ::google::protobuf::RepeatedPtrField<::NYql::NDqProto::TTaskOutput>& GetOutputs() const {
+ return Task_.GetOutputs();
+ }
+
+ const ::google::protobuf::Any& GetMeta() const {
+ return Task_.GetMeta();
+ }
+
+ bool GetUseLlvm() const {
+ return Task_.GetUseLlvm();
+ }
+
+ bool HasUseLlvm() const {
+ return Task_.HasUseLlvm();
+ }
+
+private:
+ // external callback to retrieve parameter value.
+ TDqTaskRunnerParameterProvider ParamProvider;
+ NDqProto::TDqTask Task_;
+};
+
class IDqTaskRunner : public TSimpleRefCount<IDqTaskRunner>, private TNonCopyable {
public:
virtual ~IDqTaskRunner() = default;
virtual ui64 GetTaskId() const = 0;
- virtual void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits,
- const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext(),
- const TDqTaskRunnerParameterProvider& parameterProvider = {}) = 0;
+ virtual void Prepare(const TDqTaskSettings& task, const TDqTaskRunnerMemoryLimits& memoryLimits,
+ const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext()) = 0;
virtual ERunStatus Run() = 0;
virtual bool HasEffects() const = 0;
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
index 0c8e03e285..dadd69897e 100644
--- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
@@ -54,7 +54,7 @@ IActor* CreateComputeActor(
}
}
- auto taskRunnerFactory = [=](const NDqProto::TDqTask& task, const NDq::TLogFunc& logger) {
+ auto taskRunnerFactory = [=](const NDq::TDqTaskSettings& task, const NDq::TLogFunc& logger) {
Y_UNUSED(logger);
return options.Factory->Get(task, {});
};
diff --git a/ydb/library/yql/providers/dq/actors/task_controller_impl.h b/ydb/library/yql/providers/dq/actors/task_controller_impl.h
index 61376bfcb3..bb1458a7bc 100644
--- a/ydb/library/yql/providers/dq/actors/task_controller_impl.h
+++ b/ydb/library/yql/providers/dq/actors/task_controller_impl.h
@@ -516,7 +516,7 @@ public:
for (int i = 0; i < static_cast<int>(tasks.size()); ++i) {
auto actorId = ActorIdFromProto(actorIds[i]);
auto& task = tasks[i];
- Tasks.emplace_back(task, actorId);
+ Tasks.emplace_back(NDq::TDqTaskSettings(std::move(task)), actorId);
ActorIds.emplace(task.GetId(), actorId);
TaskIds.emplace(actorId, task.GetId());
Yql::DqsProto::TTaskMeta taskMeta;
@@ -536,6 +536,10 @@ public:
}
}
+ const NDq::TDqTaskSettings GetTask(size_t idx) const {
+ return Tasks.at(idx).first;
+ }
+
private:
void MaybeUpdateChannels() {
if (Tasks.empty() || ChannelsUpdated || Tasks.size() != Executing.size()) {
@@ -634,7 +638,7 @@ private:
bool ChannelsUpdated = false;
- TVector<std::pair<NDqProto::TDqTask, TActorId>> Tasks;
+ TVector<std::pair<NDq::TDqTaskSettings, TActorId>> Tasks;
THashSet<ui64> FinishedTasks;
THashMap<ui64, TInstant> Executing;
THashMap<ui64, TActorId> ActorIds;
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index 886f619c23..7c7abd1eca 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -140,10 +140,11 @@ public:
settings.CollectBasicStats = true;
settings.CollectProfileStats = true;
auto runner = NDq::MakeDqTaskRunner(executionContext, settings, {});
+ auto runnerSettings = NDq::TDqTaskSettings(std::move(task));
{
auto guard = runner->BindAllocator(State->Settings->MemoryLimit.Get().GetOrElse(0));
- runner->Prepare(task, limits);
+ runner->Prepare(runnerSettings, limits);
}
TVector<NDqProto::TData> rows;
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index 5a7be9b85d..655e31c44f 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -515,9 +515,12 @@ public:
request.Load(&input);
});
- request.GetTask().GetMeta().UnpackTo(&taskMeta);
+ NDqProto::TDqTask task;
+ request.MutableTask()->Swap(&task);
+ task.GetMeta().UnpackTo(&taskMeta);
+ NDq::TDqTaskSettings settings(std::move(task));
try {
- Prepare(request.GetTask(), taskMeta, output);
+ Prepare(settings, taskMeta, output);
} catch (const NKikimr::TMemoryLimitExceededException& ex) {
throw yexception() << "DQ computation exceeds the memory limit " << DqConfiguration->MemoryLimit.Get().GetOrElse(0) << ". Try to increase the limit using PRAGMA dq.MemoryLimit";
}
@@ -699,7 +702,7 @@ public:
}
template<typename T>
- void Prepare(const NDqProto::TDqTask& task, const T& taskMeta, TPipedOutput& output) {
+ void Prepare(const NDq::TDqTaskSettings& task, const T& taskMeta, TPipedOutput& output) {
NYql::NDqProto::TPrepareResponse result;
result.SetResult(true); // COMPAT(aozeritsky) YQL-14268
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
index 8c5e80cbbe..9c221241b2 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
@@ -113,7 +113,7 @@ private:
class TLocalTaskRunner: public ITaskRunner {
public:
- TLocalTaskRunner(const NDqProto::TDqTask& task, TIntrusivePtr<IDqTaskRunner> runner)
+ TLocalTaskRunner(const NDq::TDqTaskSettings& task, TIntrusivePtr<IDqTaskRunner> runner)
: Task(task)
, Runner(runner)
{ }
@@ -196,7 +196,7 @@ private:
QueryStat.AddTaskRunnerStats(*Runner->GetStats(), Stats, Task.GetId());
}
- NDqProto::TDqTask Task;
+ NDq::TDqTaskSettings Task;
TIntrusivePtr<IDqTaskRunner> Runner;
TCounters QueryStat;
TDqTaskRunnerStats Stats;
@@ -252,11 +252,11 @@ public:
, TerminateOnError(terminateOnError)
{ }
- ITaskRunner::TPtr GetOld(const NDqProto::TDqTask& task, const TString& traceId) override {
+ ITaskRunner::TPtr GetOld(const TDqTaskSettings& task, const TString& traceId) override {
return new TLocalTaskRunner(task, Get(task, traceId));
}
- TIntrusivePtr<NDq::IDqTaskRunner> Get(const NDqProto::TDqTask& task, const TString& traceId) override {
+ TIntrusivePtr<NDq::IDqTaskRunner> Get(const TDqTaskSettings& task, const TString& traceId) override {
Y_UNUSED(traceId);
NDq::TDqTaskRunnerSettings settings;
settings.TerminateOnError = TerminateOnError;
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index e85962746f..722ce32ddb 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -1477,8 +1477,8 @@ public:
return Task.GetId();
}
- void Prepare(const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits,
- const IDqTaskRunnerExecutionContext& execCtx, const TDqTaskRunnerParameterProvider&) override
+ void Prepare(const TDqTaskSettings& task, const TDqTaskRunnerMemoryLimits& memoryLimits,
+ const IDqTaskRunnerExecutionContext& execCtx) override
{
Y_UNUSED(memoryLimits);
Y_UNUSED(execCtx);
@@ -1729,7 +1729,7 @@ public:
TaskScheduler.Start();
}
- ITaskRunner::TPtr GetOld(const NDqProto::TDqTask& tmp, const TString& traceId) override {
+ ITaskRunner::TPtr GetOld(const NDq::TDqTaskSettings& tmp, const TString& traceId) override {
Yql::DqsProto::TTaskMeta taskMeta;
tmp.GetMeta().UnpackTo(&taskMeta);
ui64 stageId = taskMeta.GetStageId();
@@ -1738,7 +1738,7 @@ public:
return new TTaskRunner(task, std::move(result), stageId, traceId);
}
- TIntrusivePtr<NDq::IDqTaskRunner> Get(const NDqProto::TDqTask& tmp, const TString& traceId) override
+ TIntrusivePtr<NDq::IDqTaskRunner> Get(const NDq::TDqTaskSettings& tmp, const TString& traceId) override
{
Yql::DqsProto::TTaskMeta taskMeta;
tmp.GetMeta().UnpackTo(&taskMeta);
@@ -1780,9 +1780,9 @@ private:
return exePath + "," + settings.ToString();
}
- NDqProto::TDqTask PrepareTask(const NDqProto::TDqTask& tmp, TChildProcess* result) {
+ NDqProto::TDqTask PrepareTask(const NDq::TDqTaskSettings& tmp, TChildProcess* result) {
// get files from fileCache
- NDqProto::TDqTask task = tmp;
+ auto task = tmp.GetSerializedTask();
Yql::DqsProto::TTaskMeta taskMeta;
task.GetMeta().UnpackTo(&taskMeta);
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
index 06aac8ca3f..20785ed4d1 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
@@ -89,9 +89,9 @@ class IProxyFactory: public TThrRefBase, private TNonCopyable {
public:
using TPtr = TIntrusivePtr<IProxyFactory>;
- virtual ITaskRunner::TPtr GetOld(const NDqProto::TDqTask& task, const TString& traceId = "") = 0;
+ virtual ITaskRunner::TPtr GetOld(const NDq::TDqTaskSettings& task, const TString& traceId = "") = 0;
- virtual TIntrusivePtr<NDq::IDqTaskRunner> Get(const NDqProto::TDqTask& task, const TString& traceId = "TODO") = 0;
+ virtual TIntrusivePtr<NDq::IDqTaskRunner> Get(const NDq::TDqTaskSettings& task, const TString& traceId = "TODO") = 0;
};
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index b2d41827aa..1cac519e70 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -449,7 +449,8 @@ private:
ParentId = ev->Sender;
try {
- TaskRunner = Factory->GetOld(ev->Get()->Task, TraceId);
+ NDq::TDqTaskSettings settings(std::move(ev->Get()->Task));
+ TaskRunner = Factory->GetOld(settings, TraceId);
} catch (...) {
TString message = "Could not create TaskRunner for " + ToString(taskId) + " on node " + ToString(replyTo.NodeId()) + ", error: " + CurrentExceptionMessage();
Send(replyTo, MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, message), 0, cookie);