aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-03-01 12:28:29 +0300
committerssmike <ssmike@ydb.tech>2023-03-01 12:28:29 +0300
commit1b6baf206fce75465d8130c2a0aa15f30cf52625 (patch)
tree7e6c2deceb1dc3dd920ca484c9e76266f05af1a2
parentc16c4f061fb29203701994213c06150d0b3c5b27 (diff)
downloadydb-1b6baf206fce75465d8130c2a0aa15f30cf52625.tar.gz
kqp counters for iterator reads
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.cpp6
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h2
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp12
-rw-r--r--ydb/core/kqp/counters/kqp_counters.h12
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp2
-rw-r--r--ydb/core/kqp/node_service/kqp_node_service.cpp4
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp26
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.h4
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp26
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.h3
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp6
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_factory.h3
12 files changed, 84 insertions, 22 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
index d3213fe8d4..776403a499 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
@@ -50,10 +50,10 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput
namespace NKqp {
-NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory() {
+NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCounters> counters) {
auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
- RegisterStreamLookupActorFactory(*factory);
- RegisterKqpReadActor(*factory);
+ RegisterStreamLookupActorFactory(*factory, counters);
+ RegisterKqpReadActor(*factory, counters);
return factory;
}
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index dc68e2ab93..2e99e5bfe6 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -52,7 +52,7 @@ IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, cons
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
-NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory();
+NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCounters> counters);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index d55e3d64b1..3ed8577d70 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -764,6 +764,18 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
ScanQueryRateLimitLatency = KqpGroup->GetHistogram(
"ScanQuery/RateLimitLatency", NMonitoring::ExponentialHistogram(20, 2, 1));
+ /* iterator reads */
+ IteratorsShardResolve = KqpGroup->GetCounter("IteratorReads/ShardResolves", true);
+ IteratorsReadSplits = KqpGroup->GetCounter("IteratorReads/ReadSplits", true);
+ SentIteratorAcks = KqpGroup->GetCounter("IteratorReads/SentAcks", true);
+ SentIteratorCancels = KqpGroup->GetCounter("IteratorReads/SentCancels", true);
+ CreatedIterators = KqpGroup->GetCounter("IteratorReads/Created", true);
+ ReadActorsCount = KqpGroup->GetCounter("IteratorReads/ReadActorCount", false);
+ StreamLookupActorsCount = KqpGroup->GetCounter("IteratorReads/StreamLookupActorCount", false);
+ ReadActorRetries = KqpGroup->GetCounter("IteratorReads/Retries", false);
+ DataShardIteratorFails = KqpGroup->GetCounter("IteratorReads/DatashardFails", true);
+ DataShardIteratorMessages = KqpGroup->GetCounter("IteratorReads/DatashardMessages", true);
+
LiteralTxTotalTimeHistogram = KqpGroup->GetHistogram(
"PhyTx/LiteralTxTotalTimeMs", NMonitoring::ExponentialHistogram(10, 2, 1));
DataTxTotalTimeHistogram = KqpGroup->GetHistogram(
diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h
index cdac762964..e0dfa9b8ac 100644
--- a/ydb/core/kqp/counters/kqp_counters.h
+++ b/ydb/core/kqp/counters/kqp_counters.h
@@ -367,6 +367,18 @@ public:
::NMonitoring::TDynamicCounters::TCounterPtr ScanQueryShardResolve;
NMonitoring::THistogramPtr ScanQueryRateLimitLatency;
+ // Iterator reads counters
+ ::NMonitoring::TDynamicCounters::TCounterPtr IteratorsShardResolve;
+ ::NMonitoring::TDynamicCounters::TCounterPtr IteratorsReadSplits;
+ ::NMonitoring::TDynamicCounters::TCounterPtr SentIteratorAcks;
+ ::NMonitoring::TDynamicCounters::TCounterPtr SentIteratorCancels;
+ ::NMonitoring::TDynamicCounters::TCounterPtr CreatedIterators;
+ ::NMonitoring::TDynamicCounters::TCounterPtr ReadActorsCount;
+ ::NMonitoring::TDynamicCounters::TCounterPtr StreamLookupActorsCount;
+ ::NMonitoring::TDynamicCounters::TCounterPtr ReadActorRetries;
+ ::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorFails;
+ ::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages;
+
// Physical tx duration
NMonitoring::THistogramPtr LiteralTxTotalTimeHistogram;
NMonitoring::THistogramPtr DataTxTotalTimeHistogram;
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index f92e5aa72c..ac47bc41ff 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1608,7 +1608,7 @@ private:
return false;
};
- auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), CreateKqpAsyncIoFactory(),
+ auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), CreateKqpAsyncIoFactory(Counters->Counters),
AppData()->FunctionRegistry, settings, limits);
auto computeActorId = Register(computeActor);
task.ComputeActorId = computeActorId;
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp
index 7c4bf8787f..c699ac001b 100644
--- a/ydb/core/kqp/node_service/kqp_node_service.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_service.cpp
@@ -305,12 +305,12 @@ private:
IActor* computeActor;
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask),
- CreateKqpAsyncIoFactory(), AppData()->FunctionRegistry, runtimeSettings, memoryLimits, scanPolicy,
+ CreateKqpAsyncIoFactory(Counters), AppData()->FunctionRegistry, runtimeSettings, memoryLimits, scanPolicy,
Counters, NWilson::TTraceId(ev->TraceId));
taskCtx.ComputeActorId = Register(computeActor);
} else {
if (Y_LIKELY(!CaFactory)) {
- computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateKqpAsyncIoFactory(),
+ computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateKqpAsyncIoFactory(Counters),
AppData()->FunctionRegistry, runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId));
taskCtx.ComputeActorId = Register(computeActor);
} else {
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index c6880b9158..76d1c7210a 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -318,7 +318,8 @@ public:
public:
TKqpReadActor(
NKikimrTxDataShard::TKqpReadRangesSourceSettings&& settings,
- const NYql::NDq::TDqAsyncIoFactory::TSourceArguments& args)
+ const NYql::NDq::TDqAsyncIoFactory::TSourceArguments& args,
+ TIntrusivePtr<TKqpCounters> counters)
: Settings(std::move(settings))
, LogPrefix(TStringBuilder() << "TxId: " << args.TxId << ", task: " << args.TaskId << ", CA Id " << args.ComputeActorId << ". ")
, ComputeActorId(args.ComputeActorId)
@@ -326,6 +327,7 @@ public:
, TypeEnv(args.TypeEnv)
, HolderFactory(args.HolderFactory)
, Alloc(args.Alloc)
+ , Counters(counters)
{
TableId = TTableId(
Settings.GetTable().GetTableId().GetOwnerId(),
@@ -345,6 +347,7 @@ public:
Settings.GetKeyColumnTypeInfos(i).GetPgTypeId()
) : nullptr));
}
+ Counters->ReadActorsCount->Inc();
}
virtual ~TKqpReadActor() {
@@ -370,6 +373,7 @@ public:
}
void Bootstrap() {
+ Counters->ReadActorsCount->Inc();
LogPrefix = TStringBuilder() << "SelfId: " << this->SelfId() << ", " << LogPrefix;
Snapshot = IKqpGateway::TKqpSnapshot(Settings.GetSnapshot().GetStep(), Settings.GetSnapshot().GetTxId());
THolder<TShardState> stateHolder = MakeHolder<TShardState>(Settings.GetShardIdHint());
@@ -437,6 +441,7 @@ public:
return;
}
+ Counters->IteratorsShardResolve->Inc();
state->ResolveAttempt++;
auto range = state->GetBounds(Settings.GetReverse());
@@ -588,6 +593,7 @@ public:
}
}
+ Counters->IteratorsReadSplits->Add(newShards.size() - 1);
YQL_ENSURE(!newShards.empty());
if (Settings.GetReverse()) {
for (size_t i = 0; i < newShards.size(); ++i) {
@@ -650,6 +656,7 @@ public:
}
}
+ Counters->ReadActorRetries->Inc();
StartRead(state);
}
@@ -717,6 +724,7 @@ public:
<< " snapshot = (txid=" << Settings.GetSnapshot().GetTxId() << ",step=" << Settings.GetSnapshot().GetStep() << ")"
<< " lockTxId = " << Settings.GetLockTxId());
+ Counters->CreatedIterators->Inc();
ReadIdByTabletId[state->TabletId].push_back(id);
Send(::PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true),
IEventHandle::FlagTrackDelivery);
@@ -730,6 +738,11 @@ public:
return;
}
+ Counters->DataShardIteratorMessages->Inc();
+ if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) {
+ Counters->DataShardIteratorFails->Inc();
+ }
+
for (auto& issue : record.GetStatus().GetIssues()) {
CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage());
Reads[id].Shard->Issues.push_back(issue);
@@ -982,6 +995,7 @@ public:
if (limit) {
request->Record.SetMaxRows(*limit);
}
+ Counters->SentIteratorAcks->Inc();
CA_LOG_D("sending ack for read #" << id << " limit " << limit << " seqno = " << record.GetSeqNo());
Send(::PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true),
IEventHandle::FlagTrackDelivery);
@@ -1058,6 +1072,7 @@ public:
if (!Reads[id]) {
return;
}
+ Counters->SentIteratorCancels->Inc();
auto* state = Reads[id].Shard;
auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
cancel->Record.SetReadId(id);
@@ -1065,6 +1080,7 @@ public:
}
void PassAway() override {
+ Counters->ReadActorsCount->Dec();
{
auto guard = BindAllocator();
Results.clear();
@@ -1148,14 +1164,16 @@ private:
const NMiniKQL::TTypeEnvironment& TypeEnv;
const NMiniKQL::THolderFactory& HolderFactory;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
+
+ TIntrusivePtr<TKqpCounters> Counters;
};
-void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory) {
+void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<TKqpCounters> counters) {
factory.RegisterSource<NKikimrTxDataShard::TKqpReadRangesSourceSettings>(
TString(NYql::KqpReadRangesSourceName),
- [] (NKikimrTxDataShard::TKqpReadRangesSourceSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TSourceArguments&& args) {
- auto* actor = new TKqpReadActor(std::move(settings), args);
+ [counters] (NKikimrTxDataShard::TKqpReadRangesSourceSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TSourceArguments&& args) {
+ auto* actor = new TKqpReadActor(std::move(settings), args, counters);
return std::make_pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*>(actor, actor);
});
}
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.h b/ydb/core/kqp/runtime/kqp_read_actor.h
index ecc0bcd746..22c4e05d5c 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.h
+++ b/ydb/core/kqp/runtime/kqp_read_actor.h
@@ -1,5 +1,7 @@
#pragma once
+#include <ydb/core/kqp/counters/kqp_counters.h>
+
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
namespace NKikimrTxDataShard {
@@ -10,7 +12,7 @@ class TEvReadAck;
namespace NKikimr {
namespace NKqp {
-void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory);
+void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory&, TIntrusivePtr<TKqpCounters>);
void InjectRangeEvReadSettings(const NKikimrTxDataShard::TEvRead&);
void InjectRangeEvReadAckSettings(const NKikimrTxDataShard::TEvReadAck&);
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 39cbd9c3b4..36d6c1629c 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -27,14 +27,16 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
public:
TKqpStreamLookupActor(ui64 inputIndex, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId,
const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
- std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings)
+ std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings,
+ TIntrusivePtr<TKqpCounters> counters)
: InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv)
, HolderFactory(holderFactory), Alloc(alloc), TablePath(settings.GetTable().GetPath())
, TableId(MakeTableId(settings.GetTable()))
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
- , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) {
-
+ , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
+ , Counters(counters)
+ {
KeyColumns.reserve(settings.GetKeyColumns().size());
i32 keyOrder = 0;
for (const auto& keyColumn : settings.GetKeyColumns()) {
@@ -75,6 +77,7 @@ public:
}
void Bootstrap() {
+ Counters->StreamLookupActorsCount->Inc();
ResolveTableShards();
Become(&TKqpStreamLookupActor::StateFunc);
}
@@ -179,10 +182,12 @@ private:
}
void PassAway() final {
+ Counters->StreamLookupActorsCount->Dec();
{
auto alloc = BindAllocator();
Input.Clear();
for (auto& [id, state] : Reads) {
+ Counters->SentIteratorCancels->Inc();
auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
cancel->Record.SetReadId(id);
Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state.ShardId));
@@ -277,6 +282,11 @@ private:
Locks.push_back(lock);
}
+ Counters->DataShardIteratorMessages->Inc();
+ if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) {
+ Counters->DataShardIteratorFails->Inc();
+ }
+
switch (record.GetStatus().GetCode()) {
case Ydb::StatusIds::SUCCESS:
break;
@@ -298,6 +308,7 @@ private:
if (record.GetFinished()) {
read.SetFinished();
} else {
+ Counters->SentIteratorAcks->Inc();
THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck());
request->Record.SetReadId(record.GetReadId());
request->Record.SetSeqNo(record.GetSeqNo());
@@ -488,6 +499,7 @@ private:
const auto readId = GetNextReadId();
TReadState read(readId, shardId, std::move(keys));
+ Counters->CreatedIterators->Inc();
THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead());
auto& record = request->Record;
@@ -564,6 +576,7 @@ private:
request->ResultSet.emplace_back(MakeHolder<TKeyDesc>(TableId, range, TKeyDesc::ERowOperation::Read,
keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));
+ Counters->IteratorsShardResolve->Inc();
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
@@ -628,6 +641,8 @@ private:
// stats
ui64 ReadRowsCount = 0;
ui64 ReadBytesCount = 0;
+
+ TIntrusivePtr<TKqpCounters> Counters;
};
} // namespace
@@ -635,9 +650,10 @@ private:
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex,
const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv,
const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc,
- NKikimrKqp::TKqpStreamLookupSettings&& settings) {
+ NKikimrKqp::TKqpStreamLookupSettings&& settings,
+ TIntrusivePtr<TKqpCounters> counters) {
auto actor = new TKqpStreamLookupActor(inputIndex, input, computeActorId, typeEnv, holderFactory, alloc,
- std::move(settings));
+ std::move(settings), counters);
return {actor, actor};
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h
index 51205c88ba..29fe5d70d8 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h
@@ -1,5 +1,6 @@
#pragma once
+#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/core/protos/kqp.pb.h>
@@ -9,7 +10,7 @@ namespace NKqp {
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex,
const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv,
const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc,
- NKikimrKqp::TKqpStreamLookupSettings&& settings);
+ NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters>);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
index df6b8d5cfb..59ee432d95 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
@@ -4,11 +4,11 @@
namespace NKikimr {
namespace NKqp {
-void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory) {
- factory.RegisterInputTransform<NKikimrKqp::TKqpStreamLookupSettings>("StreamLookupInputTransformer", [](NKikimrKqp::TKqpStreamLookupSettings&& settings,
+void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<TKqpCounters> counters) {
+ factory.RegisterInputTransform<NKikimrKqp::TKqpStreamLookupSettings>("StreamLookupInputTransformer", [counters](NKikimrKqp::TKqpStreamLookupSettings&& settings,
NYql::NDq::TDqAsyncIoFactory::TInputTransformArguments&& args) {
return CreateStreamLookupActor(args.InputIndex, args.TransformInput, args.ComputeActorId, args.TypeEnv,
- args.HolderFactory, args.Alloc, std::move(settings));
+ args.HolderFactory, args.Alloc, std::move(settings), counters);
});
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.h b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.h
index 1eded0576b..db49e005fa 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.h
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.h
@@ -1,11 +1,12 @@
#pragma once
+#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
namespace NKikimr {
namespace NKqp {
-void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory);
+void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<NKqp::TKqpCounters>);
} // namespace NKqp
} // namespace NKikimr