diff options
author | ssmike <ssmike@ydb.tech> | 2023-03-01 12:28:29 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-03-01 12:28:29 +0300 |
commit | 1b6baf206fce75465d8130c2a0aa15f30cf52625 (patch) | |
tree | 7e6c2deceb1dc3dd920ca484c9e76266f05af1a2 | |
parent | c16c4f061fb29203701994213c06150d0b3c5b27 (diff) | |
download | ydb-1b6baf206fce75465d8130c2a0aa15f30cf52625.tar.gz |
kqp counters for iterator reads
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.h | 12 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_service.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 26 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 26 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_factory.h | 3 |
12 files changed, 84 insertions, 22 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index d3213fe8d4..776403a499 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -50,10 +50,10 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput namespace NKqp { -NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory() { +NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCounters> counters) { auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>(); - RegisterStreamLookupActorFactory(*factory); - RegisterKqpReadActor(*factory); + RegisterStreamLookupActorFactory(*factory, counters); + RegisterKqpReadActor(*factory, counters); return factory; } diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index dc68e2ab93..2e99e5bfe6 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -52,7 +52,7 @@ IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, cons const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId); -NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(); +NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCounters> counters); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index d55e3d64b1..3ed8577d70 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -764,6 +764,18 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co ScanQueryRateLimitLatency = KqpGroup->GetHistogram( "ScanQuery/RateLimitLatency", NMonitoring::ExponentialHistogram(20, 2, 1)); + /* iterator reads */ + IteratorsShardResolve = KqpGroup->GetCounter("IteratorReads/ShardResolves", true); + IteratorsReadSplits = KqpGroup->GetCounter("IteratorReads/ReadSplits", true); + SentIteratorAcks = KqpGroup->GetCounter("IteratorReads/SentAcks", true); + SentIteratorCancels = KqpGroup->GetCounter("IteratorReads/SentCancels", true); + CreatedIterators = KqpGroup->GetCounter("IteratorReads/Created", true); + ReadActorsCount = KqpGroup->GetCounter("IteratorReads/ReadActorCount", false); + StreamLookupActorsCount = KqpGroup->GetCounter("IteratorReads/StreamLookupActorCount", false); + ReadActorRetries = KqpGroup->GetCounter("IteratorReads/Retries", false); + DataShardIteratorFails = KqpGroup->GetCounter("IteratorReads/DatashardFails", true); + DataShardIteratorMessages = KqpGroup->GetCounter("IteratorReads/DatashardMessages", true); + LiteralTxTotalTimeHistogram = KqpGroup->GetHistogram( "PhyTx/LiteralTxTotalTimeMs", NMonitoring::ExponentialHistogram(10, 2, 1)); DataTxTotalTimeHistogram = KqpGroup->GetHistogram( diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index cdac762964..e0dfa9b8ac 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -367,6 +367,18 @@ public: ::NMonitoring::TDynamicCounters::TCounterPtr ScanQueryShardResolve; NMonitoring::THistogramPtr ScanQueryRateLimitLatency; + // Iterator reads counters + ::NMonitoring::TDynamicCounters::TCounterPtr IteratorsShardResolve; + ::NMonitoring::TDynamicCounters::TCounterPtr IteratorsReadSplits; + ::NMonitoring::TDynamicCounters::TCounterPtr SentIteratorAcks; + ::NMonitoring::TDynamicCounters::TCounterPtr SentIteratorCancels; + ::NMonitoring::TDynamicCounters::TCounterPtr CreatedIterators; + ::NMonitoring::TDynamicCounters::TCounterPtr ReadActorsCount; + ::NMonitoring::TDynamicCounters::TCounterPtr StreamLookupActorsCount; + ::NMonitoring::TDynamicCounters::TCounterPtr ReadActorRetries; + ::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorFails; + ::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages; + // Physical tx duration NMonitoring::THistogramPtr LiteralTxTotalTimeHistogram; NMonitoring::THistogramPtr DataTxTotalTimeHistogram; diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index f92e5aa72c..ac47bc41ff 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1608,7 +1608,7 @@ private: return false; }; - auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), CreateKqpAsyncIoFactory(), + auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), CreateKqpAsyncIoFactory(Counters->Counters), AppData()->FunctionRegistry, settings, limits); auto computeActorId = Register(computeActor); task.ComputeActorId = computeActorId; diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 7c4bf8787f..c699ac001b 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -305,12 +305,12 @@ private: IActor* computeActor; if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) { computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask), - CreateKqpAsyncIoFactory(), AppData()->FunctionRegistry, runtimeSettings, memoryLimits, scanPolicy, + CreateKqpAsyncIoFactory(Counters), AppData()->FunctionRegistry, runtimeSettings, memoryLimits, scanPolicy, Counters, NWilson::TTraceId(ev->TraceId)); taskCtx.ComputeActorId = Register(computeActor); } else { if (Y_LIKELY(!CaFactory)) { - computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateKqpAsyncIoFactory(), + computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateKqpAsyncIoFactory(Counters), AppData()->FunctionRegistry, runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId)); taskCtx.ComputeActorId = Register(computeActor); } else { diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index c6880b9158..76d1c7210a 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -318,7 +318,8 @@ public: public: TKqpReadActor( NKikimrTxDataShard::TKqpReadRangesSourceSettings&& settings, - const NYql::NDq::TDqAsyncIoFactory::TSourceArguments& args) + const NYql::NDq::TDqAsyncIoFactory::TSourceArguments& args, + TIntrusivePtr<TKqpCounters> counters) : Settings(std::move(settings)) , LogPrefix(TStringBuilder() << "TxId: " << args.TxId << ", task: " << args.TaskId << ", CA Id " << args.ComputeActorId << ". ") , ComputeActorId(args.ComputeActorId) @@ -326,6 +327,7 @@ public: , TypeEnv(args.TypeEnv) , HolderFactory(args.HolderFactory) , Alloc(args.Alloc) + , Counters(counters) { TableId = TTableId( Settings.GetTable().GetTableId().GetOwnerId(), @@ -345,6 +347,7 @@ public: Settings.GetKeyColumnTypeInfos(i).GetPgTypeId() ) : nullptr)); } + Counters->ReadActorsCount->Inc(); } virtual ~TKqpReadActor() { @@ -370,6 +373,7 @@ public: } void Bootstrap() { + Counters->ReadActorsCount->Inc(); LogPrefix = TStringBuilder() << "SelfId: " << this->SelfId() << ", " << LogPrefix; Snapshot = IKqpGateway::TKqpSnapshot(Settings.GetSnapshot().GetStep(), Settings.GetSnapshot().GetTxId()); THolder<TShardState> stateHolder = MakeHolder<TShardState>(Settings.GetShardIdHint()); @@ -437,6 +441,7 @@ public: return; } + Counters->IteratorsShardResolve->Inc(); state->ResolveAttempt++; auto range = state->GetBounds(Settings.GetReverse()); @@ -588,6 +593,7 @@ public: } } + Counters->IteratorsReadSplits->Add(newShards.size() - 1); YQL_ENSURE(!newShards.empty()); if (Settings.GetReverse()) { for (size_t i = 0; i < newShards.size(); ++i) { @@ -650,6 +656,7 @@ public: } } + Counters->ReadActorRetries->Inc(); StartRead(state); } @@ -717,6 +724,7 @@ public: << " snapshot = (txid=" << Settings.GetSnapshot().GetTxId() << ",step=" << Settings.GetSnapshot().GetStep() << ")" << " lockTxId = " << Settings.GetLockTxId()); + Counters->CreatedIterators->Inc(); ReadIdByTabletId[state->TabletId].push_back(id); Send(::PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true), IEventHandle::FlagTrackDelivery); @@ -730,6 +738,11 @@ public: return; } + Counters->DataShardIteratorMessages->Inc(); + if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { + Counters->DataShardIteratorFails->Inc(); + } + for (auto& issue : record.GetStatus().GetIssues()) { CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage()); Reads[id].Shard->Issues.push_back(issue); @@ -982,6 +995,7 @@ public: if (limit) { request->Record.SetMaxRows(*limit); } + Counters->SentIteratorAcks->Inc(); CA_LOG_D("sending ack for read #" << id << " limit " << limit << " seqno = " << record.GetSeqNo()); Send(::PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true), IEventHandle::FlagTrackDelivery); @@ -1058,6 +1072,7 @@ public: if (!Reads[id]) { return; } + Counters->SentIteratorCancels->Inc(); auto* state = Reads[id].Shard; auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); cancel->Record.SetReadId(id); @@ -1065,6 +1080,7 @@ public: } void PassAway() override { + Counters->ReadActorsCount->Dec(); { auto guard = BindAllocator(); Results.clear(); @@ -1148,14 +1164,16 @@ private: const NMiniKQL::TTypeEnvironment& TypeEnv; const NMiniKQL::THolderFactory& HolderFactory; std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; + + TIntrusivePtr<TKqpCounters> Counters; }; -void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory) { +void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<TKqpCounters> counters) { factory.RegisterSource<NKikimrTxDataShard::TKqpReadRangesSourceSettings>( TString(NYql::KqpReadRangesSourceName), - [] (NKikimrTxDataShard::TKqpReadRangesSourceSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TSourceArguments&& args) { - auto* actor = new TKqpReadActor(std::move(settings), args); + [counters] (NKikimrTxDataShard::TKqpReadRangesSourceSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TSourceArguments&& args) { + auto* actor = new TKqpReadActor(std::move(settings), args, counters); return std::make_pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*>(actor, actor); }); } diff --git a/ydb/core/kqp/runtime/kqp_read_actor.h b/ydb/core/kqp/runtime/kqp_read_actor.h index ecc0bcd746..22c4e05d5c 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.h +++ b/ydb/core/kqp/runtime/kqp_read_actor.h @@ -1,5 +1,7 @@ #pragma once +#include <ydb/core/kqp/counters/kqp_counters.h> + #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> namespace NKikimrTxDataShard { @@ -10,7 +12,7 @@ class TEvReadAck; namespace NKikimr { namespace NKqp { -void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory); +void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory&, TIntrusivePtr<TKqpCounters>); void InjectRangeEvReadSettings(const NKikimrTxDataShard::TEvRead&); void InjectRangeEvReadAckSettings(const NKikimrTxDataShard::TEvReadAck&); diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 39cbd9c3b4..36d6c1629c 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -27,14 +27,16 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku public: TKqpStreamLookupActor(ui64 inputIndex, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, - std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings) + std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings, + TIntrusivePtr<TKqpCounters> counters) : InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv) , HolderFactory(holderFactory), Alloc(alloc), TablePath(settings.GetTable().GetPath()) , TableId(MakeTableId(settings.GetTable())) , Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId()) , LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>()) - , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) { - + , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) + , Counters(counters) + { KeyColumns.reserve(settings.GetKeyColumns().size()); i32 keyOrder = 0; for (const auto& keyColumn : settings.GetKeyColumns()) { @@ -75,6 +77,7 @@ public: } void Bootstrap() { + Counters->StreamLookupActorsCount->Inc(); ResolveTableShards(); Become(&TKqpStreamLookupActor::StateFunc); } @@ -179,10 +182,12 @@ private: } void PassAway() final { + Counters->StreamLookupActorsCount->Dec(); { auto alloc = BindAllocator(); Input.Clear(); for (auto& [id, state] : Reads) { + Counters->SentIteratorCancels->Inc(); auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); cancel->Record.SetReadId(id); Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state.ShardId)); @@ -277,6 +282,11 @@ private: Locks.push_back(lock); } + Counters->DataShardIteratorMessages->Inc(); + if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { + Counters->DataShardIteratorFails->Inc(); + } + switch (record.GetStatus().GetCode()) { case Ydb::StatusIds::SUCCESS: break; @@ -298,6 +308,7 @@ private: if (record.GetFinished()) { read.SetFinished(); } else { + Counters->SentIteratorAcks->Inc(); THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck()); request->Record.SetReadId(record.GetReadId()); request->Record.SetSeqNo(record.GetSeqNo()); @@ -488,6 +499,7 @@ private: const auto readId = GetNextReadId(); TReadState read(readId, shardId, std::move(keys)); + Counters->CreatedIterators->Inc(); THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead()); auto& record = request->Record; @@ -564,6 +576,7 @@ private: request->ResultSet.emplace_back(MakeHolder<TKeyDesc>(TableId, range, TKeyDesc::ERowOperation::Read, keyColumnTypes, TVector<TKeyDesc::TColumnOp>{})); + Counters->IteratorsShardResolve->Inc(); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {})); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request)); @@ -628,6 +641,8 @@ private: // stats ui64 ReadRowsCount = 0; ui64 ReadBytesCount = 0; + + TIntrusivePtr<TKqpCounters> Counters; }; } // namespace @@ -635,9 +650,10 @@ private: std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, - NKikimrKqp::TKqpStreamLookupSettings&& settings) { + NKikimrKqp::TKqpStreamLookupSettings&& settings, + TIntrusivePtr<TKqpCounters> counters) { auto actor = new TKqpStreamLookupActor(inputIndex, input, computeActorId, typeEnv, holderFactory, alloc, - std::move(settings)); + std::move(settings), counters); return {actor, actor}; } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h index 51205c88ba..29fe5d70d8 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h @@ -1,5 +1,6 @@ #pragma once +#include <ydb/core/kqp/counters/kqp_counters.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/core/protos/kqp.pb.h> @@ -9,7 +10,7 @@ namespace NKqp { std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, - NKikimrKqp::TKqpStreamLookupSettings&& settings); + NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters>); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp index df6b8d5cfb..59ee432d95 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp @@ -4,11 +4,11 @@ namespace NKikimr { namespace NKqp { -void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory) { - factory.RegisterInputTransform<NKikimrKqp::TKqpStreamLookupSettings>("StreamLookupInputTransformer", [](NKikimrKqp::TKqpStreamLookupSettings&& settings, +void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<TKqpCounters> counters) { + factory.RegisterInputTransform<NKikimrKqp::TKqpStreamLookupSettings>("StreamLookupInputTransformer", [counters](NKikimrKqp::TKqpStreamLookupSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TInputTransformArguments&& args) { return CreateStreamLookupActor(args.InputIndex, args.TransformInput, args.ComputeActorId, args.TypeEnv, - args.HolderFactory, args.Alloc, std::move(settings)); + args.HolderFactory, args.Alloc, std::move(settings), counters); }); } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.h b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.h index 1eded0576b..db49e005fa 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.h +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.h @@ -1,11 +1,12 @@ #pragma once +#include <ydb/core/kqp/counters/kqp_counters.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> namespace NKikimr { namespace NKqp { -void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory); +void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<NKqp::TKqpCounters>); } // namespace NKqp } // namespace NKikimr |