aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorMaxim Yurchuk <maxim-yurchuk@ydb.tech>2024-12-12 15:00:43 +0000
committerGitHub <noreply@github.com>2024-12-12 15:00:43 +0000
commit42701242eaf5be980cb935631586d0e90b82641c (patch)
tree6dbf5fcd37d3c16591e196c4a69d166e3ab3a398 /yt
parent7f5a9f394dbd9ac290cabbb7977538656b3a541e (diff)
parentf7c04b5876af3d16849ab5e3079c0eabbd4e3a00 (diff)
downloadydb-42701242eaf5be980cb935631586d0e90b82641c.tar.gz
Merge pull request #12554 from vitalyisaev2/YQ-3839.with_rightlib.3
Import from Arcadia + YDB FQ: turning gateways_config.proto into a file without external dependencies
Diffstat (limited to 'yt')
-rw-r--r--yt/yql/providers/yt/common/yql_yt_settings.cpp1
-rw-r--r--yt/yql/providers/yt/common/yql_yt_settings.h1
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp1
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h2
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp342
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_datasource.cpp5
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_logical_optimize.cpp3
-rw-r--r--yt/yt/core/misc/async_slru_cache-inl.h61
-rw-r--r--yt/yt/core/misc/async_slru_cache.h10
-rw-r--r--yt/yt/core/misc/ref_counted_tracker_profiler.cpp4
-rw-r--r--yt/yt/core/misc/unittests/async_slru_cache_ut.cpp99
-rw-r--r--yt/yt/library/profiling/example/main.cpp10
-rw-r--r--yt/yt/library/profiling/perf/counters_other.cpp27
-rw-r--r--yt/yt/library/profiling/perf/event_counter.h (renamed from yt/yt/library/profiling/perf/counters.h)19
-rw-r--r--yt/yt/library/profiling/perf/event_counter_dummy.cpp24
-rw-r--r--yt/yt/library/profiling/perf/event_counter_linux.cpp (renamed from yt/yt/library/profiling/perf/counters.cpp)166
-rw-r--r--yt/yt/library/profiling/perf/event_counter_profiler.cpp69
-rw-r--r--yt/yt/library/profiling/perf/event_counter_profiler.h11
-rw-r--r--yt/yt/library/profiling/perf/unittests/perf_event_counter_ut.cpp (renamed from yt/yt/library/profiling/unittests/perf_counter_ut.cpp)37
-rw-r--r--yt/yt/library/profiling/perf/unittests/ya.make17
-rw-r--r--yt/yt/library/profiling/perf/ya.make8
-rw-r--r--yt/yt/library/profiling/unittests/ya.make1
-rw-r--r--yt/yt/library/profiling/ya.make1
-rw-r--r--yt/yt/library/program/helpers.cpp4
24 files changed, 627 insertions, 296 deletions
diff --git a/yt/yql/providers/yt/common/yql_yt_settings.cpp b/yt/yql/providers/yt/common/yql_yt_settings.cpp
index d74a0e6044..c6ea9bc3a3 100644
--- a/yt/yql/providers/yt/common/yql_yt_settings.cpp
+++ b/yt/yql/providers/yt/common/yql_yt_settings.cpp
@@ -510,6 +510,7 @@ TYtConfiguration::TYtConfiguration(TTypeAnnotationContext& typeCtx)
REGISTER_SETTING(*this, ExtendedStatsMaxChunkCount);
REGISTER_SETTING(*this, JobBlockInput);
REGISTER_SETTING(*this, _EnableYtDqProcessWriteConstraints);
+ REGISTER_SETTING(*this, CompactForDistinct);
}
EReleaseTempDataMode GetReleaseTempDataMode(const TYtSettings& settings) {
diff --git a/yt/yql/providers/yt/common/yql_yt_settings.h b/yt/yql/providers/yt/common/yql_yt_settings.h
index 724f9fb7f3..27378b3ddb 100644
--- a/yt/yql/providers/yt/common/yql_yt_settings.h
+++ b/yt/yql/providers/yt/common/yql_yt_settings.h
@@ -287,6 +287,7 @@ struct TYtSettings {
NCommon::TConfSetting<ui64, false> ExtendedStatsMaxChunkCount;
NCommon::TConfSetting<bool, false> JobBlockInput;
NCommon::TConfSetting<bool, false> _EnableYtDqProcessWriteConstraints;
+ NCommon::TConfSetting<bool, false> CompactForDistinct;
};
EReleaseTempDataMode GetReleaseTempDataMode(const TYtSettings& settings);
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp
index 51c9d75136..1107e0210d 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp
@@ -78,6 +78,7 @@ TYtPhysicalOptProposalTransformer::TYtPhysicalOptProposalTransformer(TYtState::T
AddHandler(1, &TYtMerge::Match, HNDL(PushMergeLimitToInput));
if (!State_->Configuration->DisableFuseOperations.Get().GetOrElse(DEFAULT_DISABLE_FUSE_OPERATIONS)) {
AddHandler(1, &TYtReduce::Match, HNDL(FuseReduce));
+ AddHandler(1, &TYtReduce::Match, HNDL(FuseReduceWithTrivialMap));
}
AddHandler(2, &TYtEquiJoin::Match, HNDL(RuntimeEquiJoin));
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h
index 95f9435922..67e68ea979 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h
@@ -76,6 +76,8 @@ private:
NNodes::TMaybeNode<NNodes::TExprBase> FuseReduce(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const;
+ NNodes::TMaybeNode<NNodes::TExprBase> FuseReduceWithTrivialMap(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const;
+
NNodes::TMaybeNode<NNodes::TExprBase> FuseInnerMap(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const;
NNodes::TMaybeNode<NNodes::TExprBase> FuseOuterMap(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const;
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp
index 96fe6dab33..7753c648f2 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp
@@ -264,6 +264,348 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::FuseReduce(TExprBase no
.Done();
}
+TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::FuseReduceWithTrivialMap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
+ const EYtSettingTypes acceptedReduceSettings =
+ EYtSettingType::ReduceBy
+ | EYtSettingType::Limit
+ | EYtSettingType::SortLimitBy
+ | EYtSettingType::SortBy
+ // | EYtSettingType::JoinReduce
+ // | EYtSettingType::FirstAsPrimary
+ | EYtSettingType::Flow
+ | EYtSettingType::KeepSorted
+ | EYtSettingType::KeySwitch
+ // | EYtSettingType::ReduceInputType
+ | EYtSettingType::NoDq;
+
+ const EYtSettingTypes acceptedMapSettings =
+ EYtSettingType::Ordered
+ //| EYtSettingType::Limit
+ //| EYtSettingType::SortLimitBy
+ //| EYtSettingType::WeakFields
+ //| EYtSettingType::Sharded
+ //| EYtSettingType::JobCount
+ | EYtSettingType::Flow
+ | EYtSettingType::KeepSorted
+ | EYtSettingType::NoDq
+ //| EYtSettingType::BlockInputReady
+ //| EYtSettingType::BlockInputApplied
+ ;
+
+ auto outerReduce = node.Cast<TYtReduce>();
+ if (NYql::HasSettingsExcept(outerReduce.Settings().Ref(), acceptedReduceSettings)) {
+ return node;
+ }
+
+ const bool hasKeySwitch = NYql::HasSetting(outerReduce.Settings().Ref(), EYtSettingType::KeySwitch);
+ const bool isFlow = NYql::HasSetting(outerReduce.Settings().Ref(), EYtSettingType::Flow);
+
+ const auto sortBy = NYql::GetSettingAsColumnList(outerReduce.Settings().Ref(), EYtSettingType::SortBy);
+ const auto reduceBy = NYql::GetSettingAsColumnList(outerReduce.Settings().Ref(), EYtSettingType::ReduceBy);
+
+ THashSet<TString> sortOrKeyColumns(sortBy.begin(), sortBy.end());
+ sortOrKeyColumns.insert(reduceBy.begin(), reduceBy.end());
+
+ struct TFused {
+ TYtPath Path;
+ TCoLambda MapLambda;
+ TCoLambda ReduceLambda;
+ TExprBase ReducePlaceholder;
+ size_t InputIndex;
+ size_t OrigInputIndex;
+ TYtMap OrigMap;
+ };
+
+ TExprNode::TPtr origVariantType;
+ if (outerReduce.Input().Size() > 1) {
+ auto itemType = GetSequenceItemType(outerReduce.Reducer().Args().Arg(0), true);
+ YQL_ENSURE(itemType);
+ origVariantType = ExpandType(outerReduce.Pos(), *itemType->Cast<TVariantExprType>(), ctx);
+ }
+
+ TMaybe<TFused> fusedMap;
+ TVector<TYtSection> newInput;
+ const size_t origReduceInputs = outerReduce.Input().Size();
+ for (size_t i = 0; i < origReduceInputs; ++i) {
+ const auto& section = outerReduce.Input().Item(i);
+ if (fusedMap.Defined() || section.Settings().Size() != 0) {
+ newInput.push_back(section);
+ continue;
+ }
+
+ TVector<TYtPath> newPaths;
+ newPaths.reserve(section.Paths().Size());
+ for (const auto& path : section.Paths()) {
+ if (fusedMap.Defined() || !path.Ranges().Maybe<TCoVoid>()) {
+ newPaths.push_back(path);
+ continue;
+ }
+
+ auto maybeInnerMap = path.Table().Maybe<TYtOutput>().Operation().Maybe<TYtMap>();
+ if (!maybeInnerMap) {
+ newPaths.push_back(path);
+ continue;
+ }
+
+ TYtMap innerMap = maybeInnerMap.Cast();
+ if (innerMap.Ref().StartsExecution() ||
+ innerMap.Ref().HasResult() ||
+ outerReduce.DataSink().Cluster().Value() != innerMap.DataSink().Cluster().Value() ||
+ innerMap.Output().Size() > 1 ||
+ innerMap.Input().Size() > 1 ||
+ innerMap.Input().Item(0).Paths().Size() > 1 ||
+ !NYql::HasSetting(innerMap.Settings().Ref(), EYtSettingType::Ordered) ||
+ isFlow != NYql::HasSetting(innerMap.Settings().Ref(), EYtSettingType::Flow) ||
+ NYql::HasSettingsExcept(innerMap.Settings().Ref(), acceptedMapSettings))
+ {
+ newPaths.push_back(path);
+ continue;
+ }
+
+ const TParentsMap* parents = getParents();
+ if (IsOutputUsedMultipleTimes(path.Table().Cast<TYtOutput>().Ref(), *parents)) {
+ // Inner reduce output is used more than once
+ newPaths.push_back(path);
+ continue;
+ }
+
+ // Check world dependencies
+ auto parentsIt = parents->find(innerMap.Raw());
+ YQL_ENSURE(parentsIt != parents->cend());
+ if (!AllOf(parentsIt->second, [](const TExprNode* dep) { return TYtOutput::Match(dep); })) {
+ newPaths.push_back(path);
+ continue;
+ }
+
+ const TCoLambda mapLambda = innerMap.Mapper();
+ auto maybeFlatMap = GetFlatMapOverInputStream(mapLambda, *parents);
+ TMaybe<THashSet<TStringBuf>> passthrough;
+ if (!maybeFlatMap.Maybe<TCoOrderedFlatMap>() ||
+ !IsJustOrSingleAsList(maybeFlatMap.Cast().Lambda().Body().Ref()) ||
+ !IsPassthroughFlatMap(maybeFlatMap.Cast(), &passthrough) ||
+ !passthrough ||
+ !AllOf(sortOrKeyColumns, [&](const TString& col) { return passthrough->contains(col); }))
+ {
+ newPaths.push_back(path);
+ continue;
+ }
+
+ auto fuseRes = CanFuseLambdas(mapLambda, outerReduce.Reducer(), ctx);
+ if (!fuseRes) {
+ // Some error
+ return {};
+ }
+ if (!*fuseRes) {
+ // Cannot fuse
+ newPaths.push_back(path);
+ continue;
+ }
+
+ auto [placeHolder, lambdaWithPlaceholder] = ReplaceDependsOn(outerReduce.Reducer().Ptr(), ctx, State_->Types);
+ if (!placeHolder) {
+ return {};
+ }
+
+ TYtPath newPath = innerMap.Input().Item(0).Paths().Item(0);
+ YQL_ENSURE(newInput.size() == i);
+ if (!newPaths.empty()) {
+ newInput.push_back(
+ Build<TYtSection>(ctx, section.Pos())
+ .InitFrom(section)
+ .Paths()
+ .Add(newPaths)
+ .Build()
+ .Done());
+ newPaths.clear();
+ }
+ size_t inputIndex = newInput.size();
+ newInput.push_back(
+ Build<TYtSection>(ctx, section.Pos())
+ .InitFrom(section)
+ .Paths()
+ .Add(newPath)
+ .Build()
+ .Done());
+ fusedMap = {
+ .Path = newPath,
+ .MapLambda = mapLambda,
+ .ReduceLambda = TCoLambda(lambdaWithPlaceholder),
+ .ReducePlaceholder = TExprBase(placeHolder),
+ .InputIndex = inputIndex,
+ .OrigInputIndex = i,
+ .OrigMap = innerMap,
+ };
+ }
+ if (!newPaths.empty()) {
+ newInput.push_back(
+ Build<TYtSection>(ctx, section.Pos())
+ .InitFrom(section)
+ .Paths()
+ .Add(newPaths)
+ .Build()
+ .Done());
+ }
+ }
+
+ if (!fusedMap) {
+ return node;
+ }
+
+ YQL_ENSURE(newInput.size() >= origReduceInputs);
+ YQL_ENSURE(newInput.size() - origReduceInputs <= 1);
+
+ TExprNode::TPtr remapLambda = ctx.Builder(fusedMap->MapLambda.Pos())
+ .Lambda()
+ .Param("item")
+ .Apply(fusedMap->MapLambda.Ptr())
+ .With(0)
+ .Callable("AsList")
+ .Arg(0, "item")
+ .Seal()
+ .Done()
+ .Seal()
+ .Seal()
+ .Build();
+ if (hasKeySwitch) {
+ remapLambda = ctx.Builder(fusedMap->MapLambda.Pos())
+ .Lambda()
+ .Param("item")
+ .Callable(0, "OrderedMap")
+ .Apply(0, remapLambda)
+ .With(0)
+ .Callable("RemoveMember")
+ .Arg(0, "item")
+ .Atom(1, "_yql_sys_tablekeyswitch")
+ .Seal()
+ .Done()
+ .Seal()
+ .Lambda(1)
+ .Param("remappedItem")
+ .Callable(0, "AddMember")
+ .Arg(0, "remappedItem")
+ .Atom(1, "_yql_sys_tablekeyswitch")
+ .Callable(2, "Member")
+ .Arg(0, "item")
+ .Atom(1, "_yql_sys_tablekeyswitch")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ TExprNode::TPtr flatMapLambda;
+ if (newInput.size() == 1) {
+ flatMapLambda = remapLambda;
+ } else {
+ flatMapLambda = ctx.Builder(outerReduce.Pos())
+ .Lambda()
+ .Param("item")
+ .Callable("Visit")
+ .Arg(0, "item")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ const bool inserted = newInput.size() > origReduceInputs;
+ for (size_t i = 0; i < newInput.size(); ++i) {
+ TString paramName = TStringBuilder() << "alt" << i;
+ TString remappedName = TStringBuilder() << "remapped" << i;
+ if (i != fusedMap->InputIndex) {
+ parent
+ .Atom(2 * i + 1, i)
+ .Lambda(2 * i + 2)
+ .Param(paramName)
+ .Callable("AsList")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ if (origVariantType) {
+ parent
+ .Callable(0, "Variant")
+ .Arg(0, paramName)
+ .Atom(1, (i > fusedMap->InputIndex && inserted) ? i - 1 : i)
+ .Add(2, origVariantType)
+ .Seal();
+ } else {
+ parent
+ .Arg(0, paramName);
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal();
+ } else {
+ parent
+ .Atom(2 * i + 1, i)
+ .Lambda(2 * i + 2)
+ .Param(paramName)
+ .Callable("OrderedMap")
+ .Apply(0, remapLambda)
+ .With(0, paramName)
+ .Seal()
+ .Lambda(1)
+ .Param(remappedName)
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ if (origVariantType) {
+ parent
+ .Callable("Variant")
+ .Arg(0, remappedName)
+ .Atom(1, fusedMap->OrigInputIndex)
+ .Add(2, origVariantType)
+ .Seal();
+ } else {
+ parent
+ .Arg(remappedName);
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Seal();
+ }
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ TExprNode::TPtr newReduceLambda = ctx.Builder(outerReduce.Pos())
+ .Lambda()
+ .Param("inputStream")
+ .Apply(0, fusedMap->ReduceLambda.Ptr())
+ .With(0)
+ .Callable("OrderedFlatMap")
+ .Arg(0, "inputStream")
+ .Add(1, flatMapLambda)
+ .Seal()
+ .Done()
+ .WithNode(fusedMap->ReducePlaceholder.Ref(), "inputStream")
+ .Seal()
+ .Seal()
+ .Build();
+
+ auto newSettings = outerReduce.Settings().Ptr();
+ if (!NYql::HasSetting(outerReduce.Settings().Ref(), EYtSettingType::NoDq) &&
+ NYql::HasSetting(fusedMap->OrigMap.Settings().Ref(), EYtSettingType::NoDq))
+ {
+ newSettings = NYql::AddSetting(*newSettings, EYtSettingType::NoDq, {}, ctx);
+ }
+
+ return Build<TYtReduce>(ctx, node.Pos())
+ .InitFrom(outerReduce)
+ .World<TCoSync>()
+ .Add(fusedMap->OrigMap.World())
+ .Add(outerReduce.World())
+ .Build()
+ .Input()
+ .Add(newInput)
+ .Build()
+ .Reducer(newReduceLambda)
+ .Settings(newSettings)
+ .Done();
+
+ return node;
+}
+
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::FuseInnerMap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
auto outerMap = node.Cast<TYtMap>();
if (outerMap.Input().Size() != 1 || outerMap.Input().Item(0).Paths().Size() != 1) {
diff --git a/yt/yql/providers/yt/provider/yql_yt_datasource.cpp b/yt/yql/providers/yt/provider/yql_yt_datasource.cpp
index 7e1ea43a21..21ff1f9385 100644
--- a/yt/yql/providers/yt/provider/yql_yt_datasource.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_datasource.cpp
@@ -476,8 +476,9 @@ public:
writer.OnKeyedItem("Table");
if (auto table = TMaybeNode<TYtTable>(&node)) {
writer.OnStringScalar(table.Cast().Name().Value());
- }
- else {
+ } else if (auto table = TMaybeNode<TYtPath>(&node).Table().Maybe<TYtTable>()) {
+ writer.OnStringScalar(table.Cast().Name().Value());
+ } else {
writer.OnStringScalar("(tmp)");
}
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_logical_optimize.cpp b/yt/yql/providers/yt/provider/yql_yt_logical_optimize.cpp
index 8577696de2..55c3956a7c 100644
--- a/yt/yql/providers/yt/provider/yql_yt_logical_optimize.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_logical_optimize.cpp
@@ -354,7 +354,8 @@ protected:
auto usePhases = State_->Configuration->UseAggPhases.Get().GetOrElse(false);
auto usePartitionsByKeys = State_->Configuration->UsePartitionsByKeysForFinalAgg.Get().GetOrElse(true);
- TAggregateExpander aggExpander(usePartitionsByKeys, false, node.Ptr(), ctx, *State_->Types, false, false,
+ auto compactForDistinct = State_->Configuration->CompactForDistinct.Get().GetOrElse(false);
+ TAggregateExpander aggExpander(usePartitionsByKeys, false, node.Ptr(), ctx, *State_->Types, false, compactForDistinct,
usePhases, State_->Types->UseBlocks || State_->Types->BlockEngineMode == EBlockEngineMode::Force);
return aggExpander.ExpandAggregate();
}
diff --git a/yt/yt/core/misc/async_slru_cache-inl.h b/yt/yt/core/misc/async_slru_cache-inl.h
index d1ce44a3a6..ed28ce6942 100644
--- a/yt/yt/core/misc/async_slru_cache-inl.h
+++ b/yt/yt/core/misc/async_slru_cache-inl.h
@@ -341,7 +341,7 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::Reconfigure(const TSlruCacheDynam
auto writerGuard = WriterGuard(shard.SpinLock);
shard.Reconfigure(shardCapacity, youngerSizeFraction);
shard.DrainTouchBuffer();
- TrimWithNotify(&shard, writerGuard, nullptr);
+ NotifyOnTrim(shard.Trim(writerGuard), nullptr);
}
}
@@ -544,7 +544,7 @@ TAsyncSlruCacheBase<TKey, TValue, THash>::DoLookup(TShard* shard, const TKey& ke
Counters_.SyncHitCounter.Increment();
// NB: Releases the lock.
- TrimWithNotify(shard, writerGuard, value);
+ NotifyOnTrim(shard->Trim(writerGuard), value);
if (GhostCachesEnabled_.load()) {
shard->SmallGhost.Resurrect(value, weight);
@@ -649,7 +649,7 @@ auto TAsyncSlruCacheBase<TKey, TValue, THash>::BeginInsert(const TKey& key, i64
shard->UpdateCookie(item, /*countDelta*/ 1, cookieWeight);
if (cookieWeight > 0) {
// NB: Releases the lock.
- TrimWithNotify(shard, guard, nullptr, cookieWeight);
+ NotifyOnTrim(shard->Trim(guard), nullptr, cookieWeight);
}
guard.Release();
@@ -681,7 +681,7 @@ auto TAsyncSlruCacheBase<TKey, TValue, THash>::BeginInsert(const TKey& key, i64
Counters_.SyncHitCounter.Increment();
// NB: Releases the lock.
- TrimWithNotify(shard, guard, value);
+ NotifyOnTrim(shard->Trim(guard), value);
guard.Release();
@@ -729,7 +729,7 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::UpdateCookieWeight(const TInsertC
shard->DrainTouchBuffer();
// NB: Releases the lock.
- TrimWithNotify(shard, guard, nullptr, weightDelta);
+ NotifyOnTrim(shard->Trim(guard), nullptr, weightDelta);
} else {
guard.Release();
OnWeightUpdated(weightDelta);
@@ -774,7 +774,7 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::EndInsert(const TInsertCookie& in
Counters_.AsyncHitWeightCounter.Increment(weight * item->AsyncHitCount.load());
// NB: Releases the lock.
- TrimWithNotify(shard, guard, value, -cookieWeight);
+ NotifyOnTrim(shard->Trim(guard), value, -cookieWeight);
// We do not want to break the ghost cache invariants, according to which either EndInsert
// or CancelInsert must be called for each item in Inserting state. So we end the insertion
@@ -950,7 +950,7 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::UpdateWeight(const TKey& key)
Counters_.MissedWeightCounter.Increment(weightDelta);
}
- TrimWithNotify(shard, guard, nullptr, weightDelta);
+ NotifyOnTrim(shard->Trim(guard), nullptr, weightDelta);
if (GhostCachesEnabled_.load()) {
shard->SmallGhost.UpdateWeight(key, newWeight);
@@ -1315,8 +1315,10 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::TGhostShard::Trim(NThreading::TWr
template <class TKey, class TValue, class THash>
std::vector<typename TAsyncSlruCacheBase<TKey, TValue, THash>::TValuePtr>
-TAsyncSlruCacheBase<TKey, TValue, THash>::TShard::Trim(const TIntrusiveListWithAutoDelete<TItem, TDelete>& evictedItems)
+TAsyncSlruCacheBase<TKey, TValue, THash>::TShard::Trim(NThreading::TWriterGuard<NThreading::TReaderWriterSpinLock>& guard)
{
+ auto evictedItems = this->TrimNoDelete();
+
Parent->Size_ -= static_cast<int>(evictedItems.Size());
std::vector<TValuePtr> evictedValues;
@@ -1336,32 +1338,6 @@ TAsyncSlruCacheBase<TKey, TValue, THash>::TShard::Trim(const TIntrusiveListWithA
evictedValues.push_back(std::move(value));
}
- return evictedValues;
-}
-
-template <class TKey, class TValue, class THash>
-std::vector<typename TAsyncSlruCacheBase<TKey, TValue, THash>::TValuePtr>
-TAsyncSlruCacheBase<TKey, TValue, THash>::TrimWithNotify(
- TShard* shard,
- NThreading::TWriterGuard<NThreading::TReaderWriterSpinLock>& guard,
- const TValuePtr& insertedValue,
- i64 weightDelta)
-{
- VERIFY_SPINLOCK_AFFINITY(shard->SpinLock);
-
- auto evictedItems = shard->TrimNoDelete();
- auto evictedValues = shard->Trim(evictedItems);
-
- if (weightDelta != 0) {
- OnWeightUpdated(weightDelta);
- }
- if (insertedValue) {
- OnAdded(insertedValue);
- }
- for (const auto& value : evictedValues) {
- OnRemoved(value);
- }
-
// NB. Evicted items must die outside of critical section.
guard.Release();
@@ -1389,6 +1365,23 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::TShard::OnCookieUpdated(i64 delta
Parent->CookieWeightCounter_ += deltaWeight;
}
+template <class TKey, class TValue, class THash>
+void TAsyncSlruCacheBase<TKey, TValue, THash>::NotifyOnTrim(
+ const std::vector<TValuePtr>& evictedValues,
+ const TValuePtr& insertedValue,
+ i64 weightDelta)
+{
+ if (weightDelta != 0) {
+ OnWeightUpdated(weightDelta);
+ }
+ if (insertedValue) {
+ OnAdded(insertedValue);
+ }
+ for (const auto& value : evictedValues) {
+ OnRemoved(value);
+ }
+}
+
////////////////////////////////////////////////////////////////////////////////
template <class TKey, class TValue, class THash>
diff --git a/yt/yt/core/misc/async_slru_cache.h b/yt/yt/core/misc/async_slru_cache.h
index e4395185bb..b6ce9f3b98 100644
--- a/yt/yt/core/misc/async_slru_cache.h
+++ b/yt/yt/core/misc/async_slru_cache.h
@@ -410,8 +410,8 @@ private:
TGhostShard SmallGhost;
TGhostShard LargeGhost;
- //! Returns the list of evicted items.
- std::vector<TValuePtr> Trim(const TIntrusiveListWithAutoDelete<TItem, TDelete>& evictedItems);
+ //! Trims the lists and releases the guard. Returns the list of evicted items.
+ std::vector<TValuePtr> Trim(NThreading::TWriterGuard<NThreading::TReaderWriterSpinLock>& guard);
protected:
void OnYoungerUpdated(i64 deltaCount, i64 deltaWeight);
@@ -451,11 +451,7 @@ private:
//! Calls OnAdded on OnRemoved for the values evicted with Trim(). If the trim was caused by insertion, then
//! insertedValue must be the value, insertion of which caused trim. Otherwise, insertedValue must be nullptr.
//! If the trim was causes by weight update or weighted cookie, then weightDelta represents weight changes.
- std::vector<TValuePtr> TrimWithNotify(
- TShard* shard,
- NThreading::TWriterGuard<NThreading::TReaderWriterSpinLock>& guard,
- const TValuePtr& insertedValue,
- i64 weightDelta = 0);
+ void NotifyOnTrim(const std::vector<TValuePtr>& evictedValues, const TValuePtr& insertedValue, i64 weightDelta = 0);
void UpdateCookieWeight(const TInsertCookie& insertCookie, i64 newWeight);
void EndInsert(const TInsertCookie& insertCookie, TValuePtr value);
diff --git a/yt/yt/core/misc/ref_counted_tracker_profiler.cpp b/yt/yt/core/misc/ref_counted_tracker_profiler.cpp
index cee7477cdf..1acf759748 100644
--- a/yt/yt/core/misc/ref_counted_tracker_profiler.cpp
+++ b/yt/yt/core/misc/ref_counted_tracker_profiler.cpp
@@ -35,9 +35,11 @@ public:
}
};
+////////////////////////////////////////////////////////////////////////////////
+
void EnableRefCountedTrackerProfiling()
{
- LeakyRefCountedSingleton<TRefCountedTrackerProfiler>();
+ Y_UNUSED(LeakyRefCountedSingleton<TRefCountedTrackerProfiler>());
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/misc/unittests/async_slru_cache_ut.cpp b/yt/yt/core/misc/unittests/async_slru_cache_ut.cpp
index b2d72822db..64c166be19 100644
--- a/yt/yt/core/misc/unittests/async_slru_cache_ut.cpp
+++ b/yt/yt/core/misc/unittests/async_slru_cache_ut.cpp
@@ -1,9 +1,5 @@
#include <yt/yt/core/test_framework/framework.h>
-#include <yt/yt/core/concurrency/public.h>
-#include <yt/yt/core/concurrency/fair_share_action_queue.h>
-#include <yt/yt/core/concurrency/thread_pool.h>
-
#include <yt/yt/core/misc/async_slru_cache.h>
#include <yt/yt/core/misc/property.h>
@@ -18,10 +14,6 @@ using namespace NProfiling;
////////////////////////////////////////////////////////////////////////////////
-const NLogging::TLogger Logger("Main");
-
-////////////////////////////////////////////////////////////////////////////////
-
DECLARE_REFCOUNTED_CLASS(TSimpleCachedValue)
class TSimpleCachedValue
@@ -117,23 +109,9 @@ public:
: TAsyncSlruCacheBase(std::move(config)), EnableResurrection_(enableResurrection)
{ }
- int GetItemCount() const
- {
- auto guard = Guard(Lock_);
- return Keys_.size();
- }
-
- int GetTotalAdded() const
- {
- auto guard = Guard(Lock_);
- return TotalAdded_;
- }
-
- int GetTotalRemoved() const
- {
- auto guard = Guard(Lock_);
- return TotalRemoved_;
- }
+ DEFINE_BYVAL_RO_PROPERTY(int, ItemCount, 0);
+ DEFINE_BYVAL_RO_PROPERTY(int, TotalAdded, 0);
+ DEFINE_BYVAL_RO_PROPERTY(int, TotalRemoved, 0);
protected:
i64 GetWeight(const TSimpleCachedValuePtr& value) const override
@@ -141,42 +119,24 @@ protected:
return value->Weight;
}
- void OnAdded(const TSimpleCachedValuePtr& value) override
+ void OnAdded(const TSimpleCachedValuePtr& /*value*/) override
{
- YT_LOG_DEBUG("Item add (Item: %v)", value->GetKey());
- auto guard = Guard(Lock_);
-
- if (!Keys_.find(value->GetKey()).IsEnd()) {
- YT_LOG_ALERT("Item already exist (Item: %v)", value->GetKey());
- }
-
- EmplaceOrCrash(Keys_, value->GetKey());
+ ++ItemCount_;
++TotalAdded_;
}
- void OnRemoved(const TSimpleCachedValuePtr& value) override
+ void OnRemoved(const TSimpleCachedValuePtr& /*value*/) override
{
- YT_LOG_DEBUG("Item remove (Item: %v)", value->GetKey());
- auto guard = Guard(Lock_);
-
- if (Keys_.find(value->GetKey()).IsEnd()) {
- YT_LOG_ALERT("Item not found (Item: %v)", value->GetKey());
- }
-
- EraseOrCrash(Keys_, value->GetKey());
+ --ItemCount_;
++TotalRemoved_;
+ EXPECT_GE(ItemCount_, 0);
}
-
bool IsResurrectionSupported() const override
{
return EnableResurrection_;
}
private:
- YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock_);
- THashSet<int> Keys_;
- int TotalAdded_ = 0;
- int TotalRemoved_ = 0;
bool EnableResurrection_;
};
@@ -504,49 +464,6 @@ TEST(TAsyncSlruCacheTest, AddRemoveWithResurrection)
}
}
-TEST(TAsyncSlruCacheTest, AddRemoveStressTest)
-{
- auto threadPool = NConcurrency::CreateThreadPool(2, "AddRemoveStressTest");
-
- constexpr int cacheSize = 5;
- constexpr int valueCount = 20;
- auto config = CreateCacheConfig(cacheSize);
- auto cache = New<TCountingSlruCache>(std::move(config));
-
- std::vector<TSimpleCachedValuePtr> values;
-
- for (int i = 0; i < valueCount; ++i) {
- values.push_back(New<TSimpleCachedValue>(i, i));
- }
-
- auto callback = BIND([&] {
- std::vector<TCountingSlruCache::TInsertCookie> cookies;
-
- for (int i = 0; i < valueCount; ++i) {
- auto cookie = cache->BeginInsert(i);
- cookies.emplace_back(std::move(cookie));
- }
-
- for (int i = 0; i < valueCount; ++i) {
- cookies.back().EndInsert(values[i]);
- cookies.pop_back();
- }
- });
-
- for (int i = 0; i < 100; i++) {
- std::vector<TFuture<void>> futures;
- futures.reserve(2);
-
- for (int j = 0; j < 2; j++) {
- futures.push_back(callback.AsyncVia(threadPool->GetInvoker()).Run());
- }
-
- NConcurrency::WaitFor(AllSucceeded(futures)).ThrowOnError();
- }
-
- threadPool->Shutdown();
-}
-
TEST(TAsyncSlruCacheTest, AddThenImmediatelyRemove)
{
constexpr int cacheSize = 1;
diff --git a/yt/yt/library/profiling/example/main.cpp b/yt/yt/library/profiling/example/main.cpp
index cb70a0775b..06447792fe 100644
--- a/yt/yt/library/profiling/example/main.cpp
+++ b/yt/yt/library/profiling/example/main.cpp
@@ -13,16 +13,22 @@
#include <yt/yt/core/misc/ref_counted_tracker_profiler.h>
#include <yt/yt/library/profiling/sensor.h>
+
#include <yt/yt/library/profiling/solomon/config.h>
#include <yt/yt/library/profiling/solomon/exporter.h>
#include <yt/yt/library/profiling/solomon/registry.h>
+
#include <yt/yt/library/profiling/tcmalloc/profiler.h>
-#include <yt/yt/library/profiling/perf/counters.h>
+
+#include <yt/yt/library/profiling/perf/event_counter_profiler.h>
#include <util/stream/output.h>
#include <util/system/compiler.h>
+
#include <util/generic/yexception.h>
+
#include <util/string/cast.h>
+
#include <util/system/madvise.h>
using namespace NYT;
@@ -33,7 +39,7 @@ using namespace NYT::NProfiling;
int main(int argc, char* argv[])
{
EnableTCMallocProfiler();
- EnablePerfCounters();
+ EnablePerfEventCounterProfiling();
try {
if (argc != 2 && argc != 3) {
diff --git a/yt/yt/library/profiling/perf/counters_other.cpp b/yt/yt/library/profiling/perf/counters_other.cpp
deleted file mode 100644
index 190bcc0c6f..0000000000
--- a/yt/yt/library/profiling/perf/counters_other.cpp
+++ /dev/null
@@ -1,27 +0,0 @@
-#include "counters.h"
-
-namespace NYT::NProfiling {
-
-////////////////////////////////////////////////////////////////////////////////
-
-TPerfEventCounter::TPerfEventCounter(EPerfEventType /* type */)
-{
- Y_UNUSED(FD_);
-}
-
-TPerfEventCounter::~TPerfEventCounter()
-{ }
-
-ui64 TPerfEventCounter::Read()
-{
- return 0;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-void EnablePerfCounters()
-{ }
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NProfiling
diff --git a/yt/yt/library/profiling/perf/counters.h b/yt/yt/library/profiling/perf/event_counter.h
index 6c9f050d9b..59de2d7268 100644
--- a/yt/yt/library/profiling/perf/counters.h
+++ b/yt/yt/library/profiling/perf/event_counter.h
@@ -38,24 +38,15 @@ DEFINE_ENUM(EPerfEventType,
////////////////////////////////////////////////////////////////////////////////
-class TPerfEventCounter final
+struct IPerfEventCounter
{
-public:
- explicit TPerfEventCounter(EPerfEventType type);
- ~TPerfEventCounter();
+ virtual ~IPerfEventCounter() = default;
- TPerfEventCounter(const TPerfEventCounter&) = delete;
-
- ui64 Read();
-
-private:
- int FD_ = -1;
+ //! Returns the counter increment since the last read.
+ virtual i64 Read() = 0;
};
-////////////////////////////////////////////////////////////////////////////////
-
-//! EnablePerfCounters creates selected set of perf counters.
-void EnablePerfCounters();
+std::unique_ptr<IPerfEventCounter> CreatePerfEventCounter(EPerfEventType type);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/library/profiling/perf/event_counter_dummy.cpp b/yt/yt/library/profiling/perf/event_counter_dummy.cpp
new file mode 100644
index 0000000000..c11cff901d
--- /dev/null
+++ b/yt/yt/library/profiling/perf/event_counter_dummy.cpp
@@ -0,0 +1,24 @@
+#include "event_counter.h"
+
+namespace NYT::NProfiling {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TPerfEventCounter
+ : public IPerfEventCounter
+{
+public:
+ virtual i64 Read() final
+ {
+ return 0;
+ }
+};
+
+std::unique_ptr<IPerfEventCounter> CreatePerfEventCounter(EPerfEventType /*type*/)
+{
+ return std::make_unique<TPerfEventCounter>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NProfiling
diff --git a/yt/yt/library/profiling/perf/counters.cpp b/yt/yt/library/profiling/perf/event_counter_linux.cpp
index e5e90d4ee8..65bdc3c51a 100644
--- a/yt/yt/library/profiling/perf/counters.cpp
+++ b/yt/yt/library/profiling/perf/event_counter_linux.cpp
@@ -1,4 +1,4 @@
-#include "counters.h"
+#include "event_counter.h"
#include <yt/yt/core/misc/proc.h>
#include <yt/yt/core/misc/error.h>
@@ -9,8 +9,6 @@
#include <library/cpp/yt/system/handle_eintr.h>
-#include <library/cpp/yt/memory/leaky_ref_counted_singleton.h>
-
#include <linux/perf_event.h>
#include <sys/syscall.h>
@@ -21,18 +19,18 @@ namespace NYT::NProfiling {
namespace {
-struct TPerfEventDescription final
+struct TPerfEventDescription
{
- int EventType;
- int EventConfig;
+ ui32 EventType;
+ ui64 EventConfig;
};
-constexpr TPerfEventDescription SoftwareEvent(int perfName) noexcept
+constexpr TPerfEventDescription SoftwareEvent(ui64 perfName) noexcept
{
return {PERF_TYPE_SOFTWARE, perfName};
}
-constexpr TPerfEventDescription HardwareEvent(int perfName) noexcept
+constexpr TPerfEventDescription HardwareEvent(ui64 perfName) noexcept
{
return {PERF_TYPE_HARDWARE, perfName};
}
@@ -60,27 +58,28 @@ TPerfEventDescription CacheEvent(
int cacheActionTypeForConfig = [&] {
switch (eventType) {
- case ECacheActionType::Load:
- return PERF_COUNT_HW_CACHE_OP_READ;
- case ECacheActionType::Store:
- return PERF_COUNT_HW_CACHE_OP_WRITE;
- default:
- YT_ABORT();
+ case ECacheActionType::Load:
+ return PERF_COUNT_HW_CACHE_OP_READ;
+ case ECacheActionType::Store:
+ return PERF_COUNT_HW_CACHE_OP_WRITE;
+ default:
+ YT_ABORT();
}
}();
int eventTypeForConfig = [&] {
switch (eventResultType) {
- case ECacheEventType::Access:
- return PERF_COUNT_HW_CACHE_RESULT_ACCESS;
- case ECacheEventType::Miss:
- return PERF_COUNT_HW_CACHE_RESULT_MISS;
- default:
- YT_ABORT();
+ case ECacheEventType::Access:
+ return PERF_COUNT_HW_CACHE_RESULT_ACCESS;
+ case ECacheEventType::Miss:
+ return PERF_COUNT_HW_CACHE_RESULT_MISS;
+ default:
+ YT_ABORT();
}
}();
- int eventConfig = (perfName << kEventNameShift) |
+ ui64 eventConfig =
+ (perfName << kEventNameShift) |
(cacheActionTypeForConfig << kCacheActionTypeShift) |
(eventTypeForConfig << kEventTypeShift);
@@ -157,94 +156,73 @@ TPerfEventDescription GetDescription(EPerfEventType type)
}
}
-int OpenPerfEvent(int tid, int eventType, int eventConfig)
-{
- perf_event_attr attr{};
-
- attr.type = eventType;
- attr.size = sizeof(attr);
- attr.config = eventConfig;
- attr.inherit = 1;
-
- int fd = HandleEintr(syscall, SYS_perf_event_open, &attr, tid, -1, -1, PERF_FLAG_FD_CLOEXEC);
- if (fd == -1) {
- THROW_ERROR_EXCEPTION("Failed to open perf event descriptor")
- << TError::FromSystem()
- << TErrorAttribute("type", eventType)
- << TErrorAttribute("config", eventConfig);
- }
- return fd;
-}
-
-ui64 FetchPerfCounter(int fd)
-{
- ui64 num{};
- if (HandleEintr(read, fd, &num, sizeof(num)) != sizeof(num)) {
- THROW_ERROR_EXCEPTION("Failed to read perf event counter")
- << TError::FromSystem();
- }
- return num;
-}
-
} // namespace
////////////////////////////////////////////////////////////////////////////////
-TPerfEventCounter::TPerfEventCounter(EPerfEventType type)
- : FD_(OpenPerfEvent(
- 0,
- GetDescription(type).EventType,
- GetDescription(type).EventConfig))
-{ }
-
-TPerfEventCounter::~TPerfEventCounter()
+class TPerfEventCounter
+ : public IPerfEventCounter
{
- if (FD_ != -1) {
- SafeClose(FD_, false);
+public:
+ explicit TPerfEventCounter(EPerfEventType type)
+ : Type_(type)
+ {
+ const auto& description = GetDescription(type);
+
+ perf_event_attr attr{};
+ attr.type = description.EventType;
+ attr.size = sizeof(attr);
+ attr.config = description.EventConfig;
+ attr.inherit = true;
+
+ FD_ = HandleEintr(
+ syscall,
+ SYS_perf_event_open,
+ &attr,
+ /*pid*/ 0,
+ /*cpu*/ -1,
+ /*group_fd*/ -1,
+ PERF_FLAG_FD_CLOEXEC);
+
+ if (FD_ == -1) {
+ THROW_ERROR_EXCEPTION("Failed to open %Qlv perf event descriptor",
+ type)
+ << TError::FromSystem();
+ }
}
-}
-
-ui64 TPerfEventCounter::Read()
-{
- return FetchPerfCounter(FD_);
-}
-////////////////////////////////////////////////////////////////////////////////
+ ~TPerfEventCounter()
+ {
+ if (FD_ != -1) {
+ SafeClose(FD_, /*ignoreBadFD*/ false);
+ }
+ }
-DECLARE_REFCOUNTED_STRUCT(TCounterOwner)
+ i64 Read() final
+ {
+ i64 result = 0;
+ if (HandleEintr(read, FD_, &result, sizeof(result)) != sizeof(result)) {
+ THROW_ERROR_EXCEPTION("Failed to read perf event counter %Qlv",
+ Type_)
+ << TError::FromSystem();
+ }
+ return result;
+ }
-struct TCounterOwner
- : public TRefCounted
-{
- std::vector<std::unique_ptr<TPerfEventCounter>> Counters;
+private:
+ const EPerfEventType Type_;
+ int FD_ = -1;
};
-DEFINE_REFCOUNTED_TYPE(TCounterOwner)
+////////////////////////////////////////////////////////////////////////////////
-void EnablePerfCounters()
+std::unique_ptr<IPerfEventCounter> CreatePerfEventCounter(EPerfEventType type)
{
- auto owner = LeakyRefCountedSingleton<TCounterOwner>();
-
- auto exportCounter = [&] (const std::string& category, const std::string& name, EPerfEventType type) {
- try {
- owner->Counters.emplace_back(new TPerfEventCounter{type});
- TProfiler{category}.AddFuncCounter(name, owner, [counter=owner->Counters.back().get()] {
- return counter->Read();
- });
- } catch (const std::exception&) {
- }
- };
+ return std::make_unique<TPerfEventCounter>(type);
+}
- exportCounter("/cpu/perf", "/cycles", EPerfEventType::CpuCycles);
- exportCounter("/cpu/perf", "/instructions", EPerfEventType::Instructions);
- exportCounter("/cpu/perf", "/branch_instructions", EPerfEventType::BranchInstructions);
- exportCounter("/cpu/perf", "/branch_misses", EPerfEventType::BranchMisses);
- exportCounter("/cpu/perf", "/context_switches", EPerfEventType::ContextSwitches);
+////////////////////////////////////////////////////////////////////////////////
- exportCounter("/memory/perf", "/page_faults", EPerfEventType::PageFaults);
- exportCounter("/memory/perf", "/minor_page_faults", EPerfEventType::MinorPageFaults);
- exportCounter("/memory/perf", "/major_page_faults", EPerfEventType::MajorPageFaults);
-}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/library/profiling/perf/event_counter_profiler.cpp b/yt/yt/library/profiling/perf/event_counter_profiler.cpp
new file mode 100644
index 0000000000..4db34e3fe3
--- /dev/null
+++ b/yt/yt/library/profiling/perf/event_counter_profiler.cpp
@@ -0,0 +1,69 @@
+#include "event_counter_profiler.h"
+
+#include "event_counter.h"
+
+#include <yt/yt/library/profiling/producer.h>
+
+#include <library/cpp/yt/memory/leaky_ref_counted_singleton.h>
+
+#include <library/cpp/yt/containers/enum_indexed_array.h>
+
+#include <library/cpp/yt/threading/atomic_object.h>
+
+namespace NYT::NProfiling {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TPerfEventCounterProfilerImpl
+ : public ISensorProducer
+{
+public:
+ TPerfEventCounterProfilerImpl()
+ {
+ TProfiler("").AddProducer("/perf", this);
+ }
+
+private:
+ TEnumIndexedArray<EPerfEventType, NThreading::TAtomicObject<std::unique_ptr<IPerfEventCounter>>> TypeToCounter_;
+
+ IPerfEventCounter* GetCounter(EPerfEventType type)
+ {
+ return TypeToCounter_[type].Transform([&] (auto& counter) {
+ if (!counter) {
+ counter = CreatePerfEventCounter(type);
+ }
+ return counter.get();
+ });
+ }
+
+ void CollectSensors(ISensorWriter* writer) final
+ {
+ auto writeCounter = [&] (const std::string& name, EPerfEventType type) {
+ try {
+ auto value = GetCounter(type)->Read();
+ writer->AddCounter(name, value);
+ } catch (...) {
+ }
+ };
+
+ writeCounter("/cpu/cycles", EPerfEventType::CpuCycles);
+ writeCounter("/cpu/instructions", EPerfEventType::Instructions);
+ writeCounter("/cpu/branch_instructions", EPerfEventType::BranchInstructions);
+ writeCounter("/cpu/branch_misses", EPerfEventType::BranchMisses);
+ writeCounter("/cpu/context_switches", EPerfEventType::ContextSwitches);
+ writeCounter("/memory/page_faults", EPerfEventType::PageFaults);
+ writeCounter("/memory/minor_page_faults", EPerfEventType::MinorPageFaults);
+ writeCounter("/memory/major_page_faults", EPerfEventType::MajorPageFaults);
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+void EnablePerfEventCounterProfiling()
+{
+ Y_UNUSED(LeakyRefCountedSingleton<TPerfEventCounterProfilerImpl>());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NProfiling
diff --git a/yt/yt/library/profiling/perf/event_counter_profiler.h b/yt/yt/library/profiling/perf/event_counter_profiler.h
new file mode 100644
index 0000000000..bc5ec03ba4
--- /dev/null
+++ b/yt/yt/library/profiling/perf/event_counter_profiler.h
@@ -0,0 +1,11 @@
+#pragma once
+
+namespace NYT::NProfiling {
+
+////////////////////////////////////////////////////////////////////////////////
+
+void EnablePerfEventCounterProfiling();
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NProfiling
diff --git a/yt/yt/library/profiling/unittests/perf_counter_ut.cpp b/yt/yt/library/profiling/perf/unittests/perf_event_counter_ut.cpp
index 1482f97012..93557040ca 100644
--- a/yt/yt/library/profiling/unittests/perf_counter_ut.cpp
+++ b/yt/yt/library/profiling/perf/unittests/perf_event_counter_ut.cpp
@@ -1,11 +1,12 @@
-#include "util/system/yield.h"
#include <gtest/gtest.h>
-#include <yt/yt/library/profiling/perf/counters.h>
+#include <yt/yt/library/profiling/perf/event_counter.h>
#include <yt/yt/core/misc/error.h>
#include <yt/yt/core/misc/proc.h>
+#include <util/system/yield.h>
+
namespace NYT::NProfiling {
namespace {
@@ -21,40 +22,40 @@ void IgnorePermissionError(const TFn& fn)
if (ex.Error().FindMatching(PermissionErrorCode)) {
return;
}
-
throw;
}
}
-TEST(TPerfCounters, Cycles)
+////////////////////////////////////////////////////////////////////////////////
+
+i64 ReadPerfEventCounter(EPerfEventType type)
+{
+ auto counter = CreatePerfEventCounter(type);
+ return counter->Read();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TPerfEventCounterTest, CpuCycles)
{
IgnorePermissionError([&] {
- TPerfEventCounter counter(EPerfEventType::CpuCycles);
- ASSERT_GE(counter.Read(), 0u);
+ ASSERT_GE(ReadPerfEventCounter(EPerfEventType::CpuCycles), 0);
});
}
-TEST(TPerfCounters, ContextSwitches)
+TEST(TPerfEventCounterTest, ContextSwitches)
{
IgnorePermissionError([&] {
- TPerfEventCounter counter(EPerfEventType::ContextSwitches);
-
for (int i = 0; i < 10; i++) {
SchedYield();
}
-
- ASSERT_GE(counter.Read(), 0u);
+ ASSERT_GE(ReadPerfEventCounter(EPerfEventType::ContextSwitches), 0);
});
}
-TEST(TPerfCounters, CounterError)
+TEST(TPerfEventCounterTest, CounterError)
{
- auto createCounter = [] {
- TPerfEventCounter counter{EPerfEventType::StalledCyclesBackend};
- return 0;
- };
-
- ASSERT_THROW(createCounter(), TErrorException);
+ ASSERT_THROW(ReadPerfEventCounter(EPerfEventType::StalledCyclesBackend), TErrorException);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/library/profiling/perf/unittests/ya.make b/yt/yt/library/profiling/perf/unittests/ya.make
new file mode 100644
index 0000000000..fbdaa9d1cc
--- /dev/null
+++ b/yt/yt/library/profiling/perf/unittests/ya.make
@@ -0,0 +1,17 @@
+GTEST()
+
+INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
+
+SRCS(
+ perf_event_counter_ut.cpp
+)
+
+INCLUDE(${ARCADIA_ROOT}/yt/opensource.inc)
+
+PEERDIR(
+ yt/yt/library/profiling/perf
+)
+
+SIZE(SMALL)
+
+END()
diff --git a/yt/yt/library/profiling/perf/ya.make b/yt/yt/library/profiling/perf/ya.make
index 0bf244fdeb..c4efab21fd 100644
--- a/yt/yt/library/profiling/perf/ya.make
+++ b/yt/yt/library/profiling/perf/ya.make
@@ -2,10 +2,12 @@ LIBRARY()
INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
+SRCS(event_counter_profiler.cpp)
+
IF (OS_LINUX)
- SRCS(counters.cpp)
+ SRCS(event_counter_linux.cpp)
ELSE()
- SRCS(counters_other.cpp)
+ SRCS(event_counter_dummy.cpp)
ENDIF()
PEERDIR(
@@ -14,3 +16,5 @@ PEERDIR(
)
END()
+
+RECURSE_FOR_TESTS(unittests)
diff --git a/yt/yt/library/profiling/unittests/ya.make b/yt/yt/library/profiling/unittests/ya.make
index 0fa5d9e6ee..8346254671 100644
--- a/yt/yt/library/profiling/unittests/ya.make
+++ b/yt/yt/library/profiling/unittests/ya.make
@@ -15,7 +15,6 @@ SRCS(
tag_ut.cpp
cube_ut.cpp
exporter_ut.cpp
- perf_counter_ut.cpp
)
INCLUDE(${ARCADIA_ROOT}/yt/opensource.inc)
diff --git a/yt/yt/library/profiling/ya.make b/yt/yt/library/profiling/ya.make
index c59ab7d643..eb902850e2 100644
--- a/yt/yt/library/profiling/ya.make
+++ b/yt/yt/library/profiling/ya.make
@@ -22,6 +22,7 @@ PEERDIR(
END()
RECURSE(
+ perf
sensors_owner
solomon
unittests
diff --git a/yt/yt/library/program/helpers.cpp b/yt/yt/library/program/helpers.cpp
index 191a2f672b..b46bdd9786 100644
--- a/yt/yt/library/program/helpers.cpp
+++ b/yt/yt/library/program/helpers.cpp
@@ -12,7 +12,7 @@
#include <yt/yt/library/tracing/jaeger/tracer.h>
-#include <yt/yt/library/profiling/perf/counters.h>
+#include <yt/yt/library/profiling/perf/event_counter_profiler.h>
#include <yt/yt/library/profiling/resource_tracker/resource_tracker.h>
@@ -76,7 +76,7 @@ void ConfigureSingletons(const TSingletonsConfigPtr& config)
NTracing::SetGlobalTracer(New<NTracing::TJaegerTracer>(config->Jaeger));
- NProfiling::EnablePerfCounters();
+ NProfiling::EnablePerfEventCounterProfiling();
NTCMalloc::TTCMallocManager::Configure(config->TCMalloc);