diff options
author | AlexSm <alex@ydb.tech> | 2023-12-22 17:10:22 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-22 17:10:22 +0100 |
commit | 148f920350c60c0ca2d89b637a5aea9093eee450 (patch) | |
tree | 6314b1433dac833398c333731e83f0ad77e81a0b /yt | |
parent | 7116d46ae7c0259b5f9d489de263f8701e432b1c (diff) | |
download | ydb-148f920350c60c0ca2d89b637a5aea9093eee450.tar.gz |
Library import 2 (#639)
Diffstat (limited to 'yt')
75 files changed, 583 insertions, 266 deletions
diff --git a/yt/cpp/mapreduce/interface/operation.cpp b/yt/cpp/mapreduce/interface/operation.cpp index 0bb490b1ac..b3229117d9 100644 --- a/yt/cpp/mapreduce/interface/operation.cpp +++ b/yt/cpp/mapreduce/interface/operation.cpp @@ -337,11 +337,7 @@ void TJobOperationPreparer::FinallyValidate() const TApiUsageError error; error << "Output table schemas are missing: "; for (auto i : illegallyMissingSchemaIndices) { - error << "no. " << i; - if (auto path = Context_.GetInputPath(i)) { - error << "(" << *path << ")"; - } - error << "; "; + error << "no. " << i << " (" << Context_.GetOutputPath(i).GetOrElse("<unknown path>") << "); "; } ythrow std::move(error); } diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h index 82a2adbe74..c671dea611 100644 --- a/yt/yt/client/api/public.h +++ b/yt/yt/client/api/public.h @@ -6,6 +6,8 @@ #include <yt/yt/client/transaction_client/public.h> +#include <yt/yt/client/bundle_controller_client/public.h> + #include <yt/yt/library/auth/authentication_options.h> #include <yt/yt/core/misc/public.h> diff --git a/yt/yt/client/api/queue_client.h b/yt/yt/client/api/queue_client.h index a19aef2b90..c665386fcd 100644 --- a/yt/yt/client/api/queue_client.h +++ b/yt/yt/client/api/queue_client.h @@ -39,6 +39,10 @@ struct TPullRowsResult bool Versioned = true; }; +struct TAdvanceConsumerOptions + : public TTimeoutOptions +{ }; + struct TPullQueueOptions : public TSelectRowsOptions , public TFallbackReplicaOptions diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h index 354c883ef5..3a28b830ca 100644 --- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h +++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h @@ -84,6 +84,7 @@ public: DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, AlterReplicationCard); // Queues + DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, AdvanceConsumer); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PullQueue); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PullConsumer); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, RegisterQueueConsumer); diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp index 6777b7109f..e3c621c8f1 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp @@ -485,6 +485,39 @@ void TTransaction::ModifyRows( } } +TFuture<void> TTransaction::AdvanceConsumer( + const NYPath::TRichYPath& consumerPath, + const NYPath::TRichYPath& queuePath, + int partitionIndex, + std::optional<i64> oldOffset, + i64 newOffset, + const TAdvanceConsumerOptions& options) +{ + ValidateTabletTransactionId(GetId()); + + THROW_ERROR_EXCEPTION_IF(newOffset < 0, "Queue consumer offset %v cannot be negative", newOffset); + + auto req = Proxy_.AdvanceConsumer(); + SetTimeoutOptions(*req, options); + + if (NTracing::IsCurrentTraceContextRecorded()) { + req->TracingTags().emplace_back("yt.consumer_path", ToString(consumerPath)); + req->TracingTags().emplace_back("yt.queue_path", ToString(queuePath)); + } + + ToProto(req->mutable_transaction_id(), GetId()); + + ToProto(req->mutable_consumer_path(), consumerPath); + ToProto(req->mutable_queue_path(), queuePath); + req->set_partition_index(partitionIndex); + if (oldOffset) { + req->set_old_offset(*oldOffset); + } + req->set_new_offset(newOffset); + + return req->Invoke().As<void>(); +} + TFuture<ITransactionPtr> TTransaction::StartTransaction( ETransactionType type, const TTransactionStartOptions& options) diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h index 200e9e7539..5286ea931b 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.h +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h @@ -73,6 +73,14 @@ public: TSharedRange<NApi::TRowModification> modifications, const NApi::TModifyRowsOptions& options) override; + TFuture<void> AdvanceConsumer( + const NYPath::TRichYPath& consumerPath, + const NYPath::TRichYPath& queuePath, + int partitionIndex, + std::optional<i64> oldOffset, + i64 newOffset, + const TAdvanceConsumerOptions& options) override; + // IClientBase implementation. TFuture<NApi::ITransactionPtr> StartTransaction( NTransactionClient::ETransactionType type, diff --git a/yt/yt/client/api/security_client.h b/yt/yt/client/api/security_client.h index ed76f72d26..2c979a147d 100644 --- a/yt/yt/client/api/security_client.h +++ b/yt/yt/client/api/security_client.h @@ -76,11 +76,21 @@ struct TIssueTokenOptions : public TTimeoutOptions { }; +struct TIssueTemporaryTokenOptions + : public TIssueTokenOptions +{ + TDuration ExpirationTimeout; +}; + struct TIssueTokenResult { TString Token; }; +struct TRefreshTemporaryTokenOptions + : public TTimeoutOptions +{ }; + struct TRevokeTokenOptions : public TTimeoutOptions { }; diff --git a/yt/yt/client/api/transaction.h b/yt/yt/client/api/transaction.h index 4677db4864..3fb39fa740 100644 --- a/yt/yt/client/api/transaction.h +++ b/yt/yt/client/api/transaction.h @@ -240,22 +240,32 @@ struct ITransaction // Consumers. - //! Advance the consumer's offset for the given partition, setting it to a new value. - //! - //! If oldOffset is specified, the current offset is read inside this transaction and compared with oldOffset. - //! If they are equal, the new offset is written, otherwise an exception is thrown. + // TODO(nadya73): Remove it: YT-20712 void AdvanceConsumer( const NYPath::TYPath& path, int partitionIndex, std::optional<i64> oldOffset, i64 newOffset); + // TODO(nadya73): Remove it: YT-20712 void AdvanceConsumer( const NYPath::TRichYPath& consumerPath, const NYPath::TRichYPath& queuePath, int partitionIndex, std::optional<i64> oldOffset, i64 newOffset); + + //! Advance the consumer's offset for the given partition with index #partitionIndex, setting it to #newOffset. + //! + //! If #oldOffset is specified, the current offset is read inside this transaction and compared with #oldOffset. + //! If they are equal, the new offset is written, otherwise an exception is thrown. + virtual TFuture<void> AdvanceConsumer( + const NYPath::TRichYPath& consumerPath, + const NYPath::TRichYPath& queuePath, + int partitionIndex, + std::optional<i64> oldOffset, + i64 newOffset, + const TAdvanceConsumerOptions& options) = 0; }; DEFINE_REFCOUNTED_TYPE(ITransaction) diff --git a/yt/yt/client/bundle_controller_client/bundle_controller_client.h b/yt/yt/client/bundle_controller_client/bundle_controller_client.h index 0341458955..30cf31c72e 100644 --- a/yt/yt/client/bundle_controller_client/bundle_controller_client.h +++ b/yt/yt/client/bundle_controller_client/bundle_controller_client.h @@ -1,5 +1,6 @@ #pragma once +#include "public.h" #include "bundle_controller_settings.h" #include <yt/yt/client/api/client_common.h> @@ -8,10 +9,6 @@ namespace NYT::NApi { //////////////////////////////////////////////////////////////////////////////// -DECLARE_REFCOUNTED_STRUCT(TBundleConfigDescriptor) - -//////////////////////////////////////////////////////////////////////////////// - struct TGetBundleConfigOptions : public TTimeoutOptions { }; diff --git a/yt/yt/client/bundle_controller_client/bundle_controller_settings.cpp b/yt/yt/client/bundle_controller_client/bundle_controller_settings.cpp index 194b020c1d..12f044ca9c 100644 --- a/yt/yt/client/bundle_controller_client/bundle_controller_settings.cpp +++ b/yt/yt/client/bundle_controller_client/bundle_controller_settings.cpp @@ -64,10 +64,15 @@ bool TInstanceResources::operator==(const TInstanceResources& other) const namespace NProto { -// TODO(alexmipt): make ToProto for TCpuLimits, TMemoryLimits, TInstanceResources - //////////////////////////////////////////////////////////////////////////////// +void ToProto(NBundleController::NProto::TCpuLimits* protoCpuLimits, const NBundleControllerClient::TCpuLimitsPtr cpuLimits) +{ + protoCpuLimits->set_lookup_thread_pool_size(cpuLimits->LookupThreadPoolSize); + protoCpuLimits->set_query_thread_pool_size(cpuLimits->QueryThreadPoolSize); + protoCpuLimits->set_write_thread_pool_size(cpuLimits->WriteThreadPoolSize); +} + void FromProto(NBundleControllerClient::TCpuLimitsPtr cpuLimits, const NBundleController::NProto::TCpuLimits* protoCpuLimits) { cpuLimits->LookupThreadPoolSize = protoCpuLimits->lookup_thread_pool_size(); @@ -77,6 +82,20 @@ void FromProto(NBundleControllerClient::TCpuLimitsPtr cpuLimits, const NBundleCo //////////////////////////////////////////////////////////////////////////////// +void ToProto(NBundleController::NProto::TMemoryLimits* protoMemoryLimits, const NBundleControllerClient::TMemoryLimitsPtr memoryLimits) +{ + protoMemoryLimits->set_compressed_block_cache(memoryLimits->CompressedBlockCache.value_or(0)); + protoMemoryLimits->set_key_filter_block_cache(memoryLimits->KeyFilterBlockCache.value_or(0)); + protoMemoryLimits->set_lookup_row_cache(memoryLimits->LookupRowCache.value_or(0)); + + protoMemoryLimits->set_tablet_dynamic(memoryLimits->TabletDynamic.value_or(0)); + protoMemoryLimits->set_tablet_static(memoryLimits->TabletStatic.value_or(0)); + + protoMemoryLimits->set_uncompressed_block_cache(memoryLimits->UncompressedBlockCache.value_or(0)); + + protoMemoryLimits->set_versioned_chunk_meta(memoryLimits->VersionedChunkMeta.value_or(0)); +} + void FromProto(NBundleControllerClient::TMemoryLimitsPtr memoryLimits, const NBundleController::NProto::TMemoryLimits* protoMemoryLimits) { memoryLimits->CompressedBlockCache = protoMemoryLimits->compressed_block_cache(); @@ -93,6 +112,14 @@ void FromProto(NBundleControllerClient::TMemoryLimitsPtr memoryLimits, const NBu //////////////////////////////////////////////////////////////////////////////// +void ToProto(NBundleController::NProto::TInstanceResources* protoInstanceResources, const NBundleControllerClient::TInstanceResourcesPtr instanceResources) +{ + protoInstanceResources->set_memory(instanceResources->Memory); + protoInstanceResources->set_net(instanceResources->Net.value_or(0)); + protoInstanceResources->set_type(instanceResources->Type); + protoInstanceResources->set_vcpu(instanceResources->Vcpu); +} + void FromProto(NBundleControllerClient::TInstanceResourcesPtr instanceResources, const NBundleController::NProto::TInstanceResources* protoInstanceResources) { instanceResources->Memory = protoInstanceResources->memory(); diff --git a/yt/yt/client/bundle_controller_client/bundle_controller_settings.h b/yt/yt/client/bundle_controller_client/bundle_controller_settings.h index 8dcf241ae1..da57ead80a 100644 --- a/yt/yt/client/bundle_controller_client/bundle_controller_settings.h +++ b/yt/yt/client/bundle_controller_client/bundle_controller_settings.h @@ -1,5 +1,7 @@ #pragma once +#include "public.h" + #include <optional> #include <yt/yt/client/tablet_client/public.h> @@ -13,12 +15,6 @@ namespace NYT::NBundleControllerClient { //////////////////////////////////////////////////////////////////////////////// -DECLARE_REFCOUNTED_STRUCT(TCpuLimits) -DECLARE_REFCOUNTED_STRUCT(TMemoryLimits) -DECLARE_REFCOUNTED_STRUCT(TInstanceResources) - -//////////////////////////////////////////////////////////////////////////////// - struct TCpuLimits : public NYTree::TYsonStruct { diff --git a/yt/yt/client/bundle_controller_client/public.h b/yt/yt/client/bundle_controller_client/public.h new file mode 100644 index 0000000000..d73eac1539 --- /dev/null +++ b/yt/yt/client/bundle_controller_client/public.h @@ -0,0 +1,35 @@ +#pragma once + +#include <yt/yt/core/misc/public.h> + +#include <yt/yt/client/hydra/public.h> + +#include <yt/yt/client/object_client/public.h> + +namespace NYT::NApi { + +//////////////////////////////////////////////////////////////////////////////// + +DECLARE_REFCOUNTED_STRUCT(TBundleConfigDescriptor) + +struct TBundleConfigDescriptor; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi + +namespace NYT::NBundleControllerClient { + +//////////////////////////////////////////////////////////////////////////////// + +DECLARE_REFCOUNTED_STRUCT(TCpuLimits) +DECLARE_REFCOUNTED_STRUCT(TMemoryLimits) +DECLARE_REFCOUNTED_STRUCT(TInstanceResources) + +struct TCpuLimits; +struct TMemoryLimits; +struct TInstanceResources; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NBundleControllerClient diff --git a/yt/yt/client/chunk_client/config.h b/yt/yt/client/chunk_client/config.h index 5a28cb0750..9f3488b003 100644 --- a/yt/yt/client/chunk_client/config.h +++ b/yt/yt/client/chunk_client/config.h @@ -4,7 +4,7 @@ #include <yt/yt/client/misc/config.h> -#include <yt/yt/core/ytree/yson_serializable.h> +#include <yt/yt/core/ytree/yson_struct.h> namespace NYT::NChunkClient { diff --git a/yt/yt/client/chunk_client/public.h b/yt/yt/client/chunk_client/public.h index 6cf877026e..1482ff2a58 100644 --- a/yt/yt/client/chunk_client/public.h +++ b/yt/yt/client/chunk_client/public.h @@ -79,6 +79,7 @@ YT_DEFINE_ERROR_ENUM( ((LocationCrashed) (750)) ((LocationDiskWaitingReplacement) (751)) ((ChunkMetaCacheFetchFailed) (752)) + ((LocationMediumIsMisconfigured) (753)) ); using TChunkId = NObjectClient::TObjectId; diff --git a/yt/yt/client/driver/chaos_commands.h b/yt/yt/client/driver/chaos_commands.h index 831991ec08..ac50bcf643 100644 --- a/yt/yt/client/driver/chaos_commands.h +++ b/yt/yt/client/driver/chaos_commands.h @@ -4,7 +4,7 @@ #include <yt/yt/client/chaos_client/replication_card.h> -#include <yt/yt/core/ytree/yson_serializable.h> +#include <yt/yt/core/ytree/yson_struct.h> namespace NYT::NDriver { diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp index e2aaf35e7c..2d82e0d22c 100644 --- a/yt/yt/client/driver/queue_commands.cpp +++ b/yt/yt/client/driver/queue_commands.cpp @@ -233,7 +233,8 @@ void TAdvanceConsumerCommand::DoExecute(ICommandContextPtr context) { auto transaction = GetTransaction(context); - transaction->AdvanceConsumer(ConsumerPath, QueuePath, PartitionIndex, OldOffset, NewOffset); + WaitFor(transaction->AdvanceConsumer(ConsumerPath, QueuePath, PartitionIndex, OldOffset, NewOffset, /*options*/ {})) + .ThrowOnError(); if (ShouldCommitTransaction()) { WaitFor(transaction->Commit()) diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index f2606da4d6..3e2bf635ad 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -77,6 +77,14 @@ public: TSharedRange<TRowModification> modifications, const TModifyRowsOptions& options) override; + TFuture<void> AdvanceConsumer( + const NYPath::TRichYPath& consumerPath, + const NYPath::TRichYPath& queuePath, + int partitionIndex, + std::optional<i64> oldOffset, + i64 newOffset, + const TAdvanceConsumerOptions& options) override; + TFuture<TTransactionFlushResult> Flush() override; TFuture<void> Ping(const NApi::TTransactionPingOptions& options = {}) override; @@ -478,6 +486,7 @@ TRANSACTION_METHOD_IMPL(void, Abort, (const TTransactionAbortOptions&)); TRANSACTION_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&)); TRANSACTION_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookup, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&)); TRANSACTION_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&)); +TRANSACTION_METHOD_IMPL(void, AdvanceConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, int, std::optional<i64>, i64, const TAdvanceConsumerOptions&)); TRANSACTION_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&)); TRANSACTION_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&)); TRANSACTION_METHOD_IMPL(NYson::TYsonString, ListNode, (const NYPath::TYPath&, const TListNodeOptions&)); diff --git a/yt/yt/client/journal_client/config.h b/yt/yt/client/journal_client/config.h index b89a248d2b..23e2d7150c 100644 --- a/yt/yt/client/journal_client/config.h +++ b/yt/yt/client/journal_client/config.h @@ -2,10 +2,10 @@ #include "public.h" -#include <yt/yt/core/ytree/yson_serializable.h> - #include <yt/yt/client/chunk_client/config.h> +#include <yt/yt/core/ytree/yson_struct.h> + namespace NYT::NJournalClient { //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/queue_client/consumer_client.cpp b/yt/yt/client/queue_client/consumer_client.cpp index 2089ba21aa..8690cb6433 100644 --- a/yt/yt/client/queue_client/consumer_client.cpp +++ b/yt/yt/client/queue_client/consumer_client.cpp @@ -575,6 +575,11 @@ ISubConsumerClientPtr CreateSubConsumerClient( return CreateConsumerClient(client, consumerPath)->GetSubConsumerClient(queueRef); } +const TTableSchemaPtr& GetConsumerSchema() +{ + return YTConsumerTableSchema; +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NQueueClient diff --git a/yt/yt/client/queue_client/consumer_client.h b/yt/yt/client/queue_client/consumer_client.h index 5c239828fa..80bfce0435 100644 --- a/yt/yt/client/queue_client/consumer_client.h +++ b/yt/yt/client/queue_client/consumer_client.h @@ -118,6 +118,8 @@ ISubConsumerClientPtr CreateSubConsumerClient( const NYPath::TYPath& consumerPath, NYPath::TRichYPath queuePath); +const NTableClient::TTableSchemaPtr& GetConsumerSchema(); + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NQueueClient diff --git a/yt/yt/client/table_client/helpers.cpp b/yt/yt/client/table_client/helpers.cpp index 66343bc32b..60767d1a68 100644 --- a/yt/yt/client/table_client/helpers.cpp +++ b/yt/yt/client/table_client/helpers.cpp @@ -5,6 +5,7 @@ #include <yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.pb.h> +#include <yt/yt/core/ytree/attributes.h> #include <yt/yt/core/ytree/convert.h> #include <yt/yt/core/net/address.h> diff --git a/yt/yt/client/table_client/unversioned_row.cpp b/yt/yt/client/table_client/unversioned_row.cpp index 7a28b1333f..cff031f054 100644 --- a/yt/yt/client/table_client/unversioned_row.cpp +++ b/yt/yt/client/table_client/unversioned_row.cpp @@ -16,7 +16,7 @@ #include <yt/yt/core/yson/consumer.h> -#include <yt/yt/core/ytree/helpers.h> +#include <yt/yt/core/ytree/attributes.h> #include <yt/yt/core/ytree/node.h> #include <yt/yt/core/ytree/convert.h> diff --git a/yt/yt/client/transaction_client/config.h b/yt/yt/client/transaction_client/config.h index c4114b72c5..9c8c69d6a6 100644 --- a/yt/yt/client/transaction_client/config.h +++ b/yt/yt/client/transaction_client/config.h @@ -4,7 +4,7 @@ #include <yt/yt/core/rpc/config.h> -#include <yt/yt/core/ytree/yson_serializable.h> +#include <yt/yt/core/ytree/yson_struct.h> namespace NYT::NTransactionClient { diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h index 06c9c65148..d8380bdd61 100644 --- a/yt/yt/client/unittests/mock/transaction.h +++ b/yt/yt/client/unittests/mock/transaction.h @@ -195,6 +195,14 @@ public: MOCK_METHOD(void, SubscribeAborted, (const TAbortedHandler& callback), (override)); MOCK_METHOD(void, UnsubscribeAborted, (const TAbortedHandler& callback), (override)); + MOCK_METHOD(TFuture<void>, AdvanceConsumer, ( + const NYPath::TRichYPath& consumerPath, + const NYPath::TRichYPath& queuePath, + int partitionIndex, + std::optional<i64> oldOffset, + i64 newOffset, + const TAdvanceConsumerOptions& options), (override)); + MOCK_METHOD(void, ModifyRows, ( const NYPath::TYPath& path, NTableClient::TNameTablePtr nameTable, diff --git a/yt/yt/core/actions/unittests/future_ut.cpp b/yt/yt/core/actions/unittests/future_ut.cpp index a979e8477a..9b564e7481 100644 --- a/yt/yt/core/actions/unittests/future_ut.cpp +++ b/yt/yt/core/actions/unittests/future_ut.cpp @@ -7,6 +7,8 @@ #include <yt/yt/core/misc/ref_counted_tracker.h> #include <yt/yt/core/misc/mpsc_stack.h> +#include <yt/yt/core/ytree/attributes.h> + #include <util/system/thread.h> #include <thread> diff --git a/yt/yt/core/actions/unittests/new_with_offloaded_dtor_ut.cpp b/yt/yt/core/actions/unittests/new_with_offloaded_dtor_ut.cpp index 5c9a9ffab9..be7482808a 100644 --- a/yt/yt/core/actions/unittests/new_with_offloaded_dtor_ut.cpp +++ b/yt/yt/core/actions/unittests/new_with_offloaded_dtor_ut.cpp @@ -7,6 +7,8 @@ #include <yt/yt/core/misc/ref_counted_tracker.h> #include <yt/yt/core/misc/proc.h> +#include <yt/yt/core/logging/log.h> + namespace NYT { namespace { diff --git a/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp b/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp index c96d1bc40c..5eb10dee42 100644 --- a/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp +++ b/yt/yt/core/compression/unittests/dictionary_compression_ut.cpp @@ -6,6 +6,8 @@ #include <yt/yt/core/misc/error.h> #include <yt/yt/core/misc/serialize.h> +#include <yt/yt/core/ytree/attributes.h> + #include <library/cpp/yt/memory/chunked_memory_pool.h> #include <util/random/fast.h> diff --git a/yt/yt/core/concurrency/thread_pool_poller.cpp b/yt/yt/core/concurrency/thread_pool_poller.cpp index 0b45173871..f486003cc1 100644 --- a/yt/yt/core/concurrency/thread_pool_poller.cpp +++ b/yt/yt/core/concurrency/thread_pool_poller.cpp @@ -7,6 +7,7 @@ #include "two_level_fair_share_thread_pool.h" #include "new_fair_share_thread_pool.h" +#include <yt/yt/core/misc/collection_helpers.h> #include <yt/yt/core/misc/proc.h> #include <yt/yt/core/misc/mpsc_stack.h> #include <yt/yt/core/misc/ref_tracked.h> diff --git a/yt/yt/core/http/server.cpp b/yt/yt/core/http/server.cpp index b2c95d1dbd..132e0b1fbd 100644 --- a/yt/yt/core/http/server.cpp +++ b/yt/yt/core/http/server.cpp @@ -262,6 +262,11 @@ private: } })); + auto finally = Finally([&] { + auto count = ActiveConnections_.fetch_sub(1) - 1; + ConnectionsActive_.Update(count); + }); + if (Config_->NoDelay) { connection->SetNoDelay(); } @@ -274,11 +279,6 @@ private: void DoHandleConnection(const IConnectionPtr& connection, TGuid connectionId) { - auto finally = Finally([&] { - auto count = ActiveConnections_.fetch_sub(1) - 1; - ConnectionsActive_.Update(count); - }); - auto request = New<THttpInput>( connection, connection->RemoteAddress(), diff --git a/yt/yt/core/logging/log_manager.cpp b/yt/yt/core/logging/log_manager.cpp index 62dba0253d..b95beb20fb 100644 --- a/yt/yt/core/logging/log_manager.cpp +++ b/yt/yt/core/logging/log_manager.cpp @@ -16,6 +16,7 @@ #include <yt/yt/core/concurrency/invoker_queue.h> #include <yt/yt/core/concurrency/thread_pool.h> +#include <yt/yt/core/misc/collection_helpers.h> #include <yt/yt/core/misc/fs.h> #include <yt/yt/core/misc/spsc_queue.h> #include <yt/yt/core/misc/mpsc_stack.h> @@ -32,7 +33,7 @@ #include <yt/yt/core/ytree/ypath_client.h> #include <yt/yt/core/ytree/ypath_service.h> -#include <yt/yt/core/ytree/yson_serializable.h> +#include <yt/yt/core/ytree/yson_struct.h> #include <yt/yt/core/ytree/convert.h> #include <yt/yt/library/profiling/producer.h> diff --git a/yt/yt/core/misc/cache_config.h b/yt/yt/core/misc/cache_config.h index 9ac7c9063c..24e6b2c946 100644 --- a/yt/yt/core/misc/cache_config.h +++ b/yt/yt/core/misc/cache_config.h @@ -2,7 +2,6 @@ #include "public.h" -#include <yt/yt/core/ytree/yson_serializable.h> #include <yt/yt/core/ytree/yson_struct.h> namespace NYT { diff --git a/yt/yt/core/misc/error.cpp b/yt/yt/core/misc/error.cpp index ce9a942841..511a9fb26c 100644 --- a/yt/yt/core/misc/error.cpp +++ b/yt/yt/core/misc/error.cpp @@ -16,6 +16,7 @@ #include <yt/yt/core/yson/tokenizer.h> +#include <yt/yt/core/ytree/attributes.h> #include <yt/yt/core/ytree/convert.h> #include <yt/yt/core/ytree/fluent.h> diff --git a/yt/yt/core/misc/error.h b/yt/yt/core/misc/error.h index be3a5d9ec7..9dba023408 100644 --- a/yt/yt/core/misc/error.h +++ b/yt/yt/core/misc/error.h @@ -6,7 +6,7 @@ #include <yt/yt/core/yson/string.h> -#include <yt/yt/core/ytree/attributes.h> +#include <yt/yt/core/ytree/public.h> #include <yt/yt/core/tracing/public.h> diff --git a/yt/yt/core/misc/proc.cpp b/yt/yt/core/misc/proc.cpp index 1b7aefe466..2fbd8ba1d6 100644 --- a/yt/yt/core/misc/proc.cpp +++ b/yt/yt/core/misc/proc.cpp @@ -314,6 +314,24 @@ std::vector<size_t> GetCurrentProcessThreadIds() #endif } +bool IsUserspaceThread(size_t tid) +{ +#ifdef __linux__ + TFileInput file(Format("/proc/%v/stat", tid)); + auto statFields = SplitString(file.ReadLine(), " "); + constexpr int StartStackIndex = 27; + if (statFields.size() < StartStackIndex) { + return false; + } + // This is just a heuristic. + auto startStack = FromString<ui64>(statFields[StartStackIndex]); + return startStack != 0; +#else + Y_UNUSED(tid); + return false; +#endif +} + void ChownChmodDirectory(const TString& path, const std::optional<uid_t>& userId, const std::optional<int>& permissions) { #ifdef _unix_ diff --git a/yt/yt/core/misc/proc.h b/yt/yt/core/misc/proc.h index 2d52d0d209..823b64397e 100644 --- a/yt/yt/core/misc/proc.h +++ b/yt/yt/core/misc/proc.h @@ -4,7 +4,7 @@ #include <yt/yt/core/misc/error.h> -#include <yt/yt/core/ytree/yson_serializable.h> +#include <util/system/file.h> #include <errno.h> @@ -110,6 +110,7 @@ ui64 GetProcessCumulativeMajorPageFaults(int pid = -1); size_t GetCurrentProcessId(); size_t GetCurrentThreadId(); std::vector<size_t> GetCurrentProcessThreadIds(); +bool IsUserspaceThread(size_t tid); void ChownChmodDirectory( const TString& path, diff --git a/yt/yt/core/misc/shutdown.cpp b/yt/yt/core/misc/shutdown.cpp index 3646df7150..25a772a2d6 100644 --- a/yt/yt/core/misc/shutdown.cpp +++ b/yt/yt/core/misc/shutdown.cpp @@ -4,6 +4,8 @@ #include <yt/yt/core/misc/proc.h> #include <yt/yt/core/misc/singleton.h> +#include <library/cpp/yt/cpu_clock/clock.h> + #include <library/cpp/yt/threading/fork_aware_spin_lock.h> #include <library/cpp/yt/threading/event_count.h> diff --git a/yt/yt/core/misc/unittests/error_ut.cpp b/yt/yt/core/misc/unittests/error_ut.cpp index f8dad10b3c..f9cbbe9e9e 100644 --- a/yt/yt/core/misc/unittests/error_ut.cpp +++ b/yt/yt/core/misc/unittests/error_ut.cpp @@ -5,6 +5,7 @@ #include <yt/yt/core/yson/string.h> +#include <yt/yt/core/ytree/attributes.h> #include <yt/yt/core/ytree/convert.h> #include <util/stream/str.h> diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp index 38427b05b9..d0018dbbe8 100644 --- a/yt/yt/core/net/connection.cpp +++ b/yt/yt/core/net/connection.cpp @@ -33,7 +33,7 @@ namespace NYT::NNet { using namespace NConcurrency; -using namespace NProfiling; +// using namespace NProfiling; #ifdef _unix_ using TIOVecBasePtr = void*; diff --git a/yt/yt/core/rpc/config.cpp b/yt/yt/core/rpc/config.cpp index 14143a6585..ec29c3b88c 100644 --- a/yt/yt/core/rpc/config.cpp +++ b/yt/yt/core/rpc/config.cpp @@ -38,6 +38,20 @@ void TServiceCommonConfig::Register(TRegistrar registrar) //////////////////////////////////////////////////////////////////////////////// +void TServiceCommonDynamicConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("enable_per_user_profiling", &TThis::EnablePerUserProfiling) + .Default(); + registrar.Parameter("histogram_timer_profiling", &TThis::HistogramTimerProfiling) + .Default(); + registrar.Parameter("code_counting", &TThis::EnableErrorCodeCounting) + .Default(); + registrar.Parameter("tracing_mode", &TThis::TracingMode) + .Default(); +} + +//////////////////////////////////////////////////////////////////////////////// + void TServerConfig::Register(TRegistrar registrar) { registrar.Parameter("services", &TThis::Services) @@ -46,6 +60,14 @@ void TServerConfig::Register(TRegistrar registrar) //////////////////////////////////////////////////////////////////////////////// +void TServerDynamicConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("services", &TThis::Services) + .Default(); +} + +//////////////////////////////////////////////////////////////////////////////// + void TServiceConfig::Register(TRegistrar registrar) { registrar.Parameter("enable_per_user_profiling", &TThis::EnablePerUserProfiling) diff --git a/yt/yt/core/rpc/config.h b/yt/yt/core/rpc/config.h index be8c1fdec5..9cdf971d18 100644 --- a/yt/yt/core/rpc/config.h +++ b/yt/yt/core/rpc/config.h @@ -90,6 +90,40 @@ DEFINE_REFCOUNTED_TYPE(TServerConfig) //////////////////////////////////////////////////////////////////////////////// +// Common options shared between all services in one server. +class TServiceCommonDynamicConfig + : public NYTree::TYsonStruct +{ +public: + std::optional<bool> EnablePerUserProfiling; + std::optional<THistogramConfigPtr> HistogramTimerProfiling; + std::optional<bool> EnableErrorCodeCounting; + std::optional<ERequestTracingMode> TracingMode; + + REGISTER_YSON_STRUCT(TServiceCommonDynamicConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TServiceCommonDynamicConfig) + +//////////////////////////////////////////////////////////////////////////////// + +class TServerDynamicConfig + : public TServiceCommonDynamicConfig +{ +public: + THashMap<TString, NYTree::INodePtr> Services; + + REGISTER_YSON_STRUCT(TServerDynamicConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TServerDynamicConfig) + +//////////////////////////////////////////////////////////////////////////////// + class TServiceConfig : public NYTree::TYsonStruct { diff --git a/yt/yt/core/rpc/public.h b/yt/yt/core/rpc/public.h index af88870ad9..5548fa0f22 100644 --- a/yt/yt/core/rpc/public.h +++ b/yt/yt/core/rpc/public.h @@ -107,6 +107,8 @@ DECLARE_REFCOUNTED_CLASS(THistogramExponentialBounds) DECLARE_REFCOUNTED_CLASS(THistogramConfig) DECLARE_REFCOUNTED_CLASS(TServerConfig) DECLARE_REFCOUNTED_CLASS(TServiceCommonConfig) +DECLARE_REFCOUNTED_CLASS(TServerDynamicConfig) +DECLARE_REFCOUNTED_CLASS(TServiceCommonDynamicConfig) DECLARE_REFCOUNTED_CLASS(TServiceConfig) DECLARE_REFCOUNTED_CLASS(TMethodConfig) DECLARE_REFCOUNTED_CLASS(TRetryingChannelConfig) diff --git a/yt/yt/core/rpc/retrying_channel.h b/yt/yt/core/rpc/retrying_channel.h index d8d25fb922..7de5f5f8ec 100644 --- a/yt/yt/core/rpc/retrying_channel.h +++ b/yt/yt/core/rpc/retrying_channel.h @@ -2,8 +2,6 @@ #include "public.h" -#include <yt/yt/core/ytree/yson_serializable.h> - namespace NYT::NRpc { //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/server.h b/yt/yt/core/rpc/server.h index fe4c88a478..e48bf2ef5c 100644 --- a/yt/yt/core/rpc/server.h +++ b/yt/yt/core/rpc/server.h @@ -31,7 +31,9 @@ struct IServer virtual IServicePtr GetServiceOrThrow(const TServiceId& serviceId) const = 0; //! Reconfigures the server on-the-fly. - virtual void Configure(TServerConfigPtr config) = 0; + virtual void Configure(const TServerConfigPtr& config) = 0; + + virtual void OnDynamicConfigChanged(const TServerDynamicConfigPtr& config) = 0; //! Starts the server. /*! diff --git a/yt/yt/core/rpc/server_detail.cpp b/yt/yt/core/rpc/server_detail.cpp index 0f80a07b6b..82ba43afe4 100644 --- a/yt/yt/core/rpc/server_detail.cpp +++ b/yt/yt/core/rpc/server_detail.cpp @@ -768,12 +768,12 @@ void TServerBase::RegisterService(IServicePtr service) auto guard = WriterGuard(ServicesLock_); auto& serviceMap = RealmIdToServiceMap_[serviceId.RealmId]; YT_VERIFY(serviceMap.emplace(serviceId.ServiceName, service).second); - if (Config_) { - auto it = Config_->Services.find(serviceId.ServiceName); - if (it != Config_->Services.end()) { - service->Configure(Config_, it->second); + if (AppliedConfig_) { + auto it = AppliedConfig_->Services.find(serviceId.ServiceName); + if (it != AppliedConfig_->Services.end()) { + service->Configure(AppliedConfig_, it->second); } else { - service->Configure(Config_, nullptr); + service->Configure(AppliedConfig_, nullptr); } } DoRegisterService(service); @@ -866,26 +866,54 @@ IServicePtr TServerBase::GetServiceOrThrow(const TServiceId& serviceId) const return serviceIt->second; } -void TServerBase::Configure(TServerConfigPtr config) +void TServerBase::ApplyConfig() { - auto guard = WriterGuard(ServicesLock_); + VERIFY_SPINLOCK_AFFINITY(ServicesLock_); - // Future services will be configured appropriately. - Config_ = config; + auto newAppliedConfig = New<TServerConfig>(); + newAppliedConfig->EnableErrorCodeCounting = DynamicConfig_->EnableErrorCodeCounting.value_or(StaticConfig_->EnableErrorCodeCounting); + newAppliedConfig->EnablePerUserProfiling = DynamicConfig_->EnablePerUserProfiling.value_or(StaticConfig_->EnablePerUserProfiling); + newAppliedConfig->HistogramTimerProfiling = DynamicConfig_->HistogramTimerProfiling.value_or(StaticConfig_->HistogramTimerProfiling); + newAppliedConfig->Services = StaticConfig_->Services; + + for (const auto& [name, node] : DynamicConfig_->Services) { + newAppliedConfig->Services[name] = node; + } + + AppliedConfig_ = newAppliedConfig; // Apply configuration to all existing services. for (const auto& [realmId, serviceMap] : RealmIdToServiceMap_) { for (const auto& [serviceName, service] : serviceMap) { - auto it = config->Services.find(serviceName); - if (it != config->Services.end()) { - service->Configure(config, it->second); + auto it = AppliedConfig_->Services.find(serviceName); + if (it != AppliedConfig_->Services.end()) { + service->Configure(AppliedConfig_, it->second); } else { - service->Configure(config, nullptr); + service->Configure(AppliedConfig_, nullptr); } } } } +void TServerBase::Configure(const TServerConfigPtr& config) +{ + auto guard = WriterGuard(ServicesLock_); + + // Future services will be configured appropriately. + StaticConfig_ = config; + + ApplyConfig(); +} + +void TServerBase::OnDynamicConfigChanged(const TServerDynamicConfigPtr& config) +{ + auto guard = WriterGuard(ServicesLock_); + + DynamicConfig_ = config; + + ApplyConfig(); +} + void TServerBase::Start() { YT_VERIFY(!Started_); diff --git a/yt/yt/core/rpc/server_detail.h b/yt/yt/core/rpc/server_detail.h index 0a2647b643..c05e19216e 100644 --- a/yt/yt/core/rpc/server_detail.h +++ b/yt/yt/core/rpc/server_detail.h @@ -269,7 +269,8 @@ public: IServicePtr FindService(const TServiceId& serviceId) const override; IServicePtr GetServiceOrThrow(const TServiceId& serviceId) const override; - void Configure(TServerConfigPtr config) override; + void Configure(const TServerConfigPtr& config) override; + void OnDynamicConfigChanged(const TServerDynamicConfigPtr& config) override; void Start() override; TFuture<void> Stop(bool graceful) override; @@ -280,7 +281,9 @@ protected: std::atomic<bool> Started_ = false; YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, ServicesLock_); - TServerConfigPtr Config_; + TServerConfigPtr StaticConfig_; + TServerDynamicConfigPtr DynamicConfig_ = New<TServerDynamicConfig>(); + TServerConfigPtr AppliedConfig_; //! Service name to service. using TServiceMap = THashMap<TString, IServicePtr>; @@ -288,6 +291,8 @@ protected: explicit TServerBase(NLogging::TLogger logger); + void ApplyConfig(); + virtual void DoStart(); virtual TFuture<void> DoStop(bool graceful); diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp index 2fd6f483ad..46567c3fcd 100644 --- a/yt/yt/core/rpc/service_detail.cpp +++ b/yt/yt/core/rpc/service_detail.cpp @@ -2451,6 +2451,7 @@ void TServiceBase::DoConfigure( auto methodConfig = methodIt ? methodIt->second : New<TMethodConfig>(); const auto& descriptor = runtimeInfo->Descriptor; + runtimeInfo->Heavy.store(methodConfig->Heavy.value_or(descriptor.Options.Heavy)); runtimeInfo->QueueSizeLimit.store(methodConfig->QueueSizeLimit.value_or(descriptor.QueueSizeLimit)); runtimeInfo->ConcurrencyLimit.Reconfigure(methodConfig->ConcurrencyLimit.value_or(descriptor.ConcurrencyLimit)); diff --git a/yt/yt/core/yson/protobuf_interop.cpp b/yt/yt/core/yson/protobuf_interop.cpp index 030cc763fc..60f4ee6e1a 100644 --- a/yt/yt/core/yson/protobuf_interop.cpp +++ b/yt/yt/core/yson/protobuf_interop.cpp @@ -182,7 +182,7 @@ void SetProtobufInteropConfig(TProtobufInteropDynamicConfigPtr config) GlobalProtobufInteropConfig()->Config.Store(std::move(config)); } -void GetSchema(const TProtobufEnumType* enumType, IYsonConsumer* consumer); +void WriteSchema(const TProtobufEnumType* enumType, IYsonConsumer* consumer); //////////////////////////////////////////////////////////////////////////////// @@ -501,17 +501,18 @@ public: return EnumYsonStorageType_; } - void GetSchema(IYsonConsumer* consumer) const + void WriteSchema(IYsonConsumer* consumer) const { if (IsYsonMap()) { - BuildYsonFluently(consumer).BeginMap() - .Item("type_name").Value("dict") - .Item("key").Do([this] (auto&& fluent) { - GetYsonMapKeyField()->GetSchema(fluent.GetConsumer()); - }) - .Item("value").Do([this] (auto&& fluent) { - GetYsonMapValueField()->GetSchema(fluent.GetConsumer()); - }) + BuildYsonFluently(consumer) + .BeginMap() + .Item("type_name").Value("dict") + .Item("key").Do([&] (auto fluent) { + GetYsonMapKeyField()->WriteSchema(fluent.GetConsumer()); + }) + .Item("value").Do([&] (auto fluent) { + GetYsonMapValueField()->WriteSchema(fluent.GetConsumer()); + }) .EndMap(); return; @@ -558,10 +559,10 @@ public: consumer->OnStringScalar("string"); break; case FieldDescriptor::TYPE_ENUM: - NYson::GetSchema(GetEnumType(), consumer); + NYson::WriteSchema(GetEnumType(), consumer); break; case FieldDescriptor::TYPE_MESSAGE: - NYson::GetSchema(GetMessageType(), consumer); + NYson::WriteSchema(GetMessageType(), consumer); break; default: break; @@ -690,20 +691,21 @@ public: } } - void GetSchema(IYsonConsumer* consumer) const + void WriteSchema(IYsonConsumer* consumer) const { BuildYsonFluently(consumer).BeginMap() .Item("type_name").Value("struct") - .Item("members").DoListFor(0, Underlying_->field_count(), [this] (auto&& fluent, int index) { + .Item("members").DoListFor(0, Underlying_->field_count(), [&] (auto fluent, int index) { auto* field = GetFieldByNumber(Underlying_->field(index)->number()); - fluent.Item().BeginMap() - .Item("name").Value(field->GetYsonName()) - .Item("type").Do([&] (auto&& fluent) { - field->GetSchema(fluent.GetConsumer()); - }) - .DoIf(!field->IsYsonMap() && !field->IsRepeated() && !field->IsOptional(), [&] (auto && fluent) { - fluent.Item("required").Value(true); - }) + fluent.Item() + .BeginMap() + .Item("name").Value(field->GetYsonName()) + .Item("type").Do([&] (auto fluent) { + field->WriteSchema(fluent.GetConsumer()); + }) + .DoIf(!field->IsYsonMap() && !field->IsRepeated() && !field->IsOptional(), [] (auto fluent) { + fluent.Item("required").Value(true); + }) .EndMap(); }) .EndMap(); @@ -821,14 +823,15 @@ public: return it == ValueToLiteral_.end() ? TStringBuf() : it->second; } - void GetSchema(IYsonConsumer* consumer) const + void WriteSchema(IYsonConsumer* consumer) const { - BuildYsonFluently(consumer).BeginMap() - .Item("type_name").Value("enum") - .Item("enum_name").Value(Underlying_->name()) - .Item("values").DoListFor(0, Underlying_->value_count(), [this] (auto&& fluent, int index) { - fluent.Item().Value(FindLiteralByValue(Underlying_->value(index)->number())); - }) + BuildYsonFluently(consumer) + .BeginMap() + .Item("type_name").Value("enum") + .Item("enum_name").Value(Underlying_->name()) + .Item("values").DoListFor(0, Underlying_->value_count(), [&] (auto fluent, int index) { + fluent.Item().Value(FindLiteralByValue(Underlying_->value(index)->number())); + }) .EndMap(); } @@ -3140,14 +3143,14 @@ TString YsonStringToProto( return serializedProto; } -void GetSchema(const TProtobufEnumType* type, IYsonConsumer* consumer) +void WriteSchema(const TProtobufEnumType* type, IYsonConsumer* consumer) { - type->GetSchema(consumer); + type->WriteSchema(consumer); } -void GetSchema(const TProtobufMessageType* type, IYsonConsumer* consumer) +void WriteSchema(const TProtobufMessageType* type, IYsonConsumer* consumer) { - type->GetSchema(consumer); + type->WriteSchema(consumer); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/yson/protobuf_interop.h b/yt/yt/core/yson/protobuf_interop.h index 5daf062698..5bfe3fd93b 100644 --- a/yt/yt/core/yson/protobuf_interop.h +++ b/yt/yt/core/yson/protobuf_interop.h @@ -280,9 +280,11 @@ void SetProtobufInteropConfig(TProtobufInteropDynamicConfigPtr config); //////////////////////////////////////////////////////////////////////////////// -//! Return type v3 schema for protobuf message type. +//! Returns type v3 schema for protobuf message type. //! Note: Recursive types (message has field with self type) are not supported. -void GetSchema(const TProtobufMessageType* type, IYsonConsumer* consumer); +void WriteSchema(const TProtobufMessageType* type, IYsonConsumer* consumer); + +//////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NYson diff --git a/yt/yt/core/yson/unittests/protobuf_yson_schema_ut.cpp b/yt/yt/core/yson/unittests/protobuf_yson_schema_ut.cpp index 103f00b01d..af58fb320a 100644 --- a/yt/yt/core/yson/unittests/protobuf_yson_schema_ut.cpp +++ b/yt/yt/core/yson/unittests/protobuf_yson_schema_ut.cpp @@ -14,13 +14,14 @@ namespace NYT::NYson { namespace { +//////////////////////////////////////////////////////////////////////////////// TEST(TProtobufYsonSchemaTest, GetMessageSchema) { TStringStream outputStream; TYsonWriter ysonWriter(&outputStream, EYsonFormat::Text); - GetSchema(ReflectProtobufMessageType<NProto::TTestSchemaMessage>(), &ysonWriter); + WriteSchema(ReflectProtobufMessageType<NProto::TTestSchemaMessage>(), &ysonWriter); TStringBuf expected = R"({ type_name="struct"; members=[ @@ -54,5 +55,7 @@ TEST(TProtobufYsonSchemaTest, GetMessageSchema) << "Actual: " << ConvertToYsonString(actualNode, EYsonFormat::Text, 4).AsStringBuf() << "\n\n"; } +//////////////////////////////////////////////////////////////////////////////// + } // namespace } // namespace NYT::NYson diff --git a/yt/yt/core/ytalloc/bindings.cpp b/yt/yt/core/ytalloc/bindings.cpp index 1f47fa7982..33eb077a5b 100644 --- a/yt/yt/core/ytalloc/bindings.cpp +++ b/yt/yt/core/ytalloc/bindings.cpp @@ -9,10 +9,10 @@ #include <yt/yt/core/misc/singleton.h> #include <yt/yt/core/misc/string_builder.h> -#include <yt/yt/core/ytree/yson_serializable.h> - #include <library/cpp/ytalloc/api/ytalloc.h> +#include <library/cpp/yt/yson_string/string.h> + #include <util/system/env.h> #include <cstdio> diff --git a/yt/yt/core/ytree/attributes-inl.h b/yt/yt/core/ytree/attributes-inl.h new file mode 100644 index 0000000000..dd367f5d24 --- /dev/null +++ b/yt/yt/core/ytree/attributes-inl.h @@ -0,0 +1,89 @@ +#ifndef ATTRIBUTES_INL_H_ +#error "Direct inclusion of this file is not allowed, include helpers.h" +// For the sake of sane code completion. +#include "attributes.h" +#endif + +// #include "attribute_consumer.h" +// #include "serialize.h" +#include "convert.h" + +namespace NYT::NYTree { + +//////////////////////////////////////////////////////////////////////////////// + +template <class T> +T IAttributeDictionary::Get(TStringBuf key) const +{ + auto yson = GetYson(key); + try { + return ConvertTo<T>(yson); + } catch (const std::exception& ex) { + THROW_ERROR_EXCEPTION("Error parsing attribute %Qv", + key) + << ex; + } +} + +template <class T> +T IAttributeDictionary::GetAndRemove(const TString& key) +{ + auto result = Get<T>(key); + Remove(key); + return result; +} + +template <class T> +T IAttributeDictionary::Get(TStringBuf key, const T& defaultValue) const +{ + return Find<T>(key).value_or(defaultValue); +} + +template <class T> +T IAttributeDictionary::GetAndRemove(const TString& key, const T& defaultValue) +{ + auto result = Find<T>(key); + if (result) { + Remove(key); + return *result; + } else { + return defaultValue; + } +} + +template <class T> +typename TOptionalTraits<T>::TOptional IAttributeDictionary::Find(TStringBuf key) const +{ + auto yson = FindYson(key); + if (!yson) { + return typename TOptionalTraits<T>::TOptional(); + } + try { + return ConvertTo<T>(yson); + } catch (const std::exception& ex) { + THROW_ERROR_EXCEPTION("Error parsing attribute %Qv", + key) + << ex; + } +} + +template <class T> +typename TOptionalTraits<T>::TOptional IAttributeDictionary::FindAndRemove(const TString& key) +{ + auto result = Find<T>(key); + if (result) { + Remove(key); + } + return result; +} + +template <class T> +void IAttributeDictionary::Set(const TString& key, const T& value) +{ + auto yson = ConvertToYsonString(value, NYson::EYsonFormat::Binary); + SetYson(key, yson); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTree diff --git a/yt/yt/core/ytree/attributes.h b/yt/yt/core/ytree/attributes.h index bff7c8fd55..c703bfa529 100644 --- a/yt/yt/core/ytree/attributes.h +++ b/yt/yt/core/ytree/attributes.h @@ -100,3 +100,7 @@ DEFINE_REFCOUNTED_TYPE(IAttributeDictionary) } // namespace NYT::NYTree + +#define ATTRIBUTES_INL_H_ +#include "attributes-inl.h" +#undef ATTRIBUTES_INL_H_ diff --git a/yt/yt/core/ytree/convert-inl.h b/yt/yt/core/ytree/convert-inl.h index 0e0e1c925b..f32c92ea0b 100644 --- a/yt/yt/core/ytree/convert-inl.h +++ b/yt/yt/core/ytree/convert-inl.h @@ -4,6 +4,7 @@ #include "convert.h" #endif +#include "attribute_consumer.h" #include "default_building_consumer.h" #include "serialize.h" #include "tree_builder.h" diff --git a/yt/yt/core/ytree/exception_helpers.cpp b/yt/yt/core/ytree/exception_helpers.cpp index 8e27ceb8e0..5260ff91e5 100644 --- a/yt/yt/core/ytree/exception_helpers.cpp +++ b/yt/yt/core/ytree/exception_helpers.cpp @@ -1,4 +1,5 @@ #include "exception_helpers.h" +#include "attributes.h" #include "node.h" #include <yt/yt/core/misc/error.h> diff --git a/yt/yt/core/ytree/fluent.h b/yt/yt/core/ytree/fluent.h index f423fb17f5..6ef40dfc10 100644 --- a/yt/yt/core/ytree/fluent.h +++ b/yt/yt/core/ytree/fluent.h @@ -4,6 +4,7 @@ #include "tree_visitor.h" #include "tree_builder.h" #include "convert.h" +#include "attributes.h" #include "attribute_consumer.h" #include "helpers.h" diff --git a/yt/yt/core/ytree/helpers-inl.h b/yt/yt/core/ytree/helpers-inl.h index ea204ab82b..4f19dc1814 100644 --- a/yt/yt/core/ytree/helpers-inl.h +++ b/yt/yt/core/ytree/helpers-inl.h @@ -12,80 +12,6 @@ namespace NYT::NYTree { //////////////////////////////////////////////////////////////////////////////// -template <class T> -T IAttributeDictionary::Get(TStringBuf key) const -{ - auto yson = GetYson(key); - try { - return ConvertTo<T>(yson); - } catch (const std::exception& ex) { - THROW_ERROR_EXCEPTION("Error parsing attribute %Qv", - key) - << ex; - } -} - -template <class T> -T IAttributeDictionary::GetAndRemove(const TString& key) -{ - auto result = Get<T>(key); - Remove(key); - return result; -} - -template <class T> -T IAttributeDictionary::Get(TStringBuf key, const T& defaultValue) const -{ - return Find<T>(key).value_or(defaultValue); -} - -template <class T> -T IAttributeDictionary::GetAndRemove(const TString& key, const T& defaultValue) -{ - auto result = Find<T>(key); - if (result) { - Remove(key); - return *result; - } else { - return defaultValue; - } -} - -template <class T> -typename TOptionalTraits<T>::TOptional IAttributeDictionary::Find(TStringBuf key) const -{ - auto yson = FindYson(key); - if (!yson) { - return typename TOptionalTraits<T>::TOptional(); - } - try { - return ConvertTo<T>(yson); - } catch (const std::exception& ex) { - THROW_ERROR_EXCEPTION("Error parsing attribute %Qv", - key) - << ex; - } -} - -template <class T> -typename TOptionalTraits<T>::TOptional IAttributeDictionary::FindAndRemove(const TString& key) -{ - auto result = Find<T>(key); - if (result) { - Remove(key); - } - return result; -} - -template <class T> -void IAttributeDictionary::Set(const TString& key, const T& value) -{ - auto yson = ConvertToYsonString(value, NYson::EYsonFormat::Binary); - SetYson(key, yson); -} - -//////////////////////////////////////////////////////////////////////////////// - template <class T, class R> IYPathServicePtr IYPathService::FromMethod( R (std::remove_cv_t<T>::*method) () const, diff --git a/yt/yt/core/ytree/helpers.cpp b/yt/yt/core/ytree/helpers.cpp index cad1e1367f..cd3bb2ce7f 100644 --- a/yt/yt/core/ytree/helpers.cpp +++ b/yt/yt/core/ytree/helpers.cpp @@ -1,4 +1,5 @@ #include "helpers.h" +#include "attributes.h" #include "ypath_client.h" #include <yt/yt/core/misc/error.h> diff --git a/yt/yt/core/ytree/helpers.h b/yt/yt/core/ytree/helpers.h index 489785ea75..3320456f78 100644 --- a/yt/yt/core/ytree/helpers.h +++ b/yt/yt/core/ytree/helpers.h @@ -54,7 +54,7 @@ void ValidateYPathResolutionDepth(const NYPath::TYPath& path, int depth); //! Helps implementing IAttributeDictionary::ListPairs by delegating to //! IAttributeDictionary::ListKeys and IAttributeDictionary::FindYson for those not capable //! of providing a custom efficient implementation. -std::vector<IAttributeDictionary::TKeyValuePair> ListAttributesPairs(const IAttributeDictionary& attributes); +std::vector<std::pair<TString, NYson::TYsonString>> ListAttributesPairs(const IAttributeDictionary& attributes); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/ytree/unittests/yson_schema_ut.cpp b/yt/yt/core/ytree/unittests/yson_schema_ut.cpp index 3a9548eca6..65060e25e3 100644 --- a/yt/yt/core/ytree/unittests/yson_schema_ut.cpp +++ b/yt/yt/core/ytree/unittests/yson_schema_ut.cpp @@ -169,7 +169,7 @@ void CheckSchema(const TYsonStructPtr& ysonStruct, TStringBuf expected) auto* factory = GetEphemeralNodeFactory(); auto builder = CreateBuilderFromFactory(factory); builder->BeginTree(); - ysonStruct->GetSchema(builder.get()); + ysonStruct->WriteSchema(builder.get()); auto actualNode = builder->EndTree(); auto expectedNode = ConvertToNode(TYsonStringBuf(expected), factory); EXPECT_TRUE(AreNodesEqual(expectedNode, actualNode)) diff --git a/yt/yt/core/ytree/yson_schema-inl.h b/yt/yt/core/ytree/yson_schema-inl.h index 0aefa82d02..2b4b0e5a33 100644 --- a/yt/yt/core/ytree/yson_schema-inl.h +++ b/yt/yt/core/ytree/yson_schema-inl.h @@ -46,7 +46,7 @@ concept CIsAssociativeArray = CIsArray<T> && CIsMapping<T>; //////////////////////////////////////////////////////////////////////////////// #define DEFINE_SCHEMA_FOR_SIMPLE_TYPE(type, name) \ -inline void GetSchema(type, NYson::IYsonConsumer* consumer) \ +inline void WriteSchema(type, NYson::IYsonConsumer* consumer) \ { \ BuildYsonFluently(consumer) \ .Value(#name); \ @@ -79,83 +79,88 @@ DEFINE_SCHEMA_FOR_SIMPLE_TYPE(TGuid, guid) #undef DEFINE_SCHEMA_FOR_SIMPLE_TYPE template <CIsEnum T> -void GetSchema(const T&, NYson::IYsonConsumer* consumer) +void WriteSchema(const T&, NYson::IYsonConsumer* consumer) { - BuildYsonFluently(consumer).BeginMap() - .Item("type_name").Value("enum") - .Item("enum_name").Value(TEnumTraits<T>::GetTypeName()) - .Item("values").DoListFor( - TEnumTraits<T>::GetDomainNames(), [] (auto&& fluent, TStringBuf name) { - fluent.Item().Value(EncodeEnumValue(name)); - }) + BuildYsonFluently(consumer) + .BeginMap() + .Item("type_name").Value("enum") + .Item("enum_name").Value(TEnumTraits<T>::GetTypeName()) + .Item("values").DoListFor( + TEnumTraits<T>::GetDomainNames(), [] (auto fluent, TStringBuf name) { + fluent.Item().Value(EncodeEnumValue(name)); + }) .EndMap(); } template <CIsYsonStruct T> -void GetSchema(const NYT::TIntrusivePtr<T>& value, NYson::IYsonConsumer* consumer) +void WriteSchema(const NYT::TIntrusivePtr<T>& value, NYson::IYsonConsumer* consumer) { - BuildYsonFluently(consumer) .BeginMap() - .Item("type_name").Value("optional") - .Item("item").Do([&] (auto&& fluent) { - (value ? value : New<T>())->GetSchema(fluent.GetConsumer()); - }) + BuildYsonFluently(consumer) + .BeginMap() + .Item("type_name").Value("optional") + .Item("item").Do([&] (auto fluent) { + (value ? value : New<T>())->WriteSchema(fluent.GetConsumer()); + }) .EndMap(); } template <CIsYsonStruct T> -void GetSchema(const T& value, NYson::IYsonConsumer* consumer) +void WriteSchema(const T& value, NYson::IYsonConsumer* consumer) { - return value.GetSchema(consumer); + return value.WriteSchema(consumer); } template <CIsProtobufMessage T> -void GetSchema(const T&, NYson::IYsonConsumer* consumer) +void WriteSchema(const T&, NYson::IYsonConsumer* consumer) { - return NYson::GetSchema(NYson::ReflectProtobufMessageType<T>(), consumer); + return NYson::WriteSchema(NYson::ReflectProtobufMessageType<T>(), consumer); } template <CIsArray T> -void GetSchema(const T& value, NYson::IYsonConsumer* consumer) +void WriteSchema(const T& value, NYson::IYsonConsumer* consumer) { - BuildYsonFluently(consumer).BeginMap() - .Item("type_name").Value("list") - .Item("item").Do([&] (auto&& fluent) { - GetSchema( - std::begin(value) != std::end(value) - ? *std::begin(value) - : std::decay_t<decltype(*std::begin(value))>{}, - fluent.GetConsumer()); - }) + BuildYsonFluently(consumer) + .BeginMap() + .Item("type_name").Value("list") + .Item("item").Do([&] (auto fluent) { + WriteSchema( + std::begin(value) != std::end(value) + ? *std::begin(value) + : std::decay_t<decltype(*std::begin(value))>{}, + fluent.GetConsumer()); + }) .EndMap(); } template <CIsAssociativeArray T> -void GetSchema(const T& value, NYson::IYsonConsumer* consumer) +void WriteSchema(const T& value, NYson::IYsonConsumer* consumer) { - BuildYsonFluently(consumer).BeginMap() - .Item("type_name").Value("dict") - .Item("key").Do([&] (auto&& fluent) { - GetSchema(value.empty() ? typename T::key_type{} : value.begin()->first, fluent.GetConsumer()); - }) - .Item("value").Do([&] (auto&& fluent) { - GetSchema(value.empty() ? typename T::mapped_type{} : value.begin()->second, fluent.GetConsumer()); - }) + BuildYsonFluently(consumer) + .BeginMap() + .Item("type_name").Value("dict") + .Item("key").Do([&] (auto fluent) { + WriteSchema(value.empty() ? typename T::key_type{} : value.begin()->first, fluent.GetConsumer()); + }) + .Item("value").Do([&] (auto fluent) { + WriteSchema(value.empty() ? typename T::mapped_type{} : value.begin()->second, fluent.GetConsumer()); + }) .EndMap(); } template <CIsNullable T> -void GetSchema(const T& value, NYson::IYsonConsumer* consumer) +void WriteSchema(const T& value, NYson::IYsonConsumer* consumer) { - BuildYsonFluently(consumer) .BeginMap() - .Item("type_name").Value("optional") - .Item("item").Do([&] (auto&& fluent) { - GetSchema(value ? *value : std::decay_t<decltype(*value)>{}, fluent.GetConsumer()); - }) + BuildYsonFluently(consumer) + .BeginMap() + .Item("type_name").Value("optional") + .Item("item").Do([&] (auto fluent) { + WriteSchema(value ? *value : std::decay_t<decltype(*value)>{}, fluent.GetConsumer()); + }) .EndMap(); } template <class T> -void GetSchema(const T& value, NYson::IYsonConsumer* consumer) +void WriteSchema(const T& value, NYson::IYsonConsumer* consumer) { auto node = ConvertToNode(value); BuildYsonFluently(consumer).Value(FormatEnum(node->GetType())); diff --git a/yt/yt/core/ytree/yson_schema.h b/yt/yt/core/ytree/yson_schema.h index 2b1f3a38e6..a2e9f02b26 100644 --- a/yt/yt/core/ytree/yson_schema.h +++ b/yt/yt/core/ytree/yson_schema.h @@ -7,7 +7,7 @@ namespace NYT::NYTree::NPrivate { //////////////////////////////////////////////////////////////////////////////// template <typename T> -void GetSchema(const T& value, NYson::IYsonConsumer* consumer); +void WriteSchema(const T& value, NYson::IYsonConsumer* consumer); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/ytree/yson_struct.cpp b/yt/yt/core/ytree/yson_struct.cpp index 881414d40b..09b98aebf3 100644 --- a/yt/yt/core/ytree/yson_struct.cpp +++ b/yt/yt/core/ytree/yson_struct.cpp @@ -135,9 +135,9 @@ std::vector<TString> TYsonStructBase::GetAllParameterAliases(const TString& key) return result; } -void TYsonStructBase::GetSchema(IYsonConsumer* consumer) const +void TYsonStructBase::WriteSchema(IYsonConsumer* consumer) const { - return Meta_->GetSchema(this, consumer); + return Meta_->WriteSchema(this, consumer); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/ytree/yson_struct.h b/yt/yt/core/ytree/yson_struct.h index 18013c71fe..15acd2fa15 100644 --- a/yt/yt/core/ytree/yson_struct.h +++ b/yt/yt/core/ytree/yson_struct.h @@ -93,7 +93,7 @@ public: std::vector<TString> GetAllParameterAliases(const TString& key) const; - void GetSchema(NYson::IYsonConsumer* consumer) const; + void WriteSchema(NYson::IYsonConsumer* consumer) const; private: template <class TValue> diff --git a/yt/yt/core/ytree/yson_struct_detail-inl.h b/yt/yt/core/ytree/yson_struct_detail-inl.h index d6f507d37d..ac714065af 100644 --- a/yt/yt/core/ytree/yson_struct_detail-inl.h +++ b/yt/yt/core/ytree/yson_struct_detail-inl.h @@ -869,10 +869,10 @@ IMapNodePtr TYsonStructParameter<TValue>::GetRecursiveUnrecognized(const TYsonSt } template <class TValue> -void TYsonStructParameter<TValue>::GetSchema(const TYsonStructBase* self, NYson::IYsonConsumer* consumer) const +void TYsonStructParameter<TValue>::WriteSchema(const TYsonStructBase* self, NYson::IYsonConsumer* consumer) const { // TODO(bulatman) What about constraints: minimum, maximum, default and etc? - NPrivate::GetSchema(FieldAccessor_->GetValue(self), consumer); + NPrivate::WriteSchema(FieldAccessor_->GetValue(self), consumer); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/ytree/yson_struct_detail.cpp b/yt/yt/core/ytree/yson_struct_detail.cpp index e730ce9b09..c396e4d621 100644 --- a/yt/yt/core/ytree/yson_struct_detail.cpp +++ b/yt/yt/core/ytree/yson_struct_detail.cpp @@ -315,20 +315,21 @@ void TYsonStructMeta::SetUnrecognizedStrategy(EUnrecognizedStrategy strategy) MetaUnrecognizedStrategy_ = strategy; } -void TYsonStructMeta::GetSchema(const TYsonStructBase* target, NYson::IYsonConsumer* consumer) const +void TYsonStructMeta::WriteSchema(const TYsonStructBase* target, NYson::IYsonConsumer* consumer) const { BuildYsonFluently(consumer) .BeginMap() .Item("type_name").Value("struct") - .Item("members").DoListFor(Parameters_, [&] (auto&& fluent, auto&& pair) { - fluent.Item().BeginMap() - .Item("name").Value(pair.first) - .Item("type").Do([&] (auto&& fluent) { - pair.second->GetSchema(target, fluent.GetConsumer()); - }) - .DoIf(pair.second->IsRequired(), [] (auto&& fluent) { - fluent.Item("required").Value(true); - }) + .Item("members").DoListFor(Parameters_, [&] (auto fluent, const auto& pair) { + fluent.Item() + .BeginMap() + .Item("name").Value(pair.first) + .Item("type").Do([&] (auto fluent) { + pair.second->WriteSchema(target, fluent.GetConsumer()); + }) + .DoIf(pair.second->IsRequired(), [] (auto fluent) { + fluent.Item("required").Value(true); + }) .EndMap(); }) .EndMap(); diff --git a/yt/yt/core/ytree/yson_struct_detail.h b/yt/yt/core/ytree/yson_struct_detail.h index 27b9a6ba18..5a3effd704 100644 --- a/yt/yt/core/ytree/yson_struct_detail.h +++ b/yt/yt/core/ytree/yson_struct_detail.h @@ -61,7 +61,7 @@ struct IYsonStructParameter virtual const std::vector<TString>& GetAliases() const = 0; virtual IMapNodePtr GetRecursiveUnrecognized(const TYsonStructBase* self) const = 0; - virtual void GetSchema(const TYsonStructBase* self, NYson::IYsonConsumer* consumer) const = 0; + virtual void WriteSchema(const TYsonStructBase* self, NYson::IYsonConsumer* consumer) const = 0; }; //using IYsonStructParameterPtr = TIntrusivePtr<IYsonStructParameter>; @@ -101,7 +101,7 @@ struct IYsonStructMeta virtual void RegisterPostprocessor(std::function<void(TYsonStructBase*)> postprocessor) = 0; virtual void SetUnrecognizedStrategy(EUnrecognizedStrategy strategy) = 0; - virtual void GetSchema(const TYsonStructBase* target, NYson::IYsonConsumer* consumer) const = 0; + virtual void WriteSchema(const TYsonStructBase* target, NYson::IYsonConsumer* consumer) const = 0; virtual ~IYsonStructMeta() = default; }; @@ -144,7 +144,7 @@ public: void RegisterPostprocessor(std::function<void(TYsonStructBase*)> postprocessor) override; void SetUnrecognizedStrategy(EUnrecognizedStrategy strategy) override; - void GetSchema(const TYsonStructBase* target, NYson::IYsonConsumer* consumer) const override; + void WriteSchema(const TYsonStructBase* target, NYson::IYsonConsumer* consumer) const override; void FinishInitialization(const std::type_info& structType); @@ -250,7 +250,7 @@ public: const std::vector<TString>& GetAliases() const override; IMapNodePtr GetRecursiveUnrecognized(const TYsonStructBase* self) const override; - void GetSchema(const TYsonStructBase* self, NYson::IYsonConsumer* consumer) const override; + void WriteSchema(const TYsonStructBase* self, NYson::IYsonConsumer* consumer) const override; // Mark as optional. Field will be default-initialized if `init` is true, initialization is skipped otherwise. TYsonStructParameter& Optional(bool init = true); diff --git a/yt/yt/library/backtrace_introspector/introspect.cpp b/yt/yt/library/backtrace_introspector/introspect.cpp index 592c232f0f..a555a8a548 100644 --- a/yt/yt/library/backtrace_introspector/introspect.cpp +++ b/yt/yt/library/backtrace_introspector/introspect.cpp @@ -2,6 +2,7 @@ #include "private.h" +#include <yt/yt/core/misc/collection_helpers.h> #include <yt/yt/core/misc/finally.h> #include <yt/yt/core/misc/proc.h> diff --git a/yt/yt/library/backtrace_introspector/introspect_linux.cpp b/yt/yt/library/backtrace_introspector/introspect_linux.cpp index 3fc1a077f6..2bee8c0bd3 100644 --- a/yt/yt/library/backtrace_introspector/introspect_linux.cpp +++ b/yt/yt/library/backtrace_introspector/introspect_linux.cpp @@ -163,6 +163,12 @@ std::vector<TThreadIntrospectionInfo> IntrospectThreads() std::vector<TThreadIntrospectionInfo> infos; for (auto threadId : GetCurrentProcessThreadIds()) { + if (!IsUserspaceThread(threadId)) { + YT_LOG_DEBUG("Skipping a non-userspace thread (ThreadId: %v)", + threadId); + continue; + } + TSignalHandlerContext signalHandlerContext; if (::syscall(SYS_tkill, threadId, SIGUSR1) != 0) { YT_LOG_DEBUG(TError::FromSystem(), "Failed to signal to thread (ThreadId: %v)", diff --git a/yt/yt/library/formats/arrow_writer.cpp b/yt/yt/library/formats/arrow_writer.cpp index 6874a45ff5..d83cb97358 100644 --- a/yt/yt/library/formats/arrow_writer.cpp +++ b/yt/yt/library/formats/arrow_writer.cpp @@ -45,6 +45,7 @@ struct TTypedBatchColumn //////////////////////////////////////////////////////////////////////////////// constexpr i64 ArrowAlignment = 8; +const TString AlignmentString(ArrowAlignment, 0); flatbuffers::Offset<flatbuffers::String> SerializeString( flatbuffers::FlatBufferBuilder* flatbufBuilder, @@ -1287,18 +1288,19 @@ private: auto metadataPtr = message.FlatbufBuilder->GetBufferPointer(); - ui32 metadataSz = AlignUp<i64>(metadataSize, ArrowAlignment); + ui32 metadataAlignSize = AlignUp<i64>(metadataSize, ArrowAlignment); - output->Write(&metadataSz, sizeof(ui32)); + output->Write(&metadataAlignSize, sizeof(ui32)); output->Write(metadataPtr, metadataSize); + output->Write(AlignmentString.Data(), metadataAlignSize - metadataSize); + // Body if (message.BodyWriter) { - TString current; - current.resize(message.BodySize); + TString current(AlignUp<i64>(message.BodySize, ArrowAlignment), 0); // Double copying. - message.BodyWriter(TMutableRef::FromString(current)); - output->Write(current.data(), message.BodySize); + message.BodyWriter(TMutableRef(current.begin(), current.begin() + message.BodySize)); + output->Write(current.data(), current.Size()); } else { YT_VERIFY(message.BodySize == 0); } diff --git a/yt/yt/library/profiling/solomon/exporter.h b/yt/yt/library/profiling/solomon/exporter.h index 6ec0ce7dc1..8aad2abcb1 100644 --- a/yt/yt/library/profiling/solomon/exporter.h +++ b/yt/yt/library/profiling/solomon/exporter.h @@ -4,20 +4,20 @@ #include "registry.h" #include "remote.h" -#include <yt/yt/core/concurrency/thread_pool.h> -#include <yt/yt/core/concurrency/async_rw_lock.h> - #include <yt/yt/core/actions/public.h> +#include <yt/yt/core/concurrency/async_rw_lock.h> +#include <yt/yt/core/concurrency/thread_pool.h> + #include <yt/yt/core/http/public.h> -#include <yt/yt/core/ytree/yson_serializable.h> #include <yt/yt/core/ytree/ypath_detail.h> +#include <yt/yt/core/ytree/yson_struct.h> -#include <library/cpp/monlib/encode/format.h> - -#include <yt/yt/library/profiling/sensor.h> #include <yt/yt/library/profiling/producer.h> +#include <yt/yt/library/profiling/sensor.h> + +#include <library/cpp/monlib/encode/format.h> namespace NYT::NProfiling { diff --git a/yt/yt/library/program/config.cpp b/yt/yt/library/program/config.cpp index 288a90a3b1..304f9c761c 100644 --- a/yt/yt/library/program/config.cpp +++ b/yt/yt/library/program/config.cpp @@ -169,13 +169,6 @@ void WarnForUnrecognizedOptions( WarnForUnrecognizedOptionsImpl(logger, config->GetRecursiveUnrecognized()); } -void WarnForUnrecognizedOptions( - const NLogging::TLogger& logger, - const NYTree::TYsonSerializablePtr& config) -{ - WarnForUnrecognizedOptionsImpl(logger, config->GetUnrecognizedRecursively()); -} - void AbortOnUnrecognizedOptionsImpl( const NLogging::TLogger& logger, const IMapNodePtr& unrecognized) @@ -195,13 +188,6 @@ void AbortOnUnrecognizedOptions( AbortOnUnrecognizedOptionsImpl(logger, config->GetRecursiveUnrecognized()); } -void AbortOnUnrecognizedOptions( - const NLogging::TLogger& logger, - const NYTree::TYsonSerializablePtr& config) -{ - AbortOnUnrecognizedOptionsImpl(logger, config->GetUnrecognizedRecursively()); -} - //////////////////////////////////////////////////////////////////////////////// } // namespace NYT diff --git a/yt/yt/library/program/config.h b/yt/yt/library/program/config.h index b840950418..d4420b7278 100644 --- a/yt/yt/library/program/config.h +++ b/yt/yt/library/program/config.h @@ -2,7 +2,6 @@ #include "public.h" -#include <yt/yt/core/ytree/yson_serializable.h> #include <yt/yt/core/ytree/yson_struct.h> #include <yt/yt/core/ytalloc/config.h> diff --git a/yt/yt/library/program/program_config_mixin.h b/yt/yt/library/program/program_config_mixin.h index 80f681d06e..784c422477 100644 --- a/yt/yt/library/program/program_config_mixin.h +++ b/yt/yt/library/program/program_config_mixin.h @@ -2,10 +2,12 @@ #include "program.h" -#include <library/cpp/yt/string/enum.h> +#include <yt/yt/core/yson/writer.h> #include <yt/yt/core/ytree/convert.h> -#include <yt/yt/core/ytree/yson_serializable.h> +#include <yt/yt/core/ytree/yson_struct.h> + +#include <library/cpp/yt/string/enum.h> #include <util/stream/file.h> diff --git a/yt/yt/library/tracing/jaeger/tracer.cpp b/yt/yt/library/tracing/jaeger/tracer.cpp index 351ec787d4..fd3a409f6d 100644 --- a/yt/yt/library/tracing/jaeger/tracer.cpp +++ b/yt/yt/library/tracing/jaeger/tracer.cpp @@ -11,11 +11,13 @@ #include <yt/yt/core/concurrency/action_queue.h> #include <yt/yt/core/concurrency/periodic_executor.h> -#include <yt/yt/core/ytree/yson_serializable.h> #include <yt/yt/core/misc/protobuf_helpers.h> #include <yt/yt/core/misc/serialize.h> + #include <yt/yt/core/utilex/random.h> +#include <yt/yt/core/ytree/yson_struct.h> + #include <util/string/cast.h> #include <util/string/reverse.h> diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 8b0a758fd8..297ee2ae7d 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -601,6 +601,20 @@ message TRspSelectRows //////////////////////////////////////////////////////////////////////////////// +message TReqAdvanceConsumer +{ + optional NYT.NProto.TGuid transaction_id = 1; + optional string consumer_path = 2; + optional string queue_path = 3; + optional int32 partition_index = 4; + optional int64 old_offset = 5; + optional int64 new_offset = 6; +} + +message TRspAdvanceConsumer +{ +} + message TRowBatchReadOptions { optional int64 max_row_count = 1; |