diff options
author | AlexSm <alex@ydb.tech> | 2023-12-27 23:31:58 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-27 23:31:58 +0100 |
commit | d67bfb4b4b7549081543e87a31bc6cb5c46ac973 (patch) | |
tree | 8674f2f1570877cb653e7ddcff37ba00288de15a /yt | |
parent | 1f6bef05ed441c3aa2d565ac792b26cded704ac7 (diff) | |
download | ydb-d67bfb4b4b7549081543e87a31bc6cb5c46ac973.tar.gz |
Import libs 4 (#758)
Diffstat (limited to 'yt')
64 files changed, 904 insertions, 294 deletions
diff --git a/yt/yt/client/api/admin_client.h b/yt/yt/client/api/admin_client.h index 2a4b08c0e3..500033b8ea 100644 --- a/yt/yt/client/api/admin_client.h +++ b/yt/yt/client/api/admin_client.h @@ -289,6 +289,7 @@ struct IAdminClient virtual TFuture<TDestroyChunkLocationsResult> DestroyChunkLocations( const TString& nodeAddress, + bool recoverUnlinkedDisks, const std::vector<TGuid>& locationUuids, const TDestroyChunkLocationsOptions& options = {}) = 0; diff --git a/yt/yt/client/api/delegating_client.cpp b/yt/yt/client/api/delegating_client.cpp index c878d878ad..ac0bec3f0a 100644 --- a/yt/yt/client/api/delegating_client.cpp +++ b/yt/yt/client/api/delegating_client.cpp @@ -885,10 +885,11 @@ TFuture<TDisableChunkLocationsResult> TDelegatingClient::DisableChunkLocations( TFuture<TDestroyChunkLocationsResult> TDelegatingClient::DestroyChunkLocations( const TString& nodeAddress, + bool recoverUnlinkedDisks, const std::vector<TGuid>& locationUuids, const TDestroyChunkLocationsOptions& options) { - return Underlying_->DestroyChunkLocations(nodeAddress, locationUuids, options); + return Underlying_->DestroyChunkLocations(nodeAddress, recoverUnlinkedDisks, locationUuids, options); } TFuture<TResurrectChunkLocationsResult> TDelegatingClient::ResurrectChunkLocations( diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h index 15abcbedec..a8df97651a 100644 --- a/yt/yt/client/api/delegating_client.h +++ b/yt/yt/client/api/delegating_client.h @@ -553,6 +553,7 @@ public: TFuture<TDestroyChunkLocationsResult> DestroyChunkLocations( const TString& nodeAddress, + bool recoverUnlinkedDisks, const std::vector<TGuid>& locationUuids, const TDestroyChunkLocationsOptions& options = {}) override; diff --git a/yt/yt/client/api/journal_client.h b/yt/yt/client/api/journal_client.h index 0feb298e1d..f1d68f24e7 100644 --- a/yt/yt/client/api/journal_client.h +++ b/yt/yt/client/api/journal_client.h @@ -17,6 +17,19 @@ struct TJournalReaderOptions TJournalReaderConfigPtr Config; }; +//////////////////////////////////////////////////////////////////////////////// + +struct IJournalWritesObserver + : public TRefCounted +{ + virtual void RegisterPayloadWrite(int payloadBytes) = 0; + virtual void RegisterJournalWrite(int journalBytes, int mediumBytes) = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IJournalWritesObserver) + +//////////////////////////////////////////////////////////////////////////////// + struct TJournalWriterPerformanceCounters { TJournalWriterPerformanceCounters() = default; @@ -40,6 +53,8 @@ struct TJournalWriterPerformanceCounters NProfiling::TCounter MediumWrittenBytes; NProfiling::TCounter IORequestCount; NProfiling::TCounter JournalWrittenBytes; + + IJournalWritesObserverPtr JournalWritesObserver; }; struct TJournalWriterOptions diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h index c671dea611..7e6c250481 100644 --- a/yt/yt/client/api/public.h +++ b/yt/yt/client/api/public.h @@ -110,6 +110,8 @@ DECLARE_REFCOUNTED_TYPE(ITypeErasedRowset) DECLARE_REFCOUNTED_STRUCT(IPersistentQueueRowset) DECLARE_REFCOUNTED_STRUCT(TSkynetSharePartsLocations) +DECLARE_REFCOUNTED_STRUCT(IJournalWritesObserver) + struct TConnectionOptions; using TClientOptions = NAuth::TAuthenticationOptions; diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index 8d0a7530e2..e62188c044 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -1953,12 +1953,14 @@ TFuture<TDisableChunkLocationsResult> TClient::DisableChunkLocations( TFuture<TDestroyChunkLocationsResult> TClient::DestroyChunkLocations( const TString& nodeAddress, + bool recoverUnlinkedDisks, const std::vector<TGuid>& locationUuids, const TDestroyChunkLocationsOptions& /*options*/) { auto proxy = CreateApiServiceProxy(); auto req = proxy.DestroyChunkLocations(); + req->set_recover_unlinked_disks(recoverUnlinkedDisks); ToProto(req->mutable_node_address(), nodeAddress); ToProto(req->mutable_location_uuids(), locationUuids); diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h index 77c1b429f8..28cd077ce9 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.h +++ b/yt/yt/client/api/rpc_proxy/client_impl.h @@ -414,6 +414,7 @@ public: TFuture<TDestroyChunkLocationsResult> DestroyChunkLocations( const TString& nodeAddress, + bool recoverUnlinkedDisks, const std::vector<TGuid>& locationUuids, const TDestroyChunkLocationsOptions& options) override; diff --git a/yt/yt/client/chaos_client/replication_card.cpp b/yt/yt/client/chaos_client/replication_card.cpp index 6e32041b2e..e94771bba5 100644 --- a/yt/yt/client/chaos_client/replication_card.cpp +++ b/yt/yt/client/chaos_client/replication_card.cpp @@ -15,6 +15,53 @@ using namespace NTableClient; using namespace NTabletClient; using namespace NTransactionClient; +namespace NDetail { + +void FormatProgressWithProjection( + TStringBuilderBase* builder, + const TReplicationProgress& replicationProgress, + TReplicationProgressProjection replicationProgressProjection) +{ + const auto& segments = replicationProgress.Segments; + if (segments.empty()) { + builder->AppendString("[]"); + return; + } + + auto it = std::upper_bound( + segments.begin(), + segments.end(), + replicationProgressProjection.From, + [] (const auto& lhs, const auto& rhs) { + return CompareRows(lhs, rhs.LowerKey) <= 0; + }); + + bool comma = false; + builder->AppendChar('['); + + if (it != segments.begin()) { + builder->AppendFormat("<%v, %x>", segments[0].LowerKey, segments[0].Timestamp); + if (it != std::next(segments.begin())) { + builder->AppendString(", ..."); + } + comma = true; + } + + for (; it != segments.end() && it->LowerKey <= replicationProgressProjection.To; ++it) { + builder->AppendFormat("%v<%v, %x>", comma ? ", " : "", it->LowerKey, it->Timestamp); + comma = true; + } + + if (it != segments.end()) { + builder->AppendString(", ..."); + } + + builder->AppendChar(']'); +} + +} // namespace NDetail + + //////////////////////////////////////////////////////////////////////////////// TReplicationCardFetchOptions::operator size_t() const @@ -40,13 +87,24 @@ TString ToString(const TReplicationCardFetchOptions& options) //////////////////////////////////////////////////////////////////////////////// -void FormatValue(TStringBuilderBase* builder, const TReplicationProgress& replicationProgress, TStringBuf /*spec*/) +void FormatValue( + TStringBuilderBase* builder, + const TReplicationProgress& replicationProgress, + TStringBuf /*spec*/, + std::optional<TReplicationProgressProjection> replicationProgressProjection) { - builder->AppendFormat("{Segments: %v, UpperKey: %v}", - MakeFormattableView(replicationProgress.Segments, [] (auto* builder, const auto& segment) { + builder->AppendString("{Segments: "); + const auto& segments = replicationProgress.Segments; + if (!replicationProgressProjection) { + builder->AppendFormat("%v", MakeFormattableView(segments, [] (auto* builder, const auto& segment) { builder->AppendFormat("<%v, %x>", segment.LowerKey, segment.Timestamp); - }), - replicationProgress.UpperKey); + })); + } else { + NDetail::FormatProgressWithProjection(builder, replicationProgress, *replicationProgressProjection); + } + + builder->AppendFormat(", UpperKey: %v}", replicationProgress.UpperKey); + } TString ToString(const TReplicationProgress& replicationProgress) @@ -68,16 +126,22 @@ TString ToString(const TReplicaHistoryItem& replicaHistoryItem) return ToStringViaBuilder(replicaHistoryItem); } -void FormatValue(TStringBuilderBase* builder, const TReplicaInfo& replicaInfo, TStringBuf /*spec*/) +void FormatValue( + TStringBuilderBase* builder, + const TReplicaInfo& replicaInfo, + TStringBuf /*spec*/, + std::optional<TReplicationProgressProjection> replicationProgressProjection) { - builder->AppendFormat("{ClusterName: %v, ReplicaPath: %v, ContentType: %v, Mode: %v, State: %v, Progress: %v, History: %v}", + builder->AppendFormat("{ClusterName: %v, ReplicaPath: %v, ContentType: %v, Mode: %v, State: %v, Progress: ", replicaInfo.ClusterName, replicaInfo.ReplicaPath, replicaInfo.ContentType, replicaInfo.Mode, - replicaInfo.State, - replicaInfo.ReplicationProgress, - replicaInfo.History); + replicaInfo.State); + + FormatValue(builder, replicaInfo.ReplicationProgress, TStringBuf(), replicationProgressProjection); + + builder->AppendFormat(", History: %v}", replicaInfo.History); } TString ToString(const TReplicaInfo& replicaInfo) @@ -85,11 +149,21 @@ TString ToString(const TReplicaInfo& replicaInfo) return ToStringViaBuilder(replicaInfo); } -void FormatValue(TStringBuilderBase* builder, const TReplicationCard& replicationCard, TStringBuf /*spec*/) +void FormatValue( + TStringBuilderBase* builder, + const TReplicationCard& replicationCard, + TStringBuf /*spec*/, + std::optional<TReplicationProgressProjection> replicationProgressProjection) { builder->AppendFormat("{Era: %v, Replicas: %v, CoordinatorCellIds: %v, TableId: %v, TablePath: %v, TableClusterName: %v, CurrentTimestamp: %v, CollocationId: %v}", replicationCard.Era, - replicationCard.Replicas, + MakeFormattableView( + replicationCard.Replicas, + [&] (TStringBuilderBase* builder, std::pair<const NYT::TGuid, NYT::NChaosClient::TReplicaInfo> replica) { + FormatValue(builder, replica.first, TStringBuf()); + builder->AppendString(": "); + FormatValue(builder, replica.second, TStringBuf(), replicationProgressProjection); + }), replicationCard.CoordinatorCellIds, replicationCard.TableId, replicationCard.TablePath, @@ -98,9 +172,13 @@ void FormatValue(TStringBuilderBase* builder, const TReplicationCard& replicatio replicationCard.ReplicationCardCollocationId); } -TString ToString(const TReplicationCard& replicationCard) +TString ToString( + const TReplicationCard& replicationCard, + std::optional<TReplicationProgressProjection> replicationProgressProjection) { - return ToStringViaBuilder(replicationCard); + TStringBuilder builder; + FormatValue(&builder, replicationCard, {}, replicationProgressProjection); + return builder.Flush(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/chaos_client/replication_card.h b/yt/yt/client/chaos_client/replication_card.h index 364f4f6dd6..992815a7d2 100644 --- a/yt/yt/client/chaos_client/replication_card.h +++ b/yt/yt/client/chaos_client/replication_card.h @@ -53,6 +53,12 @@ struct TReplicaInfo int FindHistoryItemIndex(NTransactionClient::TTimestamp timestamp) const; }; +struct TReplicationProgressProjection +{ + NTableClient::TUnversionedRow From; + NTableClient::TUnversionedRow To; +}; + struct TReplicationCard : public TRefCounted { @@ -91,17 +97,31 @@ TString ToString(const TReplicationCardFetchOptions& options); /////////////////////////////////////////////////////////////////////////////// -void FormatValue(TStringBuilderBase* builder, const TReplicationProgress& replicationProgress, TStringBuf /*spec*/); +void FormatValue( + TStringBuilderBase* builder, + const TReplicationProgress& replicationProgress, + TStringBuf /*spec*/, + std::optional<TReplicationProgressProjection> replicationProgressProjection = std::nullopt); TString ToString(const TReplicationProgress& replicationProgress); void FormatValue(TStringBuilderBase* builder, const TReplicaHistoryItem& replicaHistoryItem, TStringBuf /*spec*/); TString ToString(const TReplicaHistoryItem& replicaHistoryItem); -void FormatValue(TStringBuilderBase* builder, const TReplicaInfo& replicaInfo, TStringBuf /*spec*/); +void FormatValue( + TStringBuilderBase* builder, + const TReplicaInfo& replicaInfo, + TStringBuf /*spec*/, + std::optional<TReplicationProgressProjection> replicationProgressProjection = std::nullopt); TString ToString(const TReplicaInfo& replicaInfo); -void FormatValue(TStringBuilderBase* builder, const TReplicationCard& replicationCard, TStringBuf /*spec*/); -TString ToString(const TReplicationCard& replicationCard); +void FormatValue( + TStringBuilderBase* builder, + const TReplicationCard& replicationCard, + TStringBuf /*spec*/, + std::optional<TReplicationProgressProjection> replicationProgressProjection = std::nullopt); +TString ToString( + const TReplicationCard& replicationCard, + std::optional<TReplicationProgressProjection> replicationProgressProjection = std::nullopt); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/driver/admin_commands.cpp b/yt/yt/client/driver/admin_commands.cpp index d7b097c910..7518352825 100644 --- a/yt/yt/client/driver/admin_commands.cpp +++ b/yt/yt/client/driver/admin_commands.cpp @@ -497,13 +497,19 @@ void TDisableChunkLocationsCommand::DoExecute(ICommandContextPtr context) void TDestroyChunkLocationsCommand::Register(TRegistrar registrar) { registrar.Parameter("node_address", &TThis::NodeAddress_); + registrar.Parameter("recover_unlinked_disks", &TThis::RecoverUnlinkedDisks_) + .Default(false); registrar.Parameter("location_uuids", &TThis::LocationUuids_) .Default(); } void TDestroyChunkLocationsCommand::DoExecute(ICommandContextPtr context) { - auto result = WaitFor(context->GetClient()->DestroyChunkLocations(NodeAddress_, LocationUuids_, Options)) + auto result = WaitFor(context->GetClient()->DestroyChunkLocations( + NodeAddress_, + RecoverUnlinkedDisks_, + LocationUuids_, + Options)) .ValueOrThrow(); context->ProduceOutputValue(BuildYsonStringFluently() diff --git a/yt/yt/client/driver/admin_commands.h b/yt/yt/client/driver/admin_commands.h index 79793930f8..4a52a34bbf 100644 --- a/yt/yt/client/driver/admin_commands.h +++ b/yt/yt/client/driver/admin_commands.h @@ -310,6 +310,7 @@ public: private: TString NodeAddress_; + bool RecoverUnlinkedDisks_; std::vector<TGuid> LocationUuids_; void DoExecute(ICommandContextPtr context) override; diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp index 2d82e0d22c..e2aaf35e7c 100644 --- a/yt/yt/client/driver/queue_commands.cpp +++ b/yt/yt/client/driver/queue_commands.cpp @@ -233,8 +233,7 @@ void TAdvanceConsumerCommand::DoExecute(ICommandContextPtr context) { auto transaction = GetTransaction(context); - WaitFor(transaction->AdvanceConsumer(ConsumerPath, QueuePath, PartitionIndex, OldOffset, NewOffset, /*options*/ {})) - .ThrowOnError(); + transaction->AdvanceConsumer(ConsumerPath, QueuePath, PartitionIndex, OldOffset, NewOffset); if (ShouldCommitTransaction()) { WaitFor(transaction->Commit()) diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index 3e2bf635ad..992331b41e 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -402,7 +402,7 @@ public: UNIMPLEMENTED_METHOD(TFuture<TMaintenanceId>, AddMaintenance, (EMaintenanceComponent, const TString&, EMaintenanceType, const TString&, const TAddMaintenanceOptions&)); UNIMPLEMENTED_METHOD(TFuture<TMaintenanceCounts>, RemoveMaintenance, (EMaintenanceComponent, const TString&, const TMaintenanceFilter&, const TRemoveMaintenanceOptions&)); UNIMPLEMENTED_METHOD(TFuture<TDisableChunkLocationsResult>, DisableChunkLocations, (const TString&, const std::vector<TGuid>&, const TDisableChunkLocationsOptions&)); - UNIMPLEMENTED_METHOD(TFuture<TDestroyChunkLocationsResult>, DestroyChunkLocations, (const TString&, const std::vector<TGuid>&, const TDestroyChunkLocationsOptions&)); + UNIMPLEMENTED_METHOD(TFuture<TDestroyChunkLocationsResult>, DestroyChunkLocations, (const TString&, bool, const std::vector<TGuid>&, const TDestroyChunkLocationsOptions&)); UNIMPLEMENTED_METHOD(TFuture<TResurrectChunkLocationsResult>, ResurrectChunkLocations, (const TString&, const std::vector<TGuid>&, const TResurrectChunkLocationsOptions&)); UNIMPLEMENTED_METHOD(TFuture<TRequestRestartResult>, RequestRestart, (const TString&, const TRequestRestartOptions&)); UNIMPLEMENTED_METHOD(TFuture<void>, SetUserPassword, (const TString&, const TString&, const TString&, const TSetUserPasswordOptions&)); diff --git a/yt/yt/client/hedging/counter.h b/yt/yt/client/hedging/counter.h index 1502df2475..b0633cf73a 100644 --- a/yt/yt/client/hedging/counter.h +++ b/yt/yt/client/hedging/counter.h @@ -1,9 +1,9 @@ #pragma once -#include <yt/yt/core/misc/ref_counted.h> - #include <yt/yt/library/profiling/sensor.h> +#include <library/cpp/yt/memory/ref_counted.h> + #include <util/generic/hash.h> #include <util/generic/string.h> #include <util/generic/vector.h> diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp index b76b740345..c0866b962e 100644 --- a/yt/yt/client/hedging/hedging.cpp +++ b/yt/yt/client/hedging/hedging.cpp @@ -195,7 +195,7 @@ public: UNSUPPORTED_METHOD(TFuture<TMaintenanceId>, AddMaintenance, (EMaintenanceComponent, const TString&, EMaintenanceType, const TString&, const TAddMaintenanceOptions&)); UNSUPPORTED_METHOD(TFuture<TMaintenanceCounts>, RemoveMaintenance, (EMaintenanceComponent, const TString&, const TMaintenanceFilter&, const TRemoveMaintenanceOptions&)); UNSUPPORTED_METHOD(TFuture<TDisableChunkLocationsResult>, DisableChunkLocations, (const TString&, const std::vector<TGuid>&, const TDisableChunkLocationsOptions&)); - UNSUPPORTED_METHOD(TFuture<TDestroyChunkLocationsResult>, DestroyChunkLocations, (const TString&, const std::vector<TGuid>&, const TDestroyChunkLocationsOptions&)); + UNSUPPORTED_METHOD(TFuture<TDestroyChunkLocationsResult>, DestroyChunkLocations, (const TString&, bool, const std::vector<TGuid>&, const TDestroyChunkLocationsOptions&)); UNSUPPORTED_METHOD(TFuture<TResurrectChunkLocationsResult>, ResurrectChunkLocations, (const TString&, const std::vector<TGuid>&, const TResurrectChunkLocationsOptions&)); UNSUPPORTED_METHOD(TFuture<TRequestRestartResult>, RequestRestart, (const TString&, const TRequestRestartOptions&)); UNSUPPORTED_METHOD(TFuture<TPullRowsResult>, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&)); diff --git a/yt/yt/client/queue_client/public.h b/yt/yt/client/queue_client/public.h index df1761c324..eb5cf3f289 100644 --- a/yt/yt/client/queue_client/public.h +++ b/yt/yt/client/queue_client/public.h @@ -2,7 +2,8 @@ #include <yt/yt/core/misc/common.h> #include <yt/yt/core/misc/error_code.h> -#include <yt/yt/core/misc/ref_counted.h> + +#include <library/cpp/yt/memory/ref_counted.h> namespace NYT::NQueueClient { diff --git a/yt/yt/client/scheduler/public.h b/yt/yt/client/scheduler/public.h index 7de0092bbc..d6f3da28f9 100644 --- a/yt/yt/client/scheduler/public.h +++ b/yt/yt/client/scheduler/public.h @@ -128,6 +128,8 @@ DEFINE_ENUM(EAbortReason, ((Abandoned) ( 49)) // TODO(ignat): is it actually a scheduling type of abortion? ((JobSettlementTimedOut) ( 50)) + ((NonexistentPoolTree) ( 51)) + ((WrongSchedulingSegmentModule) ( 52)) ((SchedulingFirst) (100)) ((SchedulingTimeout) (101)) ((SchedulingResourceOvercommit) (102)) diff --git a/yt/yt/client/table_client/logical_type.h b/yt/yt/client/table_client/logical_type.h index 807b851e0c..686d449dc0 100644 --- a/yt/yt/client/table_client/logical_type.h +++ b/yt/yt/client/table_client/logical_type.h @@ -7,7 +7,7 @@ #include <yt/yt/core/yson/public.h> #include <yt/yt/core/ytree/public.h> -#include <yt/yt/core/misc/ref_counted.h> +#include <library/cpp/yt/memory/ref_counted.h> #include <library/cpp/yt/misc/enum.h> diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h index 49c221d8ad..87d4752c03 100644 --- a/yt/yt/client/unittests/mock/client.h +++ b/yt/yt/client/unittests/mock/client.h @@ -553,6 +553,7 @@ public: MOCK_METHOD(TFuture<TDestroyChunkLocationsResult>, DestroyChunkLocations, ( const TString& nodeAddress, + bool recoverUnlinkedDisks, const std::vector<TGuid>& locationUuids, const TDestroyChunkLocationsOptions& options), (override)); diff --git a/yt/yt/client/unittests/replication_progress_ut.cpp b/yt/yt/client/unittests/replication_progress_ut.cpp index 0c00a02d69..e82b1e62a3 100644 --- a/yt/yt/client/unittests/replication_progress_ut.cpp +++ b/yt/yt/client/unittests/replication_progress_ut.cpp @@ -406,5 +406,136 @@ INSTANTIATE_TEST_SUITE_P( //////////////////////////////////////////////////////////////////////////////// +class TReplicationProgressSerialization + : public ::testing::Test + , public ::testing::WithParamInterface<std::tuple< + const char*, + const char*>> +{ }; + +TEST_P(TReplicationProgressSerialization, Simple) +{ + const auto& params = GetParam(); + auto progress = ConvertTo<TReplicationProgress>(TYsonStringBuf(std::get<0>(params))); + auto expected = TString(std::get<1>(params)); + + auto result = ToString(progress); + + EXPECT_EQ(result, expected) + << "progresses: " << std::get<0>(params) << std::endl + << "expected: " << expected << std::endl + << "actual: " << result << std::endl; +} + +INSTANTIATE_TEST_SUITE_P( + TReplicationProgressSerialization, + TReplicationProgressSerialization, + ::testing::Values( + std::make_tuple( + "{segments=[{lower_key=[];timestamp=1}];upper_key=[<type=max>#]}", + "{Segments: [<[], 1>], UpperKey: [0#<Max>]}"), + std::make_tuple( + "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2}];upper_key=[<type=max>#]}", + "{Segments: [<[], 1>, <[0#1], 2>], UpperKey: [0#<Max>]}"), + std::make_tuple( + "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2};{lower_key=[2];timestamp=3}];upper_key=[<type=max>#]}", + "{Segments: [<[], 1>, <[0#1], 2>, <[0#2], 3>], UpperKey: [0#<Max>]}"), + std::make_tuple( + "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2};{lower_key=[2];timestamp=3};{lower_key=[3];timestamp=4};{lower_key=[4];timestamp=5};" + "{lower_key=[5];timestamp=6};{lower_key=[6];timestamp=7};{lower_key=[7];timestamp=8};{lower_key=[8];timestamp=9}];upper_key=[<type=max>#]}", + "{Segments: [<[], 1>, <[0#1], 2>, <[0#2], 3>, <[0#3], 4>, <[0#4], 5>, <[0#5], 6>, <[0#6], 7>, <[0#7], 8>, <[0#8], 9>], UpperKey: [0#<Max>]}") +)); + +//////////////////////////////////////////////////////////////////////////////// + +class TReplicationProgressProjectedSerialization + : public ::testing::Test + , public ::testing::WithParamInterface<std::tuple< + const char*, + const char*, + const char*, + const char*>> +{ }; + +TEST_P(TReplicationProgressProjectedSerialization, Simple) +{ + const auto& params = GetParam(); + auto progress = ConvertTo<TReplicationProgress>(TYsonStringBuf(std::get<0>(params))); + auto from = ConvertTo<TUnversionedOwningRow>(TYsonStringBuf(std::get<1>(params))); + auto to = ConvertTo<TUnversionedOwningRow>(TYsonStringBuf(std::get<2>(params))); + auto expected = TString(std::get<3>(params)); + + TStringBuilder builder; + FormatValue(&builder, progress, {}, {{from, to}}); + auto result = builder.Flush(); + + EXPECT_EQ(result, expected) + << "progresses: " << std::get<0>(params) << std::endl + << "from: " << std::get<1>(params) << std::endl + << "to: " << std::get<2>(params) << std::endl + << "expected: " << expected << std::endl + << "actual: " << result << std::endl; +} + +INSTANTIATE_TEST_SUITE_P( + TReplicationProgressProjectedSerialization, + TReplicationProgressProjectedSerialization, + ::testing::Values( + std::make_tuple( + "{segments=[{lower_key=[];timestamp=1}];upper_key=[<type=max>#]}", + "[0]", + "[1]", + "{Segments: [<[], 1>], UpperKey: [0#<Max>]}"), + std::make_tuple( + "{segments=[{lower_key=[];timestamp=1}];upper_key=[<type=max>#]}", + "[0]", + "[<type=max>#]", + "{Segments: [<[], 1>], UpperKey: [0#<Max>]}"), + std::make_tuple( + "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2}];upper_key=[<type=max>#]}", + "[0]", + "[1]", + "{Segments: [<[], 1>, <[0#1], 2>], UpperKey: [0#<Max>]}"), + std::make_tuple( + "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2}];upper_key=[<type=max>#]}", + "[1]", + "[2]", + "{Segments: [<[], 1>, <[0#1], 2>], UpperKey: [0#<Max>]}"), + std::make_tuple( + "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2}];upper_key=[<type=max>#]}", + "[1]", + "[<type=max>#]", + "{Segments: [<[], 1>, <[0#1], 2>], UpperKey: [0#<Max>]}"), + std::make_tuple( + "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2};{lower_key=[2];timestamp=3}];upper_key=[<type=max>#]}", + "[]", + "[1]", + "{Segments: [<[], 1>, <[0#1], 2>, ...], UpperKey: [0#<Max>]}"), + std::make_tuple( + "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2};{lower_key=[2];timestamp=3}];upper_key=[<type=max>#]}", + "[1]", + "[2]", + "{Segments: [<[], 1>, <[0#1], 2>, <[0#2], 3>], UpperKey: [0#<Max>]}"), + std::make_tuple( + "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2};{lower_key=[2];timestamp=3}];upper_key=[<type=max>#]}", + "[2]", + "[3]", + "{Segments: [<[], 1>, ..., <[0#2], 3>], UpperKey: [0#<Max>]}"), + std::make_tuple( + "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2};{lower_key=[2];timestamp=3}];upper_key=[<type=max>#]}", + "[2]", + "[<type=max>#]", + "{Segments: [<[], 1>, ..., <[0#2], 3>], UpperKey: [0#<Max>]}"), + std::make_tuple( + "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2};{lower_key=[2];timestamp=3};{lower_key=[3];timestamp=4};{lower_key=[4];timestamp=5};" + "{lower_key=[5];timestamp=6};{lower_key=[6];timestamp=7};{lower_key=[7];timestamp=8};{lower_key=[8];timestamp=9}];upper_key=[<type=max>#]}", + "[5]", + "[6]", + "{Segments: [<[], 1>, ..., <[0#5], 6>, <[0#6], 7>, ...], UpperKey: [0#<Max>]}") +)); + + +//////////////////////////////////////////////////////////////////////////////// + } // namespace } // namespace NYT::NChaosClient diff --git a/yt/yt/core/actions/future.h b/yt/yt/core/actions/future.h index 35f7ed0524..fc993062c8 100644 --- a/yt/yt/core/actions/future.h +++ b/yt/yt/core/actions/future.h @@ -472,7 +472,7 @@ public: * If the value is set before the call to #handlered, then * #handler is discarded. */ - bool OnCanceled(TCallback<void (const TError&)> handler) const; + bool OnCanceled(TCallback<void(const TError&)> handler) const; //! Converts promise into future. operator TFuture<T>() const; diff --git a/yt/yt/core/concurrency/execution_stack.cpp b/yt/yt/core/concurrency/execution_stack.cpp index 15e3b7ec57..3fcf1a11d4 100644 --- a/yt/yt/core/concurrency/execution_stack.cpp +++ b/yt/yt/core/concurrency/execution_stack.cpp @@ -1,8 +1,6 @@ #include "execution_stack.h" #include "private.h" -#include <yt/yt/core/misc/ref_tracked.h> - #if defined(_unix_) # include <sys/mman.h> # include <limits.h> @@ -16,6 +14,7 @@ #include <yt/yt/core/misc/object_pool.h> #include <library/cpp/yt/memory/ref.h> +#include <library/cpp/yt/memory/ref_tracked.h> #include <library/cpp/yt/misc/tls.h> diff --git a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp index 2ebe055242..e5bcbc9775 100644 --- a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp +++ b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp @@ -16,6 +16,8 @@ #include <yt/yt/library/profiling/sensor.h> +#include <library/cpp/yt/containers/intrusive_linked_list.h> + #include <library/cpp/yt/memory/public.h> #include <library/cpp/yt/misc/tls.h> @@ -34,6 +36,7 @@ inline const NLogging::TLogger Logger("FairShareThreadPool"); namespace { +DECLARE_REFCOUNTED_CLASS(TBucketMapping) DECLARE_REFCOUNTED_CLASS(TTwoLevelFairShareQueue) DECLARE_REFCOUNTED_CLASS(TBucket) @@ -243,7 +246,7 @@ bool operator < (const TEnqueuedTime& lhs, const TEnqueuedTime& rhs) //////////////////////////////////////////////////////////////////////////////// // Data for scheduling on the first level. -struct TExecutionPool +struct TExecutionPool final : public THeapItemBase<TExecutionPool> { const TString PoolName; @@ -256,8 +259,6 @@ struct TExecutionPool const TEventTimer ExecTimeCounter; const TEventTimer TotalTimeCounter; const NProfiling::TTimeCounter CumulativeTimeCounter; - // Execution pool is retained for some after last usage to flush profiling counters. - TCpuInstant LastUsageTime = 0; // Action count is used to decide whether to reset excess time or not. size_t ActionCountInQueue = 0; @@ -265,11 +266,14 @@ struct TExecutionPool TCpuDuration NextUpdateWeightInstant = 0; double InverseWeight = 1.0; TCpuDuration ExcessTime = 0; - int BucketRefs = 0; TPriorityQueue<TBucket> ActiveBucketsHeap; TCpuDuration LastBucketExcessTime = 0; + TIntrusiveLinkedListNode<TExecutionPool> LinkedListNode; + // Execution pool is retained for some after last usage to flush profiling counters. + TCpuInstant LastUsageTime = 0; + TExecutionPool(TString poolName, const TProfiler& profiler) : PoolName(std::move(poolName)) , BucketCounter(profiler.Summary("/buckets")) @@ -287,6 +291,8 @@ bool operator < (const TExecutionPool& lhs, const TExecutionPool& rhs) return lhs.ExcessTime < rhs.ExcessTime; } +using TExecutionPoolPtr = ::NYT::TIntrusivePtr<TExecutionPool>; + //////////////////////////////////////////////////////////////////////////////// // Data for scheduling on the second level. @@ -296,7 +302,7 @@ struct TBucketBase const TString PoolName; TRingQueue<TAction> ActionQueue; - TExecutionPool* Pool = nullptr; + TExecutionPoolPtr Pool = nullptr; TCpuDuration ExcessTime = 0; @@ -321,7 +327,7 @@ class TBucket , public TBucketBase { public: - TBucket(TString bucketName, TString poolName, TTwoLevelFairShareQueuePtr parent) + TBucket(TString bucketName, TString poolName, TBucketMappingPtr parent) : TBucketBase(std::move(bucketName), std::move(poolName)) , Parent_(std::move(parent)) { } @@ -363,50 +369,51 @@ public: { } private: - const TTwoLevelFairShareQueuePtr Parent_; + const TBucketMappingPtr Parent_; }; DEFINE_REFCOUNTED_TYPE(TBucket) //////////////////////////////////////////////////////////////////////////////// -DEFINE_ENUM(ERequest, - (None) - (EndExecute) - (FetchNext) -); - -class TTwoLevelFairShareQueue +class TBucketMapping : public TRefCounted - , protected TNotifyManager { public: - using TWaitTimeObserver = ITwoLevelFairShareThreadPool::TWaitTimeObserver; + struct TExecutionPoolToListNode + { + auto operator() (TExecutionPool* pool) const + { + return &pool->LinkedListNode; + } + }; - TTwoLevelFairShareQueue( - TIntrusivePtr<NThreading::TEventCount> callbackEventCount, - const TString& threadNamePrefix, - const TNewTwoLevelFairShareThreadPoolOptions& options) - : TNotifyManager(std::move(callbackEventCount), GetThreadTags(threadNamePrefix), options.PollingPeriod) - , ThreadNamePrefix_(threadNamePrefix) - , Profiler_(TProfiler{"/fair_share_queue"} - .WithHot()) - , CumulativeSchedulingTimeCounter_(Profiler_.TimeCounter("/time/scheduling_cumulative")) - , PoolWeightProvider_(options.PoolWeightProvider) - , PoolRetentionTime_(options.PoolRetentionTime) - , VerboseLogging_(options.VerboseLogging) + using TPoolQueue = TIntrusiveLinkedList<TExecutionPool, TExecutionPoolToListNode>; + + explicit TBucketMapping(TDuration poolRetentionTime) + : PoolRetentionTime_(poolRetentionTime) { } - ~TTwoLevelFairShareQueue() + ~TBucketMapping() { - Shutdown(); - } + auto guard = Guard(MappingLock_); - void Configure(int threadCount) - { - ThreadCount_.store(threadCount); + while (RetainPoolQueue_.GetSize() > 0) { + auto* frontPool = RetainPoolQueue_.GetFront(); + + auto poolIt = PoolMapping_.find(frontPool->PoolName); + YT_ASSERT(poolIt != PoolMapping_.end() && poolIt->second == frontPool); + PoolMapping_.erase(poolIt); + + RetainPoolQueue_.PopFront(); + NYT::DestroyRefCounted(frontPool); + } } + virtual TProfiler GetPoolProfiler(const TString& poolName) = 0; + + virtual void Invoke(TClosure callback, TBucket* bucket) = 0; + // GetInvoker is protected by mapping lock (can be sharded). IInvokerPtr GetInvoker(const TString& poolName, const TString& bucketName) { @@ -419,6 +426,7 @@ public: if (!bucket) { bucket = New<TBucket>(bucketName, poolName, MakeStrong(this)); bucketIt->second = bucket.Get(); + bucket->Pool = GetOrRegisterPool(bucket->PoolName); } return bucket; @@ -427,25 +435,191 @@ public: // GetInvoker is protected by mapping lock (can be sharded). void RemoveBucket(TBucket* bucket) { - { - auto guard = Guard(MappingLock_); - auto bucketIt = BucketMapping_.find(std::make_pair(bucket->PoolName, bucket->BucketName)); + auto guard = Guard(MappingLock_); + auto bucketIt = BucketMapping_.find(std::make_pair(bucket->PoolName, bucket->BucketName)); + + if (bucketIt != BucketMapping_.end() && bucketIt->second == bucket) { + BucketMapping_.erase(bucketIt); + } + + // Detach under lock. + auto* poolDangerousPtr = bucket->Pool.Release(); - if (bucketIt != BucketMapping_.end() && bucketIt->second == bucket) { - BucketMapping_.erase(bucketIt); + // Do not want use NewWithDeleter and keep pointer to TTwoLevelFairShareQueue in each execution pool. + if (NYT::GetRefCounter(poolDangerousPtr)->Unref(1)) { + auto poolsToRemove = DetachPool(poolDangerousPtr); + guard.Release(); + + while (poolsToRemove.GetSize() > 0) { + auto* frontPool = poolsToRemove.GetFront(); + poolsToRemove.PopFront(); + NYT::DestroyRefCounted(frontPool); } } + } + + TExecutionPoolPtr GetOrRegisterPool(TString poolName) + { + VERIFY_SPINLOCK_AFFINITY(MappingLock_); + + auto [mappingIt, inserted] = PoolMapping_.emplace(poolName, nullptr); + if (!inserted) { + YT_ASSERT(mappingIt->second->PoolName == poolName); + + auto* pool = mappingIt->second; + // If RetainPoolQueue_ contains only one element its LinkedListNode will be null. + // Determine that pool is in RetainPoolQueue_ by checking its ref count. + if (NYT::GetRefCounter(pool)->GetRefCount() == 0) { + RetainPoolQueue_.Remove(pool); + pool->LinkedListNode = {}; + + YT_LOG_TRACE("Restoring pool (PoolName: %v)", pool->PoolName); + } + + YT_LOG_TRACE("Reusing pool (PoolName: %v)", pool->PoolName); - // Using non atomic Pool pointer is safe because it is set once and RemoveBucket cannot be - // concurrently executed with ConsumeInvokeQueue. - // Pool is nullptr when bucket was created but no actions were invoked. - if (auto* pool = bucket->Pool) { - UnlinkBucketQueue_.Enqueue(pool); + return pool; + } else { + YT_LOG_TRACE("Creating pool (PoolName: %v)", poolName); + auto pool = New<TExecutionPool>(poolName, GetPoolProfiler(poolName)); + mappingIt->second = pool.Get(); + + return pool; } } + TPoolQueue DetachPool(TExecutionPool* pool) + { + VERIFY_SPINLOCK_AFFINITY(MappingLock_); + + YT_LOG_TRACE("Removing pool (PoolName: %v)", pool->PoolName); + + auto currentInstant = GetCpuInstant(); + pool->LastUsageTime = currentInstant; + + // Items in RetainPoolQueue_ are ordered by LastUsageTime. + // When pool is used again it is removed from RetainPoolQueue_. + RetainPoolQueue_.PushBack(pool); + return ProceedRetainQueue(currentInstant); + } + + TPoolQueue ProceedRetainQueue(TCpuInstant currentInstant) + { + VERIFY_SPINLOCK_AFFINITY(MappingLock_); + + YT_LOG_TRACE("ProceedRetainQueue (Size: %v)", RetainPoolQueue_.GetSize()); + + TPoolQueue poolsToRemove; + + while (RetainPoolQueue_.GetSize() > 0) { + auto* frontPool = RetainPoolQueue_.GetFront(); + + auto lastUsageTime = frontPool->LastUsageTime; + if (CpuDurationToDuration(currentInstant - lastUsageTime) < PoolRetentionTime_) { + break; + } + + YT_LOG_TRACE("Destroing pool (PoolName: %v)", frontPool->PoolName); + + auto poolIt = PoolMapping_.find(frontPool->PoolName); + YT_ASSERT(poolIt != PoolMapping_.end() && poolIt->second == frontPool); + PoolMapping_.erase(poolIt); + + RetainPoolQueue_.PopFront(); + poolsToRemove.PushBack(frontPool); + } + + return poolsToRemove; + } + + void MaybeProceedRetainQueue(TCpuInstant currentInstant) + { + if (!MappingLock_.TryAcquire()) { + return; + } + + auto finally = Finally([&] { + MappingLock_.Release(); + }); + + auto poolsToRemove = ProceedRetainQueue(currentInstant); + MappingLock_.Release(); + finally.Release(); + + while (poolsToRemove.GetSize() > 0) { + auto* frontPool = poolsToRemove.GetFront(); + poolsToRemove.PopFront(); + NYT::DestroyRefCounted(frontPool); + } + } + +private: + const TDuration PoolRetentionTime_; + + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, MappingLock_); + THashMap<std::pair<TString, TString>, TBucket*> BucketMapping_; + THashMap<TString, TExecutionPool*> PoolMapping_; + + TPoolQueue RetainPoolQueue_; +}; + +DEFINE_REFCOUNTED_TYPE(TBucketMapping) + +//////////////////////////////////////////////////////////////////////////////// + +void TBucket::Invoke(TClosure callback) +{ + Parent_->Invoke(std::move(callback), this); +} + +TBucket::~TBucket() +{ + Parent_->RemoveBucket(this); +} + +//////////////////////////////////////////////////////////////////////////////// + +DEFINE_ENUM(ERequest, + (None) + (EndExecute) + (FetchNext) +); + +class TTwoLevelFairShareQueue + : protected TNotifyManager + , public TBucketMapping +{ +public: + using TWaitTimeObserver = ITwoLevelFairShareThreadPool::TWaitTimeObserver; + + TTwoLevelFairShareQueue( + TIntrusivePtr<NThreading::TEventCount> callbackEventCount, + const TString& threadNamePrefix, + const TNewTwoLevelFairShareThreadPoolOptions& options) + : TNotifyManager(std::move(callbackEventCount), GetThreadTags(threadNamePrefix), options.PollingPeriod) + , TBucketMapping(options.PoolRetentionTime) + , ThreadNamePrefix_(threadNamePrefix) + , Profiler_(TProfiler{"/fair_share_queue"} + .WithHot()) + , CumulativeSchedulingTimeCounter_(Profiler_ + .WithTags(GetThreadTags(ThreadNamePrefix_)) + .TimeCounter("/time/scheduling_cumulative")) + , PoolWeightProvider_(options.PoolWeightProvider) + , VerboseLogging_(options.VerboseLogging) + { } + + ~TTwoLevelFairShareQueue() + { + Shutdown(); + } + + void Configure(int threadCount) + { + ThreadCount_.store(threadCount); + } + // Invoke is lock free. - void Invoke(TClosure callback, TBucket* bucket) + void Invoke(TClosure callback, TBucket* bucket) override { if (Stopped_.load()) { return; @@ -573,14 +747,8 @@ private: const TProfiler Profiler_; const NProfiling::TTimeCounter CumulativeSchedulingTimeCounter_; const IPoolWeightProviderPtr PoolWeightProvider_; - const TDuration PoolRetentionTime_; const bool VerboseLogging_; - // TODO(lukyan): Sharded mapping. - YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, MappingLock_); - THashMap<std::pair<TString, TString>, TBucket*> BucketMapping_; - TMpscStack<TExecutionPool*> UnlinkBucketQueue_; - std::atomic<bool> Stopped_ = false; TMpscStack<TAction> InvokeQueue_; char Padding0_[CacheLineSize - sizeof(TMpscStack<TAction>)]; @@ -591,7 +759,6 @@ private: std::array<TThreadState, TThreadPoolBase::MaxThreadCount> ThreadStates_; - THashMap<TString, std::unique_ptr<TExecutionPool>> PoolMapping_; TPriorityQueue<TExecutionPool> ActivePoolsHeap_; TCpuDuration LastPoolExcessTime_ = 0; TPriorityQueue<TEnqueuedTime> WaitHeap_; @@ -604,21 +771,9 @@ private: std::atomic<bool> IsWaitTimeObserverSet_; TWaitTimeObserver WaitTimeObserver_; - TExecutionPool* GetOrRegisterPool(TString poolName) + TProfiler GetPoolProfiler(const TString& poolName) override { - VERIFY_SPINLOCK_AFFINITY(MainLock_); - - auto [mappingIt, inserted] = PoolMapping_.emplace(poolName, nullptr); - if (!inserted) { - YT_ASSERT(mappingIt->second->PoolName == poolName); - } else { - YT_LOG_TRACE("Creating pool (PoolName: %v)", poolName); - mappingIt->second = std::make_unique<TExecutionPool>( - poolName, - Profiler_.WithTags(GetBucketTags(ThreadNamePrefix_, poolName))); - } - - return mappingIt->second.get(); + return Profiler_.WithTags(GetBucketTags(ThreadNamePrefix_, poolName)); } Y_NO_INLINE void ConsumeInvokeQueue() @@ -631,12 +786,10 @@ private: InvokeQueue_.DequeueAll(true, [&] (auto& action) { auto* bucket = action.BucketHolder.Get(); - if (bucket->Pool == nullptr) { - bucket->Pool = GetOrRegisterPool(bucket->PoolName); - bucket->Pool->BucketRefs++; - } + auto* pool = bucket->Pool.Get(); + + YT_VERIFY(!pool->LinkedListNode.Next && !pool->LinkedListNode.Prev); - auto* pool = bucket->Pool; if (!pool->GetPositionInHeap()) { // ExcessTime can be greater than last pool excess time // if the pool is "currently executed" and reschedules action. @@ -690,25 +843,6 @@ private: }); } - Y_NO_INLINE void ProcessUnlinkedBuckets(TCpuInstant currentInstant) - { - UnlinkBucketQueue_.FilterElements([&] (TExecutionPool* pool) { - YT_ASSERT(pool->BucketRefs > 0); - if (pool->BucketRefs == 1) { - auto lastUsageTime = pool->LastUsageTime; - if (CpuDurationToDuration(currentInstant - lastUsageTime) < PoolRetentionTime_) { - return true; - } - auto poolIt = PoolMapping_.find(pool->PoolName); - YT_ASSERT(poolIt != PoolMapping_.end() && poolIt->second.get() == pool); - PoolMapping_.erase(poolIt); - } else { - --pool->BucketRefs; - } - return false; - }); - } - void ServeBeginExecute(TThreadState* threadState, TCpuInstant currentInstant, TAction action) { VERIFY_SPINLOCK_AFFINITY(MainLock_); @@ -722,7 +856,7 @@ private: threadState->Action = std::move(action); } - void ServeEndExecute(TThreadState* threadState, TCpuInstant cpuInstant) + void ServeEndExecute(TThreadState* threadState, TCpuInstant /*cpuInstant*/) { VERIFY_SPINLOCK_AFFINITY(MainLock_); @@ -741,8 +875,6 @@ private: auto& pool = *bucket->Pool; YT_ASSERT(pool.PoolName == bucket->PoolName); - pool.LastUsageTime = cpuInstant; - // LastActionsInQueue is used to update SizeCounter outside lock. threadState->LastActionsInQueue = --pool.ActionCountInQueue; @@ -755,7 +887,7 @@ private: { VERIFY_SPINLOCK_AFFINITY(MainLock_); - auto* pool = bucket->Pool; + auto* pool = bucket->Pool.Get(); if (PoolWeightProvider_ && pool->NextUpdateWeightInstant < currentInstant) { pool->NextUpdateWeightInstant = currentInstant + DurationToCpuDuration(TDuration::Seconds(1)); @@ -887,8 +1019,6 @@ private: ConsumeInvokeQueue(); - ProcessUnlinkedBuckets(currentInstant); - int fetchedActions = 0; int otherActionCount = 0; @@ -974,7 +1104,7 @@ private: auto finally = Finally([&] { auto bucketToUndef = std::move(threadState.BucketToUnref); if (bucketToUndef) { - auto* pool = bucketToUndef->Pool; + auto* pool = bucketToUndef->Pool.Get(); pool->SizeCounter.Record(threadState.LastActionsInQueue); pool->DequeuedCounter.Increment(1); pool->ExecTimeCounter.Record(threadState.TimeFromStart); @@ -991,6 +1121,10 @@ private: } CumulativeSchedulingTimeCounter_.Add(CpuDurationToDuration(GetCpuInstant() - cpuInstant)); + + if (!fetchNext) { + MaybeProceedRetainQueue(cpuInstant); + } }); auto& request = threadState.Request; @@ -1043,18 +1177,6 @@ DEFINE_REFCOUNTED_TYPE(TTwoLevelFairShareQueue) //////////////////////////////////////////////////////////////////////////////// -void TBucket::Invoke(TClosure callback) -{ - Parent_->Invoke(std::move(callback), this); -} - -TBucket::~TBucket() -{ - Parent_->RemoveBucket(this); -} - -//////////////////////////////////////////////////////////////////////////////// - class TFairShareThread : public TSchedulerThread { diff --git a/yt/yt/core/concurrency/thread_pool_poller.cpp b/yt/yt/core/concurrency/thread_pool_poller.cpp index f486003cc1..f93d507673 100644 --- a/yt/yt/core/concurrency/thread_pool_poller.cpp +++ b/yt/yt/core/concurrency/thread_pool_poller.cpp @@ -10,12 +10,13 @@ #include <yt/yt/core/misc/collection_helpers.h> #include <yt/yt/core/misc/proc.h> #include <yt/yt/core/misc/mpsc_stack.h> -#include <yt/yt/core/misc/ref_tracked.h> #include <yt/yt/core/profiling/tscp.h> #include <library/cpp/yt/threading/notification_handle.h> +#include <library/cpp/yt/memory/ref_tracked.h> + #include <util/system/thread.h> #include <util/thread/lfqueue.h> diff --git a/yt/yt/core/concurrency/throughput_throttler.cpp b/yt/yt/core/concurrency/throughput_throttler.cpp index f598f5190d..bdfe1d1724 100644 --- a/yt/yt/core/concurrency/throughput_throttler.cpp +++ b/yt/yt/core/concurrency/throughput_throttler.cpp @@ -477,7 +477,7 @@ IReconfigurableThroughputThrottlerPtr CreateNamedReconfigurableThroughputThrottl //////////////////////////////////////////////////////////////////////////////// class TUnlimitedThroughputThrottler - : public IThroughputThrottler + : public IReconfigurableThroughputThrottler { public: explicit TUnlimitedThroughputThrottler( @@ -552,17 +552,37 @@ public: YT_UNIMPLEMENTED(); } + void Reconfigure(TThroughputThrottlerConfigPtr /*config*/) override + { + VERIFY_THREAD_AFFINITY_ANY(); + } + + void SetLimit(std::optional<double> /*limit*/) override + { + VERIFY_THREAD_AFFINITY_ANY(); + } + + TFuture<void> GetAvailableFuture() override + { + YT_UNIMPLEMENTED(); + } + + TThroughputThrottlerConfigPtr GetConfig() const override + { + YT_UNIMPLEMENTED(); + } + private: NProfiling::TCounter ValueCounter_; NProfiling::TCounter ReleaseCounter_; }; -IThroughputThrottlerPtr GetUnlimitedThrottler() +IReconfigurableThroughputThrottlerPtr GetUnlimitedThrottler() { return LeakyRefCountedSingleton<TUnlimitedThroughputThrottler>(); } -IThroughputThrottlerPtr CreateNamedUnlimitedThroughputThrottler( +IReconfigurableThroughputThrottlerPtr CreateNamedUnlimitedThroughputThrottler( const TString& name, NProfiling::TProfiler profiler) { diff --git a/yt/yt/core/concurrency/throughput_throttler.h b/yt/yt/core/concurrency/throughput_throttler.h index c4dd476a97..df03931b16 100644 --- a/yt/yt/core/concurrency/throughput_throttler.h +++ b/yt/yt/core/concurrency/throughput_throttler.h @@ -129,10 +129,10 @@ IReconfigurableThroughputThrottlerPtr CreateNamedReconfigurableThroughputThrottl NProfiling::TProfiler profiler = {}); //! Returns a throttler that imposes no throughput limit. -IThroughputThrottlerPtr GetUnlimitedThrottler(); +IReconfigurableThroughputThrottlerPtr GetUnlimitedThrottler(); //! Returns a throttler that imposes no throughput limit and profiles throughput. -IThroughputThrottlerPtr CreateNamedUnlimitedThroughputThrottler( +IReconfigurableThroughputThrottlerPtr CreateNamedUnlimitedThroughputThrottler( const TString& name, NProfiling::TProfiler profiler = {}); diff --git a/yt/yt/core/crypto/public.h b/yt/yt/core/crypto/public.h index 2b524c79ef..8e658bd6cf 100644 --- a/yt/yt/core/crypto/public.h +++ b/yt/yt/core/crypto/public.h @@ -2,7 +2,7 @@ #include <yt/yt/core/misc/public.h> -#include <yt/yt/core/misc/intrusive_ptr.h> +#include <library/cpp/yt/memory/intrusive_ptr.h> namespace NYT::NCrypto { diff --git a/yt/yt/core/crypto/tls.cpp b/yt/yt/core/crypto/tls.cpp index 26c89a0823..793b0571a1 100644 --- a/yt/yt/core/crypto/tls.cpp +++ b/yt/yt/core/crypto/tls.cpp @@ -279,7 +279,7 @@ public: .Run(); } - void SubscribePeerDisconnect(TCallback<void ()> cb) override + void SubscribePeerDisconnect(TCallback<void()> cb) override { return Underlying_->SubscribePeerDisconnect(std::move(cb)); } diff --git a/yt/yt/core/logging/log_manager.cpp b/yt/yt/core/logging/log_manager.cpp index b95beb20fb..021f7fc9aa 100644 --- a/yt/yt/core/logging/log_manager.cpp +++ b/yt/yt/core/logging/log_manager.cpp @@ -137,11 +137,6 @@ public: YT_ASSERT(rv >= static_cast<ssize_t>(sizeof(struct inotify_event))); struct inotify_event* event = (struct inotify_event*)buffer; - if (event->mask & IN_ATTRIB) { - YT_LOG_TRACE( - "Watch %v has triggered metadata change (IN_ATTRIB)", - event->wd); - } if (event->mask & IN_DELETE_SELF) { YT_LOG_TRACE( "Watch %v has triggered a deletion (IN_DELETE_SELF)", @@ -201,9 +196,10 @@ public: void Run() { - Callback_(); - // Reinitialize watch to hook to the newly created file. + // Unregister before create a new file. DropWatch(); + Callback_(); + // Register the newly created file. CreateWatch(); } @@ -215,7 +211,7 @@ private: WD_ = inotify_add_watch( FD_, Path_.c_str(), - IN_ATTRIB | IN_DELETE_SELF | IN_MOVE_SELF); + IN_DELETE_SELF | IN_MOVE_SELF); if (WD_ < 0) { YT_LOG_ERROR(TError::FromSystem(errno), "Error registering watch for %v", @@ -892,6 +888,7 @@ private: KeyToCachedWriter_.clear(); WDToNotificationWatch_.clear(); NotificationWatches_.clear(); + InvalidNotificationWatches_.clear(); for (const auto& [name, writerConfig] : config->Writers) { auto typedWriterConfig = ConvertTo<TLogWriterConfigPtr>(writerConfig); @@ -910,12 +907,10 @@ private: if (auto fileWriter = DynamicPointerCast<IFileLogWriter>(writer)) { auto watch = CreateNotificationWatch(config, fileWriter); - if (watch && watch->IsValid()) { - // Watch can fail to initialize if the writer is disabled - // e.g. due to the lack of space. - EmplaceOrCrash(WDToNotificationWatch_, watch->GetWD(), watch.get()); + if (watch) { + RegisterNotificatonWatch(watch.get()); + NotificationWatches_.push_back(std::move(watch)); } - NotificationWatches_.push_back(std::move(watch)); } } for (const auto& [_, category] : NameToCategory_) { @@ -992,6 +987,17 @@ private: } } + void RegisterNotificatonWatch(TNotificationWatch* watch) + { + if (watch->IsValid()) { + // Watch can fail to initialize if the writer is disabled + // e.g. due to the lack of space. + EmplaceOrCrash(WDToNotificationWatch_, watch->GetWD(), watch); + } else { + InvalidNotificationWatches_.push_back(watch); + } + } + void WatchWriters() { VERIFY_THREAD_AFFINITY(LoggingThread); @@ -1016,15 +1022,20 @@ private: if (watch->GetWD() != currentWD) { WDToNotificationWatch_.erase(it); - if (watch->GetWD() >= 0) { - // Watch can fail to initialize if the writer is disabled - // e.g. due to the lack of space. - EmplaceOrCrash(WDToNotificationWatch_, watch->GetWD(), watch); - } + RegisterNotificatonWatch(watch); } previousWD = currentWD; } + // Handle invalid watches, try to register they again. + { + std::vector<TNotificationWatch*> invalidNotificationWatches; + invalidNotificationWatches.swap(InvalidNotificationWatches_); + for (auto* watch : invalidNotificationWatches) { + watch->Run(); + RegisterNotificatonWatch(watch); + } + } } void PushEvent(TLoggerQueueItem&& event) @@ -1453,6 +1464,7 @@ private: std::unique_ptr<TNotificationHandle> NotificationHandle_; std::vector<std::unique_ptr<TNotificationWatch>> NotificationWatches_; THashMap<int, TNotificationWatch*> WDToNotificationWatch_; + std::vector<TNotificationWatch*> InvalidNotificationWatches_; THashMap<TString, TLoggingAnchor*> AnchorMap_; std::atomic<TLoggingAnchor*> FirstAnchor_ = nullptr; diff --git a/yt/yt/core/misc/atomic_ptr.h b/yt/yt/core/misc/atomic_ptr.h index 89104ad7bb..4cd9375bbd 100644 --- a/yt/yt/core/misc/atomic_ptr.h +++ b/yt/yt/core/misc/atomic_ptr.h @@ -1,7 +1,8 @@ #pragma once #include "hazard_ptr.h" -#include "intrusive_ptr.h" + +#include <library/cpp/yt/memory/intrusive_ptr.h> namespace NYT { diff --git a/yt/yt/core/misc/fs.cpp b/yt/yt/core/misc/fs.cpp index 440956f273..a6e0de19b6 100644 --- a/yt/yt/core/misc/fs.cpp +++ b/yt/yt/core/misc/fs.cpp @@ -2,7 +2,6 @@ #include "finally.h" #include <yt/yt/core/logging/log.h> -#include <yt/yt/core/misc/ref_counted.h> #include <yt/yt/core/misc/proc.h> diff --git a/yt/yt/core/misc/intrusive_ptr.h b/yt/yt/core/misc/intrusive_ptr.h deleted file mode 100644 index 0a9b9ab14d..0000000000 --- a/yt/yt/core/misc/intrusive_ptr.h +++ /dev/null @@ -1 +0,0 @@ -#include <library/cpp/yt/memory/intrusive_ptr.h> diff --git a/yt/yt/core/misc/ref_counted.h b/yt/yt/core/misc/ref_counted.h deleted file mode 100644 index 9d5675f4ff..0000000000 --- a/yt/yt/core/misc/ref_counted.h +++ /dev/null @@ -1 +0,0 @@ -#include <library/cpp/yt/memory/ref_counted.h> diff --git a/yt/yt/core/misc/ref_tracked.cpp b/yt/yt/core/misc/ref_tracked.cpp index 3d7551a8f6..b3a7b0a159 100644 --- a/yt/yt/core/misc/ref_tracked.cpp +++ b/yt/yt/core/misc/ref_tracked.cpp @@ -1,6 +1,7 @@ -#include "ref_tracked.h" #include "ref_counted_tracker.h" +#include <library/cpp/yt/memory/ref_tracked.h> + namespace NYT { //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/misc/ref_tracked.h b/yt/yt/core/misc/ref_tracked.h deleted file mode 100644 index 4d94188f9a..0000000000 --- a/yt/yt/core/misc/ref_tracked.h +++ /dev/null @@ -1 +0,0 @@ -#include <library/cpp/yt/memory/ref_tracked.h> diff --git a/yt/yt/core/misc/signal_registry.cpp b/yt/yt/core/misc/signal_registry.cpp index 27fe3b250d..05b87590d5 100644 --- a/yt/yt/core/misc/signal_registry.cpp +++ b/yt/yt/core/misc/signal_registry.cpp @@ -4,13 +4,15 @@ #include <library/cpp/yt/system/thread_id.h> +#include <util/generic/algorithm.h> + #include <signal.h> namespace NYT { //////////////////////////////////////////////////////////////////////////////// -std::vector<int> CrashSignals = { +constexpr std::initializer_list<int> CrashSignals{ SIGSEGV, SIGILL, SIGFPE, @@ -20,8 +22,6 @@ std::vector<int> CrashSignals = { #endif }; -//////////////////////////////////////////////////////////////////////////////// - // This variable is used for protecting signal handlers for crash signals from // dumping stuff while another thread is already doing that. Our policy is to let // the first thread dump stuff and make other threads wait. @@ -43,19 +43,22 @@ void TSignalRegistry::SetupSignal(int signal, int flags) YT_VERIFY(signal != SIGALRM); if (!OverrideNonDefaultSignalHandlers_) { - struct sigaction oldact; - YT_VERIFY(sigaction(signal, NULL, &oldact) == 0); - if (reinterpret_cast<void*>(oldact.sa_sigaction) != SIG_DFL) { + struct sigaction sa; + YT_VERIFY(sigaction(signal, nullptr, &sa) == 0); + if (reinterpret_cast<void*>(sa.sa_sigaction) != SIG_DFL) { return; } } - struct sigaction sa; - memset(&sa, 0, sizeof(sa)); - sigemptyset(&sa.sa_mask); - sa.sa_flags = flags | SA_SIGINFO; - sa.sa_sigaction = &Handle; - YT_VERIFY(sigaction(signal, &sa, NULL) == 0); + { + struct sigaction sa; + memset(&sa, 0, sizeof(sa)); + sigemptyset(&sa.sa_mask); + sa.sa_flags = flags | SA_SIGINFO | SA_ONSTACK; + sa.sa_sigaction = &Handle; + YT_VERIFY(sigaction(signal, &sa, nullptr) == 0); + } + Signals_[signal].SetUp = true; }); #else @@ -84,7 +87,7 @@ void TSignalRegistry::PushCallback(int signal, TSignalRegistry::TSignalHandler c #ifdef _unix_ void TSignalRegistry::PushCallback(int signal, std::function<void(int)> callback) { - PushCallback(signal, [callback = std::move(callback)] (int signal, siginfo_t* /* siginfo */, void* /* ucontext */) { + PushCallback(signal, [callback = std::move(callback)] (int signal, siginfo_t* /*siginfo*/, void* /*ucontext*/) { callback(signal); }); } @@ -93,11 +96,11 @@ void TSignalRegistry::PushCallback(int signal, std::function<void(int)> callback void TSignalRegistry::PushCallback(int signal, std::function<void(void)> callback) { #ifdef _unix_ - PushCallback(signal, [callback = std::move(callback)] (int /* signal */, siginfo_t* /* siginfo */, void* /* ucontext */) { + PushCallback(signal, [callback = std::move(callback)] (int /*signal*/, siginfo_t* /*siginfo*/, void* /*ucontext*/) { callback(); }); #else - PushCallback(signal, [callback = std::move(callback)] (int /* signal */) { + PushCallback(signal, [callback = std::move(callback)] (int /*signal*/) { callback(); }); #endif @@ -107,11 +110,13 @@ void TSignalRegistry::PushDefaultSignalHandler(int signal) { PushCallback(signal, [] (int signal) { #ifdef _unix_ - struct sigaction sa; - memset(&sa, 0, sizeof(sa)); - sigemptyset(&sa.sa_mask); - sa.sa_handler = SIG_DFL; - YT_VERIFY(sigaction(signal, &sa, nullptr) == 0); + { + struct sigaction sa; + memset(&sa, 0, sizeof(sa)); + sigemptyset(&sa.sa_mask); + sa.sa_handler = SIG_DFL; + YT_VERIFY(sigaction(signal, &sa, nullptr) == 0); + } YT_VERIFY(raise(signal) == 0); #else @@ -130,13 +135,12 @@ void TSignalRegistry::Handle(int signal) auto* self = Get(); if (self->EnableCrashSignalProtection_ && - std::find(CrashSignals.begin(), CrashSignals.end(), signal) != CrashSignals.end()) { + Find(CrashSignals, signal) != CrashSignals.end()) + { // For crash signals we try pretty hard to prevent simultaneous execution of // several crash handlers. - auto currentThreadId = GetSequentialThreadId(); auto expectedCrashingThreadId = InvalidSequentialThreadId; - if (!CrashingThreadId.compare_exchange_strong(expectedCrashingThreadId, currentThreadId)) { // We've already entered the signal handler. What should we do? if (currentThreadId == expectedCrashingThreadId) { diff --git a/yt/yt/core/misc/signal_registry.h b/yt/yt/core/misc/signal_registry.h index c0fe057289..23dbdeb950 100644 --- a/yt/yt/core/misc/signal_registry.h +++ b/yt/yt/core/misc/signal_registry.h @@ -16,35 +16,35 @@ constexpr int AllCrashSignals = -1; class TSignalRegistry { public: - //! Flag enabling mechanism which protects multiple crash signal handlers from simultaneous + //! Enables a mechanism which protects multiple crash signal handlers from simultaneous //! execution. DEFINE_BYVAL_RW_PROPERTY(bool, EnableCrashSignalProtection, true); - //! Flag preventing us to override user custom signal handlers. + //! Prevents from overriding user custom signal handlers. DEFINE_BYVAL_RW_PROPERTY(bool, OverrideNonDefaultSignalHandlers, true); +public: #ifdef _unix_ using TSignalHandler = std::function<void(int, siginfo_t*, void*)>; #else using TSignalHandler = std::function<void(int)>; #endif -public: static TSignalRegistry* Get(); - //! Setup our handler that invokes registered callbacks in order. + //! Sets up our handler that invokes registered callbacks in order. //! Flags has same meaning as sa_flags in sigaction(2). Use this method if you need certain flags. //! By default any signal touched by PushCallback(...) will be set up with default flags. void SetupSignal(int signal, int flags = 0); - //! Add simple callback which should be called for signal. Different signatures are supported for convenience. + //! Adds a simple callback which should be called for signal. Different signatures are supported for convenience. void PushCallback(int signal, std::function<void(void)> callback); #ifdef _unix_ void PushCallback(int signal, std::function<void(int)> callback); #endif void PushCallback(int signal, TSignalHandler callback); - //! Add default signal handler which is called after invoking our custom handlers. + //! Adds the default signal handler which is called after invoking our custom handlers. //! NB: this handler restores default signal handler as a side-effect. Use it only //! when default handler terminates the program. void PushDefaultSignalHandler(int signal); diff --git a/yt/yt/core/misc/unittests/ref_counted_tracker_ut.cpp b/yt/yt/core/misc/unittests/ref_counted_tracker_ut.cpp index 538f03989f..faf1352a5b 100644 --- a/yt/yt/core/misc/unittests/ref_counted_tracker_ut.cpp +++ b/yt/yt/core/misc/unittests/ref_counted_tracker_ut.cpp @@ -6,15 +6,15 @@ #include <yt/yt/core/misc/blob.h> #include <yt/yt/core/misc/protobuf_helpers.h> -#include <yt/yt/core/misc/ref_counted.h> #include <yt/yt/core/misc/ref_counted_tracker.h> -#include <yt/yt/core/misc/ref_tracked.h> #include <yt/yt/core/actions/future.h> #include <yt/yt/core/concurrency/action_queue.h> #include <library/cpp/yt/memory/new.h> +#include <library/cpp/yt/memory/ref_tracked.h> +#include <library/cpp/yt/memory/ref_counted.h> namespace NYT { namespace { diff --git a/yt/yt/core/net/connection.h b/yt/yt/core/net/connection.h index e52de86abb..8512e5bc7e 100644 --- a/yt/yt/core/net/connection.h +++ b/yt/yt/core/net/connection.h @@ -4,11 +4,12 @@ #include <yt/yt/core/concurrency/async_stream.h> -#include <yt/yt/core/misc/ref_counted.h> #include <yt/yt/core/misc/proc.h> #include <yt/yt/core/net/address.h> +#include <library/cpp/yt/memory/ref_counted.h> + namespace NYT::NNet { //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/net/packet_connection.h b/yt/yt/core/net/packet_connection.h index 36c21175c1..a4450df1e3 100644 --- a/yt/yt/core/net/packet_connection.h +++ b/yt/yt/core/net/packet_connection.h @@ -2,10 +2,10 @@ #include "public.h" -#include <yt/yt/core/misc/ref_counted.h> - #include <yt/yt/core/net/address.h> +#include <library/cpp/yt/memory/ref_counted.h> + namespace NYT::NNet { //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/net/public.h b/yt/yt/core/net/public.h index bb1b74c615..05d3ec03de 100644 --- a/yt/yt/core/net/public.h +++ b/yt/yt/core/net/public.h @@ -2,7 +2,7 @@ #include <yt/yt/core/misc/public.h> -#include <yt/yt/core/misc/intrusive_ptr.h> +#include <library/cpp/yt/memory/intrusive_ptr.h> namespace NYT::NNet { diff --git a/yt/yt/core/rpc/config.cpp b/yt/yt/core/rpc/config.cpp index 14143a6585..ec29c3b88c 100644 --- a/yt/yt/core/rpc/config.cpp +++ b/yt/yt/core/rpc/config.cpp @@ -38,6 +38,20 @@ void TServiceCommonConfig::Register(TRegistrar registrar) //////////////////////////////////////////////////////////////////////////////// +void TServiceCommonDynamicConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("enable_per_user_profiling", &TThis::EnablePerUserProfiling) + .Default(); + registrar.Parameter("histogram_timer_profiling", &TThis::HistogramTimerProfiling) + .Default(); + registrar.Parameter("code_counting", &TThis::EnableErrorCodeCounting) + .Default(); + registrar.Parameter("tracing_mode", &TThis::TracingMode) + .Default(); +} + +//////////////////////////////////////////////////////////////////////////////// + void TServerConfig::Register(TRegistrar registrar) { registrar.Parameter("services", &TThis::Services) @@ -46,6 +60,14 @@ void TServerConfig::Register(TRegistrar registrar) //////////////////////////////////////////////////////////////////////////////// +void TServerDynamicConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("services", &TThis::Services) + .Default(); +} + +//////////////////////////////////////////////////////////////////////////////// + void TServiceConfig::Register(TRegistrar registrar) { registrar.Parameter("enable_per_user_profiling", &TThis::EnablePerUserProfiling) diff --git a/yt/yt/core/rpc/config.h b/yt/yt/core/rpc/config.h index be8c1fdec5..9cdf971d18 100644 --- a/yt/yt/core/rpc/config.h +++ b/yt/yt/core/rpc/config.h @@ -90,6 +90,40 @@ DEFINE_REFCOUNTED_TYPE(TServerConfig) //////////////////////////////////////////////////////////////////////////////// +// Common options shared between all services in one server. +class TServiceCommonDynamicConfig + : public NYTree::TYsonStruct +{ +public: + std::optional<bool> EnablePerUserProfiling; + std::optional<THistogramConfigPtr> HistogramTimerProfiling; + std::optional<bool> EnableErrorCodeCounting; + std::optional<ERequestTracingMode> TracingMode; + + REGISTER_YSON_STRUCT(TServiceCommonDynamicConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TServiceCommonDynamicConfig) + +//////////////////////////////////////////////////////////////////////////////// + +class TServerDynamicConfig + : public TServiceCommonDynamicConfig +{ +public: + THashMap<TString, NYTree::INodePtr> Services; + + REGISTER_YSON_STRUCT(TServerDynamicConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TServerDynamicConfig) + +//////////////////////////////////////////////////////////////////////////////// + class TServiceConfig : public NYTree::TYsonStruct { diff --git a/yt/yt/core/rpc/public.h b/yt/yt/core/rpc/public.h index af88870ad9..5548fa0f22 100644 --- a/yt/yt/core/rpc/public.h +++ b/yt/yt/core/rpc/public.h @@ -107,6 +107,8 @@ DECLARE_REFCOUNTED_CLASS(THistogramExponentialBounds) DECLARE_REFCOUNTED_CLASS(THistogramConfig) DECLARE_REFCOUNTED_CLASS(TServerConfig) DECLARE_REFCOUNTED_CLASS(TServiceCommonConfig) +DECLARE_REFCOUNTED_CLASS(TServerDynamicConfig) +DECLARE_REFCOUNTED_CLASS(TServiceCommonDynamicConfig) DECLARE_REFCOUNTED_CLASS(TServiceConfig) DECLARE_REFCOUNTED_CLASS(TMethodConfig) DECLARE_REFCOUNTED_CLASS(TRetryingChannelConfig) diff --git a/yt/yt/core/rpc/server.h b/yt/yt/core/rpc/server.h index fe4c88a478..e48bf2ef5c 100644 --- a/yt/yt/core/rpc/server.h +++ b/yt/yt/core/rpc/server.h @@ -31,7 +31,9 @@ struct IServer virtual IServicePtr GetServiceOrThrow(const TServiceId& serviceId) const = 0; //! Reconfigures the server on-the-fly. - virtual void Configure(TServerConfigPtr config) = 0; + virtual void Configure(const TServerConfigPtr& config) = 0; + + virtual void OnDynamicConfigChanged(const TServerDynamicConfigPtr& config) = 0; //! Starts the server. /*! diff --git a/yt/yt/core/rpc/server_detail.cpp b/yt/yt/core/rpc/server_detail.cpp index 0f80a07b6b..82ba43afe4 100644 --- a/yt/yt/core/rpc/server_detail.cpp +++ b/yt/yt/core/rpc/server_detail.cpp @@ -768,12 +768,12 @@ void TServerBase::RegisterService(IServicePtr service) auto guard = WriterGuard(ServicesLock_); auto& serviceMap = RealmIdToServiceMap_[serviceId.RealmId]; YT_VERIFY(serviceMap.emplace(serviceId.ServiceName, service).second); - if (Config_) { - auto it = Config_->Services.find(serviceId.ServiceName); - if (it != Config_->Services.end()) { - service->Configure(Config_, it->second); + if (AppliedConfig_) { + auto it = AppliedConfig_->Services.find(serviceId.ServiceName); + if (it != AppliedConfig_->Services.end()) { + service->Configure(AppliedConfig_, it->second); } else { - service->Configure(Config_, nullptr); + service->Configure(AppliedConfig_, nullptr); } } DoRegisterService(service); @@ -866,26 +866,54 @@ IServicePtr TServerBase::GetServiceOrThrow(const TServiceId& serviceId) const return serviceIt->second; } -void TServerBase::Configure(TServerConfigPtr config) +void TServerBase::ApplyConfig() { - auto guard = WriterGuard(ServicesLock_); + VERIFY_SPINLOCK_AFFINITY(ServicesLock_); - // Future services will be configured appropriately. - Config_ = config; + auto newAppliedConfig = New<TServerConfig>(); + newAppliedConfig->EnableErrorCodeCounting = DynamicConfig_->EnableErrorCodeCounting.value_or(StaticConfig_->EnableErrorCodeCounting); + newAppliedConfig->EnablePerUserProfiling = DynamicConfig_->EnablePerUserProfiling.value_or(StaticConfig_->EnablePerUserProfiling); + newAppliedConfig->HistogramTimerProfiling = DynamicConfig_->HistogramTimerProfiling.value_or(StaticConfig_->HistogramTimerProfiling); + newAppliedConfig->Services = StaticConfig_->Services; + + for (const auto& [name, node] : DynamicConfig_->Services) { + newAppliedConfig->Services[name] = node; + } + + AppliedConfig_ = newAppliedConfig; // Apply configuration to all existing services. for (const auto& [realmId, serviceMap] : RealmIdToServiceMap_) { for (const auto& [serviceName, service] : serviceMap) { - auto it = config->Services.find(serviceName); - if (it != config->Services.end()) { - service->Configure(config, it->second); + auto it = AppliedConfig_->Services.find(serviceName); + if (it != AppliedConfig_->Services.end()) { + service->Configure(AppliedConfig_, it->second); } else { - service->Configure(config, nullptr); + service->Configure(AppliedConfig_, nullptr); } } } } +void TServerBase::Configure(const TServerConfigPtr& config) +{ + auto guard = WriterGuard(ServicesLock_); + + // Future services will be configured appropriately. + StaticConfig_ = config; + + ApplyConfig(); +} + +void TServerBase::OnDynamicConfigChanged(const TServerDynamicConfigPtr& config) +{ + auto guard = WriterGuard(ServicesLock_); + + DynamicConfig_ = config; + + ApplyConfig(); +} + void TServerBase::Start() { YT_VERIFY(!Started_); diff --git a/yt/yt/core/rpc/server_detail.h b/yt/yt/core/rpc/server_detail.h index 0a2647b643..aceba1f14e 100644 --- a/yt/yt/core/rpc/server_detail.h +++ b/yt/yt/core/rpc/server_detail.h @@ -3,6 +3,7 @@ #include "authentication_identity.h" #include "server.h" #include "service.h" +#include "config.h" #include <yt/yt/core/logging/log.h> @@ -269,7 +270,8 @@ public: IServicePtr FindService(const TServiceId& serviceId) const override; IServicePtr GetServiceOrThrow(const TServiceId& serviceId) const override; - void Configure(TServerConfigPtr config) override; + void Configure(const TServerConfigPtr& config) override; + void OnDynamicConfigChanged(const TServerDynamicConfigPtr& config) override; void Start() override; TFuture<void> Stop(bool graceful) override; @@ -280,7 +282,9 @@ protected: std::atomic<bool> Started_ = false; YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, ServicesLock_); - TServerConfigPtr Config_; + TServerConfigPtr StaticConfig_; + TServerDynamicConfigPtr DynamicConfig_ = New<TServerDynamicConfig>(); + TServerConfigPtr AppliedConfig_; //! Service name to service. using TServiceMap = THashMap<TString, IServicePtr>; @@ -288,6 +292,8 @@ protected: explicit TServerBase(NLogging::TLogger logger); + void ApplyConfig(); + virtual void DoStart(); virtual TFuture<void> DoStop(bool graceful); diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp index 2fd6f483ad..46567c3fcd 100644 --- a/yt/yt/core/rpc/service_detail.cpp +++ b/yt/yt/core/rpc/service_detail.cpp @@ -2451,6 +2451,7 @@ void TServiceBase::DoConfigure( auto methodConfig = methodIt ? methodIt->second : New<TMethodConfig>(); const auto& descriptor = runtimeInfo->Descriptor; + runtimeInfo->Heavy.store(methodConfig->Heavy.value_or(descriptor.Options.Heavy)); runtimeInfo->QueueSizeLimit.store(methodConfig->QueueSizeLimit.value_or(descriptor.QueueSizeLimit)); runtimeInfo->ConcurrencyLimit.Reconfigure(methodConfig->ConcurrencyLimit.value_or(descriptor.ConcurrencyLimit)); diff --git a/yt/yt/core/threading/thread.cpp b/yt/yt/core/threading/thread.cpp index cc8f9cfae7..4b04e154bd 100644 --- a/yt/yt/core/threading/thread.cpp +++ b/yt/yt/core/threading/thread.cpp @@ -8,10 +8,14 @@ #include <library/cpp/yt/misc/tls.h> +#include <util/generic/size_literals.h> + #ifdef _linux_ #include <sched.h> #endif +#include <signal.h> + namespace NYT::NThreading { //////////////////////////////////////////////////////////////////////////////// @@ -203,6 +207,7 @@ void TThread::ThreadMainTrampoline() CurrentUniqueThreadId = UniqueThreadId_; SetThreadPriority(); + ConfigureSignalHandlerStack(); StartedEvent_.NotifyAll(); @@ -275,6 +280,30 @@ void TThread::SetThreadPriority() #endif } +void TThread::ConfigureSignalHandlerStack() +{ +#if !defined(_asan_enabled_) && !defined(_msan_enabled_) && \ + (_XOPEN_SOURCE >= 500 || \ + /* Since glibc 2.12: */ _POSIX_C_SOURCE >= 200809L || \ + /* glibc <= 2.19: */ _BSD_SOURCE) + YT_THREAD_LOCAL(bool) Configured; + if (std::exchange(Configured, true)) { + return; + } + + // The size of of the custom stack to be provided for signal handlers. + constexpr size_t SignalHandlerStackSize = 16_KB; + YT_THREAD_LOCAL(std::array<char, SignalHandlerStackSize>) Stack; + + stack_t stack{ + .ss_sp = GetTlsRef(Stack).data(), + .ss_flags = 0, + .ss_size = GetTlsRef(Stack).size(), + }; + YT_VERIFY(sigaltstack(&stack, nullptr) == 0); +#endif +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NThreading diff --git a/yt/yt/core/threading/thread.h b/yt/yt/core/threading/thread.h index bc54afdc07..2599f18db2 100644 --- a/yt/yt/core/threading/thread.h +++ b/yt/yt/core/threading/thread.h @@ -72,6 +72,7 @@ private: ::TThread UnderlyingThread_; void SetThreadPriority(); + void ConfigureSignalHandlerStack(); bool StartSlow(); diff --git a/yt/yt/core/yson/protobuf_interop.cpp b/yt/yt/core/yson/protobuf_interop.cpp index a72a630f04..400d4a8f73 100644 --- a/yt/yt/core/yson/protobuf_interop.cpp +++ b/yt/yt/core/yson/protobuf_interop.cpp @@ -1007,22 +1007,20 @@ protected: } switch (config->Utf8Check) { case EUtf8Check::Disable: - break; + return; case EUtf8Check::LogOnFail: - YT_LOG_WARNING("Field %v accepts only valid UTF-8 sequence, but got (%Qv)", + YT_LOG_WARNING("String field got non UTF-8 value (Path: %v, Value: %v)", YPathStack_.GetHumanReadablePath(), data); - break; + return; case EUtf8Check::ThrowOnFail: - THROW_ERROR_EXCEPTION("Field %v accepts only valid UTF-8 sequence, but got (%Qv)", + THROW_ERROR_EXCEPTION("String field got non UTF-8 value (Path: %v, Value: %v)", YPathStack_.GetHumanReadablePath(), data) << TErrorAttribute("ypath", YPathStack_.GetPath()) << TErrorAttribute("proto_field", fieldFullName); - break; } } - }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/yson/unittests/protobuf_yson_ut.cpp b/yt/yt/core/yson/unittests/protobuf_yson_ut.cpp index f995f3f459..87c15c9600 100644 --- a/yt/yt/core/yson/unittests/protobuf_yson_ut.cpp +++ b/yt/yt/core/yson/unittests/protobuf_yson_ut.cpp @@ -1033,7 +1033,7 @@ TEST(TYsonToProtobufTest, ValidUtf8StringCheck) .EndMap(); }; if (option == EUtf8Check::ThrowOnFail) { - EXPECT_THROW_WITH_SUBSTRING(check(), "valid UTF-8"); + EXPECT_THROW_WITH_SUBSTRING(check(), "String field got non UTF-8 value"); } else { EXPECT_NO_THROW(check()); } @@ -1045,7 +1045,7 @@ TEST(TYsonToProtobufTest, ValidUtf8StringCheck) TYsonWriter ysonWriter(&newYsonOutputStream, EYsonFormat::Pretty); if (option == EUtf8Check::ThrowOnFail) { EXPECT_THROW_WITH_SUBSTRING( - WriteProtobufMessage(&ysonWriter, message), "valid UTF-8"); + WriteProtobufMessage(&ysonWriter, message), "String field got non UTF-8 value"); } else { EXPECT_NO_THROW(WriteProtobufMessage(&ysonWriter, message)); } diff --git a/yt/yt/library/column_converters/column_converter.cpp b/yt/yt/library/column_converters/column_converter.cpp index 7442ea0fd4..04239c0344 100644 --- a/yt/yt/library/column_converters/column_converter.cpp +++ b/yt/yt/library/column_converters/column_converter.cpp @@ -64,7 +64,7 @@ IColumnConverterPtr CreateColumnConvert( TConvertedColumnRange TColumnConverters::ConvertRowsToColumns( TRange<TUnversionedRow> rows, - const std::vector<TColumnSchema>& columnSchema) + const THashMap<int, TColumnSchema>& columnSchema) { TConvertedColumnRange convertedColumnsRange; if (rows.size() == 0) { @@ -79,6 +79,10 @@ TConvertedColumnRange TColumnConverters::ConvertRowsToColumns( for (const auto* item = firstRow.Begin(); item != firstRow.End(); ++item) { IdsToIndexes_[item->Id] = std::ssize(ColumnIds_); ColumnIds_.push_back(item->Id); + auto iterSchema = columnSchema.find(item->Id); + if (iterSchema == columnSchema.end()) { + THROW_ERROR_EXCEPTION("Column with Id %v has no schema", item->Id); + } } } IsFirstBatch_ = false; @@ -90,15 +94,18 @@ TConvertedColumnRange TColumnConverters::ConvertRowsToColumns( TUnversionedRowValues rowValues(ColumnIds_.size(), nullptr); for (const auto* item = row.Begin(); item != row.End(); ++item) { auto iter = IdsToIndexes_.find(item->Id); - YT_VERIFY(iter != IdsToIndexes_.end()); + if(iter == IdsToIndexes_.end()) { + THROW_ERROR_EXCEPTION("Column with Id %v has no schema", item->Id); + } rowValues[iter->second] = item; } rowsValues.push_back(std::move(rowValues)); } for (int offset = 0; offset < std::ssize(ColumnIds_); offset++) { - YT_VERIFY(ColumnIds_[offset] >= 0 && ColumnIds_[offset] < std::ssize(columnSchema)); - auto converter = CreateColumnConvert(columnSchema[ColumnIds_[offset]], ColumnIds_[offset], offset); + auto iterSchema = columnSchema.find(ColumnIds_[offset]); + YT_VERIFY(iterSchema != columnSchema.end()); + auto converter = CreateColumnConvert(iterSchema->second, ColumnIds_[offset], offset); auto columns = converter->Convert(rowsValues); convertedColumnsRange.push_back(columns); } diff --git a/yt/yt/library/column_converters/column_converter.h b/yt/yt/library/column_converters/column_converter.h index 46515d8eb4..12d9e2a04c 100644 --- a/yt/yt/library/column_converters/column_converter.h +++ b/yt/yt/library/column_converters/column_converter.h @@ -50,7 +50,7 @@ class TColumnConverters public: TConvertedColumnRange ConvertRowsToColumns( TRange<NTableClient::TUnversionedRow> rows, - const std::vector<NTableClient::TColumnSchema>& columnSchema); + const THashMap<int, NTableClient::TColumnSchema>& columnSchema); private: THashMap<int, int> IdsToIndexes_; std::vector<int> ColumnIds_; diff --git a/yt/yt/library/formats/arrow_writer.cpp b/yt/yt/library/formats/arrow_writer.cpp index d83cb97358..452254f738 100644 --- a/yt/yt/library/formats/arrow_writer.cpp +++ b/yt/yt/library/formats/arrow_writer.cpp @@ -906,17 +906,27 @@ public: { YT_VERIFY(tableSchemas.size() > 0); - auto tableSchema = tableSchemas[0]; - - for (const auto& columnSchema : tableSchema->Columns()) { - NameTable_->GetIdOrRegisterName(columnSchema.Name()); - } - - auto columnCount = NameTable_->GetSize(); - SchemaExistenceFlags_.resize(columnCount, true); - - for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) { - ColumnSchemas_.push_back(GetColumnSchema(tableSchema, columnIndex)); + ColumnConverters_.resize(tableSchemas.size()); + TableNumbers_ = tableSchemas.size(); + ColumnSchemas_.resize(tableSchemas.size()); + + for (int tableIndex = 0; tableIndex < std::ssize(tableSchemas); ++tableIndex) { + for (const auto& columnSchema : tableSchemas[tableIndex]->Columns()) { + auto columnId = NameTable_->GetIdOrRegisterName(columnSchema.Name()); + ColumnSchemas_[tableIndex][columnId] = columnSchema; + } + if (CheckColumnInNameTable(GetRangeIndexColumnId())) { + ColumnSchemas_[tableIndex][GetRangeIndexColumnId()] = GetSystemColumnSchema(NameTable_->GetName(GetRangeIndexColumnId()), GetRangeIndexColumnId()); + } + if (CheckColumnInNameTable(GetRowIndexColumnId())) { + ColumnSchemas_[tableIndex][GetRowIndexColumnId()] = GetSystemColumnSchema(NameTable_->GetName(GetRowIndexColumnId()), GetRowIndexColumnId()); + } + if (CheckColumnInNameTable(GetTableIndexColumnId())) { + ColumnSchemas_[tableIndex][GetTableIndexColumnId()] = GetSystemColumnSchema(NameTable_->GetName(GetTableIndexColumnId()), GetTableIndexColumnId()); + } + if (CheckColumnInNameTable(GetTabletIndexColumnId())) { + ColumnSchemas_[tableIndex][GetTabletIndexColumnId()] = GetSystemColumnSchema(NameTable_->GetName(GetTabletIndexColumnId()), GetTabletIndexColumnId()); + } } } @@ -935,20 +945,56 @@ private: output->Write(&zero, sizeof(zero)); } - void DoWrite(TRange<TUnversionedRow> rows) override + bool CheckColumnInNameTable(int columnIndex) const { - Reset(); - - auto convertedColumns = ColumnConverters_.ConvertRowsToColumns(rows, ColumnSchemas_); + return columnIndex >= 0 && columnIndex < NameTable_->GetSize(); + } + void WriteRowsForSingleTable(TRange<TUnversionedRow> rows, i32 tableIndex) + { + Reset(); + auto convertedColumns = ColumnConverters_[tableIndex].ConvertRowsToColumns(rows, ColumnSchemas_[tableIndex]); std::vector<const TBatchColumn*> rootColumns; rootColumns.reserve( std::ssize(convertedColumns)); for (ssize_t columnIndex = 0; columnIndex < std::ssize(convertedColumns); columnIndex++) { rootColumns.push_back(convertedColumns[columnIndex].RootColumn); } RowCount_ = rows.size(); - PrepareColumns(rootColumns); - Encode(); + PrepareColumns(rootColumns, tableIndex); + Encode(tableIndex); + } + + void DoWrite(TRange<TUnversionedRow> rows) override + { + Reset(); + + ssize_t sameTableRangeBeginRowIndex = 0; + i32 tableIndex = 0; + + for (ssize_t rowIndex = 0; rowIndex < std::ssize(rows); rowIndex++) { + i32 currentTableIndex = -1; + if(TableNumbers_ > 1) { + const auto& elems = rows[rowIndex].Elements(); + for (ssize_t columnIndex = std::ssize(elems) - 1; columnIndex >= 0; --columnIndex) { + if (elems[columnIndex].Id == GetTableIndexColumnId()) { + currentTableIndex = elems[columnIndex].Data.Int64; + break; + } + } + } else { + currentTableIndex = 0; + } + YT_VERIFY(currentTableIndex < TableNumbers_ && currentTableIndex >= 0); + if (tableIndex != currentTableIndex && rowIndex != 0) { + auto currentRows = rows.Slice(sameTableRangeBeginRowIndex, rowIndex); + WriteRowsForSingleTable(currentRows, tableIndex); + sameTableRangeBeginRowIndex = rowIndex; + } + tableIndex = currentTableIndex; + } + + auto currentRows = rows.Slice(sameTableRangeBeginRowIndex, rows.size()); + WriteRowsForSingleTable(currentRows, tableIndex); } void DoWriteBatch(NTableClient::IUnversionedRowBatchPtr rowBatch) override @@ -959,22 +1005,24 @@ private: DoWrite(rowBatch->MaterializeRows()); } else { YT_LOG_DEBUG("Encoding columnar batch (RowCount: %v)", rowBatch->GetRowCount()); + YT_VERIFY(TableNumbers_ == 1); Reset(); RowCount_ = rowBatch->GetRowCount(); - PrepareColumns(columnarBatch->MaterializeColumns()); - Encode(); + PrepareColumns(columnarBatch->MaterializeColumns(), 0); + Encode(0); } } - void Encode() + void Encode(i32 tableIndex) { auto output = GetOutputStream(); - if (IsSchemaMessageNeeded()) { + if (tableIndex != PrevTableIndex_ || IsSchemaMessageNeeded()) { + PrevTableIndex_ = tableIndex; if (!IsFirstBatch_) { RegisterEosMarker(); } ResetArrowDictionaries(); - PrepareSchema(); + PrepareSchema(tableIndex); } IsFirstBatch_ = false; PrepareDictionaryBatches(); @@ -985,13 +1033,14 @@ private: } private: + i32 TableNumbers_ = 0; bool IsFirstBatch_ = true; + i64 PrevTableIndex_ = 0; i64 RowCount_ = 0; std::vector<TTypedBatchColumn> TypedColumns_; - std::vector<TColumnSchema> ColumnSchemas_; + std::vector<THashMap<int, TColumnSchema>> ColumnSchemas_; std::vector<IUnversionedColumnarRowBatch::TDictionaryId> ArrowDictionaryIds_; - std::vector<bool> SchemaExistenceFlags_; - NColumnConverters::TColumnConverters ColumnConverters_; + std::vector<NColumnConverters::TColumnConverters> ColumnConverters_; struct TMessage { @@ -1002,7 +1051,7 @@ private: std::vector<TMessage> Messages_; - bool CheckIfSystemColumnEnable(int columnIndex) + bool CheckIfSystemColumnEnable(int columnIndex) const { return ControlAttributesConfig_->EnableTableIndex && IsTableIndexColumnId(columnIndex) || ControlAttributesConfig_->EnableRangeIndex && IsRangeIndexColumnId(columnIndex) || @@ -1010,34 +1059,30 @@ private: ControlAttributesConfig_->EnableTabletIndex && IsTabletIndexColumnId(columnIndex); } - TColumnSchema GetColumnSchema(NTableClient::TTableSchemaPtr& tableSchema, int columnIndex) + bool IsColumnNeedsToAdd(int columnIndex) const { - YT_VERIFY(columnIndex >= 0); - SchemaExistenceFlags_[columnIndex] = true; - auto name = NameTable_->GetName(columnIndex); - auto columnSchema = tableSchema->FindColumn(name); - if (!columnSchema) { - if (IsSystemColumnId(columnIndex)) { - if (CheckIfSystemColumnEnable(columnIndex)) { - return TColumnSchema(TString(name), EValueType::Int64); - } - SchemaExistenceFlags_[columnIndex] = false; - return TColumnSchema(TString(name), EValueType::Null); - } - THROW_ERROR_EXCEPTION("Column %Qv has no schema", name); + return !IsSystemColumnId(columnIndex) + || (CheckIfSystemColumnEnable(columnIndex) && !IsTableIndexColumnId(columnIndex)); + } + + TColumnSchema GetSystemColumnSchema(TStringBuf name, int columnIndex) + { + if (CheckIfSystemColumnEnable(columnIndex) && !IsTableIndexColumnId(columnIndex)) { + return TColumnSchema(TString(name), EValueType::Int64); } - return *columnSchema; + return TColumnSchema(TString(name), EValueType::Null); } - void PrepareColumns(const TRange<const TBatchColumn*>& batchColumns) + void PrepareColumns(const TRange<const TBatchColumn*>& batchColumns, int tableIndex) { TypedColumns_.reserve(batchColumns.Size()); for (const auto* column : batchColumns) { - if (SchemaExistenceFlags_[column->Id]) { - YT_VERIFY(column->Id >= 0 && column->Id < std::ssize(ColumnSchemas_)); + if(IsColumnNeedsToAdd(column->Id)) { + auto iterSchema = ColumnSchemas_[tableIndex].find(column->Id); + YT_VERIFY(iterSchema != ColumnSchemas_[tableIndex].end()); TypedColumns_.push_back(TTypedBatchColumn{ column, - ColumnSchemas_[column->Id].LogicalType() + iterSchema->second.LogicalType() }); } } @@ -1093,7 +1138,7 @@ private: std::move(bodyWriter)}); } - void PrepareSchema() + void PrepareSchema(i32 tableIndex) { flatbuffers::FlatBufferBuilder flatbufBuilder; @@ -1101,8 +1146,9 @@ private: std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> fieldOffsets; for (int columnIndex = 0; columnIndex < std::ssize(TypedColumns_); columnIndex++) { const auto& typedColumn = TypedColumns_[columnIndex]; - YT_VERIFY(typedColumn.Column->Id >= 0 && typedColumn.Column->Id < std::ssize(ColumnSchemas_)); - auto columnSchema = ColumnSchemas_[typedColumn.Column->Id]; + auto iterSchema = ColumnSchemas_[tableIndex].find(typedColumn.Column->Id); + YT_VERIFY(iterSchema != ColumnSchemas_[tableIndex].end()); + auto columnSchema = iterSchema->second; auto nameOffset = SerializeString(&flatbufBuilder, columnSchema.Name()); auto [typeType, typeOffset] = SerializeColumnType(&flatbufBuilder, columnSchema); @@ -1130,10 +1176,22 @@ private: auto fieldsOffset = flatbufBuilder.CreateVector(fieldOffsets); + std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>> customMetadata; + + if (TableNumbers_ > 1) { + auto keyValueOffsett = org::apache::arrow::flatbuf::CreateKeyValue( + flatbufBuilder, + flatbufBuilder.CreateString("TableId"), + flatbufBuilder.CreateString(std::to_string(tableIndex)) + ); + customMetadata.push_back(keyValueOffsett); + } + auto schemaOffset = org::apache::arrow::flatbuf::CreateSchema( flatbufBuilder, org::apache::arrow::flatbuf::Endianness_Little, - fieldsOffset); + fieldsOffset, + flatbufBuilder.CreateVector(customMetadata)); auto messageOffset = org::apache::arrow::flatbuf::CreateMessage( flatbufBuilder, diff --git a/yt/yt/library/numeric/piecewise_linear_function.h b/yt/yt/library/numeric/piecewise_linear_function.h index 254f4de0d7..f305307760 100644 --- a/yt/yt/library/numeric/piecewise_linear_function.h +++ b/yt/yt/library/numeric/piecewise_linear_function.h @@ -382,8 +382,6 @@ public: public: TLeftToRightTraverser(const TPiecewiseLinearFunction& function, int segmentIndex = 0); - TLeftToRightTraverser(const TLeftToRightTraverser& other) = default; - // See: |TPiecewiseLinearFunction::LeftSegmentAt|. // If |y > x|, |LeftSegmentAt(x)| cannot be called after |LeftSegmentAt(y)|. // If |y >= x|, |LeftSegmentAt(x)| cannot be called after |SegmentAt(y)| or |RightSegmentAt(y)|. diff --git a/yt/yt/library/profiling/solomon/public.h b/yt/yt/library/profiling/solomon/public.h index 6942e8dba7..02bf5ad2c9 100644 --- a/yt/yt/library/profiling/solomon/public.h +++ b/yt/yt/library/profiling/solomon/public.h @@ -1,6 +1,6 @@ #pragma once -#include <yt/yt/core/misc/ref_counted.h> +#include <library/cpp/yt/memory/ref_counted.h> namespace NYT::NProfiling { diff --git a/yt/yt/library/profiling/solomon/sensor_set.h b/yt/yt/library/profiling/solomon/sensor_set.h index fb71767bdd..a873254e06 100644 --- a/yt/yt/library/profiling/solomon/sensor_set.h +++ b/yt/yt/library/profiling/solomon/sensor_set.h @@ -9,11 +9,12 @@ #include <yt/yt/core/profiling/public.h> -#include <yt/yt/core/misc/intrusive_ptr.h> #include <yt/yt/core/misc/error.h> #include <yt/yt/core/ytree/fluent.h> +#include <library/cpp/yt/memory/intrusive_ptr.h> + namespace NYT::NProfiling { //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/library/profiling/unittests/solomon_ut.cpp b/yt/yt/library/profiling/unittests/solomon_ut.cpp index 2efaf3e3cc..ecfc27d6ff 100644 --- a/yt/yt/library/profiling/unittests/solomon_ut.cpp +++ b/yt/yt/library/profiling/unittests/solomon_ut.cpp @@ -1,4 +1,3 @@ -#include "yt/yt/core/misc/ref_counted.h" #include <gtest/gtest.h> #include <gmock/gmock.h> @@ -9,6 +8,8 @@ #include <yt/yt/library/profiling/solomon/registry.h> #include <yt/yt/library/profiling/solomon/remote.h> +#include <library/cpp/yt/memory/ref_counted.h> + #include <util/string/join.h> namespace NYT::NProfiling { diff --git a/yt/yt/library/program/ya.make b/yt/yt/library/program/ya.make index 1d044b5a8d..02da9f2e57 100644 --- a/yt/yt/library/program/ya.make +++ b/yt/yt/library/program/ya.make @@ -24,6 +24,7 @@ PEERDIR( library/cpp/yt/mlock library/cpp/yt/stockpile library/cpp/yt/string + library/cpp/getopt/small ) END() diff --git a/yt/yt/library/syncmap/map.h b/yt/yt/library/syncmap/map.h index 8e442accef..ed1e296f86 100644 --- a/yt/yt/library/syncmap/map.h +++ b/yt/yt/library/syncmap/map.h @@ -2,10 +2,11 @@ #include <yt/yt/core/misc/finally.h> #include <yt/yt/core/misc/hazard_ptr.h> -#include <yt/yt/core/misc/ref_counted.h> #include <library/cpp/yt/threading/spin_lock.h> +#include <library/cpp/yt/memory/ref_counted.h> + #include <util/generic/hash.h> #include <util/generic/noncopyable.h> diff --git a/yt/yt/library/tracing/public.h b/yt/yt/library/tracing/public.h index ae473bb33b..1c20b9c7dc 100644 --- a/yt/yt/library/tracing/public.h +++ b/yt/yt/library/tracing/public.h @@ -1,6 +1,6 @@ #pragma once -#include <yt/yt/core/misc/ref_counted.h> +#include <library/cpp/yt/memory/ref_counted.h> namespace NYT::NTracing { diff --git a/yt/yt/library/tracing/tracer.h b/yt/yt/library/tracing/tracer.h index 8b53efd997..bd2e180735 100644 --- a/yt/yt/library/tracing/tracer.h +++ b/yt/yt/library/tracing/tracer.h @@ -2,10 +2,10 @@ #include "public.h" -#include <yt/yt/core/misc/ref_counted.h> - #include <yt/yt/core/tracing/public.h> +#include <library/cpp/yt/memory/ref_counted.h> + namespace NYT::NTracing { //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 297ee2ae7d..0e8b3fe47a 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -1930,6 +1930,7 @@ message TReqDestroyChunkLocations { required string node_address = 1; repeated NYT.NProto.TGuid location_uuids = 2; + optional bool recover_unlinked_disks = 3 [default = false]; } message TRspDestroyChunkLocations |