aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2022-12-20 14:38:56 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2022-12-20 14:38:56 +0300
commitdb4782a4b022421868467dbc9218dac235d9d615 (patch)
tree9969c198d80211eaafc1190185f2e07a619e6486
parent0b3d10fb0817ad9b03497ef999ffc235ddac29fe (diff)
downloadydb-db4782a4b022421868467dbc9218dac235d9d615.tar.gz
Make KqpUpsertRows and KqpDeleteRows suitable for cache
-rw-r--r--ydb/core/kqp/runtime/kqp_tasks_runner.cpp7
-rw-r--r--ydb/core/kqp/runtime/kqp_tasks_runner.h2
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.h2
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_delete_rows.cpp19
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp25
6 files changed, 33 insertions, 23 deletions
diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
index 223feb970a..677c1a59f2 100644
--- a/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
+++ b/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
@@ -81,6 +81,8 @@ TKqpTasksRunner::TKqpTasksRunner(const google::protobuf::RepeatedPtrField<NDqPro
YQL_ENSURE(execCtx.Alloc);
YQL_ENSURE(execCtx.TypeEnv);
+ ApplyCtx = dynamic_cast<NMiniKQL::TKqpDatashardApplyContext *>(execCtx.ApplyCtx);
+ YQL_ENSURE(ApplyCtx);
ComputeCtx = dynamic_cast<NMiniKQL::TKqpComputeContextBase*>(execCtx.ComputeCtx);
YQL_ENSURE(ComputeCtx);
@@ -132,6 +134,8 @@ ERunStatus TKqpTasksRunner::Run(bool applyEffects) {
bool hasPendingInputTasks = false;
bool hasPendingOutputTasks = false;
+ // for per-task statistics in KqpUpsertRows and KqpDeleteRows computation nodes
+ auto dsCtx = dynamic_cast<TKqpDatashardComputeContext*>(ComputeCtx);
for (auto& [taskId, task] : TaskRunners) {
if (Y_UNLIKELY(LogFunc)) {
LogFunc(TStringBuilder() << "running task: " << taskId);
@@ -142,6 +146,9 @@ ERunStatus TKqpTasksRunner::Run(bool applyEffects) {
continue;
}
+ if (dsCtx) {
+ ApplyCtx->TaskTableStats = &dsCtx->GetTaskCounters(taskId);
+ }
auto status = task->Run();
switch (status) {
diff --git a/ydb/core/kqp/runtime/kqp_tasks_runner.h b/ydb/core/kqp/runtime/kqp_tasks_runner.h
index 57c4763b25..b0e52743b6 100644
--- a/ydb/core/kqp/runtime/kqp_tasks_runner.h
+++ b/ydb/core/kqp/runtime/kqp_tasks_runner.h
@@ -1,6 +1,7 @@
#pragma once
#include "kqp_compute.h"
+#include "ydb/core/tx/datashard/datashard_kqp_compute.h"
#include <ydb/library/yql/dq/runtime/dq_tasks_runner.h>
namespace NKikimr {
@@ -58,6 +59,7 @@ private:
NYql::NDq::TLogFunc LogFunc;
NMiniKQL::TScopedAlloc* Alloc;
NMiniKQL::TKqpComputeContextBase* ComputeCtx;
+ NMiniKQL::TKqpDatashardApplyContext* ApplyCtx;
enum EState {
Initial = 0,
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index eebeb026f8..df8a320058 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -858,6 +858,7 @@ TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActor
auto kqpApplyCtx = MakeHolder<TKqpDatashardApplyContext>();
kqpApplyCtx->Host = EngineHost.Get();
+ kqpApplyCtx->ShardTableStats = &ComputeCtx->GetDatashardCounters();
KqpApplyCtx.Reset(kqpApplyCtx.Release());
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.h b/ydb/core/tx/datashard/datashard_kqp_compute.h
index de6d81372a..0287e896af 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.h
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.h
@@ -122,6 +122,8 @@ private:
class TKqpDatashardApplyContext : public NUdf::IApplyContext {
public:
IEngineFlatHost* Host = nullptr;
+ TKqpTableStats* ShardTableStats = nullptr;
+ TKqpTableStats* TaskTableStats = nullptr;
};
IComputationNode* WrapKqpWideReadTableRanges(TCallable& callable, const TComputationNodeFactoryContext& ctx,
diff --git a/ydb/core/tx/datashard/datashard_kqp_delete_rows.cpp b/ydb/core/tx/datashard/datashard_kqp_delete_rows.cpp
index f73be2d14a..6bd478dcd6 100644
--- a/ydb/core/tx/datashard/datashard_kqp_delete_rows.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_delete_rows.cpp
@@ -47,13 +47,15 @@ public:
if (!engineCtx.Host->IsMyKey(Owner.TableId, keyTuple)) {
return;
}
+ Y_VERIFY(engineCtx.ShardTableStats);
+ Y_VERIFY(engineCtx.TaskTableStats);
- ui64 nEraseRow = Owner.ShardTableStats.NEraseRow;
+ ui64 nEraseRow = engineCtx.ShardTableStats->NEraseRow;
engineCtx.Host->EraseRow(Owner.TableId, keyTuple);
- if (i64 delta = Owner.ShardTableStats.NEraseRow - nEraseRow; delta > 0) {
- Owner.TaskTableStats.NEraseRow += delta;
+ if (i64 delta = engineCtx.ShardTableStats->NEraseRow - nEraseRow; delta > 0) {
+ engineCtx.TaskTableStats->NEraseRow += delta;
}
};
@@ -93,16 +95,15 @@ public:
}
public:
- TKqpDeleteRowsWrapper(TComputationMutables& mutables, TKqpDatashardComputeContext& computeCtx,
- const TTableId& tableId, IComputationNode* rowsNode, TVector<NScheme::TTypeInfo> rowTypes, TVector<ui32> keyIndices, const TTypeEnvironment& env)
+ TKqpDeleteRowsWrapper(TComputationMutables& mutables, const TTableId& tableId, IComputationNode* rowsNode,
+ TVector<NScheme::TTypeInfo> rowTypes, TVector<ui32> keyIndices, const TTypeEnvironment& env)
: TBase(mutables)
, TableId(tableId)
, RowsNode(rowsNode)
, RowTypes(std::move(rowTypes))
, KeyIndices(std::move(keyIndices))
, Env(env)
- , ShardTableStats(computeCtx.GetDatashardCounters())
- , TaskTableStats(computeCtx.GetTaskCounters(computeCtx.GetCurrentTaskId())) {}
+ {}
private:
void RegisterDependencies() const final {
@@ -115,8 +116,6 @@ private:
const TVector<NScheme::TTypeInfo> RowTypes;
const TVector<ui32> KeyIndices;
const TTypeEnvironment& Env;
- TKqpTableStats& ShardTableStats;
- TKqpTableStats& TaskTableStats;
};
} // namespace
@@ -167,7 +166,7 @@ IComputationNode* WrapKqpDeleteRows(TCallable& callable, const TComputationNodeF
keyIndices[i] = it->second;
}
- return new TKqpDeleteRowsWrapper(ctx.Mutables, computeCtx, tableId,
+ return new TKqpDeleteRowsWrapper(ctx.Mutables, tableId,
LocateNode(ctx.NodeLocator, *rowsNode.GetNode()), std::move(rowTypes), std::move(keyIndices), ctx.Env);
}
diff --git a/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp b/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp
index 5abccb769c..d6a95c1eb4 100644
--- a/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp
@@ -80,15 +80,17 @@ public:
commands.emplace_back(std::move(command));
}
+ Y_VERIFY(dsApplyCtx.ShardTableStats);
+ Y_VERIFY(dsApplyCtx.TaskTableStats);
- ui64 nUpdateRow = Owner.ShardTableStats.NUpdateRow;
- ui64 updateRowBytes = Owner.ShardTableStats.UpdateRowBytes;
+ ui64 nUpdateRow = dsApplyCtx.ShardTableStats->NUpdateRow;
+ ui64 updateRowBytes = dsApplyCtx.ShardTableStats->UpdateRowBytes;
dsApplyCtx.Host->UpdateRow(Owner.TableId, keyTuple, commands);
- if (i64 delta = Owner.ShardTableStats.NUpdateRow - nUpdateRow; delta > 0) {
- Owner.TaskTableStats.NUpdateRow += delta;
- Owner.TaskTableStats.UpdateRowBytes += Owner.ShardTableStats.UpdateRowBytes - updateRowBytes;
+ if (i64 delta = dsApplyCtx.ShardTableStats->NUpdateRow - nUpdateRow; delta > 0) {
+ dsApplyCtx.TaskTableStats->NUpdateRow += delta;
+ dsApplyCtx.TaskTableStats->UpdateRowBytes += dsApplyCtx.ShardTableStats->UpdateRowBytes - updateRowBytes;
}
};
@@ -128,9 +130,9 @@ public:
}
public:
- TKqpUpsertRowsWrapper(TComputationMutables& mutables, TKqpDatashardComputeContext& computeCtx,
- const TTableId& tableId, IComputationNode* rowsNode, TVector<NScheme::TTypeInfo>&& rowTypes,
- TVector<ui32>&& keyIndices, TVector<TUpsertColumn>&& upsertColumns, const TTypeEnvironment& env)
+ TKqpUpsertRowsWrapper(TComputationMutables& mutables, const TTableId& tableId, IComputationNode* rowsNode,
+ TVector<NScheme::TTypeInfo>&& rowTypes, TVector<ui32>&& keyIndices,
+ TVector<TUpsertColumn>&& upsertColumns, const TTypeEnvironment& env)
: TBase(mutables)
, TableId(tableId)
, RowsNode(rowsNode)
@@ -138,8 +140,7 @@ public:
, KeyIndices(std::move(keyIndices))
, UpsertColumns(std::move(upsertColumns))
, Env(env)
- , ShardTableStats(computeCtx.GetDatashardCounters())
- , TaskTableStats(computeCtx.GetTaskCounters(computeCtx.GetCurrentTaskId())) {}
+ {}
private:
void RegisterDependencies() const final {
@@ -153,8 +154,6 @@ private:
TVector<ui32> KeyIndices;
TVector<TUpsertColumn> UpsertColumns;
const TTypeEnvironment& Env;
- TKqpTableStats& ShardTableStats;
- TKqpTableStats& TaskTableStats;
};
} // namespace
@@ -226,7 +225,7 @@ IComputationNode* WrapKqpUpsertRows(TCallable& callable, const TComputationNodeF
"upsert column type missmatch, column: " << tableColumn->Name);
}
- return new TKqpUpsertRowsWrapper(ctx.Mutables, computeCtx, tableId,
+ return new TKqpUpsertRowsWrapper(ctx.Mutables, tableId,
LocateNode(ctx.NodeLocator, *rowsNode.GetNode()), std::move(rowTypes), std::move(keyIndices),
std::move(upsertColumns), ctx.Env);
}