aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorAlexSm <alex@ydb.tech>2023-12-27 23:31:58 +0100
committerGitHub <noreply@github.com>2023-12-27 23:31:58 +0100
commitd67bfb4b4b7549081543e87a31bc6cb5c46ac973 (patch)
tree8674f2f1570877cb653e7ddcff37ba00288de15a /yt
parent1f6bef05ed441c3aa2d565ac792b26cded704ac7 (diff)
downloadydb-d67bfb4b4b7549081543e87a31bc6cb5c46ac973.tar.gz
Import libs 4 (#758)
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/client/api/admin_client.h1
-rw-r--r--yt/yt/client/api/delegating_client.cpp3
-rw-r--r--yt/yt/client/api/delegating_client.h1
-rw-r--r--yt/yt/client/api/journal_client.h15
-rw-r--r--yt/yt/client/api/public.h2
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp2
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.h1
-rw-r--r--yt/yt/client/chaos_client/replication_card.cpp106
-rw-r--r--yt/yt/client/chaos_client/replication_card.h28
-rw-r--r--yt/yt/client/driver/admin_commands.cpp8
-rw-r--r--yt/yt/client/driver/admin_commands.h1
-rw-r--r--yt/yt/client/driver/queue_commands.cpp3
-rw-r--r--yt/yt/client/federated/client.cpp2
-rw-r--r--yt/yt/client/hedging/counter.h4
-rw-r--r--yt/yt/client/hedging/hedging.cpp2
-rw-r--r--yt/yt/client/queue_client/public.h3
-rw-r--r--yt/yt/client/scheduler/public.h2
-rw-r--r--yt/yt/client/table_client/logical_type.h2
-rw-r--r--yt/yt/client/unittests/mock/client.h1
-rw-r--r--yt/yt/client/unittests/replication_progress_ut.cpp131
-rw-r--r--yt/yt/core/actions/future.h2
-rw-r--r--yt/yt/core/concurrency/execution_stack.cpp3
-rw-r--r--yt/yt/core/concurrency/new_fair_share_thread_pool.cpp340
-rw-r--r--yt/yt/core/concurrency/thread_pool_poller.cpp3
-rw-r--r--yt/yt/core/concurrency/throughput_throttler.cpp26
-rw-r--r--yt/yt/core/concurrency/throughput_throttler.h4
-rw-r--r--yt/yt/core/crypto/public.h2
-rw-r--r--yt/yt/core/crypto/tls.cpp2
-rw-r--r--yt/yt/core/logging/log_manager.cpp48
-rw-r--r--yt/yt/core/misc/atomic_ptr.h3
-rw-r--r--yt/yt/core/misc/fs.cpp1
-rw-r--r--yt/yt/core/misc/intrusive_ptr.h1
-rw-r--r--yt/yt/core/misc/ref_counted.h1
-rw-r--r--yt/yt/core/misc/ref_tracked.cpp3
-rw-r--r--yt/yt/core/misc/ref_tracked.h1
-rw-r--r--yt/yt/core/misc/signal_registry.cpp50
-rw-r--r--yt/yt/core/misc/signal_registry.h12
-rw-r--r--yt/yt/core/misc/unittests/ref_counted_tracker_ut.cpp4
-rw-r--r--yt/yt/core/net/connection.h3
-rw-r--r--yt/yt/core/net/packet_connection.h4
-rw-r--r--yt/yt/core/net/public.h2
-rw-r--r--yt/yt/core/rpc/config.cpp22
-rw-r--r--yt/yt/core/rpc/config.h34
-rw-r--r--yt/yt/core/rpc/public.h2
-rw-r--r--yt/yt/core/rpc/server.h4
-rw-r--r--yt/yt/core/rpc/server_detail.cpp54
-rw-r--r--yt/yt/core/rpc/server_detail.h10
-rw-r--r--yt/yt/core/rpc/service_detail.cpp1
-rw-r--r--yt/yt/core/threading/thread.cpp29
-rw-r--r--yt/yt/core/threading/thread.h1
-rw-r--r--yt/yt/core/yson/protobuf_interop.cpp10
-rw-r--r--yt/yt/core/yson/unittests/protobuf_yson_ut.cpp4
-rw-r--r--yt/yt/library/column_converters/column_converter.cpp15
-rw-r--r--yt/yt/library/column_converters/column_converter.h2
-rw-r--r--yt/yt/library/formats/arrow_writer.cpp156
-rw-r--r--yt/yt/library/numeric/piecewise_linear_function.h2
-rw-r--r--yt/yt/library/profiling/solomon/public.h2
-rw-r--r--yt/yt/library/profiling/solomon/sensor_set.h3
-rw-r--r--yt/yt/library/profiling/unittests/solomon_ut.cpp3
-rw-r--r--yt/yt/library/program/ya.make1
-rw-r--r--yt/yt/library/syncmap/map.h3
-rw-r--r--yt/yt/library/tracing/public.h2
-rw-r--r--yt/yt/library/tracing/tracer.h4
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto1
64 files changed, 904 insertions, 294 deletions
diff --git a/yt/yt/client/api/admin_client.h b/yt/yt/client/api/admin_client.h
index 2a4b08c0e3..500033b8ea 100644
--- a/yt/yt/client/api/admin_client.h
+++ b/yt/yt/client/api/admin_client.h
@@ -289,6 +289,7 @@ struct IAdminClient
virtual TFuture<TDestroyChunkLocationsResult> DestroyChunkLocations(
const TString& nodeAddress,
+ bool recoverUnlinkedDisks,
const std::vector<TGuid>& locationUuids,
const TDestroyChunkLocationsOptions& options = {}) = 0;
diff --git a/yt/yt/client/api/delegating_client.cpp b/yt/yt/client/api/delegating_client.cpp
index c878d878ad..ac0bec3f0a 100644
--- a/yt/yt/client/api/delegating_client.cpp
+++ b/yt/yt/client/api/delegating_client.cpp
@@ -885,10 +885,11 @@ TFuture<TDisableChunkLocationsResult> TDelegatingClient::DisableChunkLocations(
TFuture<TDestroyChunkLocationsResult> TDelegatingClient::DestroyChunkLocations(
const TString& nodeAddress,
+ bool recoverUnlinkedDisks,
const std::vector<TGuid>& locationUuids,
const TDestroyChunkLocationsOptions& options)
{
- return Underlying_->DestroyChunkLocations(nodeAddress, locationUuids, options);
+ return Underlying_->DestroyChunkLocations(nodeAddress, recoverUnlinkedDisks, locationUuids, options);
}
TFuture<TResurrectChunkLocationsResult> TDelegatingClient::ResurrectChunkLocations(
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h
index 15abcbedec..a8df97651a 100644
--- a/yt/yt/client/api/delegating_client.h
+++ b/yt/yt/client/api/delegating_client.h
@@ -553,6 +553,7 @@ public:
TFuture<TDestroyChunkLocationsResult> DestroyChunkLocations(
const TString& nodeAddress,
+ bool recoverUnlinkedDisks,
const std::vector<TGuid>& locationUuids,
const TDestroyChunkLocationsOptions& options = {}) override;
diff --git a/yt/yt/client/api/journal_client.h b/yt/yt/client/api/journal_client.h
index 0feb298e1d..f1d68f24e7 100644
--- a/yt/yt/client/api/journal_client.h
+++ b/yt/yt/client/api/journal_client.h
@@ -17,6 +17,19 @@ struct TJournalReaderOptions
TJournalReaderConfigPtr Config;
};
+////////////////////////////////////////////////////////////////////////////////
+
+struct IJournalWritesObserver
+ : public TRefCounted
+{
+ virtual void RegisterPayloadWrite(int payloadBytes) = 0;
+ virtual void RegisterJournalWrite(int journalBytes, int mediumBytes) = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IJournalWritesObserver)
+
+////////////////////////////////////////////////////////////////////////////////
+
struct TJournalWriterPerformanceCounters
{
TJournalWriterPerformanceCounters() = default;
@@ -40,6 +53,8 @@ struct TJournalWriterPerformanceCounters
NProfiling::TCounter MediumWrittenBytes;
NProfiling::TCounter IORequestCount;
NProfiling::TCounter JournalWrittenBytes;
+
+ IJournalWritesObserverPtr JournalWritesObserver;
};
struct TJournalWriterOptions
diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h
index c671dea611..7e6c250481 100644
--- a/yt/yt/client/api/public.h
+++ b/yt/yt/client/api/public.h
@@ -110,6 +110,8 @@ DECLARE_REFCOUNTED_TYPE(ITypeErasedRowset)
DECLARE_REFCOUNTED_STRUCT(IPersistentQueueRowset)
DECLARE_REFCOUNTED_STRUCT(TSkynetSharePartsLocations)
+DECLARE_REFCOUNTED_STRUCT(IJournalWritesObserver)
+
struct TConnectionOptions;
using TClientOptions = NAuth::TAuthenticationOptions;
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index 8d0a7530e2..e62188c044 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -1953,12 +1953,14 @@ TFuture<TDisableChunkLocationsResult> TClient::DisableChunkLocations(
TFuture<TDestroyChunkLocationsResult> TClient::DestroyChunkLocations(
const TString& nodeAddress,
+ bool recoverUnlinkedDisks,
const std::vector<TGuid>& locationUuids,
const TDestroyChunkLocationsOptions& /*options*/)
{
auto proxy = CreateApiServiceProxy();
auto req = proxy.DestroyChunkLocations();
+ req->set_recover_unlinked_disks(recoverUnlinkedDisks);
ToProto(req->mutable_node_address(), nodeAddress);
ToProto(req->mutable_location_uuids(), locationUuids);
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h
index 77c1b429f8..28cd077ce9 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.h
+++ b/yt/yt/client/api/rpc_proxy/client_impl.h
@@ -414,6 +414,7 @@ public:
TFuture<TDestroyChunkLocationsResult> DestroyChunkLocations(
const TString& nodeAddress,
+ bool recoverUnlinkedDisks,
const std::vector<TGuid>& locationUuids,
const TDestroyChunkLocationsOptions& options) override;
diff --git a/yt/yt/client/chaos_client/replication_card.cpp b/yt/yt/client/chaos_client/replication_card.cpp
index 6e32041b2e..e94771bba5 100644
--- a/yt/yt/client/chaos_client/replication_card.cpp
+++ b/yt/yt/client/chaos_client/replication_card.cpp
@@ -15,6 +15,53 @@ using namespace NTableClient;
using namespace NTabletClient;
using namespace NTransactionClient;
+namespace NDetail {
+
+void FormatProgressWithProjection(
+ TStringBuilderBase* builder,
+ const TReplicationProgress& replicationProgress,
+ TReplicationProgressProjection replicationProgressProjection)
+{
+ const auto& segments = replicationProgress.Segments;
+ if (segments.empty()) {
+ builder->AppendString("[]");
+ return;
+ }
+
+ auto it = std::upper_bound(
+ segments.begin(),
+ segments.end(),
+ replicationProgressProjection.From,
+ [] (const auto& lhs, const auto& rhs) {
+ return CompareRows(lhs, rhs.LowerKey) <= 0;
+ });
+
+ bool comma = false;
+ builder->AppendChar('[');
+
+ if (it != segments.begin()) {
+ builder->AppendFormat("<%v, %x>", segments[0].LowerKey, segments[0].Timestamp);
+ if (it != std::next(segments.begin())) {
+ builder->AppendString(", ...");
+ }
+ comma = true;
+ }
+
+ for (; it != segments.end() && it->LowerKey <= replicationProgressProjection.To; ++it) {
+ builder->AppendFormat("%v<%v, %x>", comma ? ", " : "", it->LowerKey, it->Timestamp);
+ comma = true;
+ }
+
+ if (it != segments.end()) {
+ builder->AppendString(", ...");
+ }
+
+ builder->AppendChar(']');
+}
+
+} // namespace NDetail
+
+
////////////////////////////////////////////////////////////////////////////////
TReplicationCardFetchOptions::operator size_t() const
@@ -40,13 +87,24 @@ TString ToString(const TReplicationCardFetchOptions& options)
////////////////////////////////////////////////////////////////////////////////
-void FormatValue(TStringBuilderBase* builder, const TReplicationProgress& replicationProgress, TStringBuf /*spec*/)
+void FormatValue(
+ TStringBuilderBase* builder,
+ const TReplicationProgress& replicationProgress,
+ TStringBuf /*spec*/,
+ std::optional<TReplicationProgressProjection> replicationProgressProjection)
{
- builder->AppendFormat("{Segments: %v, UpperKey: %v}",
- MakeFormattableView(replicationProgress.Segments, [] (auto* builder, const auto& segment) {
+ builder->AppendString("{Segments: ");
+ const auto& segments = replicationProgress.Segments;
+ if (!replicationProgressProjection) {
+ builder->AppendFormat("%v", MakeFormattableView(segments, [] (auto* builder, const auto& segment) {
builder->AppendFormat("<%v, %x>", segment.LowerKey, segment.Timestamp);
- }),
- replicationProgress.UpperKey);
+ }));
+ } else {
+ NDetail::FormatProgressWithProjection(builder, replicationProgress, *replicationProgressProjection);
+ }
+
+ builder->AppendFormat(", UpperKey: %v}", replicationProgress.UpperKey);
+
}
TString ToString(const TReplicationProgress& replicationProgress)
@@ -68,16 +126,22 @@ TString ToString(const TReplicaHistoryItem& replicaHistoryItem)
return ToStringViaBuilder(replicaHistoryItem);
}
-void FormatValue(TStringBuilderBase* builder, const TReplicaInfo& replicaInfo, TStringBuf /*spec*/)
+void FormatValue(
+ TStringBuilderBase* builder,
+ const TReplicaInfo& replicaInfo,
+ TStringBuf /*spec*/,
+ std::optional<TReplicationProgressProjection> replicationProgressProjection)
{
- builder->AppendFormat("{ClusterName: %v, ReplicaPath: %v, ContentType: %v, Mode: %v, State: %v, Progress: %v, History: %v}",
+ builder->AppendFormat("{ClusterName: %v, ReplicaPath: %v, ContentType: %v, Mode: %v, State: %v, Progress: ",
replicaInfo.ClusterName,
replicaInfo.ReplicaPath,
replicaInfo.ContentType,
replicaInfo.Mode,
- replicaInfo.State,
- replicaInfo.ReplicationProgress,
- replicaInfo.History);
+ replicaInfo.State);
+
+ FormatValue(builder, replicaInfo.ReplicationProgress, TStringBuf(), replicationProgressProjection);
+
+ builder->AppendFormat(", History: %v}", replicaInfo.History);
}
TString ToString(const TReplicaInfo& replicaInfo)
@@ -85,11 +149,21 @@ TString ToString(const TReplicaInfo& replicaInfo)
return ToStringViaBuilder(replicaInfo);
}
-void FormatValue(TStringBuilderBase* builder, const TReplicationCard& replicationCard, TStringBuf /*spec*/)
+void FormatValue(
+ TStringBuilderBase* builder,
+ const TReplicationCard& replicationCard,
+ TStringBuf /*spec*/,
+ std::optional<TReplicationProgressProjection> replicationProgressProjection)
{
builder->AppendFormat("{Era: %v, Replicas: %v, CoordinatorCellIds: %v, TableId: %v, TablePath: %v, TableClusterName: %v, CurrentTimestamp: %v, CollocationId: %v}",
replicationCard.Era,
- replicationCard.Replicas,
+ MakeFormattableView(
+ replicationCard.Replicas,
+ [&] (TStringBuilderBase* builder, std::pair<const NYT::TGuid, NYT::NChaosClient::TReplicaInfo> replica) {
+ FormatValue(builder, replica.first, TStringBuf());
+ builder->AppendString(": ");
+ FormatValue(builder, replica.second, TStringBuf(), replicationProgressProjection);
+ }),
replicationCard.CoordinatorCellIds,
replicationCard.TableId,
replicationCard.TablePath,
@@ -98,9 +172,13 @@ void FormatValue(TStringBuilderBase* builder, const TReplicationCard& replicatio
replicationCard.ReplicationCardCollocationId);
}
-TString ToString(const TReplicationCard& replicationCard)
+TString ToString(
+ const TReplicationCard& replicationCard,
+ std::optional<TReplicationProgressProjection> replicationProgressProjection)
{
- return ToStringViaBuilder(replicationCard);
+ TStringBuilder builder;
+ FormatValue(&builder, replicationCard, {}, replicationProgressProjection);
+ return builder.Flush();
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/chaos_client/replication_card.h b/yt/yt/client/chaos_client/replication_card.h
index 364f4f6dd6..992815a7d2 100644
--- a/yt/yt/client/chaos_client/replication_card.h
+++ b/yt/yt/client/chaos_client/replication_card.h
@@ -53,6 +53,12 @@ struct TReplicaInfo
int FindHistoryItemIndex(NTransactionClient::TTimestamp timestamp) const;
};
+struct TReplicationProgressProjection
+{
+ NTableClient::TUnversionedRow From;
+ NTableClient::TUnversionedRow To;
+};
+
struct TReplicationCard
: public TRefCounted
{
@@ -91,17 +97,31 @@ TString ToString(const TReplicationCardFetchOptions& options);
///////////////////////////////////////////////////////////////////////////////
-void FormatValue(TStringBuilderBase* builder, const TReplicationProgress& replicationProgress, TStringBuf /*spec*/);
+void FormatValue(
+ TStringBuilderBase* builder,
+ const TReplicationProgress& replicationProgress,
+ TStringBuf /*spec*/,
+ std::optional<TReplicationProgressProjection> replicationProgressProjection = std::nullopt);
TString ToString(const TReplicationProgress& replicationProgress);
void FormatValue(TStringBuilderBase* builder, const TReplicaHistoryItem& replicaHistoryItem, TStringBuf /*spec*/);
TString ToString(const TReplicaHistoryItem& replicaHistoryItem);
-void FormatValue(TStringBuilderBase* builder, const TReplicaInfo& replicaInfo, TStringBuf /*spec*/);
+void FormatValue(
+ TStringBuilderBase* builder,
+ const TReplicaInfo& replicaInfo,
+ TStringBuf /*spec*/,
+ std::optional<TReplicationProgressProjection> replicationProgressProjection = std::nullopt);
TString ToString(const TReplicaInfo& replicaInfo);
-void FormatValue(TStringBuilderBase* builder, const TReplicationCard& replicationCard, TStringBuf /*spec*/);
-TString ToString(const TReplicationCard& replicationCard);
+void FormatValue(
+ TStringBuilderBase* builder,
+ const TReplicationCard& replicationCard,
+ TStringBuf /*spec*/,
+ std::optional<TReplicationProgressProjection> replicationProgressProjection = std::nullopt);
+TString ToString(
+ const TReplicationCard& replicationCard,
+ std::optional<TReplicationProgressProjection> replicationProgressProjection = std::nullopt);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/driver/admin_commands.cpp b/yt/yt/client/driver/admin_commands.cpp
index d7b097c910..7518352825 100644
--- a/yt/yt/client/driver/admin_commands.cpp
+++ b/yt/yt/client/driver/admin_commands.cpp
@@ -497,13 +497,19 @@ void TDisableChunkLocationsCommand::DoExecute(ICommandContextPtr context)
void TDestroyChunkLocationsCommand::Register(TRegistrar registrar)
{
registrar.Parameter("node_address", &TThis::NodeAddress_);
+ registrar.Parameter("recover_unlinked_disks", &TThis::RecoverUnlinkedDisks_)
+ .Default(false);
registrar.Parameter("location_uuids", &TThis::LocationUuids_)
.Default();
}
void TDestroyChunkLocationsCommand::DoExecute(ICommandContextPtr context)
{
- auto result = WaitFor(context->GetClient()->DestroyChunkLocations(NodeAddress_, LocationUuids_, Options))
+ auto result = WaitFor(context->GetClient()->DestroyChunkLocations(
+ NodeAddress_,
+ RecoverUnlinkedDisks_,
+ LocationUuids_,
+ Options))
.ValueOrThrow();
context->ProduceOutputValue(BuildYsonStringFluently()
diff --git a/yt/yt/client/driver/admin_commands.h b/yt/yt/client/driver/admin_commands.h
index 79793930f8..4a52a34bbf 100644
--- a/yt/yt/client/driver/admin_commands.h
+++ b/yt/yt/client/driver/admin_commands.h
@@ -310,6 +310,7 @@ public:
private:
TString NodeAddress_;
+ bool RecoverUnlinkedDisks_;
std::vector<TGuid> LocationUuids_;
void DoExecute(ICommandContextPtr context) override;
diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp
index 2d82e0d22c..e2aaf35e7c 100644
--- a/yt/yt/client/driver/queue_commands.cpp
+++ b/yt/yt/client/driver/queue_commands.cpp
@@ -233,8 +233,7 @@ void TAdvanceConsumerCommand::DoExecute(ICommandContextPtr context)
{
auto transaction = GetTransaction(context);
- WaitFor(transaction->AdvanceConsumer(ConsumerPath, QueuePath, PartitionIndex, OldOffset, NewOffset, /*options*/ {}))
- .ThrowOnError();
+ transaction->AdvanceConsumer(ConsumerPath, QueuePath, PartitionIndex, OldOffset, NewOffset);
if (ShouldCommitTransaction()) {
WaitFor(transaction->Commit())
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index 3e2bf635ad..992331b41e 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -402,7 +402,7 @@ public:
UNIMPLEMENTED_METHOD(TFuture<TMaintenanceId>, AddMaintenance, (EMaintenanceComponent, const TString&, EMaintenanceType, const TString&, const TAddMaintenanceOptions&));
UNIMPLEMENTED_METHOD(TFuture<TMaintenanceCounts>, RemoveMaintenance, (EMaintenanceComponent, const TString&, const TMaintenanceFilter&, const TRemoveMaintenanceOptions&));
UNIMPLEMENTED_METHOD(TFuture<TDisableChunkLocationsResult>, DisableChunkLocations, (const TString&, const std::vector<TGuid>&, const TDisableChunkLocationsOptions&));
- UNIMPLEMENTED_METHOD(TFuture<TDestroyChunkLocationsResult>, DestroyChunkLocations, (const TString&, const std::vector<TGuid>&, const TDestroyChunkLocationsOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<TDestroyChunkLocationsResult>, DestroyChunkLocations, (const TString&, bool, const std::vector<TGuid>&, const TDestroyChunkLocationsOptions&));
UNIMPLEMENTED_METHOD(TFuture<TResurrectChunkLocationsResult>, ResurrectChunkLocations, (const TString&, const std::vector<TGuid>&, const TResurrectChunkLocationsOptions&));
UNIMPLEMENTED_METHOD(TFuture<TRequestRestartResult>, RequestRestart, (const TString&, const TRequestRestartOptions&));
UNIMPLEMENTED_METHOD(TFuture<void>, SetUserPassword, (const TString&, const TString&, const TString&, const TSetUserPasswordOptions&));
diff --git a/yt/yt/client/hedging/counter.h b/yt/yt/client/hedging/counter.h
index 1502df2475..b0633cf73a 100644
--- a/yt/yt/client/hedging/counter.h
+++ b/yt/yt/client/hedging/counter.h
@@ -1,9 +1,9 @@
#pragma once
-#include <yt/yt/core/misc/ref_counted.h>
-
#include <yt/yt/library/profiling/sensor.h>
+#include <library/cpp/yt/memory/ref_counted.h>
+
#include <util/generic/hash.h>
#include <util/generic/string.h>
#include <util/generic/vector.h>
diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp
index b76b740345..c0866b962e 100644
--- a/yt/yt/client/hedging/hedging.cpp
+++ b/yt/yt/client/hedging/hedging.cpp
@@ -195,7 +195,7 @@ public:
UNSUPPORTED_METHOD(TFuture<TMaintenanceId>, AddMaintenance, (EMaintenanceComponent, const TString&, EMaintenanceType, const TString&, const TAddMaintenanceOptions&));
UNSUPPORTED_METHOD(TFuture<TMaintenanceCounts>, RemoveMaintenance, (EMaintenanceComponent, const TString&, const TMaintenanceFilter&, const TRemoveMaintenanceOptions&));
UNSUPPORTED_METHOD(TFuture<TDisableChunkLocationsResult>, DisableChunkLocations, (const TString&, const std::vector<TGuid>&, const TDisableChunkLocationsOptions&));
- UNSUPPORTED_METHOD(TFuture<TDestroyChunkLocationsResult>, DestroyChunkLocations, (const TString&, const std::vector<TGuid>&, const TDestroyChunkLocationsOptions&));
+ UNSUPPORTED_METHOD(TFuture<TDestroyChunkLocationsResult>, DestroyChunkLocations, (const TString&, bool, const std::vector<TGuid>&, const TDestroyChunkLocationsOptions&));
UNSUPPORTED_METHOD(TFuture<TResurrectChunkLocationsResult>, ResurrectChunkLocations, (const TString&, const std::vector<TGuid>&, const TResurrectChunkLocationsOptions&));
UNSUPPORTED_METHOD(TFuture<TRequestRestartResult>, RequestRestart, (const TString&, const TRequestRestartOptions&));
UNSUPPORTED_METHOD(TFuture<TPullRowsResult>, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&));
diff --git a/yt/yt/client/queue_client/public.h b/yt/yt/client/queue_client/public.h
index df1761c324..eb5cf3f289 100644
--- a/yt/yt/client/queue_client/public.h
+++ b/yt/yt/client/queue_client/public.h
@@ -2,7 +2,8 @@
#include <yt/yt/core/misc/common.h>
#include <yt/yt/core/misc/error_code.h>
-#include <yt/yt/core/misc/ref_counted.h>
+
+#include <library/cpp/yt/memory/ref_counted.h>
namespace NYT::NQueueClient {
diff --git a/yt/yt/client/scheduler/public.h b/yt/yt/client/scheduler/public.h
index 7de0092bbc..d6f3da28f9 100644
--- a/yt/yt/client/scheduler/public.h
+++ b/yt/yt/client/scheduler/public.h
@@ -128,6 +128,8 @@ DEFINE_ENUM(EAbortReason,
((Abandoned) ( 49))
// TODO(ignat): is it actually a scheduling type of abortion?
((JobSettlementTimedOut) ( 50))
+ ((NonexistentPoolTree) ( 51))
+ ((WrongSchedulingSegmentModule) ( 52))
((SchedulingFirst) (100))
((SchedulingTimeout) (101))
((SchedulingResourceOvercommit) (102))
diff --git a/yt/yt/client/table_client/logical_type.h b/yt/yt/client/table_client/logical_type.h
index 807b851e0c..686d449dc0 100644
--- a/yt/yt/client/table_client/logical_type.h
+++ b/yt/yt/client/table_client/logical_type.h
@@ -7,7 +7,7 @@
#include <yt/yt/core/yson/public.h>
#include <yt/yt/core/ytree/public.h>
-#include <yt/yt/core/misc/ref_counted.h>
+#include <library/cpp/yt/memory/ref_counted.h>
#include <library/cpp/yt/misc/enum.h>
diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h
index 49c221d8ad..87d4752c03 100644
--- a/yt/yt/client/unittests/mock/client.h
+++ b/yt/yt/client/unittests/mock/client.h
@@ -553,6 +553,7 @@ public:
MOCK_METHOD(TFuture<TDestroyChunkLocationsResult>, DestroyChunkLocations, (
const TString& nodeAddress,
+ bool recoverUnlinkedDisks,
const std::vector<TGuid>& locationUuids,
const TDestroyChunkLocationsOptions& options), (override));
diff --git a/yt/yt/client/unittests/replication_progress_ut.cpp b/yt/yt/client/unittests/replication_progress_ut.cpp
index 0c00a02d69..e82b1e62a3 100644
--- a/yt/yt/client/unittests/replication_progress_ut.cpp
+++ b/yt/yt/client/unittests/replication_progress_ut.cpp
@@ -406,5 +406,136 @@ INSTANTIATE_TEST_SUITE_P(
////////////////////////////////////////////////////////////////////////////////
+class TReplicationProgressSerialization
+ : public ::testing::Test
+ , public ::testing::WithParamInterface<std::tuple<
+ const char*,
+ const char*>>
+{ };
+
+TEST_P(TReplicationProgressSerialization, Simple)
+{
+ const auto& params = GetParam();
+ auto progress = ConvertTo<TReplicationProgress>(TYsonStringBuf(std::get<0>(params)));
+ auto expected = TString(std::get<1>(params));
+
+ auto result = ToString(progress);
+
+ EXPECT_EQ(result, expected)
+ << "progresses: " << std::get<0>(params) << std::endl
+ << "expected: " << expected << std::endl
+ << "actual: " << result << std::endl;
+}
+
+INSTANTIATE_TEST_SUITE_P(
+ TReplicationProgressSerialization,
+ TReplicationProgressSerialization,
+ ::testing::Values(
+ std::make_tuple(
+ "{segments=[{lower_key=[];timestamp=1}];upper_key=[<type=max>#]}",
+ "{Segments: [<[], 1>], UpperKey: [0#<Max>]}"),
+ std::make_tuple(
+ "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2}];upper_key=[<type=max>#]}",
+ "{Segments: [<[], 1>, <[0#1], 2>], UpperKey: [0#<Max>]}"),
+ std::make_tuple(
+ "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2};{lower_key=[2];timestamp=3}];upper_key=[<type=max>#]}",
+ "{Segments: [<[], 1>, <[0#1], 2>, <[0#2], 3>], UpperKey: [0#<Max>]}"),
+ std::make_tuple(
+ "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2};{lower_key=[2];timestamp=3};{lower_key=[3];timestamp=4};{lower_key=[4];timestamp=5};"
+ "{lower_key=[5];timestamp=6};{lower_key=[6];timestamp=7};{lower_key=[7];timestamp=8};{lower_key=[8];timestamp=9}];upper_key=[<type=max>#]}",
+ "{Segments: [<[], 1>, <[0#1], 2>, <[0#2], 3>, <[0#3], 4>, <[0#4], 5>, <[0#5], 6>, <[0#6], 7>, <[0#7], 8>, <[0#8], 9>], UpperKey: [0#<Max>]}")
+));
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TReplicationProgressProjectedSerialization
+ : public ::testing::Test
+ , public ::testing::WithParamInterface<std::tuple<
+ const char*,
+ const char*,
+ const char*,
+ const char*>>
+{ };
+
+TEST_P(TReplicationProgressProjectedSerialization, Simple)
+{
+ const auto& params = GetParam();
+ auto progress = ConvertTo<TReplicationProgress>(TYsonStringBuf(std::get<0>(params)));
+ auto from = ConvertTo<TUnversionedOwningRow>(TYsonStringBuf(std::get<1>(params)));
+ auto to = ConvertTo<TUnversionedOwningRow>(TYsonStringBuf(std::get<2>(params)));
+ auto expected = TString(std::get<3>(params));
+
+ TStringBuilder builder;
+ FormatValue(&builder, progress, {}, {{from, to}});
+ auto result = builder.Flush();
+
+ EXPECT_EQ(result, expected)
+ << "progresses: " << std::get<0>(params) << std::endl
+ << "from: " << std::get<1>(params) << std::endl
+ << "to: " << std::get<2>(params) << std::endl
+ << "expected: " << expected << std::endl
+ << "actual: " << result << std::endl;
+}
+
+INSTANTIATE_TEST_SUITE_P(
+ TReplicationProgressProjectedSerialization,
+ TReplicationProgressProjectedSerialization,
+ ::testing::Values(
+ std::make_tuple(
+ "{segments=[{lower_key=[];timestamp=1}];upper_key=[<type=max>#]}",
+ "[0]",
+ "[1]",
+ "{Segments: [<[], 1>], UpperKey: [0#<Max>]}"),
+ std::make_tuple(
+ "{segments=[{lower_key=[];timestamp=1}];upper_key=[<type=max>#]}",
+ "[0]",
+ "[<type=max>#]",
+ "{Segments: [<[], 1>], UpperKey: [0#<Max>]}"),
+ std::make_tuple(
+ "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2}];upper_key=[<type=max>#]}",
+ "[0]",
+ "[1]",
+ "{Segments: [<[], 1>, <[0#1], 2>], UpperKey: [0#<Max>]}"),
+ std::make_tuple(
+ "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2}];upper_key=[<type=max>#]}",
+ "[1]",
+ "[2]",
+ "{Segments: [<[], 1>, <[0#1], 2>], UpperKey: [0#<Max>]}"),
+ std::make_tuple(
+ "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2}];upper_key=[<type=max>#]}",
+ "[1]",
+ "[<type=max>#]",
+ "{Segments: [<[], 1>, <[0#1], 2>], UpperKey: [0#<Max>]}"),
+ std::make_tuple(
+ "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2};{lower_key=[2];timestamp=3}];upper_key=[<type=max>#]}",
+ "[]",
+ "[1]",
+ "{Segments: [<[], 1>, <[0#1], 2>, ...], UpperKey: [0#<Max>]}"),
+ std::make_tuple(
+ "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2};{lower_key=[2];timestamp=3}];upper_key=[<type=max>#]}",
+ "[1]",
+ "[2]",
+ "{Segments: [<[], 1>, <[0#1], 2>, <[0#2], 3>], UpperKey: [0#<Max>]}"),
+ std::make_tuple(
+ "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2};{lower_key=[2];timestamp=3}];upper_key=[<type=max>#]}",
+ "[2]",
+ "[3]",
+ "{Segments: [<[], 1>, ..., <[0#2], 3>], UpperKey: [0#<Max>]}"),
+ std::make_tuple(
+ "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2};{lower_key=[2];timestamp=3}];upper_key=[<type=max>#]}",
+ "[2]",
+ "[<type=max>#]",
+ "{Segments: [<[], 1>, ..., <[0#2], 3>], UpperKey: [0#<Max>]}"),
+ std::make_tuple(
+ "{segments=[{lower_key=[];timestamp=1};{lower_key=[1];timestamp=2};{lower_key=[2];timestamp=3};{lower_key=[3];timestamp=4};{lower_key=[4];timestamp=5};"
+ "{lower_key=[5];timestamp=6};{lower_key=[6];timestamp=7};{lower_key=[7];timestamp=8};{lower_key=[8];timestamp=9}];upper_key=[<type=max>#]}",
+ "[5]",
+ "[6]",
+ "{Segments: [<[], 1>, ..., <[0#5], 6>, <[0#6], 7>, ...], UpperKey: [0#<Max>]}")
+));
+
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace
} // namespace NYT::NChaosClient
diff --git a/yt/yt/core/actions/future.h b/yt/yt/core/actions/future.h
index 35f7ed0524..fc993062c8 100644
--- a/yt/yt/core/actions/future.h
+++ b/yt/yt/core/actions/future.h
@@ -472,7 +472,7 @@ public:
* If the value is set before the call to #handlered, then
* #handler is discarded.
*/
- bool OnCanceled(TCallback<void (const TError&)> handler) const;
+ bool OnCanceled(TCallback<void(const TError&)> handler) const;
//! Converts promise into future.
operator TFuture<T>() const;
diff --git a/yt/yt/core/concurrency/execution_stack.cpp b/yt/yt/core/concurrency/execution_stack.cpp
index 15e3b7ec57..3fcf1a11d4 100644
--- a/yt/yt/core/concurrency/execution_stack.cpp
+++ b/yt/yt/core/concurrency/execution_stack.cpp
@@ -1,8 +1,6 @@
#include "execution_stack.h"
#include "private.h"
-#include <yt/yt/core/misc/ref_tracked.h>
-
#if defined(_unix_)
# include <sys/mman.h>
# include <limits.h>
@@ -16,6 +14,7 @@
#include <yt/yt/core/misc/object_pool.h>
#include <library/cpp/yt/memory/ref.h>
+#include <library/cpp/yt/memory/ref_tracked.h>
#include <library/cpp/yt/misc/tls.h>
diff --git a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
index 2ebe055242..e5bcbc9775 100644
--- a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
+++ b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
@@ -16,6 +16,8 @@
#include <yt/yt/library/profiling/sensor.h>
+#include <library/cpp/yt/containers/intrusive_linked_list.h>
+
#include <library/cpp/yt/memory/public.h>
#include <library/cpp/yt/misc/tls.h>
@@ -34,6 +36,7 @@ inline const NLogging::TLogger Logger("FairShareThreadPool");
namespace {
+DECLARE_REFCOUNTED_CLASS(TBucketMapping)
DECLARE_REFCOUNTED_CLASS(TTwoLevelFairShareQueue)
DECLARE_REFCOUNTED_CLASS(TBucket)
@@ -243,7 +246,7 @@ bool operator < (const TEnqueuedTime& lhs, const TEnqueuedTime& rhs)
////////////////////////////////////////////////////////////////////////////////
// Data for scheduling on the first level.
-struct TExecutionPool
+struct TExecutionPool final
: public THeapItemBase<TExecutionPool>
{
const TString PoolName;
@@ -256,8 +259,6 @@ struct TExecutionPool
const TEventTimer ExecTimeCounter;
const TEventTimer TotalTimeCounter;
const NProfiling::TTimeCounter CumulativeTimeCounter;
- // Execution pool is retained for some after last usage to flush profiling counters.
- TCpuInstant LastUsageTime = 0;
// Action count is used to decide whether to reset excess time or not.
size_t ActionCountInQueue = 0;
@@ -265,11 +266,14 @@ struct TExecutionPool
TCpuDuration NextUpdateWeightInstant = 0;
double InverseWeight = 1.0;
TCpuDuration ExcessTime = 0;
- int BucketRefs = 0;
TPriorityQueue<TBucket> ActiveBucketsHeap;
TCpuDuration LastBucketExcessTime = 0;
+ TIntrusiveLinkedListNode<TExecutionPool> LinkedListNode;
+ // Execution pool is retained for some after last usage to flush profiling counters.
+ TCpuInstant LastUsageTime = 0;
+
TExecutionPool(TString poolName, const TProfiler& profiler)
: PoolName(std::move(poolName))
, BucketCounter(profiler.Summary("/buckets"))
@@ -287,6 +291,8 @@ bool operator < (const TExecutionPool& lhs, const TExecutionPool& rhs)
return lhs.ExcessTime < rhs.ExcessTime;
}
+using TExecutionPoolPtr = ::NYT::TIntrusivePtr<TExecutionPool>;
+
////////////////////////////////////////////////////////////////////////////////
// Data for scheduling on the second level.
@@ -296,7 +302,7 @@ struct TBucketBase
const TString PoolName;
TRingQueue<TAction> ActionQueue;
- TExecutionPool* Pool = nullptr;
+ TExecutionPoolPtr Pool = nullptr;
TCpuDuration ExcessTime = 0;
@@ -321,7 +327,7 @@ class TBucket
, public TBucketBase
{
public:
- TBucket(TString bucketName, TString poolName, TTwoLevelFairShareQueuePtr parent)
+ TBucket(TString bucketName, TString poolName, TBucketMappingPtr parent)
: TBucketBase(std::move(bucketName), std::move(poolName))
, Parent_(std::move(parent))
{ }
@@ -363,50 +369,51 @@ public:
{ }
private:
- const TTwoLevelFairShareQueuePtr Parent_;
+ const TBucketMappingPtr Parent_;
};
DEFINE_REFCOUNTED_TYPE(TBucket)
////////////////////////////////////////////////////////////////////////////////
-DEFINE_ENUM(ERequest,
- (None)
- (EndExecute)
- (FetchNext)
-);
-
-class TTwoLevelFairShareQueue
+class TBucketMapping
: public TRefCounted
- , protected TNotifyManager
{
public:
- using TWaitTimeObserver = ITwoLevelFairShareThreadPool::TWaitTimeObserver;
+ struct TExecutionPoolToListNode
+ {
+ auto operator() (TExecutionPool* pool) const
+ {
+ return &pool->LinkedListNode;
+ }
+ };
- TTwoLevelFairShareQueue(
- TIntrusivePtr<NThreading::TEventCount> callbackEventCount,
- const TString& threadNamePrefix,
- const TNewTwoLevelFairShareThreadPoolOptions& options)
- : TNotifyManager(std::move(callbackEventCount), GetThreadTags(threadNamePrefix), options.PollingPeriod)
- , ThreadNamePrefix_(threadNamePrefix)
- , Profiler_(TProfiler{"/fair_share_queue"}
- .WithHot())
- , CumulativeSchedulingTimeCounter_(Profiler_.TimeCounter("/time/scheduling_cumulative"))
- , PoolWeightProvider_(options.PoolWeightProvider)
- , PoolRetentionTime_(options.PoolRetentionTime)
- , VerboseLogging_(options.VerboseLogging)
+ using TPoolQueue = TIntrusiveLinkedList<TExecutionPool, TExecutionPoolToListNode>;
+
+ explicit TBucketMapping(TDuration poolRetentionTime)
+ : PoolRetentionTime_(poolRetentionTime)
{ }
- ~TTwoLevelFairShareQueue()
+ ~TBucketMapping()
{
- Shutdown();
- }
+ auto guard = Guard(MappingLock_);
- void Configure(int threadCount)
- {
- ThreadCount_.store(threadCount);
+ while (RetainPoolQueue_.GetSize() > 0) {
+ auto* frontPool = RetainPoolQueue_.GetFront();
+
+ auto poolIt = PoolMapping_.find(frontPool->PoolName);
+ YT_ASSERT(poolIt != PoolMapping_.end() && poolIt->second == frontPool);
+ PoolMapping_.erase(poolIt);
+
+ RetainPoolQueue_.PopFront();
+ NYT::DestroyRefCounted(frontPool);
+ }
}
+ virtual TProfiler GetPoolProfiler(const TString& poolName) = 0;
+
+ virtual void Invoke(TClosure callback, TBucket* bucket) = 0;
+
// GetInvoker is protected by mapping lock (can be sharded).
IInvokerPtr GetInvoker(const TString& poolName, const TString& bucketName)
{
@@ -419,6 +426,7 @@ public:
if (!bucket) {
bucket = New<TBucket>(bucketName, poolName, MakeStrong(this));
bucketIt->second = bucket.Get();
+ bucket->Pool = GetOrRegisterPool(bucket->PoolName);
}
return bucket;
@@ -427,25 +435,191 @@ public:
// GetInvoker is protected by mapping lock (can be sharded).
void RemoveBucket(TBucket* bucket)
{
- {
- auto guard = Guard(MappingLock_);
- auto bucketIt = BucketMapping_.find(std::make_pair(bucket->PoolName, bucket->BucketName));
+ auto guard = Guard(MappingLock_);
+ auto bucketIt = BucketMapping_.find(std::make_pair(bucket->PoolName, bucket->BucketName));
+
+ if (bucketIt != BucketMapping_.end() && bucketIt->second == bucket) {
+ BucketMapping_.erase(bucketIt);
+ }
+
+ // Detach under lock.
+ auto* poolDangerousPtr = bucket->Pool.Release();
- if (bucketIt != BucketMapping_.end() && bucketIt->second == bucket) {
- BucketMapping_.erase(bucketIt);
+ // Do not want use NewWithDeleter and keep pointer to TTwoLevelFairShareQueue in each execution pool.
+ if (NYT::GetRefCounter(poolDangerousPtr)->Unref(1)) {
+ auto poolsToRemove = DetachPool(poolDangerousPtr);
+ guard.Release();
+
+ while (poolsToRemove.GetSize() > 0) {
+ auto* frontPool = poolsToRemove.GetFront();
+ poolsToRemove.PopFront();
+ NYT::DestroyRefCounted(frontPool);
}
}
+ }
+
+ TExecutionPoolPtr GetOrRegisterPool(TString poolName)
+ {
+ VERIFY_SPINLOCK_AFFINITY(MappingLock_);
+
+ auto [mappingIt, inserted] = PoolMapping_.emplace(poolName, nullptr);
+ if (!inserted) {
+ YT_ASSERT(mappingIt->second->PoolName == poolName);
+
+ auto* pool = mappingIt->second;
+ // If RetainPoolQueue_ contains only one element its LinkedListNode will be null.
+ // Determine that pool is in RetainPoolQueue_ by checking its ref count.
+ if (NYT::GetRefCounter(pool)->GetRefCount() == 0) {
+ RetainPoolQueue_.Remove(pool);
+ pool->LinkedListNode = {};
+
+ YT_LOG_TRACE("Restoring pool (PoolName: %v)", pool->PoolName);
+ }
+
+ YT_LOG_TRACE("Reusing pool (PoolName: %v)", pool->PoolName);
- // Using non atomic Pool pointer is safe because it is set once and RemoveBucket cannot be
- // concurrently executed with ConsumeInvokeQueue.
- // Pool is nullptr when bucket was created but no actions were invoked.
- if (auto* pool = bucket->Pool) {
- UnlinkBucketQueue_.Enqueue(pool);
+ return pool;
+ } else {
+ YT_LOG_TRACE("Creating pool (PoolName: %v)", poolName);
+ auto pool = New<TExecutionPool>(poolName, GetPoolProfiler(poolName));
+ mappingIt->second = pool.Get();
+
+ return pool;
}
}
+ TPoolQueue DetachPool(TExecutionPool* pool)
+ {
+ VERIFY_SPINLOCK_AFFINITY(MappingLock_);
+
+ YT_LOG_TRACE("Removing pool (PoolName: %v)", pool->PoolName);
+
+ auto currentInstant = GetCpuInstant();
+ pool->LastUsageTime = currentInstant;
+
+ // Items in RetainPoolQueue_ are ordered by LastUsageTime.
+ // When pool is used again it is removed from RetainPoolQueue_.
+ RetainPoolQueue_.PushBack(pool);
+ return ProceedRetainQueue(currentInstant);
+ }
+
+ TPoolQueue ProceedRetainQueue(TCpuInstant currentInstant)
+ {
+ VERIFY_SPINLOCK_AFFINITY(MappingLock_);
+
+ YT_LOG_TRACE("ProceedRetainQueue (Size: %v)", RetainPoolQueue_.GetSize());
+
+ TPoolQueue poolsToRemove;
+
+ while (RetainPoolQueue_.GetSize() > 0) {
+ auto* frontPool = RetainPoolQueue_.GetFront();
+
+ auto lastUsageTime = frontPool->LastUsageTime;
+ if (CpuDurationToDuration(currentInstant - lastUsageTime) < PoolRetentionTime_) {
+ break;
+ }
+
+ YT_LOG_TRACE("Destroing pool (PoolName: %v)", frontPool->PoolName);
+
+ auto poolIt = PoolMapping_.find(frontPool->PoolName);
+ YT_ASSERT(poolIt != PoolMapping_.end() && poolIt->second == frontPool);
+ PoolMapping_.erase(poolIt);
+
+ RetainPoolQueue_.PopFront();
+ poolsToRemove.PushBack(frontPool);
+ }
+
+ return poolsToRemove;
+ }
+
+ void MaybeProceedRetainQueue(TCpuInstant currentInstant)
+ {
+ if (!MappingLock_.TryAcquire()) {
+ return;
+ }
+
+ auto finally = Finally([&] {
+ MappingLock_.Release();
+ });
+
+ auto poolsToRemove = ProceedRetainQueue(currentInstant);
+ MappingLock_.Release();
+ finally.Release();
+
+ while (poolsToRemove.GetSize() > 0) {
+ auto* frontPool = poolsToRemove.GetFront();
+ poolsToRemove.PopFront();
+ NYT::DestroyRefCounted(frontPool);
+ }
+ }
+
+private:
+ const TDuration PoolRetentionTime_;
+
+ YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, MappingLock_);
+ THashMap<std::pair<TString, TString>, TBucket*> BucketMapping_;
+ THashMap<TString, TExecutionPool*> PoolMapping_;
+
+ TPoolQueue RetainPoolQueue_;
+};
+
+DEFINE_REFCOUNTED_TYPE(TBucketMapping)
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TBucket::Invoke(TClosure callback)
+{
+ Parent_->Invoke(std::move(callback), this);
+}
+
+TBucket::~TBucket()
+{
+ Parent_->RemoveBucket(this);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+DEFINE_ENUM(ERequest,
+ (None)
+ (EndExecute)
+ (FetchNext)
+);
+
+class TTwoLevelFairShareQueue
+ : protected TNotifyManager
+ , public TBucketMapping
+{
+public:
+ using TWaitTimeObserver = ITwoLevelFairShareThreadPool::TWaitTimeObserver;
+
+ TTwoLevelFairShareQueue(
+ TIntrusivePtr<NThreading::TEventCount> callbackEventCount,
+ const TString& threadNamePrefix,
+ const TNewTwoLevelFairShareThreadPoolOptions& options)
+ : TNotifyManager(std::move(callbackEventCount), GetThreadTags(threadNamePrefix), options.PollingPeriod)
+ , TBucketMapping(options.PoolRetentionTime)
+ , ThreadNamePrefix_(threadNamePrefix)
+ , Profiler_(TProfiler{"/fair_share_queue"}
+ .WithHot())
+ , CumulativeSchedulingTimeCounter_(Profiler_
+ .WithTags(GetThreadTags(ThreadNamePrefix_))
+ .TimeCounter("/time/scheduling_cumulative"))
+ , PoolWeightProvider_(options.PoolWeightProvider)
+ , VerboseLogging_(options.VerboseLogging)
+ { }
+
+ ~TTwoLevelFairShareQueue()
+ {
+ Shutdown();
+ }
+
+ void Configure(int threadCount)
+ {
+ ThreadCount_.store(threadCount);
+ }
+
// Invoke is lock free.
- void Invoke(TClosure callback, TBucket* bucket)
+ void Invoke(TClosure callback, TBucket* bucket) override
{
if (Stopped_.load()) {
return;
@@ -573,14 +747,8 @@ private:
const TProfiler Profiler_;
const NProfiling::TTimeCounter CumulativeSchedulingTimeCounter_;
const IPoolWeightProviderPtr PoolWeightProvider_;
- const TDuration PoolRetentionTime_;
const bool VerboseLogging_;
- // TODO(lukyan): Sharded mapping.
- YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, MappingLock_);
- THashMap<std::pair<TString, TString>, TBucket*> BucketMapping_;
- TMpscStack<TExecutionPool*> UnlinkBucketQueue_;
-
std::atomic<bool> Stopped_ = false;
TMpscStack<TAction> InvokeQueue_;
char Padding0_[CacheLineSize - sizeof(TMpscStack<TAction>)];
@@ -591,7 +759,6 @@ private:
std::array<TThreadState, TThreadPoolBase::MaxThreadCount> ThreadStates_;
- THashMap<TString, std::unique_ptr<TExecutionPool>> PoolMapping_;
TPriorityQueue<TExecutionPool> ActivePoolsHeap_;
TCpuDuration LastPoolExcessTime_ = 0;
TPriorityQueue<TEnqueuedTime> WaitHeap_;
@@ -604,21 +771,9 @@ private:
std::atomic<bool> IsWaitTimeObserverSet_;
TWaitTimeObserver WaitTimeObserver_;
- TExecutionPool* GetOrRegisterPool(TString poolName)
+ TProfiler GetPoolProfiler(const TString& poolName) override
{
- VERIFY_SPINLOCK_AFFINITY(MainLock_);
-
- auto [mappingIt, inserted] = PoolMapping_.emplace(poolName, nullptr);
- if (!inserted) {
- YT_ASSERT(mappingIt->second->PoolName == poolName);
- } else {
- YT_LOG_TRACE("Creating pool (PoolName: %v)", poolName);
- mappingIt->second = std::make_unique<TExecutionPool>(
- poolName,
- Profiler_.WithTags(GetBucketTags(ThreadNamePrefix_, poolName)));
- }
-
- return mappingIt->second.get();
+ return Profiler_.WithTags(GetBucketTags(ThreadNamePrefix_, poolName));
}
Y_NO_INLINE void ConsumeInvokeQueue()
@@ -631,12 +786,10 @@ private:
InvokeQueue_.DequeueAll(true, [&] (auto& action) {
auto* bucket = action.BucketHolder.Get();
- if (bucket->Pool == nullptr) {
- bucket->Pool = GetOrRegisterPool(bucket->PoolName);
- bucket->Pool->BucketRefs++;
- }
+ auto* pool = bucket->Pool.Get();
+
+ YT_VERIFY(!pool->LinkedListNode.Next && !pool->LinkedListNode.Prev);
- auto* pool = bucket->Pool;
if (!pool->GetPositionInHeap()) {
// ExcessTime can be greater than last pool excess time
// if the pool is "currently executed" and reschedules action.
@@ -690,25 +843,6 @@ private:
});
}
- Y_NO_INLINE void ProcessUnlinkedBuckets(TCpuInstant currentInstant)
- {
- UnlinkBucketQueue_.FilterElements([&] (TExecutionPool* pool) {
- YT_ASSERT(pool->BucketRefs > 0);
- if (pool->BucketRefs == 1) {
- auto lastUsageTime = pool->LastUsageTime;
- if (CpuDurationToDuration(currentInstant - lastUsageTime) < PoolRetentionTime_) {
- return true;
- }
- auto poolIt = PoolMapping_.find(pool->PoolName);
- YT_ASSERT(poolIt != PoolMapping_.end() && poolIt->second.get() == pool);
- PoolMapping_.erase(poolIt);
- } else {
- --pool->BucketRefs;
- }
- return false;
- });
- }
-
void ServeBeginExecute(TThreadState* threadState, TCpuInstant currentInstant, TAction action)
{
VERIFY_SPINLOCK_AFFINITY(MainLock_);
@@ -722,7 +856,7 @@ private:
threadState->Action = std::move(action);
}
- void ServeEndExecute(TThreadState* threadState, TCpuInstant cpuInstant)
+ void ServeEndExecute(TThreadState* threadState, TCpuInstant /*cpuInstant*/)
{
VERIFY_SPINLOCK_AFFINITY(MainLock_);
@@ -741,8 +875,6 @@ private:
auto& pool = *bucket->Pool;
YT_ASSERT(pool.PoolName == bucket->PoolName);
- pool.LastUsageTime = cpuInstant;
-
// LastActionsInQueue is used to update SizeCounter outside lock.
threadState->LastActionsInQueue = --pool.ActionCountInQueue;
@@ -755,7 +887,7 @@ private:
{
VERIFY_SPINLOCK_AFFINITY(MainLock_);
- auto* pool = bucket->Pool;
+ auto* pool = bucket->Pool.Get();
if (PoolWeightProvider_ && pool->NextUpdateWeightInstant < currentInstant) {
pool->NextUpdateWeightInstant = currentInstant + DurationToCpuDuration(TDuration::Seconds(1));
@@ -887,8 +1019,6 @@ private:
ConsumeInvokeQueue();
- ProcessUnlinkedBuckets(currentInstant);
-
int fetchedActions = 0;
int otherActionCount = 0;
@@ -974,7 +1104,7 @@ private:
auto finally = Finally([&] {
auto bucketToUndef = std::move(threadState.BucketToUnref);
if (bucketToUndef) {
- auto* pool = bucketToUndef->Pool;
+ auto* pool = bucketToUndef->Pool.Get();
pool->SizeCounter.Record(threadState.LastActionsInQueue);
pool->DequeuedCounter.Increment(1);
pool->ExecTimeCounter.Record(threadState.TimeFromStart);
@@ -991,6 +1121,10 @@ private:
}
CumulativeSchedulingTimeCounter_.Add(CpuDurationToDuration(GetCpuInstant() - cpuInstant));
+
+ if (!fetchNext) {
+ MaybeProceedRetainQueue(cpuInstant);
+ }
});
auto& request = threadState.Request;
@@ -1043,18 +1177,6 @@ DEFINE_REFCOUNTED_TYPE(TTwoLevelFairShareQueue)
////////////////////////////////////////////////////////////////////////////////
-void TBucket::Invoke(TClosure callback)
-{
- Parent_->Invoke(std::move(callback), this);
-}
-
-TBucket::~TBucket()
-{
- Parent_->RemoveBucket(this);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
class TFairShareThread
: public TSchedulerThread
{
diff --git a/yt/yt/core/concurrency/thread_pool_poller.cpp b/yt/yt/core/concurrency/thread_pool_poller.cpp
index f486003cc1..f93d507673 100644
--- a/yt/yt/core/concurrency/thread_pool_poller.cpp
+++ b/yt/yt/core/concurrency/thread_pool_poller.cpp
@@ -10,12 +10,13 @@
#include <yt/yt/core/misc/collection_helpers.h>
#include <yt/yt/core/misc/proc.h>
#include <yt/yt/core/misc/mpsc_stack.h>
-#include <yt/yt/core/misc/ref_tracked.h>
#include <yt/yt/core/profiling/tscp.h>
#include <library/cpp/yt/threading/notification_handle.h>
+#include <library/cpp/yt/memory/ref_tracked.h>
+
#include <util/system/thread.h>
#include <util/thread/lfqueue.h>
diff --git a/yt/yt/core/concurrency/throughput_throttler.cpp b/yt/yt/core/concurrency/throughput_throttler.cpp
index f598f5190d..bdfe1d1724 100644
--- a/yt/yt/core/concurrency/throughput_throttler.cpp
+++ b/yt/yt/core/concurrency/throughput_throttler.cpp
@@ -477,7 +477,7 @@ IReconfigurableThroughputThrottlerPtr CreateNamedReconfigurableThroughputThrottl
////////////////////////////////////////////////////////////////////////////////
class TUnlimitedThroughputThrottler
- : public IThroughputThrottler
+ : public IReconfigurableThroughputThrottler
{
public:
explicit TUnlimitedThroughputThrottler(
@@ -552,17 +552,37 @@ public:
YT_UNIMPLEMENTED();
}
+ void Reconfigure(TThroughputThrottlerConfigPtr /*config*/) override
+ {
+ VERIFY_THREAD_AFFINITY_ANY();
+ }
+
+ void SetLimit(std::optional<double> /*limit*/) override
+ {
+ VERIFY_THREAD_AFFINITY_ANY();
+ }
+
+ TFuture<void> GetAvailableFuture() override
+ {
+ YT_UNIMPLEMENTED();
+ }
+
+ TThroughputThrottlerConfigPtr GetConfig() const override
+ {
+ YT_UNIMPLEMENTED();
+ }
+
private:
NProfiling::TCounter ValueCounter_;
NProfiling::TCounter ReleaseCounter_;
};
-IThroughputThrottlerPtr GetUnlimitedThrottler()
+IReconfigurableThroughputThrottlerPtr GetUnlimitedThrottler()
{
return LeakyRefCountedSingleton<TUnlimitedThroughputThrottler>();
}
-IThroughputThrottlerPtr CreateNamedUnlimitedThroughputThrottler(
+IReconfigurableThroughputThrottlerPtr CreateNamedUnlimitedThroughputThrottler(
const TString& name,
NProfiling::TProfiler profiler)
{
diff --git a/yt/yt/core/concurrency/throughput_throttler.h b/yt/yt/core/concurrency/throughput_throttler.h
index c4dd476a97..df03931b16 100644
--- a/yt/yt/core/concurrency/throughput_throttler.h
+++ b/yt/yt/core/concurrency/throughput_throttler.h
@@ -129,10 +129,10 @@ IReconfigurableThroughputThrottlerPtr CreateNamedReconfigurableThroughputThrottl
NProfiling::TProfiler profiler = {});
//! Returns a throttler that imposes no throughput limit.
-IThroughputThrottlerPtr GetUnlimitedThrottler();
+IReconfigurableThroughputThrottlerPtr GetUnlimitedThrottler();
//! Returns a throttler that imposes no throughput limit and profiles throughput.
-IThroughputThrottlerPtr CreateNamedUnlimitedThroughputThrottler(
+IReconfigurableThroughputThrottlerPtr CreateNamedUnlimitedThroughputThrottler(
const TString& name,
NProfiling::TProfiler profiler = {});
diff --git a/yt/yt/core/crypto/public.h b/yt/yt/core/crypto/public.h
index 2b524c79ef..8e658bd6cf 100644
--- a/yt/yt/core/crypto/public.h
+++ b/yt/yt/core/crypto/public.h
@@ -2,7 +2,7 @@
#include <yt/yt/core/misc/public.h>
-#include <yt/yt/core/misc/intrusive_ptr.h>
+#include <library/cpp/yt/memory/intrusive_ptr.h>
namespace NYT::NCrypto {
diff --git a/yt/yt/core/crypto/tls.cpp b/yt/yt/core/crypto/tls.cpp
index 26c89a0823..793b0571a1 100644
--- a/yt/yt/core/crypto/tls.cpp
+++ b/yt/yt/core/crypto/tls.cpp
@@ -279,7 +279,7 @@ public:
.Run();
}
- void SubscribePeerDisconnect(TCallback<void ()> cb) override
+ void SubscribePeerDisconnect(TCallback<void()> cb) override
{
return Underlying_->SubscribePeerDisconnect(std::move(cb));
}
diff --git a/yt/yt/core/logging/log_manager.cpp b/yt/yt/core/logging/log_manager.cpp
index b95beb20fb..021f7fc9aa 100644
--- a/yt/yt/core/logging/log_manager.cpp
+++ b/yt/yt/core/logging/log_manager.cpp
@@ -137,11 +137,6 @@ public:
YT_ASSERT(rv >= static_cast<ssize_t>(sizeof(struct inotify_event)));
struct inotify_event* event = (struct inotify_event*)buffer;
- if (event->mask & IN_ATTRIB) {
- YT_LOG_TRACE(
- "Watch %v has triggered metadata change (IN_ATTRIB)",
- event->wd);
- }
if (event->mask & IN_DELETE_SELF) {
YT_LOG_TRACE(
"Watch %v has triggered a deletion (IN_DELETE_SELF)",
@@ -201,9 +196,10 @@ public:
void Run()
{
- Callback_();
- // Reinitialize watch to hook to the newly created file.
+ // Unregister before create a new file.
DropWatch();
+ Callback_();
+ // Register the newly created file.
CreateWatch();
}
@@ -215,7 +211,7 @@ private:
WD_ = inotify_add_watch(
FD_,
Path_.c_str(),
- IN_ATTRIB | IN_DELETE_SELF | IN_MOVE_SELF);
+ IN_DELETE_SELF | IN_MOVE_SELF);
if (WD_ < 0) {
YT_LOG_ERROR(TError::FromSystem(errno), "Error registering watch for %v",
@@ -892,6 +888,7 @@ private:
KeyToCachedWriter_.clear();
WDToNotificationWatch_.clear();
NotificationWatches_.clear();
+ InvalidNotificationWatches_.clear();
for (const auto& [name, writerConfig] : config->Writers) {
auto typedWriterConfig = ConvertTo<TLogWriterConfigPtr>(writerConfig);
@@ -910,12 +907,10 @@ private:
if (auto fileWriter = DynamicPointerCast<IFileLogWriter>(writer)) {
auto watch = CreateNotificationWatch(config, fileWriter);
- if (watch && watch->IsValid()) {
- // Watch can fail to initialize if the writer is disabled
- // e.g. due to the lack of space.
- EmplaceOrCrash(WDToNotificationWatch_, watch->GetWD(), watch.get());
+ if (watch) {
+ RegisterNotificatonWatch(watch.get());
+ NotificationWatches_.push_back(std::move(watch));
}
- NotificationWatches_.push_back(std::move(watch));
}
}
for (const auto& [_, category] : NameToCategory_) {
@@ -992,6 +987,17 @@ private:
}
}
+ void RegisterNotificatonWatch(TNotificationWatch* watch)
+ {
+ if (watch->IsValid()) {
+ // Watch can fail to initialize if the writer is disabled
+ // e.g. due to the lack of space.
+ EmplaceOrCrash(WDToNotificationWatch_, watch->GetWD(), watch);
+ } else {
+ InvalidNotificationWatches_.push_back(watch);
+ }
+ }
+
void WatchWriters()
{
VERIFY_THREAD_AFFINITY(LoggingThread);
@@ -1016,15 +1022,20 @@ private:
if (watch->GetWD() != currentWD) {
WDToNotificationWatch_.erase(it);
- if (watch->GetWD() >= 0) {
- // Watch can fail to initialize if the writer is disabled
- // e.g. due to the lack of space.
- EmplaceOrCrash(WDToNotificationWatch_, watch->GetWD(), watch);
- }
+ RegisterNotificatonWatch(watch);
}
previousWD = currentWD;
}
+ // Handle invalid watches, try to register they again.
+ {
+ std::vector<TNotificationWatch*> invalidNotificationWatches;
+ invalidNotificationWatches.swap(InvalidNotificationWatches_);
+ for (auto* watch : invalidNotificationWatches) {
+ watch->Run();
+ RegisterNotificatonWatch(watch);
+ }
+ }
}
void PushEvent(TLoggerQueueItem&& event)
@@ -1453,6 +1464,7 @@ private:
std::unique_ptr<TNotificationHandle> NotificationHandle_;
std::vector<std::unique_ptr<TNotificationWatch>> NotificationWatches_;
THashMap<int, TNotificationWatch*> WDToNotificationWatch_;
+ std::vector<TNotificationWatch*> InvalidNotificationWatches_;
THashMap<TString, TLoggingAnchor*> AnchorMap_;
std::atomic<TLoggingAnchor*> FirstAnchor_ = nullptr;
diff --git a/yt/yt/core/misc/atomic_ptr.h b/yt/yt/core/misc/atomic_ptr.h
index 89104ad7bb..4cd9375bbd 100644
--- a/yt/yt/core/misc/atomic_ptr.h
+++ b/yt/yt/core/misc/atomic_ptr.h
@@ -1,7 +1,8 @@
#pragma once
#include "hazard_ptr.h"
-#include "intrusive_ptr.h"
+
+#include <library/cpp/yt/memory/intrusive_ptr.h>
namespace NYT {
diff --git a/yt/yt/core/misc/fs.cpp b/yt/yt/core/misc/fs.cpp
index 440956f273..a6e0de19b6 100644
--- a/yt/yt/core/misc/fs.cpp
+++ b/yt/yt/core/misc/fs.cpp
@@ -2,7 +2,6 @@
#include "finally.h"
#include <yt/yt/core/logging/log.h>
-#include <yt/yt/core/misc/ref_counted.h>
#include <yt/yt/core/misc/proc.h>
diff --git a/yt/yt/core/misc/intrusive_ptr.h b/yt/yt/core/misc/intrusive_ptr.h
deleted file mode 100644
index 0a9b9ab14d..0000000000
--- a/yt/yt/core/misc/intrusive_ptr.h
+++ /dev/null
@@ -1 +0,0 @@
-#include <library/cpp/yt/memory/intrusive_ptr.h>
diff --git a/yt/yt/core/misc/ref_counted.h b/yt/yt/core/misc/ref_counted.h
deleted file mode 100644
index 9d5675f4ff..0000000000
--- a/yt/yt/core/misc/ref_counted.h
+++ /dev/null
@@ -1 +0,0 @@
-#include <library/cpp/yt/memory/ref_counted.h>
diff --git a/yt/yt/core/misc/ref_tracked.cpp b/yt/yt/core/misc/ref_tracked.cpp
index 3d7551a8f6..b3a7b0a159 100644
--- a/yt/yt/core/misc/ref_tracked.cpp
+++ b/yt/yt/core/misc/ref_tracked.cpp
@@ -1,6 +1,7 @@
-#include "ref_tracked.h"
#include "ref_counted_tracker.h"
+#include <library/cpp/yt/memory/ref_tracked.h>
+
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/misc/ref_tracked.h b/yt/yt/core/misc/ref_tracked.h
deleted file mode 100644
index 4d94188f9a..0000000000
--- a/yt/yt/core/misc/ref_tracked.h
+++ /dev/null
@@ -1 +0,0 @@
-#include <library/cpp/yt/memory/ref_tracked.h>
diff --git a/yt/yt/core/misc/signal_registry.cpp b/yt/yt/core/misc/signal_registry.cpp
index 27fe3b250d..05b87590d5 100644
--- a/yt/yt/core/misc/signal_registry.cpp
+++ b/yt/yt/core/misc/signal_registry.cpp
@@ -4,13 +4,15 @@
#include <library/cpp/yt/system/thread_id.h>
+#include <util/generic/algorithm.h>
+
#include <signal.h>
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
-std::vector<int> CrashSignals = {
+constexpr std::initializer_list<int> CrashSignals{
SIGSEGV,
SIGILL,
SIGFPE,
@@ -20,8 +22,6 @@ std::vector<int> CrashSignals = {
#endif
};
-////////////////////////////////////////////////////////////////////////////////
-
// This variable is used for protecting signal handlers for crash signals from
// dumping stuff while another thread is already doing that. Our policy is to let
// the first thread dump stuff and make other threads wait.
@@ -43,19 +43,22 @@ void TSignalRegistry::SetupSignal(int signal, int flags)
YT_VERIFY(signal != SIGALRM);
if (!OverrideNonDefaultSignalHandlers_) {
- struct sigaction oldact;
- YT_VERIFY(sigaction(signal, NULL, &oldact) == 0);
- if (reinterpret_cast<void*>(oldact.sa_sigaction) != SIG_DFL) {
+ struct sigaction sa;
+ YT_VERIFY(sigaction(signal, nullptr, &sa) == 0);
+ if (reinterpret_cast<void*>(sa.sa_sigaction) != SIG_DFL) {
return;
}
}
- struct sigaction sa;
- memset(&sa, 0, sizeof(sa));
- sigemptyset(&sa.sa_mask);
- sa.sa_flags = flags | SA_SIGINFO;
- sa.sa_sigaction = &Handle;
- YT_VERIFY(sigaction(signal, &sa, NULL) == 0);
+ {
+ struct sigaction sa;
+ memset(&sa, 0, sizeof(sa));
+ sigemptyset(&sa.sa_mask);
+ sa.sa_flags = flags | SA_SIGINFO | SA_ONSTACK;
+ sa.sa_sigaction = &Handle;
+ YT_VERIFY(sigaction(signal, &sa, nullptr) == 0);
+ }
+
Signals_[signal].SetUp = true;
});
#else
@@ -84,7 +87,7 @@ void TSignalRegistry::PushCallback(int signal, TSignalRegistry::TSignalHandler c
#ifdef _unix_
void TSignalRegistry::PushCallback(int signal, std::function<void(int)> callback)
{
- PushCallback(signal, [callback = std::move(callback)] (int signal, siginfo_t* /* siginfo */, void* /* ucontext */) {
+ PushCallback(signal, [callback = std::move(callback)] (int signal, siginfo_t* /*siginfo*/, void* /*ucontext*/) {
callback(signal);
});
}
@@ -93,11 +96,11 @@ void TSignalRegistry::PushCallback(int signal, std::function<void(int)> callback
void TSignalRegistry::PushCallback(int signal, std::function<void(void)> callback)
{
#ifdef _unix_
- PushCallback(signal, [callback = std::move(callback)] (int /* signal */, siginfo_t* /* siginfo */, void* /* ucontext */) {
+ PushCallback(signal, [callback = std::move(callback)] (int /*signal*/, siginfo_t* /*siginfo*/, void* /*ucontext*/) {
callback();
});
#else
- PushCallback(signal, [callback = std::move(callback)] (int /* signal */) {
+ PushCallback(signal, [callback = std::move(callback)] (int /*signal*/) {
callback();
});
#endif
@@ -107,11 +110,13 @@ void TSignalRegistry::PushDefaultSignalHandler(int signal)
{
PushCallback(signal, [] (int signal) {
#ifdef _unix_
- struct sigaction sa;
- memset(&sa, 0, sizeof(sa));
- sigemptyset(&sa.sa_mask);
- sa.sa_handler = SIG_DFL;
- YT_VERIFY(sigaction(signal, &sa, nullptr) == 0);
+ {
+ struct sigaction sa;
+ memset(&sa, 0, sizeof(sa));
+ sigemptyset(&sa.sa_mask);
+ sa.sa_handler = SIG_DFL;
+ YT_VERIFY(sigaction(signal, &sa, nullptr) == 0);
+ }
YT_VERIFY(raise(signal) == 0);
#else
@@ -130,13 +135,12 @@ void TSignalRegistry::Handle(int signal)
auto* self = Get();
if (self->EnableCrashSignalProtection_ &&
- std::find(CrashSignals.begin(), CrashSignals.end(), signal) != CrashSignals.end()) {
+ Find(CrashSignals, signal) != CrashSignals.end())
+ {
// For crash signals we try pretty hard to prevent simultaneous execution of
// several crash handlers.
-
auto currentThreadId = GetSequentialThreadId();
auto expectedCrashingThreadId = InvalidSequentialThreadId;
-
if (!CrashingThreadId.compare_exchange_strong(expectedCrashingThreadId, currentThreadId)) {
// We've already entered the signal handler. What should we do?
if (currentThreadId == expectedCrashingThreadId) {
diff --git a/yt/yt/core/misc/signal_registry.h b/yt/yt/core/misc/signal_registry.h
index c0fe057289..23dbdeb950 100644
--- a/yt/yt/core/misc/signal_registry.h
+++ b/yt/yt/core/misc/signal_registry.h
@@ -16,35 +16,35 @@ constexpr int AllCrashSignals = -1;
class TSignalRegistry
{
public:
- //! Flag enabling mechanism which protects multiple crash signal handlers from simultaneous
+ //! Enables a mechanism which protects multiple crash signal handlers from simultaneous
//! execution.
DEFINE_BYVAL_RW_PROPERTY(bool, EnableCrashSignalProtection, true);
- //! Flag preventing us to override user custom signal handlers.
+ //! Prevents from overriding user custom signal handlers.
DEFINE_BYVAL_RW_PROPERTY(bool, OverrideNonDefaultSignalHandlers, true);
+public:
#ifdef _unix_
using TSignalHandler = std::function<void(int, siginfo_t*, void*)>;
#else
using TSignalHandler = std::function<void(int)>;
#endif
-public:
static TSignalRegistry* Get();
- //! Setup our handler that invokes registered callbacks in order.
+ //! Sets up our handler that invokes registered callbacks in order.
//! Flags has same meaning as sa_flags in sigaction(2). Use this method if you need certain flags.
//! By default any signal touched by PushCallback(...) will be set up with default flags.
void SetupSignal(int signal, int flags = 0);
- //! Add simple callback which should be called for signal. Different signatures are supported for convenience.
+ //! Adds a simple callback which should be called for signal. Different signatures are supported for convenience.
void PushCallback(int signal, std::function<void(void)> callback);
#ifdef _unix_
void PushCallback(int signal, std::function<void(int)> callback);
#endif
void PushCallback(int signal, TSignalHandler callback);
- //! Add default signal handler which is called after invoking our custom handlers.
+ //! Adds the default signal handler which is called after invoking our custom handlers.
//! NB: this handler restores default signal handler as a side-effect. Use it only
//! when default handler terminates the program.
void PushDefaultSignalHandler(int signal);
diff --git a/yt/yt/core/misc/unittests/ref_counted_tracker_ut.cpp b/yt/yt/core/misc/unittests/ref_counted_tracker_ut.cpp
index 538f03989f..faf1352a5b 100644
--- a/yt/yt/core/misc/unittests/ref_counted_tracker_ut.cpp
+++ b/yt/yt/core/misc/unittests/ref_counted_tracker_ut.cpp
@@ -6,15 +6,15 @@
#include <yt/yt/core/misc/blob.h>
#include <yt/yt/core/misc/protobuf_helpers.h>
-#include <yt/yt/core/misc/ref_counted.h>
#include <yt/yt/core/misc/ref_counted_tracker.h>
-#include <yt/yt/core/misc/ref_tracked.h>
#include <yt/yt/core/actions/future.h>
#include <yt/yt/core/concurrency/action_queue.h>
#include <library/cpp/yt/memory/new.h>
+#include <library/cpp/yt/memory/ref_tracked.h>
+#include <library/cpp/yt/memory/ref_counted.h>
namespace NYT {
namespace {
diff --git a/yt/yt/core/net/connection.h b/yt/yt/core/net/connection.h
index e52de86abb..8512e5bc7e 100644
--- a/yt/yt/core/net/connection.h
+++ b/yt/yt/core/net/connection.h
@@ -4,11 +4,12 @@
#include <yt/yt/core/concurrency/async_stream.h>
-#include <yt/yt/core/misc/ref_counted.h>
#include <yt/yt/core/misc/proc.h>
#include <yt/yt/core/net/address.h>
+#include <library/cpp/yt/memory/ref_counted.h>
+
namespace NYT::NNet {
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/net/packet_connection.h b/yt/yt/core/net/packet_connection.h
index 36c21175c1..a4450df1e3 100644
--- a/yt/yt/core/net/packet_connection.h
+++ b/yt/yt/core/net/packet_connection.h
@@ -2,10 +2,10 @@
#include "public.h"
-#include <yt/yt/core/misc/ref_counted.h>
-
#include <yt/yt/core/net/address.h>
+#include <library/cpp/yt/memory/ref_counted.h>
+
namespace NYT::NNet {
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/net/public.h b/yt/yt/core/net/public.h
index bb1b74c615..05d3ec03de 100644
--- a/yt/yt/core/net/public.h
+++ b/yt/yt/core/net/public.h
@@ -2,7 +2,7 @@
#include <yt/yt/core/misc/public.h>
-#include <yt/yt/core/misc/intrusive_ptr.h>
+#include <library/cpp/yt/memory/intrusive_ptr.h>
namespace NYT::NNet {
diff --git a/yt/yt/core/rpc/config.cpp b/yt/yt/core/rpc/config.cpp
index 14143a6585..ec29c3b88c 100644
--- a/yt/yt/core/rpc/config.cpp
+++ b/yt/yt/core/rpc/config.cpp
@@ -38,6 +38,20 @@ void TServiceCommonConfig::Register(TRegistrar registrar)
////////////////////////////////////////////////////////////////////////////////
+void TServiceCommonDynamicConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("enable_per_user_profiling", &TThis::EnablePerUserProfiling)
+ .Default();
+ registrar.Parameter("histogram_timer_profiling", &TThis::HistogramTimerProfiling)
+ .Default();
+ registrar.Parameter("code_counting", &TThis::EnableErrorCodeCounting)
+ .Default();
+ registrar.Parameter("tracing_mode", &TThis::TracingMode)
+ .Default();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
void TServerConfig::Register(TRegistrar registrar)
{
registrar.Parameter("services", &TThis::Services)
@@ -46,6 +60,14 @@ void TServerConfig::Register(TRegistrar registrar)
////////////////////////////////////////////////////////////////////////////////
+void TServerDynamicConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("services", &TThis::Services)
+ .Default();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
void TServiceConfig::Register(TRegistrar registrar)
{
registrar.Parameter("enable_per_user_profiling", &TThis::EnablePerUserProfiling)
diff --git a/yt/yt/core/rpc/config.h b/yt/yt/core/rpc/config.h
index be8c1fdec5..9cdf971d18 100644
--- a/yt/yt/core/rpc/config.h
+++ b/yt/yt/core/rpc/config.h
@@ -90,6 +90,40 @@ DEFINE_REFCOUNTED_TYPE(TServerConfig)
////////////////////////////////////////////////////////////////////////////////
+// Common options shared between all services in one server.
+class TServiceCommonDynamicConfig
+ : public NYTree::TYsonStruct
+{
+public:
+ std::optional<bool> EnablePerUserProfiling;
+ std::optional<THistogramConfigPtr> HistogramTimerProfiling;
+ std::optional<bool> EnableErrorCodeCounting;
+ std::optional<ERequestTracingMode> TracingMode;
+
+ REGISTER_YSON_STRUCT(TServiceCommonDynamicConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TServiceCommonDynamicConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TServerDynamicConfig
+ : public TServiceCommonDynamicConfig
+{
+public:
+ THashMap<TString, NYTree::INodePtr> Services;
+
+ REGISTER_YSON_STRUCT(TServerDynamicConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TServerDynamicConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
class TServiceConfig
: public NYTree::TYsonStruct
{
diff --git a/yt/yt/core/rpc/public.h b/yt/yt/core/rpc/public.h
index af88870ad9..5548fa0f22 100644
--- a/yt/yt/core/rpc/public.h
+++ b/yt/yt/core/rpc/public.h
@@ -107,6 +107,8 @@ DECLARE_REFCOUNTED_CLASS(THistogramExponentialBounds)
DECLARE_REFCOUNTED_CLASS(THistogramConfig)
DECLARE_REFCOUNTED_CLASS(TServerConfig)
DECLARE_REFCOUNTED_CLASS(TServiceCommonConfig)
+DECLARE_REFCOUNTED_CLASS(TServerDynamicConfig)
+DECLARE_REFCOUNTED_CLASS(TServiceCommonDynamicConfig)
DECLARE_REFCOUNTED_CLASS(TServiceConfig)
DECLARE_REFCOUNTED_CLASS(TMethodConfig)
DECLARE_REFCOUNTED_CLASS(TRetryingChannelConfig)
diff --git a/yt/yt/core/rpc/server.h b/yt/yt/core/rpc/server.h
index fe4c88a478..e48bf2ef5c 100644
--- a/yt/yt/core/rpc/server.h
+++ b/yt/yt/core/rpc/server.h
@@ -31,7 +31,9 @@ struct IServer
virtual IServicePtr GetServiceOrThrow(const TServiceId& serviceId) const = 0;
//! Reconfigures the server on-the-fly.
- virtual void Configure(TServerConfigPtr config) = 0;
+ virtual void Configure(const TServerConfigPtr& config) = 0;
+
+ virtual void OnDynamicConfigChanged(const TServerDynamicConfigPtr& config) = 0;
//! Starts the server.
/*!
diff --git a/yt/yt/core/rpc/server_detail.cpp b/yt/yt/core/rpc/server_detail.cpp
index 0f80a07b6b..82ba43afe4 100644
--- a/yt/yt/core/rpc/server_detail.cpp
+++ b/yt/yt/core/rpc/server_detail.cpp
@@ -768,12 +768,12 @@ void TServerBase::RegisterService(IServicePtr service)
auto guard = WriterGuard(ServicesLock_);
auto& serviceMap = RealmIdToServiceMap_[serviceId.RealmId];
YT_VERIFY(serviceMap.emplace(serviceId.ServiceName, service).second);
- if (Config_) {
- auto it = Config_->Services.find(serviceId.ServiceName);
- if (it != Config_->Services.end()) {
- service->Configure(Config_, it->second);
+ if (AppliedConfig_) {
+ auto it = AppliedConfig_->Services.find(serviceId.ServiceName);
+ if (it != AppliedConfig_->Services.end()) {
+ service->Configure(AppliedConfig_, it->second);
} else {
- service->Configure(Config_, nullptr);
+ service->Configure(AppliedConfig_, nullptr);
}
}
DoRegisterService(service);
@@ -866,26 +866,54 @@ IServicePtr TServerBase::GetServiceOrThrow(const TServiceId& serviceId) const
return serviceIt->second;
}
-void TServerBase::Configure(TServerConfigPtr config)
+void TServerBase::ApplyConfig()
{
- auto guard = WriterGuard(ServicesLock_);
+ VERIFY_SPINLOCK_AFFINITY(ServicesLock_);
- // Future services will be configured appropriately.
- Config_ = config;
+ auto newAppliedConfig = New<TServerConfig>();
+ newAppliedConfig->EnableErrorCodeCounting = DynamicConfig_->EnableErrorCodeCounting.value_or(StaticConfig_->EnableErrorCodeCounting);
+ newAppliedConfig->EnablePerUserProfiling = DynamicConfig_->EnablePerUserProfiling.value_or(StaticConfig_->EnablePerUserProfiling);
+ newAppliedConfig->HistogramTimerProfiling = DynamicConfig_->HistogramTimerProfiling.value_or(StaticConfig_->HistogramTimerProfiling);
+ newAppliedConfig->Services = StaticConfig_->Services;
+
+ for (const auto& [name, node] : DynamicConfig_->Services) {
+ newAppliedConfig->Services[name] = node;
+ }
+
+ AppliedConfig_ = newAppliedConfig;
// Apply configuration to all existing services.
for (const auto& [realmId, serviceMap] : RealmIdToServiceMap_) {
for (const auto& [serviceName, service] : serviceMap) {
- auto it = config->Services.find(serviceName);
- if (it != config->Services.end()) {
- service->Configure(config, it->second);
+ auto it = AppliedConfig_->Services.find(serviceName);
+ if (it != AppliedConfig_->Services.end()) {
+ service->Configure(AppliedConfig_, it->second);
} else {
- service->Configure(config, nullptr);
+ service->Configure(AppliedConfig_, nullptr);
}
}
}
}
+void TServerBase::Configure(const TServerConfigPtr& config)
+{
+ auto guard = WriterGuard(ServicesLock_);
+
+ // Future services will be configured appropriately.
+ StaticConfig_ = config;
+
+ ApplyConfig();
+}
+
+void TServerBase::OnDynamicConfigChanged(const TServerDynamicConfigPtr& config)
+{
+ auto guard = WriterGuard(ServicesLock_);
+
+ DynamicConfig_ = config;
+
+ ApplyConfig();
+}
+
void TServerBase::Start()
{
YT_VERIFY(!Started_);
diff --git a/yt/yt/core/rpc/server_detail.h b/yt/yt/core/rpc/server_detail.h
index 0a2647b643..aceba1f14e 100644
--- a/yt/yt/core/rpc/server_detail.h
+++ b/yt/yt/core/rpc/server_detail.h
@@ -3,6 +3,7 @@
#include "authentication_identity.h"
#include "server.h"
#include "service.h"
+#include "config.h"
#include <yt/yt/core/logging/log.h>
@@ -269,7 +270,8 @@ public:
IServicePtr FindService(const TServiceId& serviceId) const override;
IServicePtr GetServiceOrThrow(const TServiceId& serviceId) const override;
- void Configure(TServerConfigPtr config) override;
+ void Configure(const TServerConfigPtr& config) override;
+ void OnDynamicConfigChanged(const TServerDynamicConfigPtr& config) override;
void Start() override;
TFuture<void> Stop(bool graceful) override;
@@ -280,7 +282,9 @@ protected:
std::atomic<bool> Started_ = false;
YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, ServicesLock_);
- TServerConfigPtr Config_;
+ TServerConfigPtr StaticConfig_;
+ TServerDynamicConfigPtr DynamicConfig_ = New<TServerDynamicConfig>();
+ TServerConfigPtr AppliedConfig_;
//! Service name to service.
using TServiceMap = THashMap<TString, IServicePtr>;
@@ -288,6 +292,8 @@ protected:
explicit TServerBase(NLogging::TLogger logger);
+ void ApplyConfig();
+
virtual void DoStart();
virtual TFuture<void> DoStop(bool graceful);
diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp
index 2fd6f483ad..46567c3fcd 100644
--- a/yt/yt/core/rpc/service_detail.cpp
+++ b/yt/yt/core/rpc/service_detail.cpp
@@ -2451,6 +2451,7 @@ void TServiceBase::DoConfigure(
auto methodConfig = methodIt ? methodIt->second : New<TMethodConfig>();
const auto& descriptor = runtimeInfo->Descriptor;
+
runtimeInfo->Heavy.store(methodConfig->Heavy.value_or(descriptor.Options.Heavy));
runtimeInfo->QueueSizeLimit.store(methodConfig->QueueSizeLimit.value_or(descriptor.QueueSizeLimit));
runtimeInfo->ConcurrencyLimit.Reconfigure(methodConfig->ConcurrencyLimit.value_or(descriptor.ConcurrencyLimit));
diff --git a/yt/yt/core/threading/thread.cpp b/yt/yt/core/threading/thread.cpp
index cc8f9cfae7..4b04e154bd 100644
--- a/yt/yt/core/threading/thread.cpp
+++ b/yt/yt/core/threading/thread.cpp
@@ -8,10 +8,14 @@
#include <library/cpp/yt/misc/tls.h>
+#include <util/generic/size_literals.h>
+
#ifdef _linux_
#include <sched.h>
#endif
+#include <signal.h>
+
namespace NYT::NThreading {
////////////////////////////////////////////////////////////////////////////////
@@ -203,6 +207,7 @@ void TThread::ThreadMainTrampoline()
CurrentUniqueThreadId = UniqueThreadId_;
SetThreadPriority();
+ ConfigureSignalHandlerStack();
StartedEvent_.NotifyAll();
@@ -275,6 +280,30 @@ void TThread::SetThreadPriority()
#endif
}
+void TThread::ConfigureSignalHandlerStack()
+{
+#if !defined(_asan_enabled_) && !defined(_msan_enabled_) && \
+ (_XOPEN_SOURCE >= 500 || \
+ /* Since glibc 2.12: */ _POSIX_C_SOURCE >= 200809L || \
+ /* glibc <= 2.19: */ _BSD_SOURCE)
+ YT_THREAD_LOCAL(bool) Configured;
+ if (std::exchange(Configured, true)) {
+ return;
+ }
+
+ // The size of of the custom stack to be provided for signal handlers.
+ constexpr size_t SignalHandlerStackSize = 16_KB;
+ YT_THREAD_LOCAL(std::array<char, SignalHandlerStackSize>) Stack;
+
+ stack_t stack{
+ .ss_sp = GetTlsRef(Stack).data(),
+ .ss_flags = 0,
+ .ss_size = GetTlsRef(Stack).size(),
+ };
+ YT_VERIFY(sigaltstack(&stack, nullptr) == 0);
+#endif
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NThreading
diff --git a/yt/yt/core/threading/thread.h b/yt/yt/core/threading/thread.h
index bc54afdc07..2599f18db2 100644
--- a/yt/yt/core/threading/thread.h
+++ b/yt/yt/core/threading/thread.h
@@ -72,6 +72,7 @@ private:
::TThread UnderlyingThread_;
void SetThreadPriority();
+ void ConfigureSignalHandlerStack();
bool StartSlow();
diff --git a/yt/yt/core/yson/protobuf_interop.cpp b/yt/yt/core/yson/protobuf_interop.cpp
index a72a630f04..400d4a8f73 100644
--- a/yt/yt/core/yson/protobuf_interop.cpp
+++ b/yt/yt/core/yson/protobuf_interop.cpp
@@ -1007,22 +1007,20 @@ protected:
}
switch (config->Utf8Check) {
case EUtf8Check::Disable:
- break;
+ return;
case EUtf8Check::LogOnFail:
- YT_LOG_WARNING("Field %v accepts only valid UTF-8 sequence, but got (%Qv)",
+ YT_LOG_WARNING("String field got non UTF-8 value (Path: %v, Value: %v)",
YPathStack_.GetHumanReadablePath(),
data);
- break;
+ return;
case EUtf8Check::ThrowOnFail:
- THROW_ERROR_EXCEPTION("Field %v accepts only valid UTF-8 sequence, but got (%Qv)",
+ THROW_ERROR_EXCEPTION("String field got non UTF-8 value (Path: %v, Value: %v)",
YPathStack_.GetHumanReadablePath(),
data)
<< TErrorAttribute("ypath", YPathStack_.GetPath())
<< TErrorAttribute("proto_field", fieldFullName);
- break;
}
}
-
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/yson/unittests/protobuf_yson_ut.cpp b/yt/yt/core/yson/unittests/protobuf_yson_ut.cpp
index f995f3f459..87c15c9600 100644
--- a/yt/yt/core/yson/unittests/protobuf_yson_ut.cpp
+++ b/yt/yt/core/yson/unittests/protobuf_yson_ut.cpp
@@ -1033,7 +1033,7 @@ TEST(TYsonToProtobufTest, ValidUtf8StringCheck)
.EndMap();
};
if (option == EUtf8Check::ThrowOnFail) {
- EXPECT_THROW_WITH_SUBSTRING(check(), "valid UTF-8");
+ EXPECT_THROW_WITH_SUBSTRING(check(), "String field got non UTF-8 value");
} else {
EXPECT_NO_THROW(check());
}
@@ -1045,7 +1045,7 @@ TEST(TYsonToProtobufTest, ValidUtf8StringCheck)
TYsonWriter ysonWriter(&newYsonOutputStream, EYsonFormat::Pretty);
if (option == EUtf8Check::ThrowOnFail) {
EXPECT_THROW_WITH_SUBSTRING(
- WriteProtobufMessage(&ysonWriter, message), "valid UTF-8");
+ WriteProtobufMessage(&ysonWriter, message), "String field got non UTF-8 value");
} else {
EXPECT_NO_THROW(WriteProtobufMessage(&ysonWriter, message));
}
diff --git a/yt/yt/library/column_converters/column_converter.cpp b/yt/yt/library/column_converters/column_converter.cpp
index 7442ea0fd4..04239c0344 100644
--- a/yt/yt/library/column_converters/column_converter.cpp
+++ b/yt/yt/library/column_converters/column_converter.cpp
@@ -64,7 +64,7 @@ IColumnConverterPtr CreateColumnConvert(
TConvertedColumnRange TColumnConverters::ConvertRowsToColumns(
TRange<TUnversionedRow> rows,
- const std::vector<TColumnSchema>& columnSchema)
+ const THashMap<int, TColumnSchema>& columnSchema)
{
TConvertedColumnRange convertedColumnsRange;
if (rows.size() == 0) {
@@ -79,6 +79,10 @@ TConvertedColumnRange TColumnConverters::ConvertRowsToColumns(
for (const auto* item = firstRow.Begin(); item != firstRow.End(); ++item) {
IdsToIndexes_[item->Id] = std::ssize(ColumnIds_);
ColumnIds_.push_back(item->Id);
+ auto iterSchema = columnSchema.find(item->Id);
+ if (iterSchema == columnSchema.end()) {
+ THROW_ERROR_EXCEPTION("Column with Id %v has no schema", item->Id);
+ }
}
}
IsFirstBatch_ = false;
@@ -90,15 +94,18 @@ TConvertedColumnRange TColumnConverters::ConvertRowsToColumns(
TUnversionedRowValues rowValues(ColumnIds_.size(), nullptr);
for (const auto* item = row.Begin(); item != row.End(); ++item) {
auto iter = IdsToIndexes_.find(item->Id);
- YT_VERIFY(iter != IdsToIndexes_.end());
+ if(iter == IdsToIndexes_.end()) {
+ THROW_ERROR_EXCEPTION("Column with Id %v has no schema", item->Id);
+ }
rowValues[iter->second] = item;
}
rowsValues.push_back(std::move(rowValues));
}
for (int offset = 0; offset < std::ssize(ColumnIds_); offset++) {
- YT_VERIFY(ColumnIds_[offset] >= 0 && ColumnIds_[offset] < std::ssize(columnSchema));
- auto converter = CreateColumnConvert(columnSchema[ColumnIds_[offset]], ColumnIds_[offset], offset);
+ auto iterSchema = columnSchema.find(ColumnIds_[offset]);
+ YT_VERIFY(iterSchema != columnSchema.end());
+ auto converter = CreateColumnConvert(iterSchema->second, ColumnIds_[offset], offset);
auto columns = converter->Convert(rowsValues);
convertedColumnsRange.push_back(columns);
}
diff --git a/yt/yt/library/column_converters/column_converter.h b/yt/yt/library/column_converters/column_converter.h
index 46515d8eb4..12d9e2a04c 100644
--- a/yt/yt/library/column_converters/column_converter.h
+++ b/yt/yt/library/column_converters/column_converter.h
@@ -50,7 +50,7 @@ class TColumnConverters
public:
TConvertedColumnRange ConvertRowsToColumns(
TRange<NTableClient::TUnversionedRow> rows,
- const std::vector<NTableClient::TColumnSchema>& columnSchema);
+ const THashMap<int, NTableClient::TColumnSchema>& columnSchema);
private:
THashMap<int, int> IdsToIndexes_;
std::vector<int> ColumnIds_;
diff --git a/yt/yt/library/formats/arrow_writer.cpp b/yt/yt/library/formats/arrow_writer.cpp
index d83cb97358..452254f738 100644
--- a/yt/yt/library/formats/arrow_writer.cpp
+++ b/yt/yt/library/formats/arrow_writer.cpp
@@ -906,17 +906,27 @@ public:
{
YT_VERIFY(tableSchemas.size() > 0);
- auto tableSchema = tableSchemas[0];
-
- for (const auto& columnSchema : tableSchema->Columns()) {
- NameTable_->GetIdOrRegisterName(columnSchema.Name());
- }
-
- auto columnCount = NameTable_->GetSize();
- SchemaExistenceFlags_.resize(columnCount, true);
-
- for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
- ColumnSchemas_.push_back(GetColumnSchema(tableSchema, columnIndex));
+ ColumnConverters_.resize(tableSchemas.size());
+ TableNumbers_ = tableSchemas.size();
+ ColumnSchemas_.resize(tableSchemas.size());
+
+ for (int tableIndex = 0; tableIndex < std::ssize(tableSchemas); ++tableIndex) {
+ for (const auto& columnSchema : tableSchemas[tableIndex]->Columns()) {
+ auto columnId = NameTable_->GetIdOrRegisterName(columnSchema.Name());
+ ColumnSchemas_[tableIndex][columnId] = columnSchema;
+ }
+ if (CheckColumnInNameTable(GetRangeIndexColumnId())) {
+ ColumnSchemas_[tableIndex][GetRangeIndexColumnId()] = GetSystemColumnSchema(NameTable_->GetName(GetRangeIndexColumnId()), GetRangeIndexColumnId());
+ }
+ if (CheckColumnInNameTable(GetRowIndexColumnId())) {
+ ColumnSchemas_[tableIndex][GetRowIndexColumnId()] = GetSystemColumnSchema(NameTable_->GetName(GetRowIndexColumnId()), GetRowIndexColumnId());
+ }
+ if (CheckColumnInNameTable(GetTableIndexColumnId())) {
+ ColumnSchemas_[tableIndex][GetTableIndexColumnId()] = GetSystemColumnSchema(NameTable_->GetName(GetTableIndexColumnId()), GetTableIndexColumnId());
+ }
+ if (CheckColumnInNameTable(GetTabletIndexColumnId())) {
+ ColumnSchemas_[tableIndex][GetTabletIndexColumnId()] = GetSystemColumnSchema(NameTable_->GetName(GetTabletIndexColumnId()), GetTabletIndexColumnId());
+ }
}
}
@@ -935,20 +945,56 @@ private:
output->Write(&zero, sizeof(zero));
}
- void DoWrite(TRange<TUnversionedRow> rows) override
+ bool CheckColumnInNameTable(int columnIndex) const
{
- Reset();
-
- auto convertedColumns = ColumnConverters_.ConvertRowsToColumns(rows, ColumnSchemas_);
+ return columnIndex >= 0 && columnIndex < NameTable_->GetSize();
+ }
+ void WriteRowsForSingleTable(TRange<TUnversionedRow> rows, i32 tableIndex)
+ {
+ Reset();
+ auto convertedColumns = ColumnConverters_[tableIndex].ConvertRowsToColumns(rows, ColumnSchemas_[tableIndex]);
std::vector<const TBatchColumn*> rootColumns;
rootColumns.reserve( std::ssize(convertedColumns));
for (ssize_t columnIndex = 0; columnIndex < std::ssize(convertedColumns); columnIndex++) {
rootColumns.push_back(convertedColumns[columnIndex].RootColumn);
}
RowCount_ = rows.size();
- PrepareColumns(rootColumns);
- Encode();
+ PrepareColumns(rootColumns, tableIndex);
+ Encode(tableIndex);
+ }
+
+ void DoWrite(TRange<TUnversionedRow> rows) override
+ {
+ Reset();
+
+ ssize_t sameTableRangeBeginRowIndex = 0;
+ i32 tableIndex = 0;
+
+ for (ssize_t rowIndex = 0; rowIndex < std::ssize(rows); rowIndex++) {
+ i32 currentTableIndex = -1;
+ if(TableNumbers_ > 1) {
+ const auto& elems = rows[rowIndex].Elements();
+ for (ssize_t columnIndex = std::ssize(elems) - 1; columnIndex >= 0; --columnIndex) {
+ if (elems[columnIndex].Id == GetTableIndexColumnId()) {
+ currentTableIndex = elems[columnIndex].Data.Int64;
+ break;
+ }
+ }
+ } else {
+ currentTableIndex = 0;
+ }
+ YT_VERIFY(currentTableIndex < TableNumbers_ && currentTableIndex >= 0);
+ if (tableIndex != currentTableIndex && rowIndex != 0) {
+ auto currentRows = rows.Slice(sameTableRangeBeginRowIndex, rowIndex);
+ WriteRowsForSingleTable(currentRows, tableIndex);
+ sameTableRangeBeginRowIndex = rowIndex;
+ }
+ tableIndex = currentTableIndex;
+ }
+
+ auto currentRows = rows.Slice(sameTableRangeBeginRowIndex, rows.size());
+ WriteRowsForSingleTable(currentRows, tableIndex);
}
void DoWriteBatch(NTableClient::IUnversionedRowBatchPtr rowBatch) override
@@ -959,22 +1005,24 @@ private:
DoWrite(rowBatch->MaterializeRows());
} else {
YT_LOG_DEBUG("Encoding columnar batch (RowCount: %v)", rowBatch->GetRowCount());
+ YT_VERIFY(TableNumbers_ == 1);
Reset();
RowCount_ = rowBatch->GetRowCount();
- PrepareColumns(columnarBatch->MaterializeColumns());
- Encode();
+ PrepareColumns(columnarBatch->MaterializeColumns(), 0);
+ Encode(0);
}
}
- void Encode()
+ void Encode(i32 tableIndex)
{
auto output = GetOutputStream();
- if (IsSchemaMessageNeeded()) {
+ if (tableIndex != PrevTableIndex_ || IsSchemaMessageNeeded()) {
+ PrevTableIndex_ = tableIndex;
if (!IsFirstBatch_) {
RegisterEosMarker();
}
ResetArrowDictionaries();
- PrepareSchema();
+ PrepareSchema(tableIndex);
}
IsFirstBatch_ = false;
PrepareDictionaryBatches();
@@ -985,13 +1033,14 @@ private:
}
private:
+ i32 TableNumbers_ = 0;
bool IsFirstBatch_ = true;
+ i64 PrevTableIndex_ = 0;
i64 RowCount_ = 0;
std::vector<TTypedBatchColumn> TypedColumns_;
- std::vector<TColumnSchema> ColumnSchemas_;
+ std::vector<THashMap<int, TColumnSchema>> ColumnSchemas_;
std::vector<IUnversionedColumnarRowBatch::TDictionaryId> ArrowDictionaryIds_;
- std::vector<bool> SchemaExistenceFlags_;
- NColumnConverters::TColumnConverters ColumnConverters_;
+ std::vector<NColumnConverters::TColumnConverters> ColumnConverters_;
struct TMessage
{
@@ -1002,7 +1051,7 @@ private:
std::vector<TMessage> Messages_;
- bool CheckIfSystemColumnEnable(int columnIndex)
+ bool CheckIfSystemColumnEnable(int columnIndex) const
{
return ControlAttributesConfig_->EnableTableIndex && IsTableIndexColumnId(columnIndex) ||
ControlAttributesConfig_->EnableRangeIndex && IsRangeIndexColumnId(columnIndex) ||
@@ -1010,34 +1059,30 @@ private:
ControlAttributesConfig_->EnableTabletIndex && IsTabletIndexColumnId(columnIndex);
}
- TColumnSchema GetColumnSchema(NTableClient::TTableSchemaPtr& tableSchema, int columnIndex)
+ bool IsColumnNeedsToAdd(int columnIndex) const
{
- YT_VERIFY(columnIndex >= 0);
- SchemaExistenceFlags_[columnIndex] = true;
- auto name = NameTable_->GetName(columnIndex);
- auto columnSchema = tableSchema->FindColumn(name);
- if (!columnSchema) {
- if (IsSystemColumnId(columnIndex)) {
- if (CheckIfSystemColumnEnable(columnIndex)) {
- return TColumnSchema(TString(name), EValueType::Int64);
- }
- SchemaExistenceFlags_[columnIndex] = false;
- return TColumnSchema(TString(name), EValueType::Null);
- }
- THROW_ERROR_EXCEPTION("Column %Qv has no schema", name);
+ return !IsSystemColumnId(columnIndex)
+ || (CheckIfSystemColumnEnable(columnIndex) && !IsTableIndexColumnId(columnIndex));
+ }
+
+ TColumnSchema GetSystemColumnSchema(TStringBuf name, int columnIndex)
+ {
+ if (CheckIfSystemColumnEnable(columnIndex) && !IsTableIndexColumnId(columnIndex)) {
+ return TColumnSchema(TString(name), EValueType::Int64);
}
- return *columnSchema;
+ return TColumnSchema(TString(name), EValueType::Null);
}
- void PrepareColumns(const TRange<const TBatchColumn*>& batchColumns)
+ void PrepareColumns(const TRange<const TBatchColumn*>& batchColumns, int tableIndex)
{
TypedColumns_.reserve(batchColumns.Size());
for (const auto* column : batchColumns) {
- if (SchemaExistenceFlags_[column->Id]) {
- YT_VERIFY(column->Id >= 0 && column->Id < std::ssize(ColumnSchemas_));
+ if(IsColumnNeedsToAdd(column->Id)) {
+ auto iterSchema = ColumnSchemas_[tableIndex].find(column->Id);
+ YT_VERIFY(iterSchema != ColumnSchemas_[tableIndex].end());
TypedColumns_.push_back(TTypedBatchColumn{
column,
- ColumnSchemas_[column->Id].LogicalType()
+ iterSchema->second.LogicalType()
});
}
}
@@ -1093,7 +1138,7 @@ private:
std::move(bodyWriter)});
}
- void PrepareSchema()
+ void PrepareSchema(i32 tableIndex)
{
flatbuffers::FlatBufferBuilder flatbufBuilder;
@@ -1101,8 +1146,9 @@ private:
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> fieldOffsets;
for (int columnIndex = 0; columnIndex < std::ssize(TypedColumns_); columnIndex++) {
const auto& typedColumn = TypedColumns_[columnIndex];
- YT_VERIFY(typedColumn.Column->Id >= 0 && typedColumn.Column->Id < std::ssize(ColumnSchemas_));
- auto columnSchema = ColumnSchemas_[typedColumn.Column->Id];
+ auto iterSchema = ColumnSchemas_[tableIndex].find(typedColumn.Column->Id);
+ YT_VERIFY(iterSchema != ColumnSchemas_[tableIndex].end());
+ auto columnSchema = iterSchema->second;
auto nameOffset = SerializeString(&flatbufBuilder, columnSchema.Name());
auto [typeType, typeOffset] = SerializeColumnType(&flatbufBuilder, columnSchema);
@@ -1130,10 +1176,22 @@ private:
auto fieldsOffset = flatbufBuilder.CreateVector(fieldOffsets);
+ std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>> customMetadata;
+
+ if (TableNumbers_ > 1) {
+ auto keyValueOffsett = org::apache::arrow::flatbuf::CreateKeyValue(
+ flatbufBuilder,
+ flatbufBuilder.CreateString("TableId"),
+ flatbufBuilder.CreateString(std::to_string(tableIndex))
+ );
+ customMetadata.push_back(keyValueOffsett);
+ }
+
auto schemaOffset = org::apache::arrow::flatbuf::CreateSchema(
flatbufBuilder,
org::apache::arrow::flatbuf::Endianness_Little,
- fieldsOffset);
+ fieldsOffset,
+ flatbufBuilder.CreateVector(customMetadata));
auto messageOffset = org::apache::arrow::flatbuf::CreateMessage(
flatbufBuilder,
diff --git a/yt/yt/library/numeric/piecewise_linear_function.h b/yt/yt/library/numeric/piecewise_linear_function.h
index 254f4de0d7..f305307760 100644
--- a/yt/yt/library/numeric/piecewise_linear_function.h
+++ b/yt/yt/library/numeric/piecewise_linear_function.h
@@ -382,8 +382,6 @@ public:
public:
TLeftToRightTraverser(const TPiecewiseLinearFunction& function, int segmentIndex = 0);
- TLeftToRightTraverser(const TLeftToRightTraverser& other) = default;
-
// See: |TPiecewiseLinearFunction::LeftSegmentAt|.
// If |y > x|, |LeftSegmentAt(x)| cannot be called after |LeftSegmentAt(y)|.
// If |y >= x|, |LeftSegmentAt(x)| cannot be called after |SegmentAt(y)| or |RightSegmentAt(y)|.
diff --git a/yt/yt/library/profiling/solomon/public.h b/yt/yt/library/profiling/solomon/public.h
index 6942e8dba7..02bf5ad2c9 100644
--- a/yt/yt/library/profiling/solomon/public.h
+++ b/yt/yt/library/profiling/solomon/public.h
@@ -1,6 +1,6 @@
#pragma once
-#include <yt/yt/core/misc/ref_counted.h>
+#include <library/cpp/yt/memory/ref_counted.h>
namespace NYT::NProfiling {
diff --git a/yt/yt/library/profiling/solomon/sensor_set.h b/yt/yt/library/profiling/solomon/sensor_set.h
index fb71767bdd..a873254e06 100644
--- a/yt/yt/library/profiling/solomon/sensor_set.h
+++ b/yt/yt/library/profiling/solomon/sensor_set.h
@@ -9,11 +9,12 @@
#include <yt/yt/core/profiling/public.h>
-#include <yt/yt/core/misc/intrusive_ptr.h>
#include <yt/yt/core/misc/error.h>
#include <yt/yt/core/ytree/fluent.h>
+#include <library/cpp/yt/memory/intrusive_ptr.h>
+
namespace NYT::NProfiling {
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/library/profiling/unittests/solomon_ut.cpp b/yt/yt/library/profiling/unittests/solomon_ut.cpp
index 2efaf3e3cc..ecfc27d6ff 100644
--- a/yt/yt/library/profiling/unittests/solomon_ut.cpp
+++ b/yt/yt/library/profiling/unittests/solomon_ut.cpp
@@ -1,4 +1,3 @@
-#include "yt/yt/core/misc/ref_counted.h"
#include <gtest/gtest.h>
#include <gmock/gmock.h>
@@ -9,6 +8,8 @@
#include <yt/yt/library/profiling/solomon/registry.h>
#include <yt/yt/library/profiling/solomon/remote.h>
+#include <library/cpp/yt/memory/ref_counted.h>
+
#include <util/string/join.h>
namespace NYT::NProfiling {
diff --git a/yt/yt/library/program/ya.make b/yt/yt/library/program/ya.make
index 1d044b5a8d..02da9f2e57 100644
--- a/yt/yt/library/program/ya.make
+++ b/yt/yt/library/program/ya.make
@@ -24,6 +24,7 @@ PEERDIR(
library/cpp/yt/mlock
library/cpp/yt/stockpile
library/cpp/yt/string
+ library/cpp/getopt/small
)
END()
diff --git a/yt/yt/library/syncmap/map.h b/yt/yt/library/syncmap/map.h
index 8e442accef..ed1e296f86 100644
--- a/yt/yt/library/syncmap/map.h
+++ b/yt/yt/library/syncmap/map.h
@@ -2,10 +2,11 @@
#include <yt/yt/core/misc/finally.h>
#include <yt/yt/core/misc/hazard_ptr.h>
-#include <yt/yt/core/misc/ref_counted.h>
#include <library/cpp/yt/threading/spin_lock.h>
+#include <library/cpp/yt/memory/ref_counted.h>
+
#include <util/generic/hash.h>
#include <util/generic/noncopyable.h>
diff --git a/yt/yt/library/tracing/public.h b/yt/yt/library/tracing/public.h
index ae473bb33b..1c20b9c7dc 100644
--- a/yt/yt/library/tracing/public.h
+++ b/yt/yt/library/tracing/public.h
@@ -1,6 +1,6 @@
#pragma once
-#include <yt/yt/core/misc/ref_counted.h>
+#include <library/cpp/yt/memory/ref_counted.h>
namespace NYT::NTracing {
diff --git a/yt/yt/library/tracing/tracer.h b/yt/yt/library/tracing/tracer.h
index 8b53efd997..bd2e180735 100644
--- a/yt/yt/library/tracing/tracer.h
+++ b/yt/yt/library/tracing/tracer.h
@@ -2,10 +2,10 @@
#include "public.h"
-#include <yt/yt/core/misc/ref_counted.h>
-
#include <yt/yt/core/tracing/public.h>
+#include <library/cpp/yt/memory/ref_counted.h>
+
namespace NYT::NTracing {
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 297ee2ae7d..0e8b3fe47a 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -1930,6 +1930,7 @@ message TReqDestroyChunkLocations
{
required string node_address = 1;
repeated NYT.NProto.TGuid location_uuids = 2;
+ optional bool recover_unlinked_disks = 3 [default = false];
}
message TRspDestroyChunkLocations