diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2022-12-20 14:38:56 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2022-12-20 14:38:56 +0300 |
commit | db4782a4b022421868467dbc9218dac235d9d615 (patch) | |
tree | 9969c198d80211eaafc1190185f2e07a619e6486 | |
parent | 0b3d10fb0817ad9b03497ef999ffc235ddac29fe (diff) | |
download | ydb-db4782a4b022421868467dbc9218dac235d9d615.tar.gz |
Make KqpUpsertRows and KqpDeleteRows suitable for cache
-rw-r--r-- | ydb/core/kqp/runtime/kqp_tasks_runner.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_tasks_runner.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_compute.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_delete_rows.cpp | 19 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp | 25 |
6 files changed, 33 insertions, 23 deletions
diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp index 223feb970a..677c1a59f2 100644 --- a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp +++ b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp @@ -81,6 +81,8 @@ TKqpTasksRunner::TKqpTasksRunner(const google::protobuf::RepeatedPtrField<NDqPro YQL_ENSURE(execCtx.Alloc); YQL_ENSURE(execCtx.TypeEnv); + ApplyCtx = dynamic_cast<NMiniKQL::TKqpDatashardApplyContext *>(execCtx.ApplyCtx); + YQL_ENSURE(ApplyCtx); ComputeCtx = dynamic_cast<NMiniKQL::TKqpComputeContextBase*>(execCtx.ComputeCtx); YQL_ENSURE(ComputeCtx); @@ -132,6 +134,8 @@ ERunStatus TKqpTasksRunner::Run(bool applyEffects) { bool hasPendingInputTasks = false; bool hasPendingOutputTasks = false; + // for per-task statistics in KqpUpsertRows and KqpDeleteRows computation nodes + auto dsCtx = dynamic_cast<TKqpDatashardComputeContext*>(ComputeCtx); for (auto& [taskId, task] : TaskRunners) { if (Y_UNLIKELY(LogFunc)) { LogFunc(TStringBuilder() << "running task: " << taskId); @@ -142,6 +146,9 @@ ERunStatus TKqpTasksRunner::Run(bool applyEffects) { continue; } + if (dsCtx) { + ApplyCtx->TaskTableStats = &dsCtx->GetTaskCounters(taskId); + } auto status = task->Run(); switch (status) { diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.h b/ydb/core/kqp/runtime/kqp_tasks_runner.h index 57c4763b25..b0e52743b6 100644 --- a/ydb/core/kqp/runtime/kqp_tasks_runner.h +++ b/ydb/core/kqp/runtime/kqp_tasks_runner.h @@ -1,6 +1,7 @@ #pragma once #include "kqp_compute.h" +#include "ydb/core/tx/datashard/datashard_kqp_compute.h" #include <ydb/library/yql/dq/runtime/dq_tasks_runner.h> namespace NKikimr { @@ -58,6 +59,7 @@ private: NYql::NDq::TLogFunc LogFunc; NMiniKQL::TScopedAlloc* Alloc; NMiniKQL::TKqpComputeContextBase* ComputeCtx; + NMiniKQL::TKqpDatashardApplyContext* ApplyCtx; enum EState { Initial = 0, diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index eebeb026f8..df8a320058 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -858,6 +858,7 @@ TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActor auto kqpApplyCtx = MakeHolder<TKqpDatashardApplyContext>(); kqpApplyCtx->Host = EngineHost.Get(); + kqpApplyCtx->ShardTableStats = &ComputeCtx->GetDatashardCounters(); KqpApplyCtx.Reset(kqpApplyCtx.Release()); diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.h b/ydb/core/tx/datashard/datashard_kqp_compute.h index de6d81372a..0287e896af 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.h +++ b/ydb/core/tx/datashard/datashard_kqp_compute.h @@ -122,6 +122,8 @@ private: class TKqpDatashardApplyContext : public NUdf::IApplyContext { public: IEngineFlatHost* Host = nullptr; + TKqpTableStats* ShardTableStats = nullptr; + TKqpTableStats* TaskTableStats = nullptr; }; IComputationNode* WrapKqpWideReadTableRanges(TCallable& callable, const TComputationNodeFactoryContext& ctx, diff --git a/ydb/core/tx/datashard/datashard_kqp_delete_rows.cpp b/ydb/core/tx/datashard/datashard_kqp_delete_rows.cpp index f73be2d14a..6bd478dcd6 100644 --- a/ydb/core/tx/datashard/datashard_kqp_delete_rows.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_delete_rows.cpp @@ -47,13 +47,15 @@ public: if (!engineCtx.Host->IsMyKey(Owner.TableId, keyTuple)) { return; } + Y_VERIFY(engineCtx.ShardTableStats); + Y_VERIFY(engineCtx.TaskTableStats); - ui64 nEraseRow = Owner.ShardTableStats.NEraseRow; + ui64 nEraseRow = engineCtx.ShardTableStats->NEraseRow; engineCtx.Host->EraseRow(Owner.TableId, keyTuple); - if (i64 delta = Owner.ShardTableStats.NEraseRow - nEraseRow; delta > 0) { - Owner.TaskTableStats.NEraseRow += delta; + if (i64 delta = engineCtx.ShardTableStats->NEraseRow - nEraseRow; delta > 0) { + engineCtx.TaskTableStats->NEraseRow += delta; } }; @@ -93,16 +95,15 @@ public: } public: - TKqpDeleteRowsWrapper(TComputationMutables& mutables, TKqpDatashardComputeContext& computeCtx, - const TTableId& tableId, IComputationNode* rowsNode, TVector<NScheme::TTypeInfo> rowTypes, TVector<ui32> keyIndices, const TTypeEnvironment& env) + TKqpDeleteRowsWrapper(TComputationMutables& mutables, const TTableId& tableId, IComputationNode* rowsNode, + TVector<NScheme::TTypeInfo> rowTypes, TVector<ui32> keyIndices, const TTypeEnvironment& env) : TBase(mutables) , TableId(tableId) , RowsNode(rowsNode) , RowTypes(std::move(rowTypes)) , KeyIndices(std::move(keyIndices)) , Env(env) - , ShardTableStats(computeCtx.GetDatashardCounters()) - , TaskTableStats(computeCtx.GetTaskCounters(computeCtx.GetCurrentTaskId())) {} + {} private: void RegisterDependencies() const final { @@ -115,8 +116,6 @@ private: const TVector<NScheme::TTypeInfo> RowTypes; const TVector<ui32> KeyIndices; const TTypeEnvironment& Env; - TKqpTableStats& ShardTableStats; - TKqpTableStats& TaskTableStats; }; } // namespace @@ -167,7 +166,7 @@ IComputationNode* WrapKqpDeleteRows(TCallable& callable, const TComputationNodeF keyIndices[i] = it->second; } - return new TKqpDeleteRowsWrapper(ctx.Mutables, computeCtx, tableId, + return new TKqpDeleteRowsWrapper(ctx.Mutables, tableId, LocateNode(ctx.NodeLocator, *rowsNode.GetNode()), std::move(rowTypes), std::move(keyIndices), ctx.Env); } diff --git a/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp b/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp index 5abccb769c..d6a95c1eb4 100644 --- a/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp @@ -80,15 +80,17 @@ public: commands.emplace_back(std::move(command)); } + Y_VERIFY(dsApplyCtx.ShardTableStats); + Y_VERIFY(dsApplyCtx.TaskTableStats); - ui64 nUpdateRow = Owner.ShardTableStats.NUpdateRow; - ui64 updateRowBytes = Owner.ShardTableStats.UpdateRowBytes; + ui64 nUpdateRow = dsApplyCtx.ShardTableStats->NUpdateRow; + ui64 updateRowBytes = dsApplyCtx.ShardTableStats->UpdateRowBytes; dsApplyCtx.Host->UpdateRow(Owner.TableId, keyTuple, commands); - if (i64 delta = Owner.ShardTableStats.NUpdateRow - nUpdateRow; delta > 0) { - Owner.TaskTableStats.NUpdateRow += delta; - Owner.TaskTableStats.UpdateRowBytes += Owner.ShardTableStats.UpdateRowBytes - updateRowBytes; + if (i64 delta = dsApplyCtx.ShardTableStats->NUpdateRow - nUpdateRow; delta > 0) { + dsApplyCtx.TaskTableStats->NUpdateRow += delta; + dsApplyCtx.TaskTableStats->UpdateRowBytes += dsApplyCtx.ShardTableStats->UpdateRowBytes - updateRowBytes; } }; @@ -128,9 +130,9 @@ public: } public: - TKqpUpsertRowsWrapper(TComputationMutables& mutables, TKqpDatashardComputeContext& computeCtx, - const TTableId& tableId, IComputationNode* rowsNode, TVector<NScheme::TTypeInfo>&& rowTypes, - TVector<ui32>&& keyIndices, TVector<TUpsertColumn>&& upsertColumns, const TTypeEnvironment& env) + TKqpUpsertRowsWrapper(TComputationMutables& mutables, const TTableId& tableId, IComputationNode* rowsNode, + TVector<NScheme::TTypeInfo>&& rowTypes, TVector<ui32>&& keyIndices, + TVector<TUpsertColumn>&& upsertColumns, const TTypeEnvironment& env) : TBase(mutables) , TableId(tableId) , RowsNode(rowsNode) @@ -138,8 +140,7 @@ public: , KeyIndices(std::move(keyIndices)) , UpsertColumns(std::move(upsertColumns)) , Env(env) - , ShardTableStats(computeCtx.GetDatashardCounters()) - , TaskTableStats(computeCtx.GetTaskCounters(computeCtx.GetCurrentTaskId())) {} + {} private: void RegisterDependencies() const final { @@ -153,8 +154,6 @@ private: TVector<ui32> KeyIndices; TVector<TUpsertColumn> UpsertColumns; const TTypeEnvironment& Env; - TKqpTableStats& ShardTableStats; - TKqpTableStats& TaskTableStats; }; } // namespace @@ -226,7 +225,7 @@ IComputationNode* WrapKqpUpsertRows(TCallable& callable, const TComputationNodeF "upsert column type missmatch, column: " << tableColumn->Name); } - return new TKqpUpsertRowsWrapper(ctx.Mutables, computeCtx, tableId, + return new TKqpUpsertRowsWrapper(ctx.Mutables, tableId, LocateNode(ctx.NodeLocator, *rowsNode.GetNode()), std::move(rowTypes), std::move(keyIndices), std::move(upsertColumns), ctx.Env); } |