aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorAlexSm <alex@ydb.tech>2023-12-22 17:10:22 +0100
committerGitHub <noreply@github.com>2023-12-22 17:10:22 +0100
commit148f920350c60c0ca2d89b637a5aea9093eee450 (patch)
tree6314b1433dac833398c333731e83f0ad77e81a0b /yt
parent7116d46ae7c0259b5f9d489de263f8701e432b1c (diff)
downloadydb-148f920350c60c0ca2d89b637a5aea9093eee450.tar.gz
Library import 2 (#639)
Diffstat (limited to 'yt')
-rw-r--r--yt/cpp/mapreduce/interface/operation.cpp6
-rw-r--r--yt/yt/client/api/public.h2
-rw-r--r--yt/yt/client/api/queue_client.h4
-rw-r--r--yt/yt/client/api/rpc_proxy/api_service_proxy.h1
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.cpp33
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.h8
-rw-r--r--yt/yt/client/api/security_client.h10
-rw-r--r--yt/yt/client/api/transaction.h18
-rw-r--r--yt/yt/client/bundle_controller_client/bundle_controller_client.h5
-rw-r--r--yt/yt/client/bundle_controller_client/bundle_controller_settings.cpp31
-rw-r--r--yt/yt/client/bundle_controller_client/bundle_controller_settings.h8
-rw-r--r--yt/yt/client/bundle_controller_client/public.h35
-rw-r--r--yt/yt/client/chunk_client/config.h2
-rw-r--r--yt/yt/client/chunk_client/public.h1
-rw-r--r--yt/yt/client/driver/chaos_commands.h2
-rw-r--r--yt/yt/client/driver/queue_commands.cpp3
-rw-r--r--yt/yt/client/federated/client.cpp9
-rw-r--r--yt/yt/client/journal_client/config.h4
-rw-r--r--yt/yt/client/queue_client/consumer_client.cpp5
-rw-r--r--yt/yt/client/queue_client/consumer_client.h2
-rw-r--r--yt/yt/client/table_client/helpers.cpp1
-rw-r--r--yt/yt/client/table_client/unversioned_row.cpp2
-rw-r--r--yt/yt/client/transaction_client/config.h2
-rw-r--r--yt/yt/client/unittests/mock/transaction.h8
-rw-r--r--yt/yt/core/actions/unittests/future_ut.cpp2
-rw-r--r--yt/yt/core/actions/unittests/new_with_offloaded_dtor_ut.cpp2
-rw-r--r--yt/yt/core/compression/unittests/dictionary_compression_ut.cpp2
-rw-r--r--yt/yt/core/concurrency/thread_pool_poller.cpp1
-rw-r--r--yt/yt/core/http/server.cpp10
-rw-r--r--yt/yt/core/logging/log_manager.cpp3
-rw-r--r--yt/yt/core/misc/cache_config.h1
-rw-r--r--yt/yt/core/misc/error.cpp1
-rw-r--r--yt/yt/core/misc/error.h2
-rw-r--r--yt/yt/core/misc/proc.cpp18
-rw-r--r--yt/yt/core/misc/proc.h3
-rw-r--r--yt/yt/core/misc/shutdown.cpp2
-rw-r--r--yt/yt/core/misc/unittests/error_ut.cpp1
-rw-r--r--yt/yt/core/net/connection.cpp2
-rw-r--r--yt/yt/core/rpc/config.cpp22
-rw-r--r--yt/yt/core/rpc/config.h34
-rw-r--r--yt/yt/core/rpc/public.h2
-rw-r--r--yt/yt/core/rpc/retrying_channel.h2
-rw-r--r--yt/yt/core/rpc/server.h4
-rw-r--r--yt/yt/core/rpc/server_detail.cpp54
-rw-r--r--yt/yt/core/rpc/server_detail.h9
-rw-r--r--yt/yt/core/rpc/service_detail.cpp1
-rw-r--r--yt/yt/core/yson/protobuf_interop.cpp69
-rw-r--r--yt/yt/core/yson/protobuf_interop.h6
-rw-r--r--yt/yt/core/yson/unittests/protobuf_yson_schema_ut.cpp5
-rw-r--r--yt/yt/core/ytalloc/bindings.cpp4
-rw-r--r--yt/yt/core/ytree/attributes-inl.h89
-rw-r--r--yt/yt/core/ytree/attributes.h4
-rw-r--r--yt/yt/core/ytree/convert-inl.h1
-rw-r--r--yt/yt/core/ytree/exception_helpers.cpp1
-rw-r--r--yt/yt/core/ytree/fluent.h1
-rw-r--r--yt/yt/core/ytree/helpers-inl.h74
-rw-r--r--yt/yt/core/ytree/helpers.cpp1
-rw-r--r--yt/yt/core/ytree/helpers.h2
-rw-r--r--yt/yt/core/ytree/unittests/yson_schema_ut.cpp2
-rw-r--r--yt/yt/core/ytree/yson_schema-inl.h95
-rw-r--r--yt/yt/core/ytree/yson_schema.h2
-rw-r--r--yt/yt/core/ytree/yson_struct.cpp4
-rw-r--r--yt/yt/core/ytree/yson_struct.h2
-rw-r--r--yt/yt/core/ytree/yson_struct_detail-inl.h4
-rw-r--r--yt/yt/core/ytree/yson_struct_detail.cpp21
-rw-r--r--yt/yt/core/ytree/yson_struct_detail.h8
-rw-r--r--yt/yt/library/backtrace_introspector/introspect.cpp1
-rw-r--r--yt/yt/library/backtrace_introspector/introspect_linux.cpp6
-rw-r--r--yt/yt/library/formats/arrow_writer.cpp14
-rw-r--r--yt/yt/library/profiling/solomon/exporter.h14
-rw-r--r--yt/yt/library/program/config.cpp14
-rw-r--r--yt/yt/library/program/config.h1
-rw-r--r--yt/yt/library/program/program_config_mixin.h6
-rw-r--r--yt/yt/library/tracing/jaeger/tracer.cpp4
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto14
75 files changed, 583 insertions, 266 deletions
diff --git a/yt/cpp/mapreduce/interface/operation.cpp b/yt/cpp/mapreduce/interface/operation.cpp
index 0bb490b1ac..b3229117d9 100644
--- a/yt/cpp/mapreduce/interface/operation.cpp
+++ b/yt/cpp/mapreduce/interface/operation.cpp
@@ -337,11 +337,7 @@ void TJobOperationPreparer::FinallyValidate() const
TApiUsageError error;
error << "Output table schemas are missing: ";
for (auto i : illegallyMissingSchemaIndices) {
- error << "no. " << i;
- if (auto path = Context_.GetInputPath(i)) {
- error << "(" << *path << ")";
- }
- error << "; ";
+ error << "no. " << i << " (" << Context_.GetOutputPath(i).GetOrElse("<unknown path>") << "); ";
}
ythrow std::move(error);
}
diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h
index 82a2adbe74..c671dea611 100644
--- a/yt/yt/client/api/public.h
+++ b/yt/yt/client/api/public.h
@@ -6,6 +6,8 @@
#include <yt/yt/client/transaction_client/public.h>
+#include <yt/yt/client/bundle_controller_client/public.h>
+
#include <yt/yt/library/auth/authentication_options.h>
#include <yt/yt/core/misc/public.h>
diff --git a/yt/yt/client/api/queue_client.h b/yt/yt/client/api/queue_client.h
index a19aef2b90..c665386fcd 100644
--- a/yt/yt/client/api/queue_client.h
+++ b/yt/yt/client/api/queue_client.h
@@ -39,6 +39,10 @@ struct TPullRowsResult
bool Versioned = true;
};
+struct TAdvanceConsumerOptions
+ : public TTimeoutOptions
+{ };
+
struct TPullQueueOptions
: public TSelectRowsOptions
, public TFallbackReplicaOptions
diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
index 354c883ef5..3a28b830ca 100644
--- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h
+++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
@@ -84,6 +84,7 @@ public:
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, AlterReplicationCard);
// Queues
+ DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, AdvanceConsumer);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PullQueue);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PullConsumer);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, RegisterQueueConsumer);
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
index 6777b7109f..e3c621c8f1 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
@@ -485,6 +485,39 @@ void TTransaction::ModifyRows(
}
}
+TFuture<void> TTransaction::AdvanceConsumer(
+ const NYPath::TRichYPath& consumerPath,
+ const NYPath::TRichYPath& queuePath,
+ int partitionIndex,
+ std::optional<i64> oldOffset,
+ i64 newOffset,
+ const TAdvanceConsumerOptions& options)
+{
+ ValidateTabletTransactionId(GetId());
+
+ THROW_ERROR_EXCEPTION_IF(newOffset < 0, "Queue consumer offset %v cannot be negative", newOffset);
+
+ auto req = Proxy_.AdvanceConsumer();
+ SetTimeoutOptions(*req, options);
+
+ if (NTracing::IsCurrentTraceContextRecorded()) {
+ req->TracingTags().emplace_back("yt.consumer_path", ToString(consumerPath));
+ req->TracingTags().emplace_back("yt.queue_path", ToString(queuePath));
+ }
+
+ ToProto(req->mutable_transaction_id(), GetId());
+
+ ToProto(req->mutable_consumer_path(), consumerPath);
+ ToProto(req->mutable_queue_path(), queuePath);
+ req->set_partition_index(partitionIndex);
+ if (oldOffset) {
+ req->set_old_offset(*oldOffset);
+ }
+ req->set_new_offset(newOffset);
+
+ return req->Invoke().As<void>();
+}
+
TFuture<ITransactionPtr> TTransaction::StartTransaction(
ETransactionType type,
const TTransactionStartOptions& options)
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h
index 200e9e7539..5286ea931b 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.h
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h
@@ -73,6 +73,14 @@ public:
TSharedRange<NApi::TRowModification> modifications,
const NApi::TModifyRowsOptions& options) override;
+ TFuture<void> AdvanceConsumer(
+ const NYPath::TRichYPath& consumerPath,
+ const NYPath::TRichYPath& queuePath,
+ int partitionIndex,
+ std::optional<i64> oldOffset,
+ i64 newOffset,
+ const TAdvanceConsumerOptions& options) override;
+
// IClientBase implementation.
TFuture<NApi::ITransactionPtr> StartTransaction(
NTransactionClient::ETransactionType type,
diff --git a/yt/yt/client/api/security_client.h b/yt/yt/client/api/security_client.h
index ed76f72d26..2c979a147d 100644
--- a/yt/yt/client/api/security_client.h
+++ b/yt/yt/client/api/security_client.h
@@ -76,11 +76,21 @@ struct TIssueTokenOptions
: public TTimeoutOptions
{ };
+struct TIssueTemporaryTokenOptions
+ : public TIssueTokenOptions
+{
+ TDuration ExpirationTimeout;
+};
+
struct TIssueTokenResult
{
TString Token;
};
+struct TRefreshTemporaryTokenOptions
+ : public TTimeoutOptions
+{ };
+
struct TRevokeTokenOptions
: public TTimeoutOptions
{ };
diff --git a/yt/yt/client/api/transaction.h b/yt/yt/client/api/transaction.h
index 4677db4864..3fb39fa740 100644
--- a/yt/yt/client/api/transaction.h
+++ b/yt/yt/client/api/transaction.h
@@ -240,22 +240,32 @@ struct ITransaction
// Consumers.
- //! Advance the consumer's offset for the given partition, setting it to a new value.
- //!
- //! If oldOffset is specified, the current offset is read inside this transaction and compared with oldOffset.
- //! If they are equal, the new offset is written, otherwise an exception is thrown.
+ // TODO(nadya73): Remove it: YT-20712
void AdvanceConsumer(
const NYPath::TYPath& path,
int partitionIndex,
std::optional<i64> oldOffset,
i64 newOffset);
+ // TODO(nadya73): Remove it: YT-20712
void AdvanceConsumer(
const NYPath::TRichYPath& consumerPath,
const NYPath::TRichYPath& queuePath,
int partitionIndex,
std::optional<i64> oldOffset,
i64 newOffset);
+
+ //! Advance the consumer's offset for the given partition with index #partitionIndex, setting it to #newOffset.
+ //!
+ //! If #oldOffset is specified, the current offset is read inside this transaction and compared with #oldOffset.
+ //! If they are equal, the new offset is written, otherwise an exception is thrown.
+ virtual TFuture<void> AdvanceConsumer(
+ const NYPath::TRichYPath& consumerPath,
+ const NYPath::TRichYPath& queuePath,
+ int partitionIndex,
+ std::optional<i64> oldOffset,
+ i64 newOffset,
+ const TAdvanceConsumerOptions& options) = 0;
};
DEFINE_REFCOUNTED_TYPE(ITransaction)
diff --git a/yt/yt/client/bundle_controller_client/bundle_controller_client.h b/yt/yt/client/bundle_controller_client/bundle_controller_client.h
index 0341458955..30cf31c72e 100644
--- a/yt/yt/client/bundle_controller_client/bundle_controller_client.h
+++ b/yt/yt/client/bundle_controller_client/bundle_controller_client.h
@@ -1,5 +1,6 @@
#pragma once
+#include "public.h"
#include "bundle_controller_settings.h"
#include <yt/yt/client/api/client_common.h>
@@ -8,10 +9,6 @@ namespace NYT::NApi {
////////////////////////////////////////////////////////////////////////////////
-DECLARE_REFCOUNTED_STRUCT(TBundleConfigDescriptor)
-
-////////////////////////////////////////////////////////////////////////////////
-
struct TGetBundleConfigOptions
: public TTimeoutOptions
{ };
diff --git a/yt/yt/client/bundle_controller_client/bundle_controller_settings.cpp b/yt/yt/client/bundle_controller_client/bundle_controller_settings.cpp
index 194b020c1d..12f044ca9c 100644
--- a/yt/yt/client/bundle_controller_client/bundle_controller_settings.cpp
+++ b/yt/yt/client/bundle_controller_client/bundle_controller_settings.cpp
@@ -64,10 +64,15 @@ bool TInstanceResources::operator==(const TInstanceResources& other) const
namespace NProto {
-// TODO(alexmipt): make ToProto for TCpuLimits, TMemoryLimits, TInstanceResources
-
////////////////////////////////////////////////////////////////////////////////
+void ToProto(NBundleController::NProto::TCpuLimits* protoCpuLimits, const NBundleControllerClient::TCpuLimitsPtr cpuLimits)
+{
+ protoCpuLimits->set_lookup_thread_pool_size(cpuLimits->LookupThreadPoolSize);
+ protoCpuLimits->set_query_thread_pool_size(cpuLimits->QueryThreadPoolSize);
+ protoCpuLimits->set_write_thread_pool_size(cpuLimits->WriteThreadPoolSize);
+}
+
void FromProto(NBundleControllerClient::TCpuLimitsPtr cpuLimits, const NBundleController::NProto::TCpuLimits* protoCpuLimits)
{
cpuLimits->LookupThreadPoolSize = protoCpuLimits->lookup_thread_pool_size();
@@ -77,6 +82,20 @@ void FromProto(NBundleControllerClient::TCpuLimitsPtr cpuLimits, const NBundleCo
////////////////////////////////////////////////////////////////////////////////
+void ToProto(NBundleController::NProto::TMemoryLimits* protoMemoryLimits, const NBundleControllerClient::TMemoryLimitsPtr memoryLimits)
+{
+ protoMemoryLimits->set_compressed_block_cache(memoryLimits->CompressedBlockCache.value_or(0));
+ protoMemoryLimits->set_key_filter_block_cache(memoryLimits->KeyFilterBlockCache.value_or(0));
+ protoMemoryLimits->set_lookup_row_cache(memoryLimits->LookupRowCache.value_or(0));
+
+ protoMemoryLimits->set_tablet_dynamic(memoryLimits->TabletDynamic.value_or(0));
+ protoMemoryLimits->set_tablet_static(memoryLimits->TabletStatic.value_or(0));
+
+ protoMemoryLimits->set_uncompressed_block_cache(memoryLimits->UncompressedBlockCache.value_or(0));
+
+ protoMemoryLimits->set_versioned_chunk_meta(memoryLimits->VersionedChunkMeta.value_or(0));
+}
+
void FromProto(NBundleControllerClient::TMemoryLimitsPtr memoryLimits, const NBundleController::NProto::TMemoryLimits* protoMemoryLimits)
{
memoryLimits->CompressedBlockCache = protoMemoryLimits->compressed_block_cache();
@@ -93,6 +112,14 @@ void FromProto(NBundleControllerClient::TMemoryLimitsPtr memoryLimits, const NBu
////////////////////////////////////////////////////////////////////////////////
+void ToProto(NBundleController::NProto::TInstanceResources* protoInstanceResources, const NBundleControllerClient::TInstanceResourcesPtr instanceResources)
+{
+ protoInstanceResources->set_memory(instanceResources->Memory);
+ protoInstanceResources->set_net(instanceResources->Net.value_or(0));
+ protoInstanceResources->set_type(instanceResources->Type);
+ protoInstanceResources->set_vcpu(instanceResources->Vcpu);
+}
+
void FromProto(NBundleControllerClient::TInstanceResourcesPtr instanceResources, const NBundleController::NProto::TInstanceResources* protoInstanceResources)
{
instanceResources->Memory = protoInstanceResources->memory();
diff --git a/yt/yt/client/bundle_controller_client/bundle_controller_settings.h b/yt/yt/client/bundle_controller_client/bundle_controller_settings.h
index 8dcf241ae1..da57ead80a 100644
--- a/yt/yt/client/bundle_controller_client/bundle_controller_settings.h
+++ b/yt/yt/client/bundle_controller_client/bundle_controller_settings.h
@@ -1,5 +1,7 @@
#pragma once
+#include "public.h"
+
#include <optional>
#include <yt/yt/client/tablet_client/public.h>
@@ -13,12 +15,6 @@ namespace NYT::NBundleControllerClient {
////////////////////////////////////////////////////////////////////////////////
-DECLARE_REFCOUNTED_STRUCT(TCpuLimits)
-DECLARE_REFCOUNTED_STRUCT(TMemoryLimits)
-DECLARE_REFCOUNTED_STRUCT(TInstanceResources)
-
-////////////////////////////////////////////////////////////////////////////////
-
struct TCpuLimits
: public NYTree::TYsonStruct
{
diff --git a/yt/yt/client/bundle_controller_client/public.h b/yt/yt/client/bundle_controller_client/public.h
new file mode 100644
index 0000000000..d73eac1539
--- /dev/null
+++ b/yt/yt/client/bundle_controller_client/public.h
@@ -0,0 +1,35 @@
+#pragma once
+
+#include <yt/yt/core/misc/public.h>
+
+#include <yt/yt/client/hydra/public.h>
+
+#include <yt/yt/client/object_client/public.h>
+
+namespace NYT::NApi {
+
+////////////////////////////////////////////////////////////////////////////////
+
+DECLARE_REFCOUNTED_STRUCT(TBundleConfigDescriptor)
+
+struct TBundleConfigDescriptor;
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi
+
+namespace NYT::NBundleControllerClient {
+
+////////////////////////////////////////////////////////////////////////////////
+
+DECLARE_REFCOUNTED_STRUCT(TCpuLimits)
+DECLARE_REFCOUNTED_STRUCT(TMemoryLimits)
+DECLARE_REFCOUNTED_STRUCT(TInstanceResources)
+
+struct TCpuLimits;
+struct TMemoryLimits;
+struct TInstanceResources;
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NBundleControllerClient
diff --git a/yt/yt/client/chunk_client/config.h b/yt/yt/client/chunk_client/config.h
index 5a28cb0750..9f3488b003 100644
--- a/yt/yt/client/chunk_client/config.h
+++ b/yt/yt/client/chunk_client/config.h
@@ -4,7 +4,7 @@
#include <yt/yt/client/misc/config.h>
-#include <yt/yt/core/ytree/yson_serializable.h>
+#include <yt/yt/core/ytree/yson_struct.h>
namespace NYT::NChunkClient {
diff --git a/yt/yt/client/chunk_client/public.h b/yt/yt/client/chunk_client/public.h
index 6cf877026e..1482ff2a58 100644
--- a/yt/yt/client/chunk_client/public.h
+++ b/yt/yt/client/chunk_client/public.h
@@ -79,6 +79,7 @@ YT_DEFINE_ERROR_ENUM(
((LocationCrashed) (750))
((LocationDiskWaitingReplacement) (751))
((ChunkMetaCacheFetchFailed) (752))
+ ((LocationMediumIsMisconfigured) (753))
);
using TChunkId = NObjectClient::TObjectId;
diff --git a/yt/yt/client/driver/chaos_commands.h b/yt/yt/client/driver/chaos_commands.h
index 831991ec08..ac50bcf643 100644
--- a/yt/yt/client/driver/chaos_commands.h
+++ b/yt/yt/client/driver/chaos_commands.h
@@ -4,7 +4,7 @@
#include <yt/yt/client/chaos_client/replication_card.h>
-#include <yt/yt/core/ytree/yson_serializable.h>
+#include <yt/yt/core/ytree/yson_struct.h>
namespace NYT::NDriver {
diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp
index e2aaf35e7c..2d82e0d22c 100644
--- a/yt/yt/client/driver/queue_commands.cpp
+++ b/yt/yt/client/driver/queue_commands.cpp
@@ -233,7 +233,8 @@ void TAdvanceConsumerCommand::DoExecute(ICommandContextPtr context)
{
auto transaction = GetTransaction(context);
- transaction->AdvanceConsumer(ConsumerPath, QueuePath, PartitionIndex, OldOffset, NewOffset);
+ WaitFor(transaction->AdvanceConsumer(ConsumerPath, QueuePath, PartitionIndex, OldOffset, NewOffset, /*options*/ {}))
+ .ThrowOnError();
if (ShouldCommitTransaction()) {
WaitFor(transaction->Commit())
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index f2606da4d6..3e2bf635ad 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -77,6 +77,14 @@ public:
TSharedRange<TRowModification> modifications,
const TModifyRowsOptions& options) override;
+ TFuture<void> AdvanceConsumer(
+ const NYPath::TRichYPath& consumerPath,
+ const NYPath::TRichYPath& queuePath,
+ int partitionIndex,
+ std::optional<i64> oldOffset,
+ i64 newOffset,
+ const TAdvanceConsumerOptions& options) override;
+
TFuture<TTransactionFlushResult> Flush() override;
TFuture<void> Ping(const NApi::TTransactionPingOptions& options = {}) override;
@@ -478,6 +486,7 @@ TRANSACTION_METHOD_IMPL(void, Abort, (const TTransactionAbortOptions&));
TRANSACTION_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&));
TRANSACTION_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookup, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&));
TRANSACTION_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&));
+TRANSACTION_METHOD_IMPL(void, AdvanceConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, int, std::optional<i64>, i64, const TAdvanceConsumerOptions&));
TRANSACTION_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&));
TRANSACTION_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&));
TRANSACTION_METHOD_IMPL(NYson::TYsonString, ListNode, (const NYPath::TYPath&, const TListNodeOptions&));
diff --git a/yt/yt/client/journal_client/config.h b/yt/yt/client/journal_client/config.h
index b89a248d2b..23e2d7150c 100644
--- a/yt/yt/client/journal_client/config.h
+++ b/yt/yt/client/journal_client/config.h
@@ -2,10 +2,10 @@
#include "public.h"
-#include <yt/yt/core/ytree/yson_serializable.h>
-
#include <yt/yt/client/chunk_client/config.h>
+#include <yt/yt/core/ytree/yson_struct.h>
+
namespace NYT::NJournalClient {
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/queue_client/consumer_client.cpp b/yt/yt/client/queue_client/consumer_client.cpp
index 2089ba21aa..8690cb6433 100644
--- a/yt/yt/client/queue_client/consumer_client.cpp
+++ b/yt/yt/client/queue_client/consumer_client.cpp
@@ -575,6 +575,11 @@ ISubConsumerClientPtr CreateSubConsumerClient(
return CreateConsumerClient(client, consumerPath)->GetSubConsumerClient(queueRef);
}
+const TTableSchemaPtr& GetConsumerSchema()
+{
+ return YTConsumerTableSchema;
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NQueueClient
diff --git a/yt/yt/client/queue_client/consumer_client.h b/yt/yt/client/queue_client/consumer_client.h
index 5c239828fa..80bfce0435 100644
--- a/yt/yt/client/queue_client/consumer_client.h
+++ b/yt/yt/client/queue_client/consumer_client.h
@@ -118,6 +118,8 @@ ISubConsumerClientPtr CreateSubConsumerClient(
const NYPath::TYPath& consumerPath,
NYPath::TRichYPath queuePath);
+const NTableClient::TTableSchemaPtr& GetConsumerSchema();
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NQueueClient
diff --git a/yt/yt/client/table_client/helpers.cpp b/yt/yt/client/table_client/helpers.cpp
index 66343bc32b..60767d1a68 100644
--- a/yt/yt/client/table_client/helpers.cpp
+++ b/yt/yt/client/table_client/helpers.cpp
@@ -5,6 +5,7 @@
#include <yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.pb.h>
+#include <yt/yt/core/ytree/attributes.h>
#include <yt/yt/core/ytree/convert.h>
#include <yt/yt/core/net/address.h>
diff --git a/yt/yt/client/table_client/unversioned_row.cpp b/yt/yt/client/table_client/unversioned_row.cpp
index 7a28b1333f..cff031f054 100644
--- a/yt/yt/client/table_client/unversioned_row.cpp
+++ b/yt/yt/client/table_client/unversioned_row.cpp
@@ -16,7 +16,7 @@
#include <yt/yt/core/yson/consumer.h>
-#include <yt/yt/core/ytree/helpers.h>
+#include <yt/yt/core/ytree/attributes.h>
#include <yt/yt/core/ytree/node.h>
#include <yt/yt/core/ytree/convert.h>
diff --git a/yt/yt/client/transaction_client/config.h b/yt/yt/client/transaction_client/config.h
index c4114b72c5..9c8c69d6a6 100644
--- a/yt/yt/client/transaction_client/config.h
+++ b/yt/yt/client/transaction_client/config.h
@@ -4,7 +4,7 @@
#include <yt/yt/core/rpc/config.h>
-#include <yt/yt/core/ytree/yson_serializable.h>
+#include <yt/yt/core/ytree/yson_struct.h>
namespace NYT::NTransactionClient {
diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h
index 06c9c65148..d8380bdd61 100644
--- a/yt/yt/client/unittests/mock/transaction.h
+++ b/yt/yt/client/unittests/mock/transaction.h
@@ -195,6 +195,14 @@ public:
MOCK_METHOD(void, SubscribeAborted, (const TAbortedHandler& callback), (override));
MOCK_METHOD(void, UnsubscribeAborted, (const TAbortedHandler& callback), (override));
+ MOCK_METHOD(TFuture<void>, AdvanceConsumer, (
+ const NYPath::TRichYPath& consumerPath,
+ const NYPath::TRichYPath& queuePath,
+ int partitionIndex,
+ std::optional<i64> oldOffset,
+ i64 newOffset,
+ const TAdvanceConsumerOptions& options), (override));
+
MOCK_METHOD(void, ModifyRows, (
const NYPath::TYPath& path,
NTableClient::TNameTablePtr nameTable,
diff --git a/yt/yt/core/actions/unittests/future_ut.cpp b/yt/yt/core/actions/unittests/future_ut.cpp
index a979e8477a..9b564e7481 100644
--- a/yt/yt/core/actions/unittests/future_ut.cpp
+++ b/yt/yt/core/actions/unittests/future_ut.cpp
@@ -7,6 +7,8 @@
#include <yt/yt/core/misc/ref_counted_tracker.h>
#include <yt/yt/core/misc/mpsc_stack.h>
+#include <yt/yt/core/ytree/attributes.h>
+
#include <util/system/thread.h>
#include <thread>
diff --git a/yt/yt/core/actions/unittests/new_with_offloaded_dtor_ut.cpp b/yt/yt/core/actions/unittests/new_with_offloaded_dtor_ut.cpp
index 5c9a9ffab9..be7482808a 100644
--- a/yt/yt/core/actions/unittests/new_with_offloaded_dtor_ut.cpp
+++ b/yt/yt/core/actions/unittests/new_with_offloaded_dtor_ut.cpp
@@ -7,6 +7,8 @@
#include <yt/yt/core/misc/ref_counted_tracker.h>
#include <yt/yt/core/misc/proc.h>
+#include <yt/yt/core/logging/log.h>
+
namespace NYT {
namespace {
diff --git a/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp b/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp
index c96d1bc40c..5eb10dee42 100644
--- a/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp
+++ b/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp
@@ -6,6 +6,8 @@
#include <yt/yt/core/misc/error.h>
#include <yt/yt/core/misc/serialize.h>
+#include <yt/yt/core/ytree/attributes.h>
+
#include <library/cpp/yt/memory/chunked_memory_pool.h>
#include <util/random/fast.h>
diff --git a/yt/yt/core/concurrency/thread_pool_poller.cpp b/yt/yt/core/concurrency/thread_pool_poller.cpp
index 0b45173871..f486003cc1 100644
--- a/yt/yt/core/concurrency/thread_pool_poller.cpp
+++ b/yt/yt/core/concurrency/thread_pool_poller.cpp
@@ -7,6 +7,7 @@
#include "two_level_fair_share_thread_pool.h"
#include "new_fair_share_thread_pool.h"
+#include <yt/yt/core/misc/collection_helpers.h>
#include <yt/yt/core/misc/proc.h>
#include <yt/yt/core/misc/mpsc_stack.h>
#include <yt/yt/core/misc/ref_tracked.h>
diff --git a/yt/yt/core/http/server.cpp b/yt/yt/core/http/server.cpp
index b2c95d1dbd..132e0b1fbd 100644
--- a/yt/yt/core/http/server.cpp
+++ b/yt/yt/core/http/server.cpp
@@ -262,6 +262,11 @@ private:
}
}));
+ auto finally = Finally([&] {
+ auto count = ActiveConnections_.fetch_sub(1) - 1;
+ ConnectionsActive_.Update(count);
+ });
+
if (Config_->NoDelay) {
connection->SetNoDelay();
}
@@ -274,11 +279,6 @@ private:
void DoHandleConnection(const IConnectionPtr& connection, TGuid connectionId)
{
- auto finally = Finally([&] {
- auto count = ActiveConnections_.fetch_sub(1) - 1;
- ConnectionsActive_.Update(count);
- });
-
auto request = New<THttpInput>(
connection,
connection->RemoteAddress(),
diff --git a/yt/yt/core/logging/log_manager.cpp b/yt/yt/core/logging/log_manager.cpp
index 62dba0253d..b95beb20fb 100644
--- a/yt/yt/core/logging/log_manager.cpp
+++ b/yt/yt/core/logging/log_manager.cpp
@@ -16,6 +16,7 @@
#include <yt/yt/core/concurrency/invoker_queue.h>
#include <yt/yt/core/concurrency/thread_pool.h>
+#include <yt/yt/core/misc/collection_helpers.h>
#include <yt/yt/core/misc/fs.h>
#include <yt/yt/core/misc/spsc_queue.h>
#include <yt/yt/core/misc/mpsc_stack.h>
@@ -32,7 +33,7 @@
#include <yt/yt/core/ytree/ypath_client.h>
#include <yt/yt/core/ytree/ypath_service.h>
-#include <yt/yt/core/ytree/yson_serializable.h>
+#include <yt/yt/core/ytree/yson_struct.h>
#include <yt/yt/core/ytree/convert.h>
#include <yt/yt/library/profiling/producer.h>
diff --git a/yt/yt/core/misc/cache_config.h b/yt/yt/core/misc/cache_config.h
index 9ac7c9063c..24e6b2c946 100644
--- a/yt/yt/core/misc/cache_config.h
+++ b/yt/yt/core/misc/cache_config.h
@@ -2,7 +2,6 @@
#include "public.h"
-#include <yt/yt/core/ytree/yson_serializable.h>
#include <yt/yt/core/ytree/yson_struct.h>
namespace NYT {
diff --git a/yt/yt/core/misc/error.cpp b/yt/yt/core/misc/error.cpp
index ce9a942841..511a9fb26c 100644
--- a/yt/yt/core/misc/error.cpp
+++ b/yt/yt/core/misc/error.cpp
@@ -16,6 +16,7 @@
#include <yt/yt/core/yson/tokenizer.h>
+#include <yt/yt/core/ytree/attributes.h>
#include <yt/yt/core/ytree/convert.h>
#include <yt/yt/core/ytree/fluent.h>
diff --git a/yt/yt/core/misc/error.h b/yt/yt/core/misc/error.h
index be3a5d9ec7..9dba023408 100644
--- a/yt/yt/core/misc/error.h
+++ b/yt/yt/core/misc/error.h
@@ -6,7 +6,7 @@
#include <yt/yt/core/yson/string.h>
-#include <yt/yt/core/ytree/attributes.h>
+#include <yt/yt/core/ytree/public.h>
#include <yt/yt/core/tracing/public.h>
diff --git a/yt/yt/core/misc/proc.cpp b/yt/yt/core/misc/proc.cpp
index 1b7aefe466..2fbd8ba1d6 100644
--- a/yt/yt/core/misc/proc.cpp
+++ b/yt/yt/core/misc/proc.cpp
@@ -314,6 +314,24 @@ std::vector<size_t> GetCurrentProcessThreadIds()
#endif
}
+bool IsUserspaceThread(size_t tid)
+{
+#ifdef __linux__
+ TFileInput file(Format("/proc/%v/stat", tid));
+ auto statFields = SplitString(file.ReadLine(), " ");
+ constexpr int StartStackIndex = 27;
+ if (statFields.size() < StartStackIndex) {
+ return false;
+ }
+ // This is just a heuristic.
+ auto startStack = FromString<ui64>(statFields[StartStackIndex]);
+ return startStack != 0;
+#else
+ Y_UNUSED(tid);
+ return false;
+#endif
+}
+
void ChownChmodDirectory(const TString& path, const std::optional<uid_t>& userId, const std::optional<int>& permissions)
{
#ifdef _unix_
diff --git a/yt/yt/core/misc/proc.h b/yt/yt/core/misc/proc.h
index 2d52d0d209..823b64397e 100644
--- a/yt/yt/core/misc/proc.h
+++ b/yt/yt/core/misc/proc.h
@@ -4,7 +4,7 @@
#include <yt/yt/core/misc/error.h>
-#include <yt/yt/core/ytree/yson_serializable.h>
+#include <util/system/file.h>
#include <errno.h>
@@ -110,6 +110,7 @@ ui64 GetProcessCumulativeMajorPageFaults(int pid = -1);
size_t GetCurrentProcessId();
size_t GetCurrentThreadId();
std::vector<size_t> GetCurrentProcessThreadIds();
+bool IsUserspaceThread(size_t tid);
void ChownChmodDirectory(
const TString& path,
diff --git a/yt/yt/core/misc/shutdown.cpp b/yt/yt/core/misc/shutdown.cpp
index 3646df7150..25a772a2d6 100644
--- a/yt/yt/core/misc/shutdown.cpp
+++ b/yt/yt/core/misc/shutdown.cpp
@@ -4,6 +4,8 @@
#include <yt/yt/core/misc/proc.h>
#include <yt/yt/core/misc/singleton.h>
+#include <library/cpp/yt/cpu_clock/clock.h>
+
#include <library/cpp/yt/threading/fork_aware_spin_lock.h>
#include <library/cpp/yt/threading/event_count.h>
diff --git a/yt/yt/core/misc/unittests/error_ut.cpp b/yt/yt/core/misc/unittests/error_ut.cpp
index f8dad10b3c..f9cbbe9e9e 100644
--- a/yt/yt/core/misc/unittests/error_ut.cpp
+++ b/yt/yt/core/misc/unittests/error_ut.cpp
@@ -5,6 +5,7 @@
#include <yt/yt/core/yson/string.h>
+#include <yt/yt/core/ytree/attributes.h>
#include <yt/yt/core/ytree/convert.h>
#include <util/stream/str.h>
diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp
index 38427b05b9..d0018dbbe8 100644
--- a/yt/yt/core/net/connection.cpp
+++ b/yt/yt/core/net/connection.cpp
@@ -33,7 +33,7 @@
namespace NYT::NNet {
using namespace NConcurrency;
-using namespace NProfiling;
+// using namespace NProfiling;
#ifdef _unix_
using TIOVecBasePtr = void*;
diff --git a/yt/yt/core/rpc/config.cpp b/yt/yt/core/rpc/config.cpp
index 14143a6585..ec29c3b88c 100644
--- a/yt/yt/core/rpc/config.cpp
+++ b/yt/yt/core/rpc/config.cpp
@@ -38,6 +38,20 @@ void TServiceCommonConfig::Register(TRegistrar registrar)
////////////////////////////////////////////////////////////////////////////////
+void TServiceCommonDynamicConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("enable_per_user_profiling", &TThis::EnablePerUserProfiling)
+ .Default();
+ registrar.Parameter("histogram_timer_profiling", &TThis::HistogramTimerProfiling)
+ .Default();
+ registrar.Parameter("code_counting", &TThis::EnableErrorCodeCounting)
+ .Default();
+ registrar.Parameter("tracing_mode", &TThis::TracingMode)
+ .Default();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
void TServerConfig::Register(TRegistrar registrar)
{
registrar.Parameter("services", &TThis::Services)
@@ -46,6 +60,14 @@ void TServerConfig::Register(TRegistrar registrar)
////////////////////////////////////////////////////////////////////////////////
+void TServerDynamicConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("services", &TThis::Services)
+ .Default();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
void TServiceConfig::Register(TRegistrar registrar)
{
registrar.Parameter("enable_per_user_profiling", &TThis::EnablePerUserProfiling)
diff --git a/yt/yt/core/rpc/config.h b/yt/yt/core/rpc/config.h
index be8c1fdec5..9cdf971d18 100644
--- a/yt/yt/core/rpc/config.h
+++ b/yt/yt/core/rpc/config.h
@@ -90,6 +90,40 @@ DEFINE_REFCOUNTED_TYPE(TServerConfig)
////////////////////////////////////////////////////////////////////////////////
+// Common options shared between all services in one server.
+class TServiceCommonDynamicConfig
+ : public NYTree::TYsonStruct
+{
+public:
+ std::optional<bool> EnablePerUserProfiling;
+ std::optional<THistogramConfigPtr> HistogramTimerProfiling;
+ std::optional<bool> EnableErrorCodeCounting;
+ std::optional<ERequestTracingMode> TracingMode;
+
+ REGISTER_YSON_STRUCT(TServiceCommonDynamicConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TServiceCommonDynamicConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TServerDynamicConfig
+ : public TServiceCommonDynamicConfig
+{
+public:
+ THashMap<TString, NYTree::INodePtr> Services;
+
+ REGISTER_YSON_STRUCT(TServerDynamicConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TServerDynamicConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
class TServiceConfig
: public NYTree::TYsonStruct
{
diff --git a/yt/yt/core/rpc/public.h b/yt/yt/core/rpc/public.h
index af88870ad9..5548fa0f22 100644
--- a/yt/yt/core/rpc/public.h
+++ b/yt/yt/core/rpc/public.h
@@ -107,6 +107,8 @@ DECLARE_REFCOUNTED_CLASS(THistogramExponentialBounds)
DECLARE_REFCOUNTED_CLASS(THistogramConfig)
DECLARE_REFCOUNTED_CLASS(TServerConfig)
DECLARE_REFCOUNTED_CLASS(TServiceCommonConfig)
+DECLARE_REFCOUNTED_CLASS(TServerDynamicConfig)
+DECLARE_REFCOUNTED_CLASS(TServiceCommonDynamicConfig)
DECLARE_REFCOUNTED_CLASS(TServiceConfig)
DECLARE_REFCOUNTED_CLASS(TMethodConfig)
DECLARE_REFCOUNTED_CLASS(TRetryingChannelConfig)
diff --git a/yt/yt/core/rpc/retrying_channel.h b/yt/yt/core/rpc/retrying_channel.h
index d8d25fb922..7de5f5f8ec 100644
--- a/yt/yt/core/rpc/retrying_channel.h
+++ b/yt/yt/core/rpc/retrying_channel.h
@@ -2,8 +2,6 @@
#include "public.h"
-#include <yt/yt/core/ytree/yson_serializable.h>
-
namespace NYT::NRpc {
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/rpc/server.h b/yt/yt/core/rpc/server.h
index fe4c88a478..e48bf2ef5c 100644
--- a/yt/yt/core/rpc/server.h
+++ b/yt/yt/core/rpc/server.h
@@ -31,7 +31,9 @@ struct IServer
virtual IServicePtr GetServiceOrThrow(const TServiceId& serviceId) const = 0;
//! Reconfigures the server on-the-fly.
- virtual void Configure(TServerConfigPtr config) = 0;
+ virtual void Configure(const TServerConfigPtr& config) = 0;
+
+ virtual void OnDynamicConfigChanged(const TServerDynamicConfigPtr& config) = 0;
//! Starts the server.
/*!
diff --git a/yt/yt/core/rpc/server_detail.cpp b/yt/yt/core/rpc/server_detail.cpp
index 0f80a07b6b..82ba43afe4 100644
--- a/yt/yt/core/rpc/server_detail.cpp
+++ b/yt/yt/core/rpc/server_detail.cpp
@@ -768,12 +768,12 @@ void TServerBase::RegisterService(IServicePtr service)
auto guard = WriterGuard(ServicesLock_);
auto& serviceMap = RealmIdToServiceMap_[serviceId.RealmId];
YT_VERIFY(serviceMap.emplace(serviceId.ServiceName, service).second);
- if (Config_) {
- auto it = Config_->Services.find(serviceId.ServiceName);
- if (it != Config_->Services.end()) {
- service->Configure(Config_, it->second);
+ if (AppliedConfig_) {
+ auto it = AppliedConfig_->Services.find(serviceId.ServiceName);
+ if (it != AppliedConfig_->Services.end()) {
+ service->Configure(AppliedConfig_, it->second);
} else {
- service->Configure(Config_, nullptr);
+ service->Configure(AppliedConfig_, nullptr);
}
}
DoRegisterService(service);
@@ -866,26 +866,54 @@ IServicePtr TServerBase::GetServiceOrThrow(const TServiceId& serviceId) const
return serviceIt->second;
}
-void TServerBase::Configure(TServerConfigPtr config)
+void TServerBase::ApplyConfig()
{
- auto guard = WriterGuard(ServicesLock_);
+ VERIFY_SPINLOCK_AFFINITY(ServicesLock_);
- // Future services will be configured appropriately.
- Config_ = config;
+ auto newAppliedConfig = New<TServerConfig>();
+ newAppliedConfig->EnableErrorCodeCounting = DynamicConfig_->EnableErrorCodeCounting.value_or(StaticConfig_->EnableErrorCodeCounting);
+ newAppliedConfig->EnablePerUserProfiling = DynamicConfig_->EnablePerUserProfiling.value_or(StaticConfig_->EnablePerUserProfiling);
+ newAppliedConfig->HistogramTimerProfiling = DynamicConfig_->HistogramTimerProfiling.value_or(StaticConfig_->HistogramTimerProfiling);
+ newAppliedConfig->Services = StaticConfig_->Services;
+
+ for (const auto& [name, node] : DynamicConfig_->Services) {
+ newAppliedConfig->Services[name] = node;
+ }
+
+ AppliedConfig_ = newAppliedConfig;
// Apply configuration to all existing services.
for (const auto& [realmId, serviceMap] : RealmIdToServiceMap_) {
for (const auto& [serviceName, service] : serviceMap) {
- auto it = config->Services.find(serviceName);
- if (it != config->Services.end()) {
- service->Configure(config, it->second);
+ auto it = AppliedConfig_->Services.find(serviceName);
+ if (it != AppliedConfig_->Services.end()) {
+ service->Configure(AppliedConfig_, it->second);
} else {
- service->Configure(config, nullptr);
+ service->Configure(AppliedConfig_, nullptr);
}
}
}
}
+void TServerBase::Configure(const TServerConfigPtr& config)
+{
+ auto guard = WriterGuard(ServicesLock_);
+
+ // Future services will be configured appropriately.
+ StaticConfig_ = config;
+
+ ApplyConfig();
+}
+
+void TServerBase::OnDynamicConfigChanged(const TServerDynamicConfigPtr& config)
+{
+ auto guard = WriterGuard(ServicesLock_);
+
+ DynamicConfig_ = config;
+
+ ApplyConfig();
+}
+
void TServerBase::Start()
{
YT_VERIFY(!Started_);
diff --git a/yt/yt/core/rpc/server_detail.h b/yt/yt/core/rpc/server_detail.h
index 0a2647b643..c05e19216e 100644
--- a/yt/yt/core/rpc/server_detail.h
+++ b/yt/yt/core/rpc/server_detail.h
@@ -269,7 +269,8 @@ public:
IServicePtr FindService(const TServiceId& serviceId) const override;
IServicePtr GetServiceOrThrow(const TServiceId& serviceId) const override;
- void Configure(TServerConfigPtr config) override;
+ void Configure(const TServerConfigPtr& config) override;
+ void OnDynamicConfigChanged(const TServerDynamicConfigPtr& config) override;
void Start() override;
TFuture<void> Stop(bool graceful) override;
@@ -280,7 +281,9 @@ protected:
std::atomic<bool> Started_ = false;
YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, ServicesLock_);
- TServerConfigPtr Config_;
+ TServerConfigPtr StaticConfig_;
+ TServerDynamicConfigPtr DynamicConfig_ = New<TServerDynamicConfig>();
+ TServerConfigPtr AppliedConfig_;
//! Service name to service.
using TServiceMap = THashMap<TString, IServicePtr>;
@@ -288,6 +291,8 @@ protected:
explicit TServerBase(NLogging::TLogger logger);
+ void ApplyConfig();
+
virtual void DoStart();
virtual TFuture<void> DoStop(bool graceful);
diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp
index 2fd6f483ad..46567c3fcd 100644
--- a/yt/yt/core/rpc/service_detail.cpp
+++ b/yt/yt/core/rpc/service_detail.cpp
@@ -2451,6 +2451,7 @@ void TServiceBase::DoConfigure(
auto methodConfig = methodIt ? methodIt->second : New<TMethodConfig>();
const auto& descriptor = runtimeInfo->Descriptor;
+
runtimeInfo->Heavy.store(methodConfig->Heavy.value_or(descriptor.Options.Heavy));
runtimeInfo->QueueSizeLimit.store(methodConfig->QueueSizeLimit.value_or(descriptor.QueueSizeLimit));
runtimeInfo->ConcurrencyLimit.Reconfigure(methodConfig->ConcurrencyLimit.value_or(descriptor.ConcurrencyLimit));
diff --git a/yt/yt/core/yson/protobuf_interop.cpp b/yt/yt/core/yson/protobuf_interop.cpp
index 030cc763fc..60f4ee6e1a 100644
--- a/yt/yt/core/yson/protobuf_interop.cpp
+++ b/yt/yt/core/yson/protobuf_interop.cpp
@@ -182,7 +182,7 @@ void SetProtobufInteropConfig(TProtobufInteropDynamicConfigPtr config)
GlobalProtobufInteropConfig()->Config.Store(std::move(config));
}
-void GetSchema(const TProtobufEnumType* enumType, IYsonConsumer* consumer);
+void WriteSchema(const TProtobufEnumType* enumType, IYsonConsumer* consumer);
////////////////////////////////////////////////////////////////////////////////
@@ -501,17 +501,18 @@ public:
return EnumYsonStorageType_;
}
- void GetSchema(IYsonConsumer* consumer) const
+ void WriteSchema(IYsonConsumer* consumer) const
{
if (IsYsonMap()) {
- BuildYsonFluently(consumer).BeginMap()
- .Item("type_name").Value("dict")
- .Item("key").Do([this] (auto&& fluent) {
- GetYsonMapKeyField()->GetSchema(fluent.GetConsumer());
- })
- .Item("value").Do([this] (auto&& fluent) {
- GetYsonMapValueField()->GetSchema(fluent.GetConsumer());
- })
+ BuildYsonFluently(consumer)
+ .BeginMap()
+ .Item("type_name").Value("dict")
+ .Item("key").Do([&] (auto fluent) {
+ GetYsonMapKeyField()->WriteSchema(fluent.GetConsumer());
+ })
+ .Item("value").Do([&] (auto fluent) {
+ GetYsonMapValueField()->WriteSchema(fluent.GetConsumer());
+ })
.EndMap();
return;
@@ -558,10 +559,10 @@ public:
consumer->OnStringScalar("string");
break;
case FieldDescriptor::TYPE_ENUM:
- NYson::GetSchema(GetEnumType(), consumer);
+ NYson::WriteSchema(GetEnumType(), consumer);
break;
case FieldDescriptor::TYPE_MESSAGE:
- NYson::GetSchema(GetMessageType(), consumer);
+ NYson::WriteSchema(GetMessageType(), consumer);
break;
default:
break;
@@ -690,20 +691,21 @@ public:
}
}
- void GetSchema(IYsonConsumer* consumer) const
+ void WriteSchema(IYsonConsumer* consumer) const
{
BuildYsonFluently(consumer).BeginMap()
.Item("type_name").Value("struct")
- .Item("members").DoListFor(0, Underlying_->field_count(), [this] (auto&& fluent, int index) {
+ .Item("members").DoListFor(0, Underlying_->field_count(), [&] (auto fluent, int index) {
auto* field = GetFieldByNumber(Underlying_->field(index)->number());
- fluent.Item().BeginMap()
- .Item("name").Value(field->GetYsonName())
- .Item("type").Do([&] (auto&& fluent) {
- field->GetSchema(fluent.GetConsumer());
- })
- .DoIf(!field->IsYsonMap() && !field->IsRepeated() && !field->IsOptional(), [&] (auto && fluent) {
- fluent.Item("required").Value(true);
- })
+ fluent.Item()
+ .BeginMap()
+ .Item("name").Value(field->GetYsonName())
+ .Item("type").Do([&] (auto fluent) {
+ field->WriteSchema(fluent.GetConsumer());
+ })
+ .DoIf(!field->IsYsonMap() && !field->IsRepeated() && !field->IsOptional(), [] (auto fluent) {
+ fluent.Item("required").Value(true);
+ })
.EndMap();
})
.EndMap();
@@ -821,14 +823,15 @@ public:
return it == ValueToLiteral_.end() ? TStringBuf() : it->second;
}
- void GetSchema(IYsonConsumer* consumer) const
+ void WriteSchema(IYsonConsumer* consumer) const
{
- BuildYsonFluently(consumer).BeginMap()
- .Item("type_name").Value("enum")
- .Item("enum_name").Value(Underlying_->name())
- .Item("values").DoListFor(0, Underlying_->value_count(), [this] (auto&& fluent, int index) {
- fluent.Item().Value(FindLiteralByValue(Underlying_->value(index)->number()));
- })
+ BuildYsonFluently(consumer)
+ .BeginMap()
+ .Item("type_name").Value("enum")
+ .Item("enum_name").Value(Underlying_->name())
+ .Item("values").DoListFor(0, Underlying_->value_count(), [&] (auto fluent, int index) {
+ fluent.Item().Value(FindLiteralByValue(Underlying_->value(index)->number()));
+ })
.EndMap();
}
@@ -3140,14 +3143,14 @@ TString YsonStringToProto(
return serializedProto;
}
-void GetSchema(const TProtobufEnumType* type, IYsonConsumer* consumer)
+void WriteSchema(const TProtobufEnumType* type, IYsonConsumer* consumer)
{
- type->GetSchema(consumer);
+ type->WriteSchema(consumer);
}
-void GetSchema(const TProtobufMessageType* type, IYsonConsumer* consumer)
+void WriteSchema(const TProtobufMessageType* type, IYsonConsumer* consumer)
{
- type->GetSchema(consumer);
+ type->WriteSchema(consumer);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/yson/protobuf_interop.h b/yt/yt/core/yson/protobuf_interop.h
index 5daf062698..5bfe3fd93b 100644
--- a/yt/yt/core/yson/protobuf_interop.h
+++ b/yt/yt/core/yson/protobuf_interop.h
@@ -280,9 +280,11 @@ void SetProtobufInteropConfig(TProtobufInteropDynamicConfigPtr config);
////////////////////////////////////////////////////////////////////////////////
-//! Return type v3 schema for protobuf message type.
+//! Returns type v3 schema for protobuf message type.
//! Note: Recursive types (message has field with self type) are not supported.
-void GetSchema(const TProtobufMessageType* type, IYsonConsumer* consumer);
+void WriteSchema(const TProtobufMessageType* type, IYsonConsumer* consumer);
+
+////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NYson
diff --git a/yt/yt/core/yson/unittests/protobuf_yson_schema_ut.cpp b/yt/yt/core/yson/unittests/protobuf_yson_schema_ut.cpp
index 103f00b01d..af58fb320a 100644
--- a/yt/yt/core/yson/unittests/protobuf_yson_schema_ut.cpp
+++ b/yt/yt/core/yson/unittests/protobuf_yson_schema_ut.cpp
@@ -14,13 +14,14 @@
namespace NYT::NYson {
namespace {
+////////////////////////////////////////////////////////////////////////////////
TEST(TProtobufYsonSchemaTest, GetMessageSchema)
{
TStringStream outputStream;
TYsonWriter ysonWriter(&outputStream, EYsonFormat::Text);
- GetSchema(ReflectProtobufMessageType<NProto::TTestSchemaMessage>(), &ysonWriter);
+ WriteSchema(ReflectProtobufMessageType<NProto::TTestSchemaMessage>(), &ysonWriter);
TStringBuf expected = R"({
type_name="struct";
members=[
@@ -54,5 +55,7 @@ TEST(TProtobufYsonSchemaTest, GetMessageSchema)
<< "Actual: " << ConvertToYsonString(actualNode, EYsonFormat::Text, 4).AsStringBuf() << "\n\n";
}
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace
} // namespace NYT::NYson
diff --git a/yt/yt/core/ytalloc/bindings.cpp b/yt/yt/core/ytalloc/bindings.cpp
index 1f47fa7982..33eb077a5b 100644
--- a/yt/yt/core/ytalloc/bindings.cpp
+++ b/yt/yt/core/ytalloc/bindings.cpp
@@ -9,10 +9,10 @@
#include <yt/yt/core/misc/singleton.h>
#include <yt/yt/core/misc/string_builder.h>
-#include <yt/yt/core/ytree/yson_serializable.h>
-
#include <library/cpp/ytalloc/api/ytalloc.h>
+#include <library/cpp/yt/yson_string/string.h>
+
#include <util/system/env.h>
#include <cstdio>
diff --git a/yt/yt/core/ytree/attributes-inl.h b/yt/yt/core/ytree/attributes-inl.h
new file mode 100644
index 0000000000..dd367f5d24
--- /dev/null
+++ b/yt/yt/core/ytree/attributes-inl.h
@@ -0,0 +1,89 @@
+#ifndef ATTRIBUTES_INL_H_
+#error "Direct inclusion of this file is not allowed, include helpers.h"
+// For the sake of sane code completion.
+#include "attributes.h"
+#endif
+
+// #include "attribute_consumer.h"
+// #include "serialize.h"
+#include "convert.h"
+
+namespace NYT::NYTree {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T>
+T IAttributeDictionary::Get(TStringBuf key) const
+{
+ auto yson = GetYson(key);
+ try {
+ return ConvertTo<T>(yson);
+ } catch (const std::exception& ex) {
+ THROW_ERROR_EXCEPTION("Error parsing attribute %Qv",
+ key)
+ << ex;
+ }
+}
+
+template <class T>
+T IAttributeDictionary::GetAndRemove(const TString& key)
+{
+ auto result = Get<T>(key);
+ Remove(key);
+ return result;
+}
+
+template <class T>
+T IAttributeDictionary::Get(TStringBuf key, const T& defaultValue) const
+{
+ return Find<T>(key).value_or(defaultValue);
+}
+
+template <class T>
+T IAttributeDictionary::GetAndRemove(const TString& key, const T& defaultValue)
+{
+ auto result = Find<T>(key);
+ if (result) {
+ Remove(key);
+ return *result;
+ } else {
+ return defaultValue;
+ }
+}
+
+template <class T>
+typename TOptionalTraits<T>::TOptional IAttributeDictionary::Find(TStringBuf key) const
+{
+ auto yson = FindYson(key);
+ if (!yson) {
+ return typename TOptionalTraits<T>::TOptional();
+ }
+ try {
+ return ConvertTo<T>(yson);
+ } catch (const std::exception& ex) {
+ THROW_ERROR_EXCEPTION("Error parsing attribute %Qv",
+ key)
+ << ex;
+ }
+}
+
+template <class T>
+typename TOptionalTraits<T>::TOptional IAttributeDictionary::FindAndRemove(const TString& key)
+{
+ auto result = Find<T>(key);
+ if (result) {
+ Remove(key);
+ }
+ return result;
+}
+
+template <class T>
+void IAttributeDictionary::Set(const TString& key, const T& value)
+{
+ auto yson = ConvertToYsonString(value, NYson::EYsonFormat::Binary);
+ SetYson(key, yson);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTree
diff --git a/yt/yt/core/ytree/attributes.h b/yt/yt/core/ytree/attributes.h
index bff7c8fd55..c703bfa529 100644
--- a/yt/yt/core/ytree/attributes.h
+++ b/yt/yt/core/ytree/attributes.h
@@ -100,3 +100,7 @@ DEFINE_REFCOUNTED_TYPE(IAttributeDictionary)
} // namespace NYT::NYTree
+
+#define ATTRIBUTES_INL_H_
+#include "attributes-inl.h"
+#undef ATTRIBUTES_INL_H_
diff --git a/yt/yt/core/ytree/convert-inl.h b/yt/yt/core/ytree/convert-inl.h
index 0e0e1c925b..f32c92ea0b 100644
--- a/yt/yt/core/ytree/convert-inl.h
+++ b/yt/yt/core/ytree/convert-inl.h
@@ -4,6 +4,7 @@
#include "convert.h"
#endif
+#include "attribute_consumer.h"
#include "default_building_consumer.h"
#include "serialize.h"
#include "tree_builder.h"
diff --git a/yt/yt/core/ytree/exception_helpers.cpp b/yt/yt/core/ytree/exception_helpers.cpp
index 8e27ceb8e0..5260ff91e5 100644
--- a/yt/yt/core/ytree/exception_helpers.cpp
+++ b/yt/yt/core/ytree/exception_helpers.cpp
@@ -1,4 +1,5 @@
#include "exception_helpers.h"
+#include "attributes.h"
#include "node.h"
#include <yt/yt/core/misc/error.h>
diff --git a/yt/yt/core/ytree/fluent.h b/yt/yt/core/ytree/fluent.h
index f423fb17f5..6ef40dfc10 100644
--- a/yt/yt/core/ytree/fluent.h
+++ b/yt/yt/core/ytree/fluent.h
@@ -4,6 +4,7 @@
#include "tree_visitor.h"
#include "tree_builder.h"
#include "convert.h"
+#include "attributes.h"
#include "attribute_consumer.h"
#include "helpers.h"
diff --git a/yt/yt/core/ytree/helpers-inl.h b/yt/yt/core/ytree/helpers-inl.h
index ea204ab82b..4f19dc1814 100644
--- a/yt/yt/core/ytree/helpers-inl.h
+++ b/yt/yt/core/ytree/helpers-inl.h
@@ -12,80 +12,6 @@ namespace NYT::NYTree {
////////////////////////////////////////////////////////////////////////////////
-template <class T>
-T IAttributeDictionary::Get(TStringBuf key) const
-{
- auto yson = GetYson(key);
- try {
- return ConvertTo<T>(yson);
- } catch (const std::exception& ex) {
- THROW_ERROR_EXCEPTION("Error parsing attribute %Qv",
- key)
- << ex;
- }
-}
-
-template <class T>
-T IAttributeDictionary::GetAndRemove(const TString& key)
-{
- auto result = Get<T>(key);
- Remove(key);
- return result;
-}
-
-template <class T>
-T IAttributeDictionary::Get(TStringBuf key, const T& defaultValue) const
-{
- return Find<T>(key).value_or(defaultValue);
-}
-
-template <class T>
-T IAttributeDictionary::GetAndRemove(const TString& key, const T& defaultValue)
-{
- auto result = Find<T>(key);
- if (result) {
- Remove(key);
- return *result;
- } else {
- return defaultValue;
- }
-}
-
-template <class T>
-typename TOptionalTraits<T>::TOptional IAttributeDictionary::Find(TStringBuf key) const
-{
- auto yson = FindYson(key);
- if (!yson) {
- return typename TOptionalTraits<T>::TOptional();
- }
- try {
- return ConvertTo<T>(yson);
- } catch (const std::exception& ex) {
- THROW_ERROR_EXCEPTION("Error parsing attribute %Qv",
- key)
- << ex;
- }
-}
-
-template <class T>
-typename TOptionalTraits<T>::TOptional IAttributeDictionary::FindAndRemove(const TString& key)
-{
- auto result = Find<T>(key);
- if (result) {
- Remove(key);
- }
- return result;
-}
-
-template <class T>
-void IAttributeDictionary::Set(const TString& key, const T& value)
-{
- auto yson = ConvertToYsonString(value, NYson::EYsonFormat::Binary);
- SetYson(key, yson);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
template <class T, class R>
IYPathServicePtr IYPathService::FromMethod(
R (std::remove_cv_t<T>::*method) () const,
diff --git a/yt/yt/core/ytree/helpers.cpp b/yt/yt/core/ytree/helpers.cpp
index cad1e1367f..cd3bb2ce7f 100644
--- a/yt/yt/core/ytree/helpers.cpp
+++ b/yt/yt/core/ytree/helpers.cpp
@@ -1,4 +1,5 @@
#include "helpers.h"
+#include "attributes.h"
#include "ypath_client.h"
#include <yt/yt/core/misc/error.h>
diff --git a/yt/yt/core/ytree/helpers.h b/yt/yt/core/ytree/helpers.h
index 489785ea75..3320456f78 100644
--- a/yt/yt/core/ytree/helpers.h
+++ b/yt/yt/core/ytree/helpers.h
@@ -54,7 +54,7 @@ void ValidateYPathResolutionDepth(const NYPath::TYPath& path, int depth);
//! Helps implementing IAttributeDictionary::ListPairs by delegating to
//! IAttributeDictionary::ListKeys and IAttributeDictionary::FindYson for those not capable
//! of providing a custom efficient implementation.
-std::vector<IAttributeDictionary::TKeyValuePair> ListAttributesPairs(const IAttributeDictionary& attributes);
+std::vector<std::pair<TString, NYson::TYsonString>> ListAttributesPairs(const IAttributeDictionary& attributes);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/ytree/unittests/yson_schema_ut.cpp b/yt/yt/core/ytree/unittests/yson_schema_ut.cpp
index 3a9548eca6..65060e25e3 100644
--- a/yt/yt/core/ytree/unittests/yson_schema_ut.cpp
+++ b/yt/yt/core/ytree/unittests/yson_schema_ut.cpp
@@ -169,7 +169,7 @@ void CheckSchema(const TYsonStructPtr& ysonStruct, TStringBuf expected)
auto* factory = GetEphemeralNodeFactory();
auto builder = CreateBuilderFromFactory(factory);
builder->BeginTree();
- ysonStruct->GetSchema(builder.get());
+ ysonStruct->WriteSchema(builder.get());
auto actualNode = builder->EndTree();
auto expectedNode = ConvertToNode(TYsonStringBuf(expected), factory);
EXPECT_TRUE(AreNodesEqual(expectedNode, actualNode))
diff --git a/yt/yt/core/ytree/yson_schema-inl.h b/yt/yt/core/ytree/yson_schema-inl.h
index 0aefa82d02..2b4b0e5a33 100644
--- a/yt/yt/core/ytree/yson_schema-inl.h
+++ b/yt/yt/core/ytree/yson_schema-inl.h
@@ -46,7 +46,7 @@ concept CIsAssociativeArray = CIsArray<T> && CIsMapping<T>;
////////////////////////////////////////////////////////////////////////////////
#define DEFINE_SCHEMA_FOR_SIMPLE_TYPE(type, name) \
-inline void GetSchema(type, NYson::IYsonConsumer* consumer) \
+inline void WriteSchema(type, NYson::IYsonConsumer* consumer) \
{ \
BuildYsonFluently(consumer) \
.Value(#name); \
@@ -79,83 +79,88 @@ DEFINE_SCHEMA_FOR_SIMPLE_TYPE(TGuid, guid)
#undef DEFINE_SCHEMA_FOR_SIMPLE_TYPE
template <CIsEnum T>
-void GetSchema(const T&, NYson::IYsonConsumer* consumer)
+void WriteSchema(const T&, NYson::IYsonConsumer* consumer)
{
- BuildYsonFluently(consumer).BeginMap()
- .Item("type_name").Value("enum")
- .Item("enum_name").Value(TEnumTraits<T>::GetTypeName())
- .Item("values").DoListFor(
- TEnumTraits<T>::GetDomainNames(), [] (auto&& fluent, TStringBuf name) {
- fluent.Item().Value(EncodeEnumValue(name));
- })
+ BuildYsonFluently(consumer)
+ .BeginMap()
+ .Item("type_name").Value("enum")
+ .Item("enum_name").Value(TEnumTraits<T>::GetTypeName())
+ .Item("values").DoListFor(
+ TEnumTraits<T>::GetDomainNames(), [] (auto fluent, TStringBuf name) {
+ fluent.Item().Value(EncodeEnumValue(name));
+ })
.EndMap();
}
template <CIsYsonStruct T>
-void GetSchema(const NYT::TIntrusivePtr<T>& value, NYson::IYsonConsumer* consumer)
+void WriteSchema(const NYT::TIntrusivePtr<T>& value, NYson::IYsonConsumer* consumer)
{
- BuildYsonFluently(consumer) .BeginMap()
- .Item("type_name").Value("optional")
- .Item("item").Do([&] (auto&& fluent) {
- (value ? value : New<T>())->GetSchema(fluent.GetConsumer());
- })
+ BuildYsonFluently(consumer)
+ .BeginMap()
+ .Item("type_name").Value("optional")
+ .Item("item").Do([&] (auto fluent) {
+ (value ? value : New<T>())->WriteSchema(fluent.GetConsumer());
+ })
.EndMap();
}
template <CIsYsonStruct T>
-void GetSchema(const T& value, NYson::IYsonConsumer* consumer)
+void WriteSchema(const T& value, NYson::IYsonConsumer* consumer)
{
- return value.GetSchema(consumer);
+ return value.WriteSchema(consumer);
}
template <CIsProtobufMessage T>
-void GetSchema(const T&, NYson::IYsonConsumer* consumer)
+void WriteSchema(const T&, NYson::IYsonConsumer* consumer)
{
- return NYson::GetSchema(NYson::ReflectProtobufMessageType<T>(), consumer);
+ return NYson::WriteSchema(NYson::ReflectProtobufMessageType<T>(), consumer);
}
template <CIsArray T>
-void GetSchema(const T& value, NYson::IYsonConsumer* consumer)
+void WriteSchema(const T& value, NYson::IYsonConsumer* consumer)
{
- BuildYsonFluently(consumer).BeginMap()
- .Item("type_name").Value("list")
- .Item("item").Do([&] (auto&& fluent) {
- GetSchema(
- std::begin(value) != std::end(value)
- ? *std::begin(value)
- : std::decay_t<decltype(*std::begin(value))>{},
- fluent.GetConsumer());
- })
+ BuildYsonFluently(consumer)
+ .BeginMap()
+ .Item("type_name").Value("list")
+ .Item("item").Do([&] (auto fluent) {
+ WriteSchema(
+ std::begin(value) != std::end(value)
+ ? *std::begin(value)
+ : std::decay_t<decltype(*std::begin(value))>{},
+ fluent.GetConsumer());
+ })
.EndMap();
}
template <CIsAssociativeArray T>
-void GetSchema(const T& value, NYson::IYsonConsumer* consumer)
+void WriteSchema(const T& value, NYson::IYsonConsumer* consumer)
{
- BuildYsonFluently(consumer).BeginMap()
- .Item("type_name").Value("dict")
- .Item("key").Do([&] (auto&& fluent) {
- GetSchema(value.empty() ? typename T::key_type{} : value.begin()->first, fluent.GetConsumer());
- })
- .Item("value").Do([&] (auto&& fluent) {
- GetSchema(value.empty() ? typename T::mapped_type{} : value.begin()->second, fluent.GetConsumer());
- })
+ BuildYsonFluently(consumer)
+ .BeginMap()
+ .Item("type_name").Value("dict")
+ .Item("key").Do([&] (auto fluent) {
+ WriteSchema(value.empty() ? typename T::key_type{} : value.begin()->first, fluent.GetConsumer());
+ })
+ .Item("value").Do([&] (auto fluent) {
+ WriteSchema(value.empty() ? typename T::mapped_type{} : value.begin()->second, fluent.GetConsumer());
+ })
.EndMap();
}
template <CIsNullable T>
-void GetSchema(const T& value, NYson::IYsonConsumer* consumer)
+void WriteSchema(const T& value, NYson::IYsonConsumer* consumer)
{
- BuildYsonFluently(consumer) .BeginMap()
- .Item("type_name").Value("optional")
- .Item("item").Do([&] (auto&& fluent) {
- GetSchema(value ? *value : std::decay_t<decltype(*value)>{}, fluent.GetConsumer());
- })
+ BuildYsonFluently(consumer)
+ .BeginMap()
+ .Item("type_name").Value("optional")
+ .Item("item").Do([&] (auto fluent) {
+ WriteSchema(value ? *value : std::decay_t<decltype(*value)>{}, fluent.GetConsumer());
+ })
.EndMap();
}
template <class T>
-void GetSchema(const T& value, NYson::IYsonConsumer* consumer)
+void WriteSchema(const T& value, NYson::IYsonConsumer* consumer)
{
auto node = ConvertToNode(value);
BuildYsonFluently(consumer).Value(FormatEnum(node->GetType()));
diff --git a/yt/yt/core/ytree/yson_schema.h b/yt/yt/core/ytree/yson_schema.h
index 2b1f3a38e6..a2e9f02b26 100644
--- a/yt/yt/core/ytree/yson_schema.h
+++ b/yt/yt/core/ytree/yson_schema.h
@@ -7,7 +7,7 @@ namespace NYT::NYTree::NPrivate {
////////////////////////////////////////////////////////////////////////////////
template <typename T>
-void GetSchema(const T& value, NYson::IYsonConsumer* consumer);
+void WriteSchema(const T& value, NYson::IYsonConsumer* consumer);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/ytree/yson_struct.cpp b/yt/yt/core/ytree/yson_struct.cpp
index 881414d40b..09b98aebf3 100644
--- a/yt/yt/core/ytree/yson_struct.cpp
+++ b/yt/yt/core/ytree/yson_struct.cpp
@@ -135,9 +135,9 @@ std::vector<TString> TYsonStructBase::GetAllParameterAliases(const TString& key)
return result;
}
-void TYsonStructBase::GetSchema(IYsonConsumer* consumer) const
+void TYsonStructBase::WriteSchema(IYsonConsumer* consumer) const
{
- return Meta_->GetSchema(this, consumer);
+ return Meta_->WriteSchema(this, consumer);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/ytree/yson_struct.h b/yt/yt/core/ytree/yson_struct.h
index 18013c71fe..15acd2fa15 100644
--- a/yt/yt/core/ytree/yson_struct.h
+++ b/yt/yt/core/ytree/yson_struct.h
@@ -93,7 +93,7 @@ public:
std::vector<TString> GetAllParameterAliases(const TString& key) const;
- void GetSchema(NYson::IYsonConsumer* consumer) const;
+ void WriteSchema(NYson::IYsonConsumer* consumer) const;
private:
template <class TValue>
diff --git a/yt/yt/core/ytree/yson_struct_detail-inl.h b/yt/yt/core/ytree/yson_struct_detail-inl.h
index d6f507d37d..ac714065af 100644
--- a/yt/yt/core/ytree/yson_struct_detail-inl.h
+++ b/yt/yt/core/ytree/yson_struct_detail-inl.h
@@ -869,10 +869,10 @@ IMapNodePtr TYsonStructParameter<TValue>::GetRecursiveUnrecognized(const TYsonSt
}
template <class TValue>
-void TYsonStructParameter<TValue>::GetSchema(const TYsonStructBase* self, NYson::IYsonConsumer* consumer) const
+void TYsonStructParameter<TValue>::WriteSchema(const TYsonStructBase* self, NYson::IYsonConsumer* consumer) const
{
// TODO(bulatman) What about constraints: minimum, maximum, default and etc?
- NPrivate::GetSchema(FieldAccessor_->GetValue(self), consumer);
+ NPrivate::WriteSchema(FieldAccessor_->GetValue(self), consumer);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/ytree/yson_struct_detail.cpp b/yt/yt/core/ytree/yson_struct_detail.cpp
index e730ce9b09..c396e4d621 100644
--- a/yt/yt/core/ytree/yson_struct_detail.cpp
+++ b/yt/yt/core/ytree/yson_struct_detail.cpp
@@ -315,20 +315,21 @@ void TYsonStructMeta::SetUnrecognizedStrategy(EUnrecognizedStrategy strategy)
MetaUnrecognizedStrategy_ = strategy;
}
-void TYsonStructMeta::GetSchema(const TYsonStructBase* target, NYson::IYsonConsumer* consumer) const
+void TYsonStructMeta::WriteSchema(const TYsonStructBase* target, NYson::IYsonConsumer* consumer) const
{
BuildYsonFluently(consumer)
.BeginMap()
.Item("type_name").Value("struct")
- .Item("members").DoListFor(Parameters_, [&] (auto&& fluent, auto&& pair) {
- fluent.Item().BeginMap()
- .Item("name").Value(pair.first)
- .Item("type").Do([&] (auto&& fluent) {
- pair.second->GetSchema(target, fluent.GetConsumer());
- })
- .DoIf(pair.second->IsRequired(), [] (auto&& fluent) {
- fluent.Item("required").Value(true);
- })
+ .Item("members").DoListFor(Parameters_, [&] (auto fluent, const auto& pair) {
+ fluent.Item()
+ .BeginMap()
+ .Item("name").Value(pair.first)
+ .Item("type").Do([&] (auto fluent) {
+ pair.second->WriteSchema(target, fluent.GetConsumer());
+ })
+ .DoIf(pair.second->IsRequired(), [] (auto fluent) {
+ fluent.Item("required").Value(true);
+ })
.EndMap();
})
.EndMap();
diff --git a/yt/yt/core/ytree/yson_struct_detail.h b/yt/yt/core/ytree/yson_struct_detail.h
index 27b9a6ba18..5a3effd704 100644
--- a/yt/yt/core/ytree/yson_struct_detail.h
+++ b/yt/yt/core/ytree/yson_struct_detail.h
@@ -61,7 +61,7 @@ struct IYsonStructParameter
virtual const std::vector<TString>& GetAliases() const = 0;
virtual IMapNodePtr GetRecursiveUnrecognized(const TYsonStructBase* self) const = 0;
- virtual void GetSchema(const TYsonStructBase* self, NYson::IYsonConsumer* consumer) const = 0;
+ virtual void WriteSchema(const TYsonStructBase* self, NYson::IYsonConsumer* consumer) const = 0;
};
//using IYsonStructParameterPtr = TIntrusivePtr<IYsonStructParameter>;
@@ -101,7 +101,7 @@ struct IYsonStructMeta
virtual void RegisterPostprocessor(std::function<void(TYsonStructBase*)> postprocessor) = 0;
virtual void SetUnrecognizedStrategy(EUnrecognizedStrategy strategy) = 0;
- virtual void GetSchema(const TYsonStructBase* target, NYson::IYsonConsumer* consumer) const = 0;
+ virtual void WriteSchema(const TYsonStructBase* target, NYson::IYsonConsumer* consumer) const = 0;
virtual ~IYsonStructMeta() = default;
};
@@ -144,7 +144,7 @@ public:
void RegisterPostprocessor(std::function<void(TYsonStructBase*)> postprocessor) override;
void SetUnrecognizedStrategy(EUnrecognizedStrategy strategy) override;
- void GetSchema(const TYsonStructBase* target, NYson::IYsonConsumer* consumer) const override;
+ void WriteSchema(const TYsonStructBase* target, NYson::IYsonConsumer* consumer) const override;
void FinishInitialization(const std::type_info& structType);
@@ -250,7 +250,7 @@ public:
const std::vector<TString>& GetAliases() const override;
IMapNodePtr GetRecursiveUnrecognized(const TYsonStructBase* self) const override;
- void GetSchema(const TYsonStructBase* self, NYson::IYsonConsumer* consumer) const override;
+ void WriteSchema(const TYsonStructBase* self, NYson::IYsonConsumer* consumer) const override;
// Mark as optional. Field will be default-initialized if `init` is true, initialization is skipped otherwise.
TYsonStructParameter& Optional(bool init = true);
diff --git a/yt/yt/library/backtrace_introspector/introspect.cpp b/yt/yt/library/backtrace_introspector/introspect.cpp
index 592c232f0f..a555a8a548 100644
--- a/yt/yt/library/backtrace_introspector/introspect.cpp
+++ b/yt/yt/library/backtrace_introspector/introspect.cpp
@@ -2,6 +2,7 @@
#include "private.h"
+#include <yt/yt/core/misc/collection_helpers.h>
#include <yt/yt/core/misc/finally.h>
#include <yt/yt/core/misc/proc.h>
diff --git a/yt/yt/library/backtrace_introspector/introspect_linux.cpp b/yt/yt/library/backtrace_introspector/introspect_linux.cpp
index 3fc1a077f6..2bee8c0bd3 100644
--- a/yt/yt/library/backtrace_introspector/introspect_linux.cpp
+++ b/yt/yt/library/backtrace_introspector/introspect_linux.cpp
@@ -163,6 +163,12 @@ std::vector<TThreadIntrospectionInfo> IntrospectThreads()
std::vector<TThreadIntrospectionInfo> infos;
for (auto threadId : GetCurrentProcessThreadIds()) {
+ if (!IsUserspaceThread(threadId)) {
+ YT_LOG_DEBUG("Skipping a non-userspace thread (ThreadId: %v)",
+ threadId);
+ continue;
+ }
+
TSignalHandlerContext signalHandlerContext;
if (::syscall(SYS_tkill, threadId, SIGUSR1) != 0) {
YT_LOG_DEBUG(TError::FromSystem(), "Failed to signal to thread (ThreadId: %v)",
diff --git a/yt/yt/library/formats/arrow_writer.cpp b/yt/yt/library/formats/arrow_writer.cpp
index 6874a45ff5..d83cb97358 100644
--- a/yt/yt/library/formats/arrow_writer.cpp
+++ b/yt/yt/library/formats/arrow_writer.cpp
@@ -45,6 +45,7 @@ struct TTypedBatchColumn
////////////////////////////////////////////////////////////////////////////////
constexpr i64 ArrowAlignment = 8;
+const TString AlignmentString(ArrowAlignment, 0);
flatbuffers::Offset<flatbuffers::String> SerializeString(
flatbuffers::FlatBufferBuilder* flatbufBuilder,
@@ -1287,18 +1288,19 @@ private:
auto metadataPtr = message.FlatbufBuilder->GetBufferPointer();
- ui32 metadataSz = AlignUp<i64>(metadataSize, ArrowAlignment);
+ ui32 metadataAlignSize = AlignUp<i64>(metadataSize, ArrowAlignment);
- output->Write(&metadataSz, sizeof(ui32));
+ output->Write(&metadataAlignSize, sizeof(ui32));
output->Write(metadataPtr, metadataSize);
+ output->Write(AlignmentString.Data(), metadataAlignSize - metadataSize);
+
// Body
if (message.BodyWriter) {
- TString current;
- current.resize(message.BodySize);
+ TString current(AlignUp<i64>(message.BodySize, ArrowAlignment), 0);
// Double copying.
- message.BodyWriter(TMutableRef::FromString(current));
- output->Write(current.data(), message.BodySize);
+ message.BodyWriter(TMutableRef(current.begin(), current.begin() + message.BodySize));
+ output->Write(current.data(), current.Size());
} else {
YT_VERIFY(message.BodySize == 0);
}
diff --git a/yt/yt/library/profiling/solomon/exporter.h b/yt/yt/library/profiling/solomon/exporter.h
index 6ec0ce7dc1..8aad2abcb1 100644
--- a/yt/yt/library/profiling/solomon/exporter.h
+++ b/yt/yt/library/profiling/solomon/exporter.h
@@ -4,20 +4,20 @@
#include "registry.h"
#include "remote.h"
-#include <yt/yt/core/concurrency/thread_pool.h>
-#include <yt/yt/core/concurrency/async_rw_lock.h>
-
#include <yt/yt/core/actions/public.h>
+#include <yt/yt/core/concurrency/async_rw_lock.h>
+#include <yt/yt/core/concurrency/thread_pool.h>
+
#include <yt/yt/core/http/public.h>
-#include <yt/yt/core/ytree/yson_serializable.h>
#include <yt/yt/core/ytree/ypath_detail.h>
+#include <yt/yt/core/ytree/yson_struct.h>
-#include <library/cpp/monlib/encode/format.h>
-
-#include <yt/yt/library/profiling/sensor.h>
#include <yt/yt/library/profiling/producer.h>
+#include <yt/yt/library/profiling/sensor.h>
+
+#include <library/cpp/monlib/encode/format.h>
namespace NYT::NProfiling {
diff --git a/yt/yt/library/program/config.cpp b/yt/yt/library/program/config.cpp
index 288a90a3b1..304f9c761c 100644
--- a/yt/yt/library/program/config.cpp
+++ b/yt/yt/library/program/config.cpp
@@ -169,13 +169,6 @@ void WarnForUnrecognizedOptions(
WarnForUnrecognizedOptionsImpl(logger, config->GetRecursiveUnrecognized());
}
-void WarnForUnrecognizedOptions(
- const NLogging::TLogger& logger,
- const NYTree::TYsonSerializablePtr& config)
-{
- WarnForUnrecognizedOptionsImpl(logger, config->GetUnrecognizedRecursively());
-}
-
void AbortOnUnrecognizedOptionsImpl(
const NLogging::TLogger& logger,
const IMapNodePtr& unrecognized)
@@ -195,13 +188,6 @@ void AbortOnUnrecognizedOptions(
AbortOnUnrecognizedOptionsImpl(logger, config->GetRecursiveUnrecognized());
}
-void AbortOnUnrecognizedOptions(
- const NLogging::TLogger& logger,
- const NYTree::TYsonSerializablePtr& config)
-{
- AbortOnUnrecognizedOptionsImpl(logger, config->GetUnrecognizedRecursively());
-}
-
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
diff --git a/yt/yt/library/program/config.h b/yt/yt/library/program/config.h
index b840950418..d4420b7278 100644
--- a/yt/yt/library/program/config.h
+++ b/yt/yt/library/program/config.h
@@ -2,7 +2,6 @@
#include "public.h"
-#include <yt/yt/core/ytree/yson_serializable.h>
#include <yt/yt/core/ytree/yson_struct.h>
#include <yt/yt/core/ytalloc/config.h>
diff --git a/yt/yt/library/program/program_config_mixin.h b/yt/yt/library/program/program_config_mixin.h
index 80f681d06e..784c422477 100644
--- a/yt/yt/library/program/program_config_mixin.h
+++ b/yt/yt/library/program/program_config_mixin.h
@@ -2,10 +2,12 @@
#include "program.h"
-#include <library/cpp/yt/string/enum.h>
+#include <yt/yt/core/yson/writer.h>
#include <yt/yt/core/ytree/convert.h>
-#include <yt/yt/core/ytree/yson_serializable.h>
+#include <yt/yt/core/ytree/yson_struct.h>
+
+#include <library/cpp/yt/string/enum.h>
#include <util/stream/file.h>
diff --git a/yt/yt/library/tracing/jaeger/tracer.cpp b/yt/yt/library/tracing/jaeger/tracer.cpp
index 351ec787d4..fd3a409f6d 100644
--- a/yt/yt/library/tracing/jaeger/tracer.cpp
+++ b/yt/yt/library/tracing/jaeger/tracer.cpp
@@ -11,11 +11,13 @@
#include <yt/yt/core/concurrency/action_queue.h>
#include <yt/yt/core/concurrency/periodic_executor.h>
-#include <yt/yt/core/ytree/yson_serializable.h>
#include <yt/yt/core/misc/protobuf_helpers.h>
#include <yt/yt/core/misc/serialize.h>
+
#include <yt/yt/core/utilex/random.h>
+#include <yt/yt/core/ytree/yson_struct.h>
+
#include <util/string/cast.h>
#include <util/string/reverse.h>
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 8b0a758fd8..297ee2ae7d 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -601,6 +601,20 @@ message TRspSelectRows
////////////////////////////////////////////////////////////////////////////////
+message TReqAdvanceConsumer
+{
+ optional NYT.NProto.TGuid transaction_id = 1;
+ optional string consumer_path = 2;
+ optional string queue_path = 3;
+ optional int32 partition_index = 4;
+ optional int64 old_offset = 5;
+ optional int64 new_offset = 6;
+}
+
+message TRspAdvanceConsumer
+{
+}
+
message TRowBatchReadOptions
{
optional int64 max_row_count = 1;