diff options
author | Alexander Smirnov <alex@ydb.tech> | 2024-10-16 12:11:24 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2024-10-16 12:11:24 +0000 |
commit | 40811e93f3fdf9342a9295369994012420fac548 (patch) | |
tree | a8d85e094a9c21e10aa250f537c101fc2016a049 /yt | |
parent | 30ebe5357bb143648c6be4d151ecd4944af81ada (diff) | |
parent | 28a0c4a9f297064538a018c512cd9bbd00a1a35d (diff) | |
download | ydb-40811e93f3fdf9342a9295369994012420fac548.tar.gz |
Merge branch 'rightlib' into mergelibs-241016-1210
Diffstat (limited to 'yt')
104 files changed, 2867 insertions, 785 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 9e2885bbe0..9dab176bde 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -1243,6 +1243,14 @@ IFileReaderPtr TClient::GetJobStderr( return NRawClient::GetJobStderr(Context_, operationId, jobId, options); } +std::vector<TJobTraceEvent> TClient::GetJobTrace( + const TOperationId& operationId, + const TGetJobTraceOptions& options) +{ + CheckShutdown(); + return NRawClient::GetJobTrace(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options); +} + TNode::TListType TClient::SkyShareTable( const std::vector<TYPath>& tablePaths, const TSkyShareTableOptions& options) diff --git a/yt/cpp/mapreduce/client/client.h b/yt/cpp/mapreduce/client/client.h index 5de00285ef..32c458316d 100644 --- a/yt/cpp/mapreduce/client/client.h +++ b/yt/cpp/mapreduce/client/client.h @@ -449,6 +449,10 @@ public: const TJobId& jobId, const TGetJobStderrOptions& options = TGetJobStderrOptions()) override; + std::vector<TJobTraceEvent> GetJobTrace( + const TOperationId& operationId, + const TGetJobTraceOptions& options = TGetJobTraceOptions()) override; + TNode::TListType SkyShareTable( const std::vector<TYPath>& tablePaths, const TSkyShareTableOptions& options = TSkyShareTableOptions()) override; diff --git a/yt/cpp/mapreduce/interface/client.h b/yt/cpp/mapreduce/interface/client.h index 56efa3c23c..3032025140 100644 --- a/yt/cpp/mapreduce/interface/client.h +++ b/yt/cpp/mapreduce/interface/client.h @@ -493,6 +493,18 @@ public: const TGetJobStderrOptions& options = TGetJobStderrOptions()) = 0; /// + /// @brief Get trace of a job. + /// + /// @ref NYT::TErrorResponse exception is thrown if it is missing. + /// + /// @note YT doesn't store all job traces. + /// + /// @see [YT doc](https://ytsaurus.tech/docs/en/api/commands.html#get_job_trace) + virtual std::vector<TJobTraceEvent> GetJobTrace( + const TOperationId& operationId, + const TGetJobTraceOptions& options = TGetJobTraceOptions()) = 0; + + /// /// @brief Create one or several rbtorrents for files in a blob table. /// /// If specified, one torrent is created for each value of `KeyColumns` option. diff --git a/yt/cpp/mapreduce/interface/fwd.h b/yt/cpp/mapreduce/interface/fwd.h index 0434c03d8b..485b45129a 100644 --- a/yt/cpp/mapreduce/interface/fwd.h +++ b/yt/cpp/mapreduce/interface/fwd.h @@ -157,6 +157,7 @@ namespace NYT { using TTabletCellId = TGUID; using TReplicaId = TGUID; using TJobId = TGUID; + using TJobTraceId = TGUID; using TYPath = TString; using TLocalFilePath = TString; @@ -370,6 +371,8 @@ namespace NYT { struct TListJobsOptions; + struct TGetJobTraceOptions; + struct IOperationClient; enum class EFinishedJobState : int; diff --git a/yt/cpp/mapreduce/interface/operation.h b/yt/cpp/mapreduce/interface/operation.h index 9a85049886..f2de3ea3bd 100644 --- a/yt/cpp/mapreduce/interface/operation.h +++ b/yt/cpp/mapreduce/interface/operation.h @@ -3048,6 +3048,70 @@ struct TGetFailedJobInfoOptions //////////////////////////////////////////////////////////////////////////////// /// +/// @brief Options for @ref NYT::IClient::GetJobTrace. +struct TGetJobTraceOptions +{ + /// @cond Doxygen_Suppress + using TSelf = TGetJobTraceOptions; + /// @endcond + + /// + /// @brief Id of the job. + FLUENT_FIELD_OPTION(TJobId, JobId); + + /// + /// @brief Id of the trace. + FLUENT_FIELD_OPTION(TJobTraceId, TraceId); + + /// + /// @brief Search for traces with time >= `FromTime`. + FLUENT_FIELD_OPTION(i64, FromTime); + + /// + /// @brief Search for traces with time <= `ToTime`. + FLUENT_FIELD_OPTION(i64, ToTime); + + /// + /// @brief Search for traces with event index >= `FromEventIndex`. + FLUENT_FIELD_OPTION(i64, FromEventIndex); + + /// + /// @brief Search for traces with event index >= `ToEventIndex`. + FLUENT_FIELD_OPTION(i64, ToEventIndex); +}; + +/// +/// @brief Response for @ref NYT::IOperation::GetJobTrace. +struct TJobTraceEvent +{ + /// + /// @brief Id of the operation. + TOperationId OperationId; + + /// + /// @brief Id of the job. + TJobId JobId; + + /// + /// @brief Id of the trace. + TJobTraceId TraceId; + + /// + /// @brief Index of the trace event. + i64 EventIndex; + + /// + /// @brief Raw evenr in json format. + TString Event; + + /// + /// @brief Time of the event. + TInstant EventTime; +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// /// @brief Interface representing an operation. struct IOperation : public TThrRefBase diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index 2f9610a1ca..59868c599e 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -783,6 +783,55 @@ IFileReaderPtr GetJobStderr( return new TResponseReader(context, std::move(header)); } +TJobTraceEvent ParseJobTraceEvent(const TNode& node) +{ + const auto& mapNode = node.AsMap(); + TJobTraceEvent result; + + if (auto idNode = mapNode.FindPtr("operation_id")) { + result.OperationId = GetGuid(idNode->AsString()); + } + if (auto idNode = mapNode.FindPtr("job_id")) { + result.JobId = GetGuid(idNode->AsString()); + } + if (auto idNode = mapNode.FindPtr("trace_id")) { + result.TraceId = GetGuid(idNode->AsString()); + } + if (auto eventIndexNode = mapNode.FindPtr("event_index")) { + result.EventIndex = eventIndexNode->AsInt64(); + } + if (auto eventNode = mapNode.FindPtr("event")) { + result.Event = eventNode->AsString(); + } + if (auto eventTimeNode = mapNode.FindPtr("event_time")) { + result.EventTime = TInstant::ParseIso8601(eventTimeNode->AsString());; + } + + return result; +} + +std::vector<TJobTraceEvent> GetJobTrace( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TGetJobTraceOptions& options) +{ + THttpHeader header("GET", "get_job_trace"); + header.MergeParameters(SerializeParamsForGetJobTrace(operationId, options)); + auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header); + auto resultNode = NodeFromYsonString(responseInfo.Response); + + std::vector<TJobTraceEvent> result; + + const auto& traceEventNodesList = resultNode.AsList(); + result.reserve(traceEventNodesList.size()); + for (const auto& traceEventNode : traceEventNodesList) { + result.push_back(ParseJobTraceEvent(traceEventNode)); + } + + return result; +} + TMaybe<TYPath> GetFileFromCache( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h index c2d1a53b51..0a183d617c 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.h +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -257,6 +257,12 @@ TString GetJobStderrWithRetries( const TJobId& jobId, const TGetJobStderrOptions& options = TGetJobStderrOptions()); +std::vector<TJobTraceEvent> GetJobTrace( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TGetJobTraceOptions& options = TGetJobTraceOptions()); + // // File cache // diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp index a638ac581d..bebc59ec91 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp @@ -525,6 +525,15 @@ TNode SerializeParamsForGetJob( return result; } +TNode SerializeParamsForGetJobTrace( + const TOperationId& operationId, + const TGetJobTraceOptions& /* options */) +{ + TNode result; + SetOperationIdParam(&result, operationId); + return result; +} + TNode SerializeParamsForListJobs( const TOperationId& operationId, const TListJobsOptions& options) diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h index a7ab35d91d..69a4888267 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h @@ -126,6 +126,10 @@ TNode SerializeParamsForListJobs( const TOperationId& operationId, const TListJobsOptions& options); +TNode SerializeParamsForGetJobTrace( + const TOperationId& operationId, + const TGetJobTraceOptions& options); + TNode SerializeParametersForInsertRows( const TString& pathPrefix, const TYPath& path, diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h index c14f2d49d6..6f85657139 100644 --- a/yt/yt/client/api/delegating_client.h +++ b/yt/yt/client/api/delegating_client.h @@ -516,6 +516,11 @@ public: const TGetJobStderrOptions& options), (operationIdOrAlias, jobId, options)) + DELEGATE_METHOD(TFuture<std::vector<TJobTraceEvent>>, GetJobTrace, ( + const NScheduler::TOperationIdOrAlias& operationIdOrAlias, + const TGetJobTraceOptions& options), + (operationIdOrAlias, options)) + DELEGATE_METHOD(TFuture<TSharedRef>, GetJobFailContext, ( const NScheduler::TOperationIdOrAlias& operationIdOrAlias, NJobTrackerClient::TJobId jobId, @@ -851,9 +856,9 @@ public: const TDistributedWriteSessionFinishOptions& options), (std::move(session), options)) - DELEGATE_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, ( - const TDistributedWriteCookiePtr& cookie, - const TParticipantTableWriterOptions& options), + DELEGATE_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, ( + const TFragmentWriteCookiePtr& cookie, + const TFragmentTableWriterOptions& options), (cookie, options)) // Shuffle Service diff --git a/yt/yt/client/api/distributed_table_client.h b/yt/yt/client/api/distributed_table_client.h index 9ac983371c..d1bca5bc3a 100644 --- a/yt/yt/client/api/distributed_table_client.h +++ b/yt/yt/client/api/distributed_table_client.h @@ -4,6 +4,8 @@ #include <yt/yt/client/table_client/config.h> +#include <library/cpp/yt/misc/non_null_ptr.h> + namespace NYT::NApi { //////////////////////////////////////////////////////////////////////////////// @@ -13,10 +15,11 @@ struct TDistributedWriteSessionStartOptions { }; struct TDistributedWriteSessionFinishOptions - : public TTransactionalOptions -{ }; +{ + int MaxChildrenPerAttachRequest = 10'000; +}; -struct TParticipantTableWriterOptions +struct TFragmentTableWriterOptions : public TTableWriterOptions { }; @@ -24,6 +27,7 @@ struct TParticipantTableWriterOptions struct IDistributedTableClientBase { +public: virtual ~IDistributedTableClientBase() = default; virtual TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession( @@ -33,6 +37,13 @@ struct IDistributedTableClientBase virtual TFuture<void> FinishDistributedWriteSession( TDistributedWriteSessionPtr session, const TDistributedWriteSessionFinishOptions& options = {}) = 0; + + // Helper used to implement FinishDistributedWriteSession efficiently + // without compromising privacy of session fields. + // defined in yt/yt/client/api/distributed_table_session.cpp. + void* GetOpaqueDistributedWriteResults(Y_LIFETIME_BOUND const TDistributedWriteSessionPtr& session); + // Used in chunk writer for results recording + void RecordOpaqueWriteResult(const TFragmentWriteCookiePtr& cookie, void* opaqueWriteResult); }; //////////////////////////////////////////////////////////////////////////////// @@ -41,9 +52,9 @@ struct IDistributedTableClient { virtual ~IDistributedTableClient() = default; - virtual TFuture<ITableWriterPtr> CreateParticipantTableWriter( - const TDistributedWriteCookiePtr& cookie, - const TParticipantTableWriterOptions& options = {}) = 0; + virtual TFuture<ITableWriterPtr> CreateFragmentTableWriter( + const TFragmentWriteCookiePtr& cookie, + const TFragmentTableWriterOptions& options = {}) = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/distributed_table_session.cpp b/yt/yt/client/api/distributed_table_session.cpp new file mode 100644 index 0000000000..d67998b9d2 --- /dev/null +++ b/yt/yt/client/api/distributed_table_session.cpp @@ -0,0 +1,197 @@ +#include "distributed_table_session.h" + +#include "client.h" +#include "transaction.h" + +namespace NYT::NApi { + +using namespace NYPath; +using namespace NObjectClient; +using namespace NTableClient; +using namespace NTransactionClient; +using namespace NYTree; +using namespace NCypressClient; +using namespace NChunkClient; + +//////////////////////////////////////////////////////////////////////////////// + +TTableWriterPatchInfo::TTableWriterPatchInfo( + TRichYPath richPath, + TObjectId objectId, + TCellTag externalCellTag, + TMasterTableSchemaId chunkSchemaId, + TTableSchemaPtr chunkSchema, + std::optional<TLegacyOwningKey> writerLastKey, + int maxHeavyColumns, + TTimestamp timestamp, + const IAttributeDictionary& tableAttributes) + : TTableWriterPatchInfo() +{ + ObjectId = objectId; + RichPath = std::move(richPath); + + ChunkSchemaId = chunkSchemaId; + ChunkSchema = std::move(chunkSchema); + + WriterLastKey = writerLastKey; + MaxHeavyColumns = maxHeavyColumns; + + TableAttributes = tableAttributes.ToMap(); + + ExternalCellTag = externalCellTag; + + Timestamp = timestamp; +} + +void TTableWriterPatchInfo::Register(TRegistrar registrar) +{ + registrar.Parameter("table_id", &TThis::ObjectId); + registrar.Parameter("table_path", &TThis::RichPath); + + registrar.Parameter("chunk_schema_id", &TThis::ChunkSchemaId); + registrar.Parameter("chunk_schema", &TThis::ChunkSchema); + + registrar.Parameter("writer_last_key", &TThis::WriterLastKey); + registrar.Parameter("max_heavy_columns", &TThis::MaxHeavyColumns); + + registrar.Parameter("table_attributes", &TThis::TableAttributes); + + registrar.Parameter("external_cell_tag", &TThis::ExternalCellTag); + + registrar.Parameter("timestamp", &TThis::Timestamp); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TFragmentWriteResult::Register(TRegistrar registrar) +{ + registrar.Parameter("min_boundary_key", &TThis::MinBoundaryKey); + registrar.Parameter("max_boundary_key", &TThis::MaxBoundaryKey); + registrar.Parameter("chunk_list_id", &TThis::ChunkListId); +} + +//////////////////////////////////////////////////////////////////////////////// + +const TTableWriterPatchInfo& TFragmentWriteCookie::GetPatchInfo() const +{ + return PatchInfo_; +} + +TTransactionId TFragmentWriteCookie::GetMainTransactionId() const +{ + return MainTxId_; +} + +TTransactionId TFragmentWriteCookie::GetUploadTransactionId() const +{ + return UploadTxId_; +} + +void TFragmentWriteCookie::Register(TRegistrar registrar) +{ + registrar.Parameter("session_id", &TThis::Id_); + + registrar.Parameter("tx_id", &TThis::MainTxId_); + registrar.Parameter("upload_tx_id", &TThis::UploadTxId_); + + registrar.Parameter("patch_info", &TThis::PatchInfo_); + + registrar.Parameter("writer_results", &TThis::WriteResults_); +} + +//////////////////////////////////////////////////////////////////////////////// + +TDistributedWriteSession::TDistributedWriteSession( + TTransactionId mainTxId, + TTransactionId uploadTxId, + TChunkListId rootChunkListId, + TTableWriterPatchInfo patchInfo) + : TDistributedWriteSession() +{ + Id_ = TDistributedWriteSessionId(TGuid::Create()); + MainTxId_ = mainTxId; + UploadTxId_ = uploadTxId; + + RootChunkListId_ = rootChunkListId; + + PatchInfo_ = std::move(patchInfo); + + // Signatures? +} + +TTransactionId TDistributedWriteSession::GetMainTransactionId() const +{ + return MainTxId_; +} + +TTransactionId TDistributedWriteSession::GetUploadTransactionId() const +{ + return UploadTxId_; +} + +const TTableWriterPatchInfo& TDistributedWriteSession::GetPatchInfo() const Y_LIFETIME_BOUND +{ + return PatchInfo_; +} + +TChunkListId TDistributedWriteSession::GetRootChunkListId() const +{ + return RootChunkListId_; +} + +TFragmentWriteCookiePtr TDistributedWriteSession::GiveCookie() +{ + auto cookie = New<TFragmentWriteCookie>(); + cookie->Id_ = Id_; + cookie->MainTxId_ = MainTxId_; + cookie->UploadTxId_ = UploadTxId_; + cookie->PatchInfo_ = PatchInfo_; + + return cookie; +} + +void TDistributedWriteSession::TakeCookie(TFragmentWriteCookiePtr cookie) +{ + // Verify cookie signature? + WriteResults_.reserve(std::ssize(WriteResults_) + std::ssize(cookie->WriteResults_)); + for (const auto& writeResult : cookie->WriteResults_) { + WriteResults_.push_back(writeResult); + } +} + +TFuture<void> TDistributedWriteSession::Ping(IClientPtr client) +{ + // NB(arkady-e1ppa): AutoAbort = false by default. + auto mainTx = client->AttachTransaction(MainTxId_); + + return mainTx->Ping(); +} + +void TDistributedWriteSession::Register(TRegistrar registrar) +{ + registrar.Parameter("session_id", &TThis::Id_); + registrar.Parameter("tx_id", &TThis::MainTxId_); + registrar.Parameter("upload_tx_id", &TThis::UploadTxId_); + + registrar.Parameter("root_chunk_list_id", &TThis::RootChunkListId_); + + registrar.Parameter("patch_info", &TThis::PatchInfo_); + + registrar.Parameter("write_results", &TThis::WriteResults_); +} + +//////////////////////////////////////////////////////////////////////////////// + +void* IDistributedTableClientBase::GetOpaqueDistributedWriteResults(Y_LIFETIME_BOUND const TDistributedWriteSessionPtr& session) +{ + return static_cast<void*>(&session->WriteResults_); +} + +void IDistributedTableClientBase::RecordOpaqueWriteResult(const TFragmentWriteCookiePtr& cookie, void* opaqueWriteResult) +{ + auto* concrete = static_cast<TFragmentWriteResult*>(opaqueWriteResult); + YT_ASSERT(concrete); + cookie->WriteResults_.push_back(*concrete); +} + +} // namespace NYT::NApi diff --git a/yt/yt/client/api/distributed_table_session.h b/yt/yt/client/api/distributed_table_session.h new file mode 100644 index 0000000000..c8c074c84c --- /dev/null +++ b/yt/yt/client/api/distributed_table_session.h @@ -0,0 +1,158 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/client/table_client/table_upload_options.h> + +#include <yt/yt/client/chunk_client/public.h> + +#include <yt/yt/client/table_client/key.h> + +#include <yt/yt/client/ypath/rich.h> + +#include <yt/yt/core/ytree/yson_struct.h> + +namespace NYT::NApi { + +//////////////////////////////////////////////////////////////////////////////// + +YT_DEFINE_STRONG_TYPEDEF(TDistributedWriteSessionId, TGuid); + +//////////////////////////////////////////////////////////////////////////////// + +// Used by distributed table writer to patch +// its config based on table data. +// NB(arkady-e1ppa): TableUploadOptions are +// encoded in RichPath + TableAttributes and thus +// are not required to be stored directly. +struct TTableWriterPatchInfo + : public NYTree::TYsonStructLite +{ +public: + NObjectClient::TObjectId ObjectId; + + NYPath::TRichYPath RichPath; + + // NB(arkady-e1ppa): Empty writer last key Serialization into Deserialization + // somehow results in a []; row which is considered non-empty for some reason. + // This matters too little for me to bother saving up the std::optional wrapper. + std::optional<NTableClient::TLegacyOwningKey> WriterLastKey; + int MaxHeavyColumns; + + NTableClient::TMasterTableSchemaId ChunkSchemaId; + NTableClient::TTableSchemaPtr ChunkSchema; + + NYTree::INodePtr TableAttributes; + + NObjectClient::TCellTag ExternalCellTag; + + NTransactionClient::TTimestamp Timestamp; + + TTableWriterPatchInfo( + NYPath::TRichYPath richPath, + NObjectClient::TObjectId objectId, + NObjectClient::TCellTag externalCellTag, + NTableClient::TMasterTableSchemaId chunkSchemaId, + NTableClient::TTableSchemaPtr chunkSchema, + std::optional<NTableClient::TLegacyOwningKey> writerLastKey, + int maxHeavyColumns, + NTransactionClient::TTimestamp timestamp, + const NYTree::IAttributeDictionary& tableAttributes); + + REGISTER_YSON_STRUCT_LITE(TTableWriterPatchInfo) + + static void Register(TRegistrar registrar); +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TFragmentWriteResult + : public NYTree::TYsonStructLite +{ + NTableClient::TLegacyOwningKey MinBoundaryKey; + NTableClient::TLegacyOwningKey MaxBoundaryKey; + NChunkClient::TChunkListId ChunkListId; + + REGISTER_YSON_STRUCT_LITE(TFragmentWriteResult) + + static void Register(TRegistrar registrar); +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TFragmentWriteCookie + : public NYTree::TYsonStruct +{ +public: + const TTableWriterPatchInfo& GetPatchInfo() const; + + NCypressClient::TTransactionId GetMainTransactionId() const; + NCypressClient::TTransactionId GetUploadTransactionId() const; + +private: + TDistributedWriteSessionId Id_; + + NCypressClient::TTransactionId MainTxId_; + NCypressClient::TTransactionId UploadTxId_; + + TTableWriterPatchInfo PatchInfo_; + + std::vector<TFragmentWriteResult> WriteResults_; + + REGISTER_YSON_STRUCT(TFragmentWriteCookie); + + static void Register(TRegistrar registrar); + + friend class TDistributedWriteSession; + friend struct IDistributedTableClientBase; +}; + +DEFINE_REFCOUNTED_TYPE(TFragmentWriteCookie); + +//////////////////////////////////////////////////////////////////////////////// + +class TDistributedWriteSession + : public NYTree::TYsonStruct +{ +public: + TDistributedWriteSession( + NCypressClient::TTransactionId mainTxId, + NCypressClient::TTransactionId uploadTxId, + NChunkClient::TChunkListId rootChunkListId, + TTableWriterPatchInfo patchInfo); + + NCypressClient::TTransactionId GetMainTransactionId() const; + NCypressClient::TTransactionId GetUploadTransactionId() const; + const TTableWriterPatchInfo& GetPatchInfo() const Y_LIFETIME_BOUND; + NChunkClient::TChunkListId GetRootChunkListId() const; + + TFragmentWriteCookiePtr GiveCookie(); + void TakeCookie(TFragmentWriteCookiePtr cookie); + + TFuture<void> Ping(IClientPtr client); + + REGISTER_YSON_STRUCT(TDistributedWriteSession); + + static void Register(TRegistrar registrar); + +private: + TDistributedWriteSessionId Id_; + + NCypressClient::TTransactionId MainTxId_; + NCypressClient::TTransactionId UploadTxId_; + + NChunkClient::TChunkListId RootChunkListId_; + + TTableWriterPatchInfo PatchInfo_; + + // This is used to commit changes when session is over. + std::vector<TFragmentWriteResult> WriteResults_; + + friend struct IDistributedTableClientBase; +}; + +DEFINE_REFCOUNTED_TYPE(TDistributedWriteSession); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi diff --git a/yt/yt/client/api/distributed_table_sessions.cpp b/yt/yt/client/api/distributed_table_sessions.cpp deleted file mode 100644 index b454ad476b..0000000000 --- a/yt/yt/client/api/distributed_table_sessions.cpp +++ /dev/null @@ -1,17 +0,0 @@ -#include "distributed_table_sessions.h" - -namespace NYT::NApi { - -//////////////////////////////////////////////////////////////////////////////// - -void TDistributedWriteCookie::Register(TRegistrar /*registrar*/) -{ } - -//////////////////////////////////////////////////////////////////////////////// - -void TDistributedWriteSession::Register(TRegistrar /*registrar*/) -{ } - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NApi diff --git a/yt/yt/client/api/distributed_table_sessions.h b/yt/yt/client/api/distributed_table_sessions.h deleted file mode 100644 index 19b3c0f5c0..0000000000 --- a/yt/yt/client/api/distributed_table_sessions.h +++ /dev/null @@ -1,41 +0,0 @@ -#pragma once - -#include "public.h" - -#include <yt/yt/core/ytree/yson_struct.h> - -namespace NYT::NApi { - -//////////////////////////////////////////////////////////////////////////////// - -YT_DEFINE_STRONG_TYPEDEF(TDistributedWriteSessionId, TGuid); - -//////////////////////////////////////////////////////////////////////////////// - -class TDistributedWriteCookie - : public NYTree::TYsonStruct -{ -public: - REGISTER_YSON_STRUCT(TDistributedWriteCookie); - - static void Register(TRegistrar registrar); -}; - -DEFINE_REFCOUNTED_TYPE(TDistributedWriteCookie); - -//////////////////////////////////////////////////////////////////////////////// - -class TDistributedWriteSession - : public NYTree::TYsonStruct -{ -public: - REGISTER_YSON_STRUCT(TDistributedWriteSession); - - static void Register(TRegistrar registrar); -}; - -DEFINE_REFCOUNTED_TYPE(TDistributedWriteSession); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NApi diff --git a/yt/yt/client/api/operation_client.cpp b/yt/yt/client/api/operation_client.cpp index 4e1816a6e7..8f8dbb355b 100644 --- a/yt/yt/client/api/operation_client.cpp +++ b/yt/yt/client/api/operation_client.cpp @@ -217,6 +217,19 @@ void Serialize(const TJob& job, NYson::IYsonConsumer* consumer, TStringBuf idKey .EndMap(); } +void Serialize(const TJobTraceEvent& traceEvent, NYson::IYsonConsumer* consumer) +{ + NYTree::BuildYsonFluently(consumer) + .BeginMap() + .Item("operation_id").Value(traceEvent.OperationId) + .Item("job_id").Value(traceEvent.JobId) + .Item("trace_id").Value(traceEvent.TraceId) + .Item("event_index").Value(traceEvent.EventIndex) + .Item("event").Value(traceEvent.Event) + .Item("event_time").Value(traceEvent.EventTime.MicroSeconds()) + .EndMap(); +} + //////////////////////////////////////////////////////////////////////////////// void TListOperationsAccessFilter::Register(TRegistrar registrar) diff --git a/yt/yt/client/api/operation_client.h b/yt/yt/client/api/operation_client.h index 48687497da..d5072db1fc 100644 --- a/yt/yt/client/api/operation_client.h +++ b/yt/yt/client/api/operation_client.h @@ -89,6 +89,18 @@ struct TGetJobStderrOptions std::optional<i64> Offset; }; +struct TGetJobTraceOptions + : public TTimeoutOptions + , public TMasterReadOptions +{ + std::optional<NJobTrackerClient::TJobId> JobId; + std::optional<NScheduler::TJobTraceId> TraceId; + std::optional<i64> FromTime; + std::optional<i64> ToTime; + std::optional<i64> FromEventIndex; + std::optional<i64> ToEventIndex; +}; + struct TGetJobFailContextOptions : public TTimeoutOptions , public TMasterReadOptions @@ -346,6 +358,18 @@ struct TJob void Serialize(const TJob& job, NYson::IYsonConsumer* consumer, TStringBuf idKey); +struct TJobTraceEvent +{ + NJobTrackerClient::TOperationId OperationId; + NJobTrackerClient::TJobId JobId; + NScheduler::TJobTraceId TraceId; + i64 EventIndex; + TString Event; + TInstant EventTime; +}; + +void Serialize(const TJobTraceEvent& traceEvent, NYson::IYsonConsumer* consumer); + struct TListJobsStatistics { TEnumIndexedArray<NJobTrackerClient::EJobState, i64> StateCounts; @@ -443,6 +467,10 @@ struct IOperationClient NJobTrackerClient::TJobId jobId, const TGetJobStderrOptions& options = {}) = 0; + virtual TFuture<std::vector<TJobTraceEvent>> GetJobTrace( + const NScheduler::TOperationIdOrAlias& operationIdOrAlias, + const TGetJobTraceOptions& options = {}) = 0; + virtual TFuture<TSharedRef> GetJobFailContext( const NScheduler::TOperationIdOrAlias& operationIdOrAlias, NJobTrackerClient::TJobId jobId, diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h index be710a04ce..47eea56fb8 100644 --- a/yt/yt/client/api/public.h +++ b/yt/yt/client/api/public.h @@ -188,7 +188,8 @@ DECLARE_REFCOUNTED_STRUCT(TBackupManifest) DECLARE_REFCOUNTED_STRUCT(TListOperationsAccessFilter) DECLARE_REFCOUNTED_CLASS(TDistributedWriteSession) -DECLARE_REFCOUNTED_CLASS(TDistributedWriteCookie) +DECLARE_REFCOUNTED_CLASS(TFragmentWriteCookie) +struct IDistributedTableClientBase; DECLARE_REFCOUNTED_STRUCT(TShuffleHandle) diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h index 6b00c7470a..5f1c945e3f 100644 --- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h +++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h @@ -121,6 +121,7 @@ public: DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetJobInputPaths); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetJobSpec); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetJobStderr); + DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetJobTrace); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetJobFailContext); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, AbandonJob); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PollJobShell); @@ -207,7 +208,7 @@ public: // Distributed table client DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, StartDistributedWriteSession); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, FinishDistributedWriteSession); - DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ParticipantWriteTable, + DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, WriteTableFragment, .SetStreamingEnabled(true)); // Shuffle service diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp index c86a1305af..6ad3c152b4 100644 --- a/yt/yt/client/api/rpc_proxy/client_base.cpp +++ b/yt/yt/client/api/rpc_proxy/client_base.cpp @@ -12,7 +12,7 @@ #include "table_writer.h" #include "transaction.h" -#include <yt/yt/client/api/distributed_table_sessions.h> +#include <yt/yt/client/api/distributed_table_session.h> #include <yt/yt/client/api/file_reader.h> #include <yt/yt/client/api/file_writer.h> #include <yt/yt/client/api/journal_reader.h> diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index b9e4944381..430b366714 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -8,6 +8,7 @@ #include "timestamp_provider.h" #include "transaction.h" +#include <yt/yt/client/api/distributed_table_session.h> #include <yt/yt/client/api/helpers.h> #include <yt/yt/client/api/rowset.h> #include <yt/yt/client/api/transaction.h> @@ -752,12 +753,14 @@ TFuture<void> TClient::AlterReplicationCard( return req->Invoke().As<void>(); } -TFuture<ITableWriterPtr> TClient::CreateParticipantTableWriter( - const TDistributedWriteCookiePtr& cookie, - const TParticipantTableWriterOptions& options) +TFuture<ITableWriterPtr> TClient::CreateFragmentTableWriter( + const TFragmentWriteCookiePtr& cookie, + const TFragmentTableWriterOptions& options) { + using TRspPtr = TIntrusivePtr<NRpc::TTypedClientResponse<NProto::TRspWriteTableFragment>>; + auto proxy = CreateApiServiceProxy(); - auto req = proxy.ParticipantWriteTable(); + auto req = proxy.WriteTableFragment(); InitStreamingRequest(*req); FillRequest(req.Get(), cookie, options); @@ -768,12 +771,15 @@ TFuture<ITableWriterPtr> TClient::CreateParticipantTableWriter( BIND ([=] (const TSharedRef& metaRef) { NApi::NRpcProxy::NProto::TWriteTableMeta meta; if (!TryDeserializeProto(&meta, metaRef)) { - THROW_ERROR_EXCEPTION("Failed to deserialize schema for participant table writer"); + THROW_ERROR_EXCEPTION("Failed to deserialize schema for fragment table writer"); } FromProto(schema.Get(), meta.schema()); + }), + BIND([=] (TRspPtr&& rsp) mutable { + Deserialize(*cookie, ConvertToNode(TYsonString(rsp->cookie()))); })) - .Apply(BIND([=] (IAsyncZeroCopyOutputStreamPtr outputStream) { + .ApplyUnique(BIND([=] (IAsyncZeroCopyOutputStreamPtr&& outputStream) { return NRpcProxy::CreateTableWriter(std::move(outputStream), std::move(schema)); })).As<ITableWriterPtr>(); } @@ -1328,6 +1334,41 @@ TFuture<TGetJobStderrResponse> TClient::GetJobStderr( })); } +TFuture<std::vector<TJobTraceEvent>> TClient::GetJobTrace( + const TOperationIdOrAlias& operationIdOrAlias, + const TGetJobTraceOptions& options) +{ + auto proxy = CreateApiServiceProxy(); + + auto req = proxy.GetJobTrace(); + SetTimeoutOptions(*req, options); + + NScheduler::ToProto(req, operationIdOrAlias); + if (options.JobId) { + ToProto(req->mutable_job_id(), *options.JobId); + } + if (options.TraceId) { + ToProto(req->mutable_trace_id(), *options.TraceId); + } + if (options.FromTime) { + req->set_from_time(*options.FromTime); + } + if (options.ToTime) { + req->set_to_time(*options.ToTime); + } + if (options.FromEventIndex) { + req->set_from_event_index(*options.FromEventIndex); + } + if (options.ToEventIndex) { + req->set_to_event_index(*options.ToEventIndex); + } + + return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspGetJobTracePtr& rsp) { + return FromProto<std::vector<TJobTraceEvent>>(rsp->events()); + })); +} + + TFuture<TSharedRef> TClient::GetJobFailContext( const TOperationIdOrAlias& operationIdOrAlias, NJobTrackerClient::TJobId jobId, diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h index d3905300e3..16e9400391 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.h +++ b/yt/yt/client/api/rpc_proxy/client_impl.h @@ -130,9 +130,9 @@ public: const TAlterReplicationCardOptions& options = {}) override; // Distributed table client - TFuture<ITableWriterPtr> CreateParticipantTableWriter( - const TDistributedWriteCookiePtr& cookie, - const TParticipantTableWriterOptions& options) override; + TFuture<ITableWriterPtr> CreateFragmentTableWriter( + const TFragmentWriteCookiePtr& cookie, + const TFragmentTableWriterOptions& options) override; // Queues. TFuture<NQueueClient::IQueueRowsetPtr> PullQueue( @@ -278,6 +278,10 @@ public: NJobTrackerClient::TJobId jobId, const NApi::TGetJobStderrOptions& options) override; + TFuture<std::vector<TJobTraceEvent>> GetJobTrace( + const NScheduler::TOperationIdOrAlias& operationIdOrAlias, + const NApi::TGetJobTraceOptions& options) override; + TFuture<TSharedRef> GetJobFailContext( const NScheduler::TOperationIdOrAlias& operationIdOrAlias, NJobTrackerClient::TJobId jobId, diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.h b/yt/yt/client/api/rpc_proxy/connection_impl.h index 576b655870..92d1f5b4ec 100644 --- a/yt/yt/client/api/rpc_proxy/connection_impl.h +++ b/yt/yt/client/api/rpc_proxy/connection_impl.h @@ -9,7 +9,7 @@ #include <yt/yt/core/rpc/public.h> // TODO(prime@): Create HTTP endpoint for discovery that works without authentication. -#include <yt/yt/core/misc/atomic_object.h> +#include <library/cpp/yt/threading/atomic_object.h> #include <yt/yt/core/service_discovery/public.h> diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp index a89368573c..3a6a3650be 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.cpp +++ b/yt/yt/client/api/rpc_proxy/helpers.cpp @@ -1,5 +1,6 @@ #include "helpers.h" +#include <yt/yt/client/api/distributed_table_session.h> #include <yt/yt/client/api/rowset.h> #include <yt/yt/client/api/table_client.h> @@ -58,6 +59,17 @@ void ToProto( proto->set_suppress_upstream_sync(options.SuppressUpstreamSync); } +void FromProto( + NApi::TTransactionalOptions* options, + const NProto::TTransactionalOptions& proto) +{ + FromProto(&options->TransactionId, proto.transaction_id()); + options->Ping = proto.ping(); + options->PingAncestors = proto.ping_ancestors(); + options->SuppressTransactionCoordinatorSync = proto.suppress_transaction_coordinator_sync(); + options->SuppressUpstreamSync = proto.suppress_upstream_sync(); +} + void ToProto( NProto::TPrerequisiteOptions* proto, const NApi::TPrerequisiteOptions& options) @@ -426,6 +438,30 @@ void FromProto( FromProto(&result->Errors, proto.errors()); } +void ToProto( + NProto::TJobTraceEvent* proto, + const NApi::TJobTraceEvent& result) +{ + ToProto(proto->mutable_operation_id(), result.OperationId); + ToProto(proto->mutable_job_id(), result.JobId); + ToProto(proto->mutable_trace_id(), result.TraceId); + proto->set_event_index(result.EventIndex); + proto->set_event(result.Event); + proto->set_event_time(ToProto<i64>(result.EventTime)); +} + +void FromProto( + NApi::TJobTraceEvent* result, + const NProto::TJobTraceEvent& proto) +{ + FromProto(&result->OperationId, proto.operation_id()); + FromProto(&result->JobId, proto.job_id()); + FromProto(&result->TraceId, proto.trace_id()); + result->EventIndex = proto.event_index(); + result->Event = proto.event(); + result->EventTime = TInstant::FromValue(proto.event_time()); +} + //////////////////////////////////////////////////////////////////////////////// // MISC //////////////////////////////////////////////////////////////////////////////// @@ -1922,28 +1958,22 @@ void FillRequest( const NYPath::TRichYPath& path, const TDistributedWriteSessionStartOptions& options) { - Y_UNUSED(req, path, options); + ToProto(req->mutable_path(), path); + + if (options.TransactionId) { + ToProto(req->mutable_transactional_options(), options); + } } -void FromProto( +void ParseRequest( + NYPath::TRichYPath* mutablePath, TDistributedWriteSessionStartOptions* mutableOptions, const TReqStartDistributedWriteSession& req) { - Y_UNUSED(req, mutableOptions); -} - -void FromProto( - TDistributedWriteSession* mutableSession, - TRspStartDistributedWriteSession&& rsp) -{ - Y_UNUSED(mutableSession, rsp); -} - -void ToProto( - TRspStartDistributedWriteSession* rsp, - const TDistributedWriteSessionPtr& session) -{ - Y_UNUSED(rsp, session); + *mutablePath = FromProto<NYPath::TRichYPath>(req.path()); + if (req.has_transactional_options()) { + FromProto(mutableOptions, req.transactional_options()); + } } //////////////////////////////////////////////////////////////////////////////// @@ -1953,52 +1983,44 @@ void FillRequest( TDistributedWriteSessionPtr session, const TDistributedWriteSessionFinishOptions& options) { - Y_UNUSED(req, session, options); + req->set_session(ConvertToYsonString(session).ToString()); + req->set_max_children_per_attach_request(options.MaxChildrenPerAttachRequest); } -void FromProto( +void ParseRequest( + TDistributedWriteSessionPtr* mutableSession, TDistributedWriteSessionFinishOptions* mutableOptions, const TReqFinishDistributedWriteSession& req) { - Y_UNUSED(req, mutableOptions); -} - -void FromProto( - TDistributedWriteSession* mutableSession, - const TReqFinishDistributedWriteSession& req) -{ - Y_UNUSED(mutableSession, req); + *mutableSession = ConvertTo<TDistributedWriteSessionPtr>(TYsonString(req.session())); + mutableOptions->MaxChildrenPerAttachRequest = req.max_children_per_attach_request(); } //////////////////////////////////////////////////////////////////////////////// void FillRequest( - TReqParticipantWriteTable* req, - const TDistributedWriteCookiePtr& cookie, - const TParticipantTableWriterOptions& options) + TReqWriteTableFragment* req, + const TFragmentWriteCookiePtr& cookie, + const TFragmentTableWriterOptions& options) { - Y_UNUSED(req, cookie, options); -} + req->set_cookie(ConvertToYsonString(cookie).ToString()); -void FromProto( - TParticipantTableWriterOptions* mutableOptions, - const TReqParticipantWriteTable& req) -{ - Y_UNUSED(req, mutableOptions); -} - -void FromProto( - TDistributedWriteCookie* cookie, - const TReqParticipantWriteTable& req) -{ - Y_UNUSED(cookie, req); + if (options.Config) { + req->set_config(ConvertToYsonString(*options.Config).ToString()); + } } -void ToProto( - TRspParticipantWriteTable* rsp, - const TDistributedWriteCookiePtr& cookie) +void ParseRequest( + TFragmentWriteCookiePtr* mutableCookie, + TFragmentTableWriterOptions* mutableOptions, + const TReqWriteTableFragment& req) { - Y_UNUSED(rsp, cookie); + *mutableCookie = ConvertTo<TFragmentWriteCookiePtr>(TYsonString(req.cookie())); + if (req.has_config()) { + mutableOptions->Config = ConvertTo<TTableWriterConfigPtr>(TYsonString(req.config())); + } else { + mutableOptions->Config = ConvertTo<TTableWriterConfigPtr>(TYsonString(TStringBuf("{}"))); + } } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/rpc_proxy/helpers.h b/yt/yt/client/api/rpc_proxy/helpers.h index 9027337359..8849b7be44 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.h +++ b/yt/yt/client/api/rpc_proxy/helpers.h @@ -30,6 +30,10 @@ void ToProto( NProto::TTransactionalOptions* proto, const NApi::TTransactionalOptions& options); +void FromProto( + NApi::TTransactionalOptions* options, + const NProto::TTransactionalOptions& proto); + void ToProto( NProto::TPrerequisiteOptions* proto, const NApi::TPrerequisiteOptions& options); @@ -106,6 +110,14 @@ void FromProto( NApi::TListJobsResult* result, const NProto::TListJobsResult& proto); +void ToProto( + NProto::TJobTraceEvent* proto, + const NApi::TJobTraceEvent& result); + +void FromProto( + NApi::TJobTraceEvent* result, + const NProto::TJobTraceEvent& proto); + void ToProto(NProto::TColumnSchema* protoSchema, const NTableClient::TColumnSchema& schema); void FromProto(NTableClient::TColumnSchema* schema, const NProto::TColumnSchema& protoSchema); @@ -279,18 +291,11 @@ void FillRequest( const NYPath::TRichYPath& path, const TDistributedWriteSessionStartOptions& options); -void FromProto( +void ParseRequest( + NYPath::TRichYPath* mutablePath, TDistributedWriteSessionStartOptions* mutableOptions, const TReqStartDistributedWriteSession& req); -void FromProto( - TDistributedWriteSession* mutableSession, - TRspStartDistributedWriteSession&& rsp); - -void ToProto( - TRspStartDistributedWriteSession* rsp, - const TDistributedWriteSessionPtr& session); - //////////////////////////////////////////////////////////////////////////////// void FillRequest( @@ -298,32 +303,22 @@ void FillRequest( TDistributedWriteSessionPtr session, const TDistributedWriteSessionFinishOptions& options); -void FromProto( +void ParseRequest( + TDistributedWriteSessionPtr* mutableSession, TDistributedWriteSessionFinishOptions* mutableOptions, const TReqFinishDistributedWriteSession& req); -void FromProto( - TDistributedWriteSession* mutableSession, - const TReqFinishDistributedWriteSession& req); - //////////////////////////////////////////////////////////////////////////////// void FillRequest( - TReqParticipantWriteTable* req, - const TDistributedWriteCookiePtr& cookie, - const TParticipantTableWriterOptions& options); - -void FromProto( - TParticipantTableWriterOptions* mutableOptions, - const TReqParticipantWriteTable& req); - -void FromProto( - TDistributedWriteCookie* cookie, - const TReqParticipantWriteTable& req); - -void ToProto( - TRspParticipantWriteTable* rsp, - const TDistributedWriteCookiePtr& cookie); + TReqWriteTableFragment* req, + const TFragmentWriteCookiePtr& cookie, + const TFragmentTableWriterOptions& options); + +void ParseRequest( + TFragmentWriteCookiePtr* mutableCookie, + TFragmentTableWriterOptions* mutableOptions, + const TReqWriteTableFragment& req); } // namespace NProto diff --git a/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp b/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp index 392312d511..571d0901b6 100644 --- a/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp +++ b/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp @@ -103,9 +103,8 @@ private: TIndexInfo indexInfo{ .TableId = FromProto<NObjectClient::TObjectId>(protoIndexInfo.index_table_id()), .Kind = FromProto<ESecondaryIndexKind>(protoIndexInfo.index_kind()), - .Predicate = protoIndexInfo.has_predicate() - ? std::make_optional(FromProto<TString>(protoIndexInfo.predicate())) - : std::nullopt, + .Predicate = YT_PROTO_OPTIONAL(protoIndexInfo, predicate), + .UnfoldedColumn = YT_PROTO_OPTIONAL(protoIndexInfo, unfolded_column), }; THROW_ERROR_EXCEPTION_UNLESS(TEnumTraits<ESecondaryIndexKind>::FindLiteralByValue(indexInfo.Kind).has_value(), "Unsupported secondary index kind %Qlv (client not up-to-date)", diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp index e719d41f8c..e52fdf7acb 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp @@ -938,7 +938,7 @@ TFuture<void> TTransaction::FinishDistributedWriteSession( ValidateActive(); return Client_->FinishDistributedWriteSession( std::move(session), - PatchTransactionId(options)); + options); } TFuture<void> TTransaction::DoAbort( diff --git a/yt/yt/client/chunk_client/public.h b/yt/yt/client/chunk_client/public.h index e38cbb567f..5c93ee55fd 100644 --- a/yt/yt/client/chunk_client/public.h +++ b/yt/yt/client/chunk_client/public.h @@ -88,6 +88,13 @@ YT_DEFINE_ERROR_ENUM( ((DiskHealthCheckFailed) (759)) ((TooManyChunksToFetch) (760)) ((TotalMemoryLimitExceeded) (761)) + ((ForbiddenErasureCodec) (762)) +); + +DEFINE_ENUM(EUpdateMode, + ((None) (0)) + ((Append) (1)) + ((Overwrite) (2)) ); using TChunkId = NObjectClient::TObjectId; diff --git a/yt/yt/client/driver/distributed_table_commands.cpp b/yt/yt/client/driver/distributed_table_commands.cpp index 5d41f1abc8..7bd88c5279 100644 --- a/yt/yt/client/driver/distributed_table_commands.cpp +++ b/yt/yt/client/driver/distributed_table_commands.cpp @@ -2,34 +2,16 @@ #include "config.h" #include "helpers.h" -#include <yt/yt/client/api/distributed_table_sessions.h> - -// #include <yt/yt/client/api/rowset.h> -// #include <yt/yt/client/api/skynet.h> - -// #include <yt/yt/client/chaos_client/replication_card_serialization.h> +#include <yt/yt/client/api/distributed_table_session.h> #include <yt/yt/client/formats/config.h> #include <yt/yt/client/formats/parser.h> -// #include <yt/yt/client/table_client/adapters.h> -// #include <yt/yt/client/table_client/blob_reader.h> -// #include <yt/yt/client/table_client/columnar_statistics.h> -// #include <yt/yt/client/table_client/row_buffer.h> -// #include <yt/yt/client/table_client/table_consumer.h> -// #include <yt/yt/client/table_client/table_output.h> -// #include <yt/yt/client/table_client/unversioned_writer.h> -// #include <yt/yt/client/table_client/versioned_writer.h> -// #include <yt/yt/client/table_client/wire_protocol.h> - -// #include <yt/yt/client/tablet_client/table_mount_cache.h> - #include <yt/yt/client/ypath/public.h> #include <yt/yt/library/formats/format.h> #include <yt/yt/core/concurrency/scheduler_api.h> -// #include <yt/yt/core/misc/finally.h> #include <yt/yt/core/ytree/convert.h> @@ -45,20 +27,6 @@ using namespace NYPath; //////////////////////////////////////////////////////////////////////////////// -// namespace { - -// NLogging::TLogger WithCommandTag( -// const NLogging::TLogger& logger, -// const ICommandContextPtr& context) -// { -// return logger.WithTag("Command: %v", -// context->Request().CommandName); -// } - -// } // namespace - -//////////////////////////////////////////////////////////////////////////////// - void TStartDistributedWriteSessionCommand::Register(TRegistrar registrar) { registrar.Parameter("path", &TThis::Path); @@ -88,8 +56,6 @@ void TFinishDistributedWriteSessionCommand::Register(TRegistrar registrar) // -> Nothing void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context) { - auto transaction = AttachTransaction(context, /*required*/ false); - auto session = ConvertTo<TDistributedWriteSessionPtr>(Session); WaitFor(context->GetClient()->FinishDistributedWriteSession( @@ -102,35 +68,35 @@ void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context //////////////////////////////////////////////////////////////////////////////// -void TParticipantWriteTableCommand::Execute(ICommandContextPtr context) +void TWriteTableFragmentCommand::Execute(ICommandContextPtr context) { - TTypedCommand<NApi::TParticipantTableWriterOptions>::Execute(std::move(context)); + TTypedCommand<NApi::TFragmentTableWriterOptions>::Execute(std::move(context)); } -void TParticipantWriteTableCommand::Register(TRegistrar /*registrar*/) +void TWriteTableFragmentCommand::Register(TRegistrar /*registrar*/) { } -TFuture<NApi::ITableWriterPtr> TParticipantWriteTableCommand::CreateTableWriter( +TFuture<NApi::ITableWriterPtr> TWriteTableFragmentCommand::CreateTableWriter( const ICommandContextPtr& context) const { PutMethodInfoInTraceContext("participant_write_table"); return context ->GetClient() - ->CreateParticipantTableWriter( - StaticPointerCast<TDistributedWriteCookie>(ResultingCookie), - TTypedCommand<TParticipantTableWriterOptions>::Options); + ->CreateFragmentTableWriter( + StaticPointerCast<TFragmentWriteCookie>(ResultingCookie), + TTypedCommand<TFragmentTableWriterOptions>::Options); } // -> Cookie -void TParticipantWriteTableCommand::DoExecute(ICommandContextPtr context) +void TWriteTableFragmentCommand::DoExecute(ICommandContextPtr context) { - auto cookie = ConvertTo<TDistributedWriteCookiePtr>(Cookie); + auto cookie = ConvertTo<TFragmentWriteCookiePtr>(Cookie); ResultingCookie = StaticPointerCast<TRefCounted>(std::move(cookie)); DoExecuteImpl(context); ProduceOutput(context, [cookie = std::move(ResultingCookie)] (IYsonConsumer* consumer) { - Serialize(StaticPointerCast<TDistributedWriteCookie>(cookie), consumer); + Serialize(StaticPointerCast<TFragmentWriteCookie>(cookie), consumer); }); } diff --git a/yt/yt/client/driver/distributed_table_commands.h b/yt/yt/client/driver/distributed_table_commands.h index ea3f7a9b7b..fa3d6bb4e5 100644 --- a/yt/yt/client/driver/distributed_table_commands.h +++ b/yt/yt/client/driver/distributed_table_commands.h @@ -45,8 +45,8 @@ private: //////////////////////////////////////////////////////////////////////////////// // -> Cookie -class TParticipantWriteTableCommand - : public TTypedCommand<NApi::TParticipantTableWriterOptions> +class TWriteTableFragmentCommand + : public TTypedCommand<NApi::TFragmentTableWriterOptions> , private TWriteTableCommand { public: @@ -54,7 +54,7 @@ public: // ambiguity in dispatch. void Execute(ICommandContextPtr context) override; - REGISTER_YSON_STRUCT_LITE(TParticipantWriteTableCommand); + REGISTER_YSON_STRUCT_LITE(TWriteTableFragmentCommand); static void Register(TRegistrar registrar); diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp index 19a0f3bdc5..e13634f539 100644 --- a/yt/yt/client/driver/driver.cpp +++ b/yt/yt/client/driver/driver.cpp @@ -292,6 +292,7 @@ public: REGISTER_ALL(TGetJobInputCommand, "get_job_input", Null, Binary, false, true ); REGISTER_ALL(TGetJobInputPathsCommand, "get_job_input_paths", Null, Structured, false, true ); REGISTER_ALL(TGetJobStderrCommand, "get_job_stderr", Null, Binary, false, true ); + REGISTER_ALL(TGetJobTraceCommand, "get_job_trace", Null, Structured, false, true ); REGISTER_ALL(TGetJobFailContextCommand, "get_job_fail_context", Null, Binary, false, true ); REGISTER_ALL(TGetJobSpecCommand, "get_job_spec", Null, Structured, false, true ); REGISTER_ALL(TListOperationsCommand, "list_operations", Null, Structured, false, false); @@ -395,9 +396,9 @@ public: REGISTER_ALL(TRevokeLeaseCommand, "revoke_lease", Null, Structured, true, false); REGISTER_ALL(TReferenceLeaseCommand, "reference_lease", Null, Structured, true, false); REGISTER_ALL(TUnreferenceLeaseCommand, "unreference_lease", Null, Structured, true, false); - REGISTER_ALL(TStartDistributedWriteSessionCommand, "start_distributed_write_session", Null, Structured, true, false); - REGISTER_ALL(TFinishDistributedWriteSessionCommand, "finish_distributed_write_session", Null, Null, true, false); - REGISTER_ALL(TParticipantWriteTableCommand, "participant_write_table", Tabular, Structured, true, true ); + REGISTER_ALL(TStartDistributedWriteSessionCommand, "start_distributed_write_session", Null, Structured, true, false); + REGISTER_ALL(TFinishDistributedWriteSessionCommand, "finish_distributed_write_session", Null, Null, true, false); + REGISTER_ALL(TWriteTableFragmentCommand, "distributed_write_table_partition", Tabular, Structured, true, true ); } #undef REGISTER diff --git a/yt/yt/client/driver/scheduler_commands.cpp b/yt/yt/client/driver/scheduler_commands.cpp index 9cb7396725..53f82c7bf2 100644 --- a/yt/yt/client/driver/scheduler_commands.cpp +++ b/yt/yt/client/driver/scheduler_commands.cpp @@ -174,6 +174,52 @@ void TGetJobStderrCommand::DoExecute(ICommandContextPtr context) //////////////////////////////////////////////////////////////////////////////// +void TGetJobTraceCommand::Register(TRegistrar registrar) +{ + registrar.ParameterWithUniversalAccessor<std::optional<TJobId>>( + "job_id", + [] (TThis* command) -> auto& {return command->Options.JobId; }) + .Optional(/*init*/ false); + + registrar.ParameterWithUniversalAccessor<std::optional<TJobTraceId>>( + "trace_id", + [] (TThis* command) -> auto& {return command->Options.TraceId; }) + .Optional(/*init*/ false); + + registrar.ParameterWithUniversalAccessor<std::optional<i64>>( + "from_event_index", + [] (TThis* command) -> auto& {return command->Options.FromEventIndex; }) + .Optional(/*init*/ false); + + registrar.ParameterWithUniversalAccessor<std::optional<i64>>( + "to_event_index", + [] (TThis* command) -> auto& {return command->Options.ToEventIndex; }) + .Optional(/*init*/ false); + + registrar.ParameterWithUniversalAccessor<std::optional<i64>>( + "from_time", + [] (TThis* command) -> auto& {return command->Options.FromTime; }) + .Optional(/*init*/ false); + + registrar.ParameterWithUniversalAccessor<std::optional<i64>>( + "to_time", + [] (TThis* command) -> auto& {return command->Options.ToTime; }) + .Optional(/*init*/ false); +} + +void TGetJobTraceCommand::DoExecute(ICommandContextPtr context) +{ + auto result = WaitFor(context->GetClient()->GetJobTrace(OperationIdOrAlias, Options)) + .ValueOrThrow(); + + context->ProduceOutputValue(BuildYsonStringFluently() + .DoListFor(result, [&] (TFluentList fluent, const TJobTraceEvent& traceEvent) { + Serialize(traceEvent, fluent.GetConsumer()); + })); +} + +//////////////////////////////////////////////////////////////////////////////// + void TGetJobFailContextCommand::Register(TRegistrar registrar) { registrar.Parameter("job_id", &TThis::JobId); diff --git a/yt/yt/client/driver/scheduler_commands.h b/yt/yt/client/driver/scheduler_commands.h index 280d55e331..59d1e9d21e 100644 --- a/yt/yt/client/driver/scheduler_commands.h +++ b/yt/yt/client/driver/scheduler_commands.h @@ -137,6 +137,20 @@ private: //////////////////////////////////////////////////////////////////////////////// +class TGetJobTraceCommand + : public TSimpleOperationCommandBase<NApi::TGetJobTraceOptions> +{ +public: + REGISTER_YSON_STRUCT_LITE(TGetJobTraceCommand); + + static void Register(TRegistrar registrar); + +private: + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + class TGetJobFailContextCommand : public TSimpleOperationCommandBase<NApi::TGetJobFailContextOptions> { diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index a3410ff50a..f43f76958d 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -15,6 +15,8 @@ #include <yt/yt/client/object_client/helpers.h> +#include <yt/yt/client/security_client/public.h> + #include <yt/yt/core/concurrency/periodic_executor.h> #include <yt/yt/core/net/address.h> @@ -40,19 +42,20 @@ DECLARE_REFCOUNTED_CLASS(TClient) //////////////////////////////////////////////////////////////////////////////// -std::optional<TString> GetDataCenterByClient(const IClientPtr& client) +TFuture<std::optional<TString>> GetDataCenterByClient(const IClientPtr& client) { TListNodeOptions options; options.MaxSize = 1; - auto items = NConcurrency::WaitFor(client->ListNode(RpcProxiesPath, options)) - .ValueOrThrow(); - auto itemsList = NYTree::ConvertTo<NYTree::IListNodePtr>(items); - if (!itemsList->GetChildCount()) { - return std::nullopt; - } - auto host = itemsList->GetChildren()[0]; - return NNet::InferYPClusterFromHostName(host->GetValue<TString>()); + return client->ListNode(RpcProxiesPath, options) + .Apply(BIND([] (const NYson::TYsonString& items) { + auto itemsList = NYTree::ConvertTo<NYTree::IListNodePtr>(items); + if (!itemsList->GetChildCount()) { + return std::optional<TString>(); + } + auto host = itemsList->GetChildren()[0]; + return NNet::InferYPClusterFromHostName(host->GetValue<TString>()); + })); } class TTransaction @@ -248,17 +251,24 @@ DECLARE_REFCOUNTED_TYPE(TTransaction) //////////////////////////////////////////////////////////////////////////////// +enum class EClientPriority : ui8 +{ + Local, + Remote, + Undefined, +}; + DECLARE_REFCOUNTED_STRUCT(TClientDescription) struct TClientDescription final { - TClientDescription(IClientPtr client, int priority) + TClientDescription(IClientPtr client, EClientPriority priority) : Client(std::move(client)) , Priority(priority) { } IClientPtr Client; - int Priority; + EClientPriority Priority; std::atomic<bool> HasErrors{false}; }; @@ -407,6 +417,7 @@ public: UNIMPLEMENTED_METHOD(TFuture<NYson::TYsonString>, GetJobInputPaths, (NJobTrackerClient::TJobId, const TGetJobInputPathsOptions&)); UNIMPLEMENTED_METHOD(TFuture<NYson::TYsonString>, GetJobSpec, (NJobTrackerClient::TJobId, const TGetJobSpecOptions&)); UNIMPLEMENTED_METHOD(TFuture<TGetJobStderrResponse>, GetJobStderr, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobStderrOptions&)); + UNIMPLEMENTED_METHOD(TFuture<std::vector<TJobTraceEvent>>, GetJobTrace, (const NScheduler::TOperationIdOrAlias&, const TGetJobTraceOptions&)); UNIMPLEMENTED_METHOD(TFuture<TSharedRef>, GetJobFailContext, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobFailContextOptions&)); UNIMPLEMENTED_METHOD(TFuture<TListOperationsResult>, ListOperations, (const TListOperationsOptions&)); UNIMPLEMENTED_METHOD(TFuture<TListJobsResult>, ListJobs, (const NScheduler::TOperationIdOrAlias&, const TListJobsOptions&)); @@ -475,7 +486,7 @@ public: UNIMPLEMENTED_METHOD(TFuture<TGetFlowViewResult>, GetFlowView, (const NYPath::TYPath&, const NYPath::TYPath&, const TGetFlowViewOptions&)); UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&)); UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&)); - UNIMPLEMENTED_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (const TDistributedWriteCookiePtr&, const TParticipantTableWriterOptions&)); + UNIMPLEMENTED_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (const TFragmentWriteCookiePtr&, const TFragmentTableWriterOptions&)); UNIMPLEMENTED_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (const TString& , int, const TStartShuffleOptions&)); UNIMPLEMENTED_METHOD(TFuture<void>, FinishShuffle, (const TShuffleHandlePtr&, const TFinishShuffleOptions&)); UNIMPLEMENTED_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (const TShuffleHandlePtr&, int, const NTableClient::TTableReaderConfigPtr&)); @@ -502,6 +513,7 @@ private: private: const TFederationConfigPtr Config_; const NConcurrency::TPeriodicExecutorPtr Executor_; + const TString LocalDatacenter_; std::vector<TClientDescriptionPtr> UnderlyingClients_; IClientPtr ActiveClient_; @@ -592,19 +604,14 @@ TClient::TClient(const std::vector<IClientPtr>& underlyingClients, TFederationCo NRpc::TDispatcher::Get()->GetLightInvoker(), BIND(&TClient::CheckClustersHealth, MakeWeak(this)), Config_->ClusterHealthCheckPeriod)) + , LocalDatacenter_(NNet::GetLocalYPCluster()) { YT_VERIFY(!underlyingClients.empty()); UnderlyingClients_.reserve(underlyingClients.size()); - const auto& localDatacenter = NNet::GetLocalYPCluster(); for (const auto& client : underlyingClients) { - int priority = GetDataCenterByClient(client) == localDatacenter ? 1 : 0; - UnderlyingClients_.push_back(New<TClientDescription>(client, priority)); + UnderlyingClients_.push_back(New<TClientDescription>(client, EClientPriority::Undefined)); } - std::stable_sort(UnderlyingClients_.begin(), UnderlyingClients_.end(), [] (const auto& lhs, const auto& rhs) { - return lhs->Priority > rhs->Priority; - }); - ActiveClient_ = UnderlyingClients_[0]->Client; ActiveClientIndex_ = 0; @@ -614,43 +621,54 @@ TClient::TClient(const std::vector<IClientPtr>& underlyingClients, TFederationCo void TClient::CheckClustersHealth() { TCheckClusterLivenessOptions options; - options.CheckCypressRoot = true; + options.CheckCypressRoot = Config_->CheckCypressRoot; options.CheckTabletCellBundle = Config_->BundleName; - int activeClientIndex = ActiveClientIndex_.load(); - std::optional<int> betterClientIndex; - std::vector<TFuture<void>> checks; checks.reserve(UnderlyingClients_.size()); - for (const auto& clientDescription : UnderlyingClients_) { checks.emplace_back(clientDescription->Client->CheckClusterLiveness(options)); } - for (int index = 0; index < std::ssize(checks); ++index) { + for (int index = 0; index != std::ssize(checks); ++index) { const auto& check = checks[index]; - bool hasErrors = !NConcurrency::WaitFor(check).IsOK(); - UnderlyingClients_[index]->HasErrors = hasErrors; - if (!betterClientIndex && !hasErrors && index < activeClientIndex) { - betterClientIndex = index; - } + auto error = NConcurrency::WaitFor(check); + YT_LOG_DEBUG_UNLESS(error.IsOK(), error, "Cluster %Qv is marked as unhealthy", + UnderlyingClients_[index]->Client->GetClusterName(/*fetchIfNull*/ false)); + UnderlyingClients_[index]->HasErrors = !error.IsOK() + && !error.FindMatching(NSecurityClient::EErrorCode::AuthorizationError); // Ignore authorization errors. } - if (betterClientIndex && ActiveClientIndex_ == activeClientIndex) { - int newClientIndex = *betterClientIndex; - auto guard = NThreading::WriterGuard(Lock_); - ActiveClient_ = UnderlyingClients_[newClientIndex]->Client; - ActiveClientIndex_ = newClientIndex; - return; + for (int index = 0; index != std::ssize(UnderlyingClients_); ++index) { + auto& client = UnderlyingClients_[index]; + // `Priority` accessed only from this thread so it is not require synchronization. + if (client->Priority == EClientPriority::Undefined) { + auto clientDatacenter = NConcurrency::WaitFor(GetDataCenterByClient(client->Client)); + if (clientDatacenter.IsOK()) { + client->Priority = clientDatacenter.Value() == LocalDatacenter_ + ? EClientPriority::Local + : EClientPriority::Remote; + } + } + } + // Compute better activeClientIndex. + int betterClientIndex = ActiveClientIndex_.load(); + auto betterPriority = UnderlyingClients_[betterClientIndex]->HasErrors + ? EClientPriority::Undefined + : UnderlyingClients_[betterClientIndex]->Priority; + + for (int index = 0; index != std::ssize(UnderlyingClients_); ++index) { + const auto& client = UnderlyingClients_[index]; + if (!client->HasErrors && client->Priority < betterPriority) { + betterClientIndex = index; + betterPriority = client->Priority; + } } - // If active cluster is not healthy, try changing it. - if (UnderlyingClients_[activeClientIndex]->HasErrors) { + if (ActiveClientIndex_ != betterClientIndex) { auto guard = NThreading::WriterGuard(Lock_); - // Check that active client wasn't changed. - if (ActiveClientIndex_ == activeClientIndex && UnderlyingClients_[activeClientIndex]->HasErrors) { - UpdateActiveClient(); - } + ActiveClient_ = UnderlyingClients_[betterClientIndex]->Client; + ActiveClientIndex_ = betterClientIndex; } } @@ -751,42 +769,34 @@ void TClient::HandleError(const TErrorOr<void>& error, int clientIndex) if (ActiveClientIndex_ != clientIndex) { return; } - - auto guard = WriterGuard(Lock_); - if (ActiveClientIndex_ != clientIndex) { - return; - } - UpdateActiveClient(); } void TClient::UpdateActiveClient() { - VERIFY_WRITER_SPINLOCK_AFFINITY(Lock_); - - int activeClientIndex = ActiveClientIndex_.load(); + VERIFY_THREAD_AFFINITY_ANY(); for (int index = 0; index < std::ssize(UnderlyingClients_); ++index) { const auto& clientDescription = UnderlyingClients_[index]; if (!clientDescription->HasErrors) { - if (activeClientIndex != index) { + if (ActiveClientIndex_ != index) { YT_LOG_DEBUG("Active client was changed (PreviousClientIndex: %v, NewClientIndex: %v)", - activeClientIndex, + ActiveClientIndex_.load(), index); + auto guard = NThreading::WriterGuard(Lock_); + ActiveClientIndex_ = index; + ActiveClient_ = clientDescription->Client; } - - ActiveClient_ = clientDescription->Client; - ActiveClientIndex_ = index; - break; + return; } } } TClient::TActiveClientInfo TClient::GetActiveClient() { - auto guard = ReaderGuard(Lock_); YT_LOG_TRACE("Request will be send to the active client (ClientIndex: %v)", ActiveClientIndex_.load()); + auto guard = ReaderGuard(Lock_); return {ActiveClient_, ActiveClientIndex_.load()}; } diff --git a/yt/yt/client/federated/config.cpp b/yt/yt/client/federated/config.cpp index edba2d190e..f5381e890a 100644 --- a/yt/yt/client/federated/config.cpp +++ b/yt/yt/client/federated/config.cpp @@ -15,6 +15,9 @@ void TFederationConfig::Register(TRegistrar registrar) .GreaterThan(TDuration::Zero()) .Default(TDuration::Seconds(60)); + registrar.Parameter("check_cypress_root", &TThis::CheckCypressRoot) + .Default(true); + registrar.Parameter("cluster_retry_attempts", &TThis::ClusterRetryAttempts) .GreaterThanOrEqual(0) .Default(3); diff --git a/yt/yt/client/federated/config.h b/yt/yt/client/federated/config.h index 115d0450dc..d092c3f019 100644 --- a/yt/yt/client/federated/config.h +++ b/yt/yt/client/federated/config.h @@ -22,6 +22,9 @@ public: //! How often cluster liveness should be checked on the background. TDuration ClusterHealthCheckPeriod; + //! Checks Cypress root availability in liveness check. + bool CheckCypressRoot; + //! Maximum number of retry attempts to make. int ClusterRetryAttempts; diff --git a/yt/yt/client/federated/unittests/client_ut.cpp b/yt/yt/client/federated/unittests/client_ut.cpp index 0bc009fb6f..169c1c5039 100644 --- a/yt/yt/client/federated/unittests/client_ut.cpp +++ b/yt/yt/client/federated/unittests/client_ut.cpp @@ -119,13 +119,13 @@ TEST(TFederatedClientTest, Basic) .WillRepeatedly(Return(VoidFuture)); // Creation of federated client. - std::vector<IClientPtr> clients{mockClientSas, mockClientVla}; + std::vector<IClientPtr> clients{mockClientVla, mockClientSas}; auto config = New<TFederationConfig>(); - config->ClusterHealthCheckPeriod = TDuration::Seconds(5); + config->ClusterHealthCheckPeriod = TDuration::Seconds(3); config->ClusterRetryAttempts = 1; auto federatedClient = CreateClient(clients, config); - // 1. `vla` client should be used as closest cluster. + // 1. `vla` client should be used as first cluster. // 2. error from `vla` cluster should be received. // 3. `sas` client should be used as other cluster. @@ -213,6 +213,9 @@ TEST(TFederatedClientTest, CheckHealth) EXPECT_CALL(*mockClientSas, LookupRows(data.Path, _, _, _)) .WillOnce(Return(MakeFuture(data.LookupResult2))); + // Wait initialization and choose `local` cluster. + Sleep(TDuration::Seconds(2)); + // From `vla`. { auto result = federatedClient->LookupRows(data.Path, data.NameTable, data.Keys); @@ -462,8 +465,6 @@ TEST(TFederatedClientTest, AttachTransaction) auto mockConnectionVla = New<TStrictMockConnection>(); EXPECT_CALL(*mockConnectionVla, GetClusterTag()) .WillRepeatedly(Return(NObjectClient::TCellTag(456))); - EXPECT_CALL(*mockClientVla, GetConnection()) - .WillOnce(Return(mockConnectionVla)); // Creation of federated client. std::vector<IClientPtr> clients{mockClientSas, mockClientVla}; @@ -471,6 +472,9 @@ TEST(TFederatedClientTest, AttachTransaction) config->ClusterHealthCheckPeriod = TDuration::Seconds(5); auto federatedClient = CreateClient(clients, config); + // Wait initialization. + Sleep(TDuration::Seconds(2)); + auto mockTransactionSas = New<TStrictMockTransaction>(); auto transactionId = TGuid(0, 123 << 16, 0, 0); EXPECT_CALL(*mockTransactionSas, GetId()) diff --git a/yt/yt/client/federated/unittests/connection_ut.cpp b/yt/yt/client/federated/unittests/connection_ut.cpp index 7f7473d019..e512087a14 100644 --- a/yt/yt/client/federated/unittests/connection_ut.cpp +++ b/yt/yt/client/federated/unittests/connection_ut.cpp @@ -35,11 +35,19 @@ TEST(TFederatedConnectionTest, CreateClient) // To identify best (closest) cluster. NYson::TYsonString nodesYsonSas(TStringBuf(R"(["a-rpc-proxy-a.sas.yp-c.yandex.net:9013"])")); EXPECT_CALL(*mockClientSas, ListNode("//sys/rpc_proxies", _)) - .WillRepeatedly(Return(MakeFuture(nodesYsonSas))); + .WillOnce(Return(MakeFuture(nodesYsonSas))); + EXPECT_CALL(*mockClientSas, GetNode("//test/node", _)) + .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TGetNodeOptions&) { + return MakeFuture(nodesYsonSas); + }); NYson::TYsonString nodesYsonVla(TStringBuf(R"(["a-rpc-proxy-a.vla.yp-c.yandex.net:9013"])")); EXPECT_CALL(*mockClientVla, ListNode("//sys/rpc_proxies", _)) - .WillRepeatedly(Return(MakeFuture(nodesYsonVla))); + .WillOnce(Return(MakeFuture(nodesYsonVla))); + EXPECT_CALL(*mockClientVla, GetNode("//test/node", _)) + .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TGetNodeOptions&) { + return MakeFuture(nodesYsonVla); + }); EXPECT_CALL(*mockClientSas, CheckClusterLiveness(_)) .WillRepeatedly(Return(VoidFuture)); @@ -65,8 +73,81 @@ TEST(TFederatedConnectionTest, CreateClient) auto connection = CreateConnection({mockConnectionSas, mockConnectionVla}, config); EXPECT_THAT(connection->GetLoggingTag(), testing::HasSubstr("Clusters: (sas; vla)")); auto client = connection->CreateClient(clientOptions); - auto nodes = client->ListNode("//sys/rpc_proxies").Get().ValueOrThrow(); + auto nodes = client->GetNode("//test/node").Get().ValueOrThrow(); EXPECT_EQ(nodesYsonSas, nodes); + + Sleep(TDuration::Seconds(2)); + auto nodes2 = client->GetNode("//test/node").Get().ValueOrThrow(); + EXPECT_EQ(nodesYsonSas, nodes2); +} + +TEST(TFederatedConnectionTest, CreateClientWhenOneClusterUnavailable) +{ + auto config = New<TFederationConfig>(); + config->BundleName = "my_bundle"; + config->ClusterHealthCheckPeriod = TDuration::Seconds(5); + + auto mockConnectionSas = New<TStrictMockConnection>(); + auto mockConnectionVla = New<TStrictMockConnection>(); + auto mockClientSas = New<TStrictMockClient>(); + auto mockClientVla = New<TStrictMockClient>(); + + // To identify best (closest) cluster. + NYson::TYsonString nodesYsonSas(TStringBuf(R"(["a-rpc-proxy-a.sas.yp-c.yandex.net:9013"])")); + EXPECT_CALL(*mockClientSas, ListNode("//sys/rpc_proxies", _)) + .WillOnce(Return(MakeFuture<NYson::TYsonString>(TError("Failure")))) + .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TListNodeOptions&) { + return MakeFuture(nodesYsonSas); + }); + EXPECT_CALL(*mockClientSas, GetNode("//test/node", _)) + .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TGetNodeOptions&) { + return MakeFuture(nodesYsonSas); + }); + + NYson::TYsonString nodesYsonVla(TStringBuf(R"(["a-rpc-proxy-a.vla.yp-c.yandex.net:9013"])")); + EXPECT_CALL(*mockClientVla, ListNode("//sys/rpc_proxies", _)) + .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TListNodeOptions&) { + return MakeFuture(nodesYsonVla); + }); + EXPECT_CALL(*mockClientVla, GetNode("//test/node", _)) + .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TGetNodeOptions&) { + return MakeFuture(nodesYsonVla); + }); + + EXPECT_CALL(*mockClientSas, CheckClusterLiveness(_)) + .WillRepeatedly(Return(VoidFuture)); + EXPECT_CALL(*mockClientVla, CheckClusterLiveness(_)) + .WillRepeatedly(Return(VoidFuture)); + + NApi::TClientOptions clientOptions; + EXPECT_CALL(*mockConnectionSas, CreateClient(::testing::Ref(clientOptions))) + .WillOnce(Return(mockClientSas)); + EXPECT_CALL(*mockConnectionVla, CreateClient(::testing::Ref(clientOptions))) + .WillOnce(Return(mockClientVla)); + + EXPECT_CALL(*mockConnectionSas, GetLoggingTag()) + .WillOnce(ReturnRefOfCopy(TString("sas"))); + EXPECT_CALL(*mockConnectionVla, GetLoggingTag()) + .WillOnce(ReturnRefOfCopy(TString("vla"))); + + auto finally = Finally([oldLocalHostName = NNet::GetLocalHostName()] { + NNet::WriteLocalHostName(oldLocalHostName); + }); + NNet::WriteLocalHostName("a-rpc-proxy.sas.yp-c.yandex.net"); + + auto connection = CreateConnection({mockConnectionSas, mockConnectionVla}, config); + EXPECT_THAT(connection->GetLoggingTag(), testing::HasSubstr("Clusters: (sas; vla)")); + auto client = connection->CreateClient(clientOptions); + + Sleep(TDuration::Seconds(2)); + + auto nodes1 = client->GetNode("//test/node").Get().ValueOrThrow(); + EXPECT_EQ(nodesYsonVla, nodes1); + + Sleep(TDuration::Seconds(6)); + + auto nodes2 = client->GetNode("//test/node").Get().ValueOrThrow(); + EXPECT_EQ(nodesYsonSas, nodes2); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp index ae0376bcda..3fb36f8560 100644 --- a/yt/yt/client/hedging/hedging.cpp +++ b/yt/yt/client/hedging/hedging.cpp @@ -111,7 +111,7 @@ public: UNSUPPORTED_METHOD(IJournalWriterPtr, CreateJournalWriter, (const TYPath&, const TJournalWriterOptions&)); UNSUPPORTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&)); UNSUPPORTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&)); - UNSUPPORTED_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (const TDistributedWriteCookiePtr&, const TParticipantTableWriterOptions&)); + UNSUPPORTED_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (const TFragmentWriteCookiePtr&, const TFragmentTableWriterOptions&)); // IClient methods. // Unsupported methods. @@ -164,6 +164,7 @@ public: UNSUPPORTED_METHOD(TFuture<NYson::TYsonString>, GetJobInputPaths, (NJobTrackerClient::TJobId, const TGetJobInputPathsOptions&)); UNSUPPORTED_METHOD(TFuture<NYson::TYsonString>, GetJobSpec, (NJobTrackerClient::TJobId, const TGetJobSpecOptions&)); UNSUPPORTED_METHOD(TFuture<TGetJobStderrResponse>, GetJobStderr, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobStderrOptions&)); + UNSUPPORTED_METHOD(TFuture<std::vector<TJobTraceEvent>>, GetJobTrace, (const NScheduler::TOperationIdOrAlias&, const TGetJobTraceOptions&)); UNSUPPORTED_METHOD(TFuture<TSharedRef>, GetJobFailContext, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobFailContextOptions&)); UNSUPPORTED_METHOD(TFuture<TListOperationsResult>, ListOperations, (const TListOperationsOptions&)); UNSUPPORTED_METHOD(TFuture<TListJobsResult>, ListJobs, (const NScheduler::TOperationIdOrAlias&, const TListJobsOptions&)); diff --git a/yt/yt/client/logging/dynamic_table_log_writer.cpp b/yt/yt/client/logging/dynamic_table_log_writer.cpp index 1ff8ff680b..c6c731f2dd 100644 --- a/yt/yt/client/logging/dynamic_table_log_writer.cpp +++ b/yt/yt/client/logging/dynamic_table_log_writer.cpp @@ -24,7 +24,7 @@ #include <yt/yt/core/logging/formatter.h> #include <yt/yt/core/logging/system_log_event_provider.h> -#include <yt/yt/core/misc/atomic_object.h> +#include <library/cpp/yt/threading/atomic_object.h> #include <yt/yt/core/misc/proc.h> #include <library/cpp/yt/memory/atomic_intrusive_ptr.h> diff --git a/yt/yt/client/object_client/helpers.cpp b/yt/yt/client/object_client/helpers.cpp index c4d02fa6d0..795c0fb34c 100644 --- a/yt/yt/client/object_client/helpers.cpp +++ b/yt/yt/client/object_client/helpers.cpp @@ -330,6 +330,11 @@ bool IsSchemaType(EObjectType type) return (static_cast<ui32>(type) & SchemaObjectTypeMask) != 0; } +TString FormatObjectType(EObjectType type) +{ + return IsSchemaType(type) ? Format("schema:%v", TypeFromSchemaType(type)) : FormatEnum(type); +} + bool IsGlobalCellId(TCellId cellId) { auto type = TypeFromId(cellId); diff --git a/yt/yt/client/object_client/helpers.h b/yt/yt/client/object_client/helpers.h index 3139e5481b..1c668033c9 100644 --- a/yt/yt/client/object_client/helpers.h +++ b/yt/yt/client/object_client/helpers.h @@ -119,6 +119,9 @@ EObjectType SchemaTypeFromType(EObjectType type); //! Returns the regular type for a given schema #type. EObjectType TypeFromSchemaType(EObjectType type); +//! Formats object type into string (taking schemas into account). +TString FormatObjectType(EObjectType type); + //! Constructs the id from its parts. TObjectId MakeId( EObjectType type, diff --git a/yt/yt/client/queue_client/producer_client.cpp b/yt/yt/client/queue_client/producer_client.cpp index 2115805de0..4040bd337c 100644 --- a/yt/yt/client/queue_client/producer_client.cpp +++ b/yt/yt/client/queue_client/producer_client.cpp @@ -1,5 +1,7 @@ #include "producer_client.h" +#include "private.h" + #include <yt/yt/client/api/client.h> #include <yt/yt/client/api/transaction.h> @@ -10,6 +12,9 @@ #include <yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.pb.h> +#include <yt/yt/core/concurrency/action_queue.h> +#include <yt/yt/core/concurrency/periodic_executor.h> + namespace NYT::NQueueClient { using namespace NApi; @@ -23,6 +28,10 @@ using namespace NYTree; //////////////////////////////////////////////////////////////////////////////// +static constexpr auto& Logger = QueueClientLogger; + +//////////////////////////////////////////////////////////////////////////////// + class TProducerSession : public IProducerSession { @@ -34,24 +43,42 @@ public: TNameTablePtr nameTable, TQueueProducerSessionId sessionId, TCreateQueueProducerSessionResult createSessionResult, - TProducerSessionOptions options) + TProducerSessionOptions options, + IInvokerPtr invoker) : Client_(std::move(client)) , ProducerPath_(std::move(producerPath)) , QueuePath_(std::move(queuePath)) , NameTable_(std::move(nameTable)) , SessionId_(std::move(sessionId)) + , Invoker_(std::move(invoker)) , Options_(std::move(options)) , Epoch_(createSessionResult.Epoch) , LastSequenceNumber_(createSessionResult.SequenceNumber) , UserMeta_(std::move(createSessionResult.UserMeta)) , BufferedRowWriter_(CreateWireProtocolWriter()) - { } + { + if (Options_.BackgroundFlushPeriod) { + if (!Invoker_) { + THROW_ERROR_EXCEPTION("Cannot create producer session with background flush without invoker"); + } + + FlushExecutor_ = New<TPeriodicExecutor>( + Invoker_, + BIND(&TProducerSession::OnFlush, NYT::MakeWeak(this)), + *Options_.BackgroundFlushPeriod); + } else { + if (!Options_.BatchOptions.RowCount && !Options_.BatchOptions.ByteSize) { + YT_LOG_DEBUG("None of batch row count or batch byte size are specified, batch byte size will be equal to 16 MB"); + Options_.BatchOptions.ByteSize = 16_MB; + } + } + } // TODO(nadya73): add possibility to pass user meta. TQueueProducerSequenceNumber GetLastSequenceNumber() const override { - return LastSequenceNumber_; + return LastSequenceNumber_.load(); } const INodePtr& GetUserMeta() const override @@ -61,23 +88,87 @@ public: bool Write(TRange<TUnversionedRow> rows) override { + auto guard = Guard(SpinLock_); + + if (!Started_) { + if (FlushExecutor_) { + FlushExecutor_->Start(); + } + Started_ = true; + } + + if (Closed_) { + return false; + } + for (const auto& row : rows) { BufferedRowWriter_->WriteUnversionedRow(row); ++BufferedRowCount_; } - return BufferedRowWriter_->GetByteSize() < Options_.MaxBufferSize; + if (IsFlushNeeded()) { + if (!FlushExecutor_) { + return false; + } + FlushExecutor_->ScheduleOutOfBand(); + } + return true; } TFuture<void> GetReadyEvent() override { + if (FlushExecutor_) { + return VoidFuture; + } + + { + auto guard = Guard(SpinLock_); + if (Closed_) { + return MakeFuture<void>(TError("Producer session was closed")); + } + } return TryToFlush(); } - TFuture<void> Close() override + TFuture<void> Flush() override { - return Flush(); + if (!FlushExecutor_) { + std::vector<TSharedRef> serializedRows; + { + auto guard = Guard(SpinLock_); + YT_LOG_DEBUG("Flushing rows (RowCount: %v)", BufferedRowCount_); + serializedRows = GetRowsToFlushAndResetBuffer(); + } + + return FlushImpl(serializedRows); + } + + FlushExecutor_->ScheduleOutOfBand(); + return FlushExecutor_->GetExecutedEvent(); } + TFuture<void> Close() override + { + { + auto guard = Guard(SpinLock_); + Closed_ = true; + } + + // Run one last flush will finish writing the remaining items and + // eventually lead to the stop promise being set. + // A single flush is enough since it is guaranteed that no new messages are added to the queue after the + // critical section above. + + if (!FlushExecutor_) { + return TryToFlush(); + } + + FlushExecutor_->ScheduleOutOfBand(); + + return StoppedPromise_.ToFuture() + .Apply(BIND([this, this_ = MakeStrong(this)] { + return FlushExecutor_->Stop(); + })); + } std::optional<TMD5Hash> GetDigest() const override { @@ -90,44 +181,94 @@ private: const TRichYPath QueuePath_; const TNameTablePtr NameTable_; const TQueueProducerSessionId SessionId_; - const TProducerSessionOptions Options_; + const IInvokerPtr Invoker_; + + TProducerSessionOptions Options_; TQueueProducerEpoch Epoch_ = TQueueProducerEpoch{0}; - TQueueProducerSequenceNumber LastSequenceNumber_ = TQueueProducerSequenceNumber{0}; + std::atomic<TQueueProducerSequenceNumber> LastSequenceNumber_ = TQueueProducerSequenceNumber{0}; INodePtr UserMeta_; std::unique_ptr<IWireProtocolWriter> BufferedRowWriter_; i64 BufferedRowCount_ = 0; + bool Started_ = false; + TPeriodicExecutorPtr FlushExecutor_; + + bool Closed_ = false; + TPromise<void> StoppedPromise_ = NewPromise<void>(); + YT_DECLARE_SPIN_LOCK(TSpinLock, SpinLock_); + bool IsFlushNeeded() const + { + VERIFY_SPINLOCK_AFFINITY(SpinLock_); + + return (Options_.BatchOptions.ByteSize && static_cast<i64>(BufferedRowWriter_->GetByteSize()) >= *Options_.BatchOptions.ByteSize) + || (Options_.BatchOptions.RowCount && BufferedRowCount_ >= *Options_.BatchOptions.RowCount); + } + TFuture<void> TryToFlush() { - if (BufferedRowWriter_->GetByteSize() >= Options_.MaxBufferSize) { - return Flush(); + std::vector<TSharedRef> serializedRows; + { + auto guard = Guard(SpinLock_); + if (!IsFlushNeeded() && !Closed_) { + return VoidFuture; + } + serializedRows = GetRowsToFlushAndResetBuffer(); } - return VoidFuture; + return FlushImpl(std::move(serializedRows)); } - TFuture<void> Flush() + std::vector<TSharedRef> GetRowsToFlushAndResetBuffer() { - auto guard = Guard(SpinLock_); + VERIFY_SPINLOCK_AFFINITY(SpinLock_); auto writer = CreateWireProtocolWriter(); writer->WriteSerializedRowset(BufferedRowCount_, BufferedRowWriter_->Finish()); BufferedRowWriter_ = CreateWireProtocolWriter(); BufferedRowCount_ = 0; + return writer->Finish(); + } + + void OnFlush() + { + std::vector<TSharedRef> serializedRows; + { + auto guard = Guard(SpinLock_); + YT_LOG_DEBUG("Flushing rows (RowCount: %v)", BufferedRowCount_); + serializedRows = GetRowsToFlushAndResetBuffer(); + } + + WaitFor(FlushImpl(std::move(serializedRows))) + .ThrowOnError(); + bool isStopped = false; + { + auto guard = Guard(SpinLock_); + if (Closed_ && BufferedRowCount_ == 0) { + isStopped = true; + } + } + + if (isStopped) { + StoppedPromise_.TrySet(); + } + } + + TFuture<void> FlushImpl(std::vector<TSharedRef> serializedRows) + { return Client_->StartTransaction(ETransactionType::Tablet) - .Apply(BIND([writer = std::move(writer), this, this_ = MakeStrong(this)] (const ITransactionPtr& transaction) { + .Apply(BIND([serializedRows = std::move(serializedRows), this, this_ = MakeStrong(this)] (const ITransactionPtr& transaction) { TPushQueueProducerOptions pushQueueProducerOptions; if (Options_.AutoSequenceNumber) { - pushQueueProducerOptions.SequenceNumber = TQueueProducerSequenceNumber{LastSequenceNumber_.Underlying() + 1}; + pushQueueProducerOptions.SequenceNumber = TQueueProducerSequenceNumber{LastSequenceNumber_.load().Underlying() + 1}; } - return transaction->PushQueueProducer(ProducerPath_, QueuePath_, SessionId_, Epoch_, NameTable_, writer->Finish(), pushQueueProducerOptions) + return transaction->PushQueueProducer(ProducerPath_, QueuePath_, SessionId_, Epoch_, NameTable_, serializedRows, pushQueueProducerOptions) .Apply(BIND([=, this, this_ = MakeStrong(this)] (const TPushQueueProducerResult& pushQueueProducerResult) { - LastSequenceNumber_ = pushQueueProducerResult.LastSequenceNumber; + LastSequenceNumber_.store(pushQueueProducerResult.LastSequenceNumber); return transaction->Commit(); })); })).AsVoid(); @@ -149,12 +290,13 @@ public: const TRichYPath& queuePath, const TNameTablePtr& nameTable, const TQueueProducerSessionId& sessionId, - const TProducerSessionOptions& options) override + const TProducerSessionOptions& options, + const IInvokerPtr& invoker) override { return Client_->CreateQueueProducerSession(ProducerPath_, queuePath, sessionId) .Apply(BIND([=, this, this_ = MakeStrong(this)] (const TCreateQueueProducerSessionResult& createSessionResult) -> IProducerSessionPtr { - return New<TProducerSession>(Client_, ProducerPath_, queuePath, nameTable, sessionId, createSessionResult, options); + return New<TProducerSession>(Client_, ProducerPath_, queuePath, nameTable, sessionId, createSessionResult, options, invoker); })); } diff --git a/yt/yt/client/queue_client/producer_client.h b/yt/yt/client/queue_client/producer_client.h index 4e1b46756a..3fc73ed3a8 100644 --- a/yt/yt/client/queue_client/producer_client.h +++ b/yt/yt/client/queue_client/producer_client.h @@ -17,6 +17,14 @@ namespace NYT::NQueueClient { //////////////////////////////////////////////////////////////////////////////// +struct TProducerSessionBatchOptions +{ + //! Weight of serialized buffered rows when flush will be called regardless of the background flush period. + std::optional<i64> ByteSize; + //! Buffered rows count when flush will be called regardless of the background flush period. + std::optional<i64> RowCount; +}; + struct TProducerSessionOptions { //! If true, sequence numbers will be incremented automatically, @@ -24,8 +32,12 @@ struct TProducerSessionOptions //! If false, each row should contain value of $sequnce_number column. bool AutoSequenceNumber = false; - //! Size of buffer when rows will be flushed to server. - size_t MaxBufferSize = 1_MB; + //! Batch sizes when rows will be flushed to server regardless of the background flush period (if it is specified). + //! If there is no background flush, rows will be flush when `BatchOptions::ByteSize` or `BatchOptions::RowCount` is reached. If none of them are specified, `BatchOptions::ByteSize` will be equal to 16 MB. + TProducerSessionBatchOptions BatchOptions; + + //! If set, rows will be flushed in background with this period. + std::optional<TDuration> BackgroundFlushPeriod; }; struct IProducerSession @@ -38,6 +50,9 @@ struct IProducerSession //! Get user meta saved in the producer session. virtual const NYTree::INodePtr& GetUserMeta() const = 0; + + //! Flush all written rows. + virtual TFuture<void> Flush() = 0; }; DEFINE_REFCOUNTED_TYPE(IProducerSession) @@ -47,12 +62,12 @@ struct IProducerClient : public virtual TRefCounted { //! Create a session (or increase its epoch) and return session writer. - //! NB: Session writer return by this method is NOT thread-safe. virtual TFuture<IProducerSessionPtr> CreateSession( const NYPath::TRichYPath& queuePath, const NTableClient::TNameTablePtr& nameTable, const NQueueClient::TQueueProducerSessionId& sessionId, - const TProducerSessionOptions& options = {}) = 0; + const TProducerSessionOptions& options = {}, + const IInvokerPtr& invoker = nullptr) = 0; }; DEFINE_REFCOUNTED_TYPE(IProducerClient) diff --git a/yt/yt/client/scheduler/public.h b/yt/yt/client/scheduler/public.h index d92ae73f78..ac7afba09b 100644 --- a/yt/yt/client/scheduler/public.h +++ b/yt/yt/client/scheduler/public.h @@ -8,6 +8,12 @@ namespace NYT::NScheduler { //////////////////////////////////////////////////////////////////////////////// +YT_DEFINE_STRONG_TYPEDEF(TJobTraceId, TGuid); + +extern const TJobTraceId NullJobTraceId; + +//////////////////////////////////////////////////////////////////////////////// + using NJobTrackerClient::TJobId; using NJobTrackerClient::TOperationId; @@ -61,6 +67,7 @@ YT_DEFINE_ERROR_ENUM( ((MasterDisconnected) (218)) ((NoSuchJobShell) (219)) ((JobResourceLimitsRestrictionsViolated) (220)) + ((CannotUseBothAclAndAco) (221)) ); DEFINE_ENUM(EUnavailableChunkAction, diff --git a/yt/yt/client/table_client/helpers-inl.h b/yt/yt/client/table_client/helpers-inl.h index a4e39d7a6c..86cc5a793c 100644 --- a/yt/yt/client/table_client/helpers-inl.h +++ b/yt/yt/client/table_client/helpers-inl.h @@ -14,6 +14,7 @@ #include <yt/yt/core/concurrency/scheduler.h> #include <library/cpp/yt/misc/strong_typedef.h> +#include <library/cpp/yt/misc/cast.h> #include <array> @@ -142,8 +143,10 @@ void ToUnversionedValue( if constexpr (TEnumTraits<T>::IsStringSerializableEnum) { ToUnversionedValue(unversionedValue, NYT::FormatEnum(value), rowBuffer, id, flags); } else if constexpr (TEnumTraits<T>::IsBitEnum) { + static_assert(CanFitSubtype<ui64, std::underlying_type_t<T>>()); ToUnversionedValue(unversionedValue, static_cast<ui64>(value), rowBuffer, id, flags); } else { + static_assert(CanFitSubtype<i64, std::underlying_type_t<T>>()); ToUnversionedValue(unversionedValue, static_cast<i64>(value), rowBuffer, id, flags); } } @@ -154,12 +157,16 @@ void FromUnversionedValue( T* value, TUnversionedValue unversionedValue) { + static_assert(TEnumTraits<T>::IsStringSerializableEnum || + TEnumTraits<T>::IsBitEnum && CanFitSubtype<ui64, std::underlying_type_t<T>>() || + !TEnumTraits<T>::IsBitEnum && CanFitSubtype<i64, std::underlying_type_t<T>>()); + switch (unversionedValue.Type) { case EValueType::Int64: - *value = static_cast<T>(unversionedValue.Data.Int64); + *value = static_cast<T>(CheckedIntegralCast<std::underlying_type_t<T>>(unversionedValue.Data.Int64)); break; case EValueType::Uint64: - *value = static_cast<T>(unversionedValue.Data.Uint64); + *value = static_cast<T>(CheckedIntegralCast<std::underlying_type_t<T>>(unversionedValue.Data.Uint64)); break; case EValueType::String: *value = NYT::ParseEnum<T>(unversionedValue.AsStringBuf()); @@ -170,6 +177,37 @@ void FromUnversionedValue( } } +template <class T> + requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T> +void ToUnversionedValue( + TUnversionedValue* unversionedValue, + T value, + const TRowBufferPtr& rowBuffer, + int id, + EValueFlags flags) +{ + static_assert(CanFitSubtype<i64, std::underlying_type_t<T>>()); + ToUnversionedValue(unversionedValue, static_cast<i64>(value), rowBuffer, id, flags); +} + +template <class T> + requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T> +void FromUnversionedValue( + T* value, + TUnversionedValue unversionedValue) +{ + static_assert(CanFitSubtype<i64, std::underlying_type_t<T>>()); + + switch (unversionedValue.Type) { + case EValueType::Int64: + *value = static_cast<T>(CheckedIntegralCast<std::underlying_type_t<T>>(unversionedValue.Data.Int64)); + break; + default: + THROW_ERROR_EXCEPTION("Cannot parse enum value from %Qlv", + unversionedValue.Type); + } +} + //////////////////////////////////////////////////////////////////////////////// void ProtobufToUnversionedValueImpl( diff --git a/yt/yt/client/table_client/helpers.h b/yt/yt/client/table_client/helpers.h index b86124557e..d1c5d17325 100644 --- a/yt/yt/client/table_client/helpers.h +++ b/yt/yt/client/table_client/helpers.h @@ -148,6 +148,20 @@ void FromUnversionedValue( TUnversionedValue unversionedValue); template <class T> + requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T> +void ToUnversionedValue( + TUnversionedValue* unversionedValue, + T value, + const TRowBufferPtr& rowBuffer, + int id = 0, + EValueFlags flags = EValueFlags::None); +template <class T> + requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T> +void FromUnversionedValue( + T* value, + TUnversionedValue unversionedValue); + +template <class T> TUnversionedValue ToUnversionedValue( T&& value, const TRowBufferPtr& rowBuffer, diff --git a/yt/yt/client/table_client/logical_type.h b/yt/yt/client/table_client/logical_type.h index 904ff9708b..bd482df984 100644 --- a/yt/yt/client/table_client/logical_type.h +++ b/yt/yt/client/table_client/logical_type.h @@ -170,7 +170,7 @@ class TDecimalLogicalType { public: static constexpr int MinPrecision = 1; - static constexpr int MaxPrecision = 35; + static constexpr int MaxPrecision = 76; public: TDecimalLogicalType(int precision, int scale); diff --git a/yt/yt/client/table_client/table_upload_options.cpp b/yt/yt/client/table_client/table_upload_options.cpp new file mode 100644 index 0000000000..06c4d9c064 --- /dev/null +++ b/yt/yt/client/table_client/table_upload_options.cpp @@ -0,0 +1,385 @@ +#include "table_upload_options.h" +#include "helpers.h" + +#include <yt/yt/client/table_client/helpers.h> + +#include <yt/yt/client/ypath/rich.h> + +#include <yt/yt/core/ytree/helpers.h> + +namespace NYT::NTableClient { + +using namespace NChunkClient; +using namespace NCompression; +using namespace NCypressClient; +using namespace NYPath; +using namespace NYTree; + +//////////////////////////////////////////////////////////////////////////////// + +TEpochSchema::TEpochSchema(const TEpochSchema& other) +{ + *this = other; +} + +TEpochSchema& TEpochSchema::operator=(const TEpochSchema& other) +{ + TableSchema_ = other.TableSchema_; + Revision_ += other.Revision_ + 1; + return *this; +} + +TEpochSchema& TEpochSchema::operator=(TTableSchemaPtr schema) +{ + Set(schema); + return *this; +} + +const TTableSchema* TEpochSchema::operator->() const +{ + return TableSchema_.Get(); +} + +const TTableSchemaPtr& TEpochSchema::operator*() const +{ + return TableSchema_; +} + +const TTableSchemaPtr& TEpochSchema::Get() const +{ + return TableSchema_; +} + +ui64 TEpochSchema::GetRevision() const +{ + return Revision_; +} + +ui64 TEpochSchema::Set(const TTableSchemaPtr& schema) +{ + TableSchema_ = schema; + return ++Revision_; +} + +void TEpochSchema::Persist(const NPhoenix::TPersistenceContext& context) +{ + using NYT::Persist; + + Persist(context, Revision_); + Persist<TNonNullableIntrusivePtrSerializer<>>(context, TableSchema_); +} + +ui64 TEpochSchema::Reset() +{ + TableSchema_ = New<TTableSchema>(); + return ++Revision_; +} + +//////////////////////////////////////////////////////////////////////////////// + +TTableSchemaPtr TTableUploadOptions::GetUploadSchema() const +{ + switch (SchemaModification) { + case ETableSchemaModification::None: + return TableSchema.Get(); + + case ETableSchemaModification::UnversionedUpdate: + return TableSchema->ToUnversionedUpdate(/*sorted*/ true); + + default: + YT_ABORT(); + } +} + +void TTableUploadOptions::Persist(const NPhoenix::TPersistenceContext& context) +{ + using NYT::Persist; + + Persist(context, UpdateMode); + Persist(context, LockMode); + // COMPAT(h0pless): NControllerAgent::ESnapshotVersion::AddChunkSchemas + if (context.GetVersion() >= 301300) { + Persist(context, TableSchema); + } else { + TTableSchemaPtr schema; + Persist<TNonNullableIntrusivePtrSerializer<>>(context, schema); + TableSchema.Set(schema); + } + Persist(context, SchemaId); + Persist(context, SchemaModification); + // COMPAT(dave11ar): NControllerAgent::ESnapshotVersion::VersionedMapReduceWrite + if (context.GetVersion() >= 301602) { + Persist(context, VersionedWriteOptions); + } + Persist(context, SchemaMode); + Persist(context, OptimizeFor); + // COMPAT(babenko): NControllerAgent::ESnapshotVersion::ChunkFormat + if (context.GetVersion() >= 301103) { + Persist(context, ChunkFormat); + } + Persist(context, CompressionCodec); + Persist(context, ErasureCodec); + Persist(context, EnableStripedErasure); + Persist(context, SecurityTags); + Persist(context, PartiallySorted); +} + +//////////////////////////////////////////////////////////////////////////////// + +static void ValidateSortColumnsEqual(const TSortColumns& sortColumns, const TTableSchema& schema) +{ + if (sortColumns != schema.GetSortColumns()) { + THROW_ERROR_EXCEPTION("YPath attribute \"sorted_by\" must be compatible with table schema for a \"strong\" schema mode") + << TErrorAttribute("sort_columns", sortColumns) + << TErrorAttribute("table_schema", schema); + } +} + +static void ValidateAppendKeyColumns(const TSortColumns& sortColumns, const TTableSchema& schema, i64 rowCount) +{ + ValidateSortColumns(sortColumns); + + if (rowCount == 0) { + return; + } + + auto tableSortColumns = schema.GetSortColumns(); + bool areKeyColumnsCompatible = true; + if (tableSortColumns.size() < sortColumns.size()) { + areKeyColumnsCompatible = false; + } else { + for (int i = 0; i < std::ssize(sortColumns); ++i) { + if (tableSortColumns[i] != sortColumns[i]) { + areKeyColumnsCompatible = false; + break; + } + } + } + + if (!areKeyColumnsCompatible) { + THROW_ERROR_EXCEPTION("Sort columns mismatch while trying to append sorted data into a non-empty table") + << TErrorAttribute("append_sort_columns", sortColumns) + << TErrorAttribute("table_sort_columns", tableSortColumns); + } +} + +const std::vector<TString>& GetTableUploadOptionsAttributeKeys() +{ + static const std::vector<TString> Result{ + "schema_mode", + "optimize_for", + "chunk_format", + "compression_codec", + "erasure_codec", + "enable_striped_erasure", + "dynamic" + }; + return Result; +} + +TTableUploadOptions GetTableUploadOptions( + const TRichYPath& path, + const IAttributeDictionary& cypressTableAttributes, + const TTableSchemaPtr& schema, + i64 rowCount) +{ + auto schemaMode = cypressTableAttributes.Get<ETableSchemaMode>("schema_mode"); + auto optimizeFor = cypressTableAttributes.Get<EOptimizeFor>("optimize_for"); + auto chunkFormat = cypressTableAttributes.Find<EChunkFormat>("chunk_format"); + auto compressionCodec = cypressTableAttributes.Get<NCompression::ECodec>("compression_codec"); + auto erasureCodec = cypressTableAttributes.Get<NErasure::ECodec>("erasure_codec", NErasure::ECodec::None); + auto enableStripedErasure = cypressTableAttributes.Get<bool>("enable_striped_erasure", false); + auto dynamic = cypressTableAttributes.Get<bool>("dynamic"); + + // Validate "optimize_for" and "chunk_format" compatibility. + if (chunkFormat) { + ValidateTableChunkFormatAndOptimizeFor(*chunkFormat, optimizeFor); + } + + // Some ypath attributes are not compatible with attribute "schema". + if (path.GetAppend() && path.GetSchema()) { + THROW_ERROR_EXCEPTION("YPath attributes \"append\" and \"schema\" are not compatible") + << TErrorAttribute("path", path); + } + + if (!path.GetSortedBy().empty() && path.GetSchema()) { + THROW_ERROR_EXCEPTION("YPath attributes \"sorted_by\" and \"schema\" are not compatible") + << TErrorAttribute("path", path); + } + + // Dynamic tables have their own requirements as well. + if (dynamic) { + if (path.GetSchema()) { + THROW_ERROR_EXCEPTION("YPath attribute \"schema\" cannot be set on a dynamic table") + << TErrorAttribute("path", path); + } + + if (!path.GetSortedBy().empty()) { + THROW_ERROR_EXCEPTION("YPath attribute \"sorted_by\" cannot be set on a dynamic table") + << TErrorAttribute("path", path); + } + } + + TTableUploadOptions result; + // NB: Saving schema to make sure that if changes are applied to it the schema revision also changes. + result.TableSchema = schema; + auto pathSchema = path.GetSchema(); + if (path.GetAppend() && !path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Strong)) { + ValidateSortColumnsEqual(path.GetSortedBy(), *schema); + + result.LockMode = ELockMode::Exclusive; + result.UpdateMode = EUpdateMode::Append; + result.SchemaMode = ETableSchemaMode::Strong; + } else if (path.GetAppend() && !path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Weak)) { + // Old behaviour. + ValidateAppendKeyColumns(path.GetSortedBy(), *schema, rowCount); + + result.LockMode = ELockMode::Exclusive; + result.UpdateMode = EUpdateMode::Append; + result.SchemaMode = ETableSchemaMode::Weak; + result.TableSchema = TTableSchema::FromSortColumns(path.GetSortedBy()); + } else if (path.GetAppend() && path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Strong)) { + result.LockMode = (schema->IsSorted() && !dynamic) ? ELockMode::Exclusive : ELockMode::Shared; + result.UpdateMode = EUpdateMode::Append; + result.SchemaMode = ETableSchemaMode::Strong; + } else if (path.GetAppend() && path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Weak)) { + // Old behaviour - reset key columns if there were any. + result.LockMode = ELockMode::Shared; + result.UpdateMode = EUpdateMode::Append; + result.SchemaMode = ETableSchemaMode::Weak; + result.TableSchema.Reset(); + } else if (!path.GetAppend() && !path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Strong)) { + ValidateSortColumnsEqual(path.GetSortedBy(), *schema); + + result.LockMode = ELockMode::Exclusive; + result.UpdateMode = EUpdateMode::Overwrite; + result.SchemaMode = ETableSchemaMode::Strong; + } else if (!path.GetAppend() && !path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Weak)) { + result.LockMode = ELockMode::Exclusive; + result.UpdateMode = EUpdateMode::Overwrite; + result.SchemaMode = ETableSchemaMode::Weak; + result.TableSchema = TTableSchema::FromSortColumns(path.GetSortedBy()); + } else if (!path.GetAppend() && pathSchema && (schemaMode == ETableSchemaMode::Strong)) { + result.LockMode = ELockMode::Exclusive; + result.UpdateMode = EUpdateMode::Overwrite; + result.SchemaMode = ETableSchemaMode::Strong; + result.TableSchema = pathSchema; + } else if (!path.GetAppend() && pathSchema && (schemaMode == ETableSchemaMode::Weak)) { + // Change from Weak to Strong schema mode. + result.LockMode = ELockMode::Exclusive; + result.UpdateMode = EUpdateMode::Overwrite; + result.SchemaMode = ETableSchemaMode::Strong; + result.TableSchema = pathSchema; + } else if (!path.GetAppend() && path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Strong)) { + result.LockMode = ELockMode::Exclusive; + result.UpdateMode = EUpdateMode::Overwrite; + result.SchemaMode = ETableSchemaMode::Strong; + } else if (!path.GetAppend() && path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Weak)) { + result.LockMode = ELockMode::Exclusive; + result.UpdateMode = EUpdateMode::Overwrite; + result.SchemaMode = ETableSchemaMode::Weak; + result.TableSchema.Reset(); + } else { + // Do not use YT_ABORT here, since this code is executed inside scheduler. + THROW_ERROR_EXCEPTION("Failed to define upload parameters") + << TErrorAttribute("path", path) + << TErrorAttribute("schema_mode", schemaMode) + << TErrorAttribute("schema", *schema); + } + + if (path.GetAppend() && path.GetOptimizeFor()) { + THROW_ERROR_EXCEPTION("YPath attributes \"append\" and \"optimize_for\" are not compatible") + << TErrorAttribute("path", path); + } + + result.OptimizeFor = path.GetOptimizeFor() ? *path.GetOptimizeFor() : optimizeFor; + result.ChunkFormat = path.GetChunkFormat() ? *path.GetChunkFormat() : chunkFormat; + + if (path.GetAppend() && path.GetCompressionCodec()) { + THROW_ERROR_EXCEPTION("YPath attributes \"append\" and \"compression_codec\" are not compatible") + << TErrorAttribute("path", path); + } + + if (path.GetCompressionCodec()) { + result.CompressionCodec = *path.GetCompressionCodec(); + } else { + result.CompressionCodec = compressionCodec; + } + + if (path.GetAppend() && path.GetErasureCodec()) { + THROW_ERROR_EXCEPTION("YPath attributes \"append\" and \"erasure_codec\" are not compatible") + << TErrorAttribute("path", path); + } + + result.ErasureCodec = path.GetErasureCodec().value_or(erasureCodec); + result.EnableStripedErasure = enableStripedErasure; + + if (path.GetSchemaModification() == ETableSchemaModification::UnversionedUpdateUnsorted) { + THROW_ERROR_EXCEPTION("YPath attribute \"schema_modification\" cannot have value %Qlv for output tables", + path.GetSchemaModification()) + << TErrorAttribute("path", path); + } else if (!dynamic && path.GetSchemaModification() != ETableSchemaModification::None) { + THROW_ERROR_EXCEPTION("YPath attribute \"schema_modification\" can have value %Qlv only for dynamic tables", + path.GetSchemaModification()) + << TErrorAttribute("path", path); + } + result.SchemaModification = path.GetSchemaModification(); + + auto versionedWriteOptions = path.GetVersionedWriteOptions(); + if (!dynamic && versionedWriteOptions.WriteMode != EVersionedIOMode::Default) { + THROW_ERROR_EXCEPTION("YPath attribute \"versioned_write_options/write_mode\" can have value %Qlv only for dynamic tables", + versionedWriteOptions.WriteMode) + << TErrorAttribute("path", path); + } + if (versionedWriteOptions.WriteMode != EVersionedIOMode::Default && path.GetSchemaModification() != ETableSchemaModification::None) { + THROW_ERROR_EXCEPTION("YPath attributes \"versioned_write_options/write_mode\" and \"schema_modification\"" + "can not be set in non-trivial state together: \"versioned_write_options/write_mode\" is %Qlv, \"schema_modification\" is %Qlv", + versionedWriteOptions.WriteMode, + path.GetSchemaModification()) + << TErrorAttribute("path", path); + } + result.VersionedWriteOptions = versionedWriteOptions; + + if (!dynamic && path.GetPartiallySorted()) { + THROW_ERROR_EXCEPTION("YPath attribute \"partially_sorted\" can be set only for dynamic tables") + << TErrorAttribute("path", path); + } + result.PartiallySorted = path.GetPartiallySorted(); + + result.SecurityTags = path.GetSecurityTags(); + + return result; +} + +TTableUploadOptions GetFileUploadOptions( + const TRichYPath& path, + const IAttributeDictionary& cypressTableAttributes) +{ + auto compressionCodec = cypressTableAttributes.Get<NCompression::ECodec>("compression_codec"); + auto enableStripedErasure = cypressTableAttributes.Get<bool>("enable_striped_erasure", false); + auto erasureCodec = cypressTableAttributes.Get<NErasure::ECodec>("erasure_codec", NErasure::ECodec::None); + + if (path.GetAppend()) { + THROW_ERROR_EXCEPTION("Attribute \"append\" is not supported for files") + << TErrorAttribute("path", path); + } + + // NB(coteeq): Fill for sanity. They should not have impact on behaviour, because + // RichYPath's compression_codec & erasure_codec are disallowed in remote copy. + // TODO(coteeq): Make it YT_VERIFY + if (path.GetCompressionCodec() || path.GetErasureCodec()) { + THROW_ERROR_EXCEPTION("\"compression_codec\" and \"erasure_codec\" are disallowed for files") + << TErrorAttribute("path", path); + } + + TTableUploadOptions result; + result.CompressionCodec = compressionCodec; + result.ErasureCodec = erasureCodec; + result.EnableStripedErasure = enableStripedErasure; + result.SecurityTags = path.GetSecurityTags(); + return result; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NTableClient diff --git a/yt/yt/client/table_client/table_upload_options.h b/yt/yt/client/table_client/table_upload_options.h new file mode 100644 index 0000000000..24e19766fd --- /dev/null +++ b/yt/yt/client/table_client/table_upload_options.h @@ -0,0 +1,87 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/client/chunk_client/public.h> + +#include <yt/yt/client/table_client/schema.h> +#include <yt/yt/client/table_client/versioned_io_options.h> + +#include <yt/yt/client/security_client/public.h> + +#include <yt/yt/library/erasure/public.h> + +#include <yt/yt/core/compression/public.h> + +#include <yt/yt/core/misc/phoenix.h> + +namespace NYT::NTableClient { + +//////////////////////////////////////////////////////////////////////////////// + +class TEpochSchema +{ +public: + TEpochSchema() = default; + TEpochSchema(const TEpochSchema& other); + TEpochSchema& operator=(const TEpochSchema& other); + + TEpochSchema(const TEpochSchema&& other) = delete; + TEpochSchema& operator=(TEpochSchema&& other) = delete; + + TEpochSchema& operator=(TTableSchemaPtr schema); + + const TTableSchema* operator->() const; + const TTableSchemaPtr& operator*() const; + + const TTableSchemaPtr& Get() const; + ui64 GetRevision() const; + + ui64 Set(const TTableSchemaPtr& schema); + + void Persist(const NPhoenix::TPersistenceContext& context); + + ui64 Reset(); + +private: + TTableSchemaPtr TableSchema_ = New<TTableSchema>(); + ui64 Revision_ = 0; +}; + +struct TTableUploadOptions +{ + NChunkClient::EUpdateMode UpdateMode = NChunkClient::EUpdateMode::Overwrite; + NCypressClient::ELockMode LockMode = NCypressClient::ELockMode::Exclusive; + TEpochSchema TableSchema; + TMasterTableSchemaId SchemaId; + ETableSchemaModification SchemaModification = ETableSchemaModification::None; + TVersionedWriteOptions VersionedWriteOptions; + ETableSchemaMode SchemaMode = ETableSchemaMode::Strong; + EOptimizeFor OptimizeFor = EOptimizeFor::Lookup; + std::optional<NChunkClient::EChunkFormat> ChunkFormat; + NCompression::ECodec CompressionCodec = NCompression::ECodec::None; + NErasure::ECodec ErasureCodec = NErasure::ECodec::None; + bool EnableStripedErasure = false; + std::optional<std::vector<NSecurityClient::TSecurityTag>> SecurityTags; + bool PartiallySorted = false; + + TTableSchemaPtr GetUploadSchema() const; + + void Persist(const NPhoenix::TPersistenceContext& context); +}; + +const std::vector<TString>& GetTableUploadOptionsAttributeKeys(); + +TTableUploadOptions GetTableUploadOptions( + const NYPath::TRichYPath& path, + const NYTree::IAttributeDictionary& cypressTableAttributes, + const TTableSchemaPtr& schema, + i64 rowCount); + +TTableUploadOptions GetFileUploadOptions( + const NYPath::TRichYPath& path, + const NYTree::IAttributeDictionary& cypressTableAttributes); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NTableClient diff --git a/yt/yt/client/tablet_client/table_mount_cache.h b/yt/yt/client/tablet_client/table_mount_cache.h index a450fdf833..086b466487 100644 --- a/yt/yt/client/tablet_client/table_mount_cache.h +++ b/yt/yt/client/tablet_client/table_mount_cache.h @@ -65,6 +65,7 @@ struct TIndexInfo NObjectClient::TObjectId TableId; ESecondaryIndexKind Kind; std::optional<TString> Predicate; + std::optional<TString> UnfoldedColumn; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/transaction_client/remote_timestamp_provider.cpp b/yt/yt/client/transaction_client/remote_timestamp_provider.cpp index f856c462d3..e2584e3106 100644 --- a/yt/yt/client/transaction_client/remote_timestamp_provider.cpp +++ b/yt/yt/client/transaction_client/remote_timestamp_provider.cpp @@ -135,7 +135,7 @@ ITimestampProviderPtr CreateRemoteTimestampProvider( return CreateRemoteTimestampProvider( std::move(config), std::move(channel), - false); + /*allowOldClocks*/ false); } ITimestampProviderPtr CreateBatchingRemoteTimestampProvider( @@ -159,7 +159,7 @@ ITimestampProviderPtr CreateBatchingRemoteTimestampProvider( return CreateBatchingRemoteTimestampProvider( std::move(config), std::move(channel), - false); + /*allowOldClocks*/ false); } ITimestampProviderPtr CreateBatchingRemoteTimestampProvider( @@ -180,7 +180,7 @@ ITimestampProviderPtr CreateBatchingRemoteTimestampProvider( return CreateBatchingRemoteTimestampProvider( config, channelFactory, - false); + /*allowOldClocks*/ false); } //////////////////////////////////////////////////////////////////////////////// @@ -215,7 +215,7 @@ TAlienRemoteTimestampProvidersMap CreateAlienTimestampProvidersMap( CreateBatchingRemoteTimestampProvider( foreignProviderConfig->TimestampProvider, channelFactory, - true)); + /*allowOldClocks*/ true)); } return alienProvidersMap; diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h index b8217c8657..43e8056b1e 100644 --- a/yt/yt/client/unittests/mock/client.h +++ b/yt/yt/client/unittests/mock/client.h @@ -2,7 +2,7 @@ #include <yt/yt/client/api/connection.h> #include <yt/yt/client/api/client.h> -#include <yt/yt/client/api/distributed_table_sessions.h> +#include <yt/yt/client/api/distributed_table_session.h> #include <yt/yt/client/api/file_writer.h> #include <yt/yt/client/api/journal_reader.h> #include <yt/yt/client/api/journal_writer.h> @@ -626,6 +626,11 @@ public: const TGetJobStderrOptions& options), (override)); + MOCK_METHOD(TFuture<std::vector<TJobTraceEvent>>, GetJobTrace, ( + const NScheduler::TOperationIdOrAlias& operationIdOrAlias, + const TGetJobTraceOptions& options), + (override)); + MOCK_METHOD(TFuture<TSharedRef>, GetJobFailContext, ( const NScheduler::TOperationIdOrAlias& operationIdOrAlias, NJobTrackerClient::TJobId jobId, @@ -836,9 +841,9 @@ public: const TDistributedWriteSessionFinishOptions& options), (override)); - MOCK_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, ( - const TDistributedWriteCookiePtr& cookie, - const TParticipantTableWriterOptions& options), + MOCK_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, ( + const TFragmentWriteCookiePtr& cookie, + const TFragmentTableWriterOptions& options), (override)); MOCK_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, ( diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h index 9bb296dbe8..943e4fa201 100644 --- a/yt/yt/client/unittests/mock/transaction.h +++ b/yt/yt/client/unittests/mock/transaction.h @@ -1,6 +1,6 @@ #pragma once -#include <yt/yt/client/api/distributed_table_sessions.h> +#include <yt/yt/client/api/distributed_table_session.h> #include <yt/yt/client/api/file_writer.h> #include <yt/yt/client/api/journal_reader.h> #include <yt/yt/client/api/journal_writer.h> diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index a0cc4caf88..a09ba33382 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -13,7 +13,7 @@ SRCS( api/client_cache.cpp api/delegating_client.cpp api/delegating_transaction.cpp - api/distributed_table_sessions.cpp + api/distributed_table_session.cpp api/etc_client.cpp api/journal_client.cpp api/operation_client.cpp @@ -127,6 +127,7 @@ SRCS( table_client/schemaless_buffered_dynamic_table_writer.cpp table_client/schemaless_dynamic_table_writer.cpp table_client/serialize.cpp + table_client/table_upload_options.cpp table_client/logical_type.cpp table_client/merge_table_schemas.cpp table_client/name_table.cpp diff --git a/yt/yt/core/bus/tcp/connection.h b/yt/yt/core/bus/tcp/connection.h index 2948d20b67..421216ed81 100644 --- a/yt/yt/core/bus/tcp/connection.h +++ b/yt/yt/core/bus/tcp/connection.h @@ -15,7 +15,7 @@ #include <yt/yt/core/net/address.h> -#include <yt/yt/core/misc/atomic_object.h> +#include <library/cpp/yt/threading/atomic_object.h> #include <yt/yt/core/misc/blob.h> #include <yt/yt/core/misc/mpsc_stack.h> #include <yt/yt/core/misc/ring_queue.h> diff --git a/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp b/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp index 7db0cf83c7..4ddc546cdb 100644 --- a/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp +++ b/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp @@ -152,15 +152,18 @@ TEST_F(TQuantizedExecutorTest, Simple) TEST_F(TQuantizedExecutorTest, Timeout) { - InitSimple(/*workerCount*/ 4, /*iterationCount*/ std::numeric_limits<i64>::max()); + InitSimple(/*workerCount*/ 2, /*iterationCount*/ std::numeric_limits<i64>::max()); - for (int index = 1; index <= 10; ++index) { - WaitFor(Executor_->Run(TDuration::MilliSeconds(100))) - .ThrowOnError(); + WaitFor(Executor_->Run(TDuration::MilliSeconds(100))) + .ThrowOnError(); - auto counter = SimpleCallbackProvider_->GetCounter(); - EXPECT_LE(counter, /*workerCount*/ 4 * /*milliseconds*/ 100.0 / /*period*/ 5 * index * 1.25); - } + auto oldCounter = SimpleCallbackProvider_->GetCounter(); + + TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100)); + + auto newCounter = SimpleCallbackProvider_->GetCounter(); + + EXPECT_EQ(oldCounter, newCounter); } TEST_F(TQuantizedExecutorTest, LongCallback1) diff --git a/yt/yt/core/crypto/tls.cpp b/yt/yt/core/crypto/tls.cpp index 9acfb26180..0eb910fbab 100644 --- a/yt/yt/core/crypto/tls.cpp +++ b/yt/yt/core/crypto/tls.cpp @@ -340,10 +340,12 @@ public: bool IsIdle() const override { - return - Underlying_->IsIdle() && - ActiveIOCount_ == 0 && - !Failed_; + return ActiveIOCount_ == 0 && !Failed_; + } + + bool IsReusable() const override + { + return IsIdle() && Underlying_->IsReusable(); } TFuture<void> Abort() override diff --git a/yt/yt/core/http/config.cpp b/yt/yt/core/http/config.cpp index 2fb5b909aa..7be4dca2d4 100644 --- a/yt/yt/core/http/config.cpp +++ b/yt/yt/core/http/config.cpp @@ -38,10 +38,10 @@ void TServerConfig::Register(TRegistrar registrar) .Default(80); registrar.Parameter("max_simultaneous_connections", &TThis::MaxSimultaneousConnections) - .Default(50000); + .Default(50'000); registrar.Parameter("max_backlog_size", &TThis::MaxBacklogSize) - .Default(8192); + .Default(8'192); registrar.Parameter("bind_retry_count", &TThis::BindRetryCount) .Default(5); diff --git a/yt/yt/core/http/connection_reuse_helpers.cpp b/yt/yt/core/http/connection_reuse_helpers.cpp index 55ba238a5e..a0e1de77be 100644 --- a/yt/yt/core/http/connection_reuse_helpers.cpp +++ b/yt/yt/core/http/connection_reuse_helpers.cpp @@ -17,7 +17,7 @@ TReusableConnectionState::TReusableConnectionState( TReusableConnectionState::~TReusableConnectionState() { - if (Reusable && OwningPool && Connection->IsIdle()) { + if (Reusable && OwningPool && Connection->IsReusable()) { OwningPool->Release(std::move(Connection)); } } diff --git a/yt/yt/core/http/server.cpp b/yt/yt/core/http/server.cpp index ae7df4b620..d2fda153b5 100644 --- a/yt/yt/core/http/server.cpp +++ b/yt/yt/core/http/server.cpp @@ -12,7 +12,6 @@ #include <yt/yt/core/concurrency/thread_pool_poller.h> #include <yt/yt/core/misc/finally.h> -#include <yt/yt/core/misc/memory_usage_tracker.h> #include <yt/yt/core/misc/public.h> #include <yt/yt/core/ytree/convert.h> @@ -63,7 +62,6 @@ public: IPollerPtr poller, IPollerPtr acceptor, IInvokerPtr invoker, - IMemoryUsageTrackerPtr memoryUsageTracker, IRequestPathMatcherPtr requestPathMatcher, bool ownPoller = false) : Config_(std::move(config)) @@ -71,7 +69,6 @@ public: , Poller_(std::move(poller)) , Acceptor_(std::move(acceptor)) , Invoker_(std::move(invoker)) - , MemoryUsageTracker_(std::move(memoryUsageTracker)) , OwnPoller_(ownPoller) , RequestPathMatcher_(std::move(requestPathMatcher)) { } @@ -126,7 +123,6 @@ private: const IPollerPtr Poller_; const IPollerPtr Acceptor_; const IInvokerPtr Invoker_; - const IMemoryUsageTrackerPtr MemoryUsageTracker_; const bool OwnPoller_ = false; IRequestPathMatcherPtr RequestPathMatcher_; @@ -224,14 +220,6 @@ private: SetRequestId(response, request->GetRequestId()); - if (MemoryUsageTracker_ && MemoryUsageTracker_->IsExceeded()) { - THROW_ERROR_EXCEPTION( - EStatusCode::TooManyRequests, - "Request is dropped due to high memory pressure") - << TErrorAttribute("total_memory_limit", MemoryUsageTracker_->GetLimit()) - << TErrorAttribute("memory_usage", MemoryUsageTracker_->GetUsed()); - } - handler->HandleRequest(request, response); NTracing::FlushCurrentTraceContextElapsedTime(); @@ -393,7 +381,6 @@ IServerPtr CreateServer( IPollerPtr poller, IPollerPtr acceptor, IInvokerPtr invoker, - IMemoryUsageTrackerPtr memoryUsageTracker, bool ownPoller) { auto handlers = New<TRequestPathMatcher>(); @@ -403,7 +390,6 @@ IServerPtr CreateServer( std::move(poller), std::move(acceptor), std::move(invoker), - std::move(memoryUsageTracker), std::move(handlers), ownPoller); } @@ -413,7 +399,6 @@ IServerPtr CreateServer( IPollerPtr poller, IPollerPtr acceptor, IInvokerPtr invoker, - IMemoryUsageTrackerPtr memoryUsageTracker, bool ownPoller) { auto address = TNetworkAddress::CreateIPv6Any(config->Port); @@ -426,7 +411,6 @@ IServerPtr CreateServer( std::move(poller), std::move(acceptor), std::move(invoker), - std::move(memoryUsageTracker), ownPoller); } catch (const std::exception& ex) { if (i + 1 == config->BindRetryCount) { @@ -456,7 +440,6 @@ IServerPtr CreateServer( std::move(poller), std::move(acceptor), std::move(invoker), - /*memoryUsageTracker*/ GetNullMemoryUsageTracker(), /*ownPoller*/ false); } @@ -464,8 +447,7 @@ IServerPtr CreateServer( TServerConfigPtr config, IListenerPtr listener, IPollerPtr poller, - IPollerPtr acceptor, - IMemoryUsageTrackerPtr memoryUsageTracker) + IPollerPtr acceptor) { auto invoker = poller->GetInvoker(); return CreateServer( @@ -474,15 +456,13 @@ IServerPtr CreateServer( std::move(poller), std::move(acceptor), std::move(invoker), - std::move(memoryUsageTracker), /*ownPoller*/ false); } IServerPtr CreateServer( TServerConfigPtr config, IPollerPtr poller, - IPollerPtr acceptor, - IMemoryUsageTrackerPtr memoryUsageTracker) + IPollerPtr acceptor) { auto invoker = poller->GetInvoker(); return CreateServer( @@ -490,7 +470,6 @@ IServerPtr CreateServer( std::move(poller), std::move(acceptor), std::move(invoker), - std::move(memoryUsageTracker), /*ownPoller*/ false); } @@ -520,7 +499,6 @@ IServerPtr CreateServer(TServerConfigPtr config, int pollerThreadCount) std::move(poller), std::move(acceptor), std::move(invoker), - /*memoryUsageTracker*/ GetNullMemoryUsageTracker(), /*ownPoller*/ true); } @@ -535,7 +513,6 @@ IServerPtr CreateServer( std::move(poller), std::move(acceptor), std::move(invoker), - /*memoryUsageTracker*/ GetNullMemoryUsageTracker(), /*ownPoller*/ false); } diff --git a/yt/yt/core/http/server.h b/yt/yt/core/http/server.h index 171fb70399..e96720981b 100644 --- a/yt/yt/core/http/server.h +++ b/yt/yt/core/http/server.h @@ -89,16 +89,14 @@ IServerPtr CreateServer( TServerConfigPtr config, NNet::IListenerPtr listener, NConcurrency::IPollerPtr poller, - NConcurrency::IPollerPtr acceptor, - IMemoryUsageTrackerPtr memoryTracker = GetNullMemoryUsageTracker()); + NConcurrency::IPollerPtr acceptor); IServerPtr CreateServer( TServerConfigPtr config, NConcurrency::IPollerPtr poller); IServerPtr CreateServer( TServerConfigPtr config, NConcurrency::IPollerPtr poller, - NConcurrency::IPollerPtr acceptor, - IMemoryUsageTrackerPtr memoryTracker = GetNullMemoryUsageTracker()); + NConcurrency::IPollerPtr acceptor); IServerPtr CreateServer( int port, NConcurrency::IPollerPtr poller); diff --git a/yt/yt/core/http/unittests/http_ut.cpp b/yt/yt/core/http/unittests/http_ut.cpp index cda7dea65f..7030a2350f 100644 --- a/yt/yt/core/http/unittests/http_ut.cpp +++ b/yt/yt/core/http/unittests/http_ut.cpp @@ -206,6 +206,11 @@ struct TFakeConnection return true; } + bool IsReusable() const override + { + return true; + } + TFuture<void> Abort() override { THROW_ERROR_EXCEPTION("Not implemented"); diff --git a/yt/yt/core/https/server.cpp b/yt/yt/core/https/server.cpp index b97cb801e8..92bdee379f 100644 --- a/yt/yt/core/https/server.cpp +++ b/yt/yt/core/https/server.cpp @@ -112,8 +112,7 @@ IServerPtr CreateServer( const TServerConfigPtr& config, const IPollerPtr& poller, const IPollerPtr& acceptor, - const IInvokerPtr& controlInvoker, - const IMemoryUsageTrackerPtr& memoryTracker) + const IInvokerPtr& controlInvoker) { auto sslContext = New<TSslContext>(); ApplySslConfig(sslContext, config->Credentials); @@ -165,8 +164,7 @@ IServerPtr CreateServer( configCopy, tlsListener, poller, - acceptor, - memoryTracker); + acceptor); return New<TServer>(std::move(httpServer), std::move(certificateUpdater)); } diff --git a/yt/yt/core/https/server.h b/yt/yt/core/https/server.h index 46994e8ea4..c6c40eeec3 100644 --- a/yt/yt/core/https/server.h +++ b/yt/yt/core/https/server.h @@ -8,8 +8,6 @@ #include <yt/yt/core/http/public.h> -#include <yt/yt/core/misc/memory_usage_tracker.h> - namespace NYT::NHttps { //////////////////////////////////////////////////////////////////////////////// @@ -28,8 +26,7 @@ NHttp::IServerPtr CreateServer( const TServerConfigPtr& config, const NConcurrency::IPollerPtr& poller, const NConcurrency::IPollerPtr& acceptor, - const IInvokerPtr& controlInvoker, - const IMemoryUsageTrackerPtr& memoryTracker = GetNullMemoryUsageTracker()); + const IInvokerPtr& controlInvoker); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/logging/log_manager.cpp b/yt/yt/core/logging/log_manager.cpp index 5209a25373..fa96cbc28a 100644 --- a/yt/yt/core/logging/log_manager.cpp +++ b/yt/yt/core/logging/log_manager.cpp @@ -348,26 +348,52 @@ public: RegisterWriterFactory(TString(TStderrLogWriterConfig::WriterType), GetStderrLogWriterFactory()); } + bool IsInitialized() const + { + return InitializationFinished_.Test(); + } + void Initialize() { - std::call_once(Initialized_, [&] { - // NB: Cannot place this logic inside ctor since it may boot up Compression threads unexpected - // and these will try to access TLogManager instance causing a deadlock. - try { - if (auto config = TLogManagerConfig::TryCreateFromEnv()) { - DoUpdateConfig(config, /*fromEnv*/ true); - } - } catch (const std::exception& ex) { - fprintf(stderr, "Error configuring logging from environment variables\n%s\n", - ex.what()); + [[likely]] if (InitializationFinished_.Test()) { + // Don't bother doing syscalls on a hot path. + return; + } + + // Sync is done via event so there is no need for stronger memory orders. + // Case of recursive call is alright, because there sync is done via sequenced-before ordering. + [[likely]] if (InitializationStarted_.exchange(true, std::memory_order::relaxed)) { + NThreading::TThreadId initializerThreadId = NThreading::InvalidThreadId; + while (initializerThreadId == NThreading::InvalidThreadId) { + initializerThreadId = InitializerThreadId_.load(std::memory_order::relaxed); + } + if (GetCurrentThreadId() == initializerThreadId) { + // Recursive call -- bail out. + return; } + // Another thread -- now wait for real. + InitializationFinished_.Wait(); + return; + } + InitializerThreadId_.store(GetCurrentThreadId(), std::memory_order::relaxed); - if (!IsConfiguredFromEnv()) { - DoUpdateConfig(TLogManagerConfig::CreateDefault(), /*fromEnv*/ false); + // NB: Cannot place this logic inside ctor since it may boot up Compression threads unexpected + // and these will try to access TLogManager instance causing a deadlock. + try { + if (auto config = TLogManagerConfig::TryCreateFromEnv()) { + DoUpdateConfig(config, /*fromEnv*/ true); } + } catch (const std::exception& ex) { + fprintf(stderr, "Error configuring logging from environment variables\n%s\n", + ex.what()); + } - SystemCategory_ = GetCategory(SystemLoggingCategoryName); - }); + if (!IsConfiguredFromEnv()) { + DoUpdateConfig(TLogManagerConfig::CreateDefault(), /*fromEnv*/ false); + } + + SystemCategory_ = GetCategory(SystemLoggingCategoryName); + InitializationFinished_.NotifyAll(); } void Configure(INodePtr node) @@ -422,18 +448,20 @@ public: { ShutdownRequested_.store(true); + auto config = Config_.Acquire(); + if (LoggingThread_->GetThreadId() == GetCurrentThreadId()) { FlushWriters(); } else { // Wait for all previously enqueued messages to be flushed // but no more than ShutdownGraceTimeout to prevent hanging. - Synchronize(TInstant::Now() + Config_->ShutdownGraceTimeout); + Synchronize(TInstant::Now() + config->ShutdownGraceTimeout); } // For now this is the only way to wait for log writers that perform asynchronous flushes. // TODO(achulkov2): Refactor log manager to support asynchronous operations. - if (Config_->ShutdownBusyTimeout) { - Sleep(Config_->ShutdownBusyTimeout); + if (config->ShutdownBusyTimeout != TDuration::Zero()) { + Sleep(config->ShutdownBusyTimeout); } EventQueue_->Shutdown(); @@ -460,13 +488,14 @@ public: } auto guard = Guard(SpinLock_); + auto config = Config_.Acquire(); auto it = NameToCategory_.find(categoryName); if (it == NameToCategory_.end()) { auto category = std::make_unique<TLoggingCategory>(); category->Name = categoryName; category->ActualVersion = &Version_; it = NameToCategory_.emplace(categoryName, std::move(category)).first; - DoUpdateCategory(it->second.get()); + DoUpdateCategory(config, it->second.get()); } return it->second.get(); } @@ -474,14 +503,17 @@ public: void UpdateCategory(TLoggingCategory* category) { auto guard = Guard(SpinLock_); - DoUpdateCategory(category); + auto config = Config_.Acquire(); + DoUpdateCategory(config, category); } void UpdateAnchor(TLoggingAnchor* anchor) { auto guard = Guard(SpinLock_); + auto config = Config_.Acquire(); + bool enabled = true; - for (const auto& prefix : Config_->SuppressedMessages) { + for (const auto& prefix : config->SuppressedMessages) { if (anchor->AnchorMessage.StartsWith(prefix)) { enabled = false; break; @@ -619,11 +651,9 @@ public: void SuppressRequest(TRequestId requestId) { - if (!RequestSuppressionEnabled_) { - return; + if (RequestSuppressionEnabled_.load(std::memory_order_relaxed)) { + SuppressedRequestIdQueue_.Enqueue(requestId); } - - SuppressedRequestIdQueue_.Enqueue(requestId); } void Synchronize(TInstant deadline = TInstant::Max()) @@ -678,6 +708,8 @@ private: void EnsureStarted() { + VERIFY_THREAD_AFFINITY_ANY(); + std::call_once(Started_, [&] { if (LoggingThread_->IsStopping()) { return; @@ -695,7 +727,9 @@ private: }); } - const std::vector<ILogWriterPtr>& GetWriters(const TLogEvent& event) + const std::vector<ILogWriterPtr>& GetWriters( + const TLogManagerConfigPtr& config, + const TLogEvent& event) { VERIFY_THREAD_AFFINITY(LoggingThread); @@ -710,7 +744,7 @@ private: } THashSet<TString> writerNames; - for (const auto& rule : Config_->Rules) { + for (const auto& rule : config->Rules) { if (rule->IsApplicable(event.Category->Name, event.Level, event.Family)) { writerNames.insert(rule->Writers.begin(), rule->Writers.end()); } @@ -757,10 +791,7 @@ private: return; } - AbortOnAlert_.store(event.Config->AbortOnAlert); - EnsureStarted(); - FlushWriters(); try { @@ -771,8 +802,10 @@ private: } } - std::unique_ptr<ILogFormatter> CreateFormatter(const TLogWriterConfigPtr& writerConfig) + static std::unique_ptr<ILogFormatter> CreateFormatter(const TLogWriterConfigPtr& writerConfig) { + VERIFY_THREAD_AFFINITY_ANY(); + switch (writerConfig->Format) { case ELogFormat::PlainText: return std::make_unique<TPlainTextLogFormatter>( @@ -795,7 +828,10 @@ private: void DoUpdateConfig(const TLogManagerConfigPtr& config, bool fromEnv) { - if (AreNodesEqual(ConvertToNode(Config_), ConvertToNode(config))) { + // This could be called both from ctor and from LoggingThread. + + auto oldConfig = Config_.Acquire(); + if (AreNodesEqual(ConvertToNode(oldConfig), ConvertToNode(config))) { return; } @@ -850,31 +886,36 @@ private: category->StructuredValidationSamplingRate.store(config->StructuredValidationSamplingRate, std::memory_order::relaxed); } - Config_ = config; ConfiguredFromEnv_.store(fromEnv); - HighBacklogWatermark_.store(Config_->HighBacklogWatermark); - LowBacklogWatermark_.store(Config_->LowBacklogWatermark); - RequestSuppressionEnabled_.store(Config_->RequestSuppressionTimeout != TDuration::Zero()); + HighBacklogWatermark_.store(config->HighBacklogWatermark); + LowBacklogWatermark_.store(config->LowBacklogWatermark); + RequestSuppressionEnabled_.store(config->RequestSuppressionTimeout != TDuration::Zero()); + AbortOnAlert_.store(config->AbortOnAlert); - CompressionThreadPool_->Configure(Config_->CompressionThreadCount); + CompressionThreadPool_->Configure(config->CompressionThreadCount); if (RequestSuppressionEnabled_) { - SuppressedRequestIdSet_.SetTtl((Config_->RequestSuppressionTimeout + DequeuePeriod) * 2); + SuppressedRequestIdSet_.SetTtl((config->RequestSuppressionTimeout + DequeuePeriod) * 2); } else { SuppressedRequestIdSet_.Clear(); SuppressedRequestIdQueue_.DequeueAll(); } - FlushExecutor_->SetPeriod(Config_->FlushPeriod); - WatchExecutor_->SetPeriod(Config_->WatchPeriod); - CheckSpaceExecutor_->SetPeriod(Config_->CheckSpacePeriod); - FileRotationExecutor_->SetPeriod(Config_->RotationCheckPeriod); + FlushExecutor_->SetPeriod(config->FlushPeriod); + WatchExecutor_->SetPeriod(config->WatchPeriod); + CheckSpaceExecutor_->SetPeriod(config->CheckSpacePeriod); + FileRotationExecutor_->SetPeriod(config->RotationCheckPeriod); + Config_.Store(std::move(config)); Version_++; } - void WriteEvent(const TLogEvent& event) + void WriteEvent( + const TLogManagerConfigPtr& config, + const TLogEvent& event) { + VERIFY_THREAD_AFFINITY(LoggingThread); + if (ReopenRequested_.exchange(false)) { ReloadWriters(); } @@ -886,21 +927,26 @@ private: event.Anchor->ByteCounter.Current += std::ssize(event.MessageRef); } - for (const auto& writer : GetWriters(event)) { + for (const auto& writer : GetWriters(config, event)) { writer->Write(event); } } void FlushWriters() { + VERIFY_THREAD_AFFINITY(LoggingThread); + for (const auto& [name, writer] : NameToWriter_) { writer->Flush(); } + FlushedEvents_ = WrittenEvents_.load(); } void RotateFiles() { + VERIFY_THREAD_AFFINITY(LoggingThread); + for (const auto& [name, writer] : NameToWriter_) { if (auto fileWriter = DynamicPointerCast<IFileLogWriter>(writer)) { fileWriter->MaybeRotate(); @@ -910,6 +956,8 @@ private: void ReloadWriters() { + VERIFY_THREAD_AFFINITY(LoggingThread); + Version_++; for (const auto& [name, writer] : NameToWriter_) { writer->Reload(); @@ -918,15 +966,20 @@ private: void CheckSpace() { + VERIFY_THREAD_AFFINITY(LoggingThread); + + auto config = Config_.Acquire(); for (const auto& [name, writer] : NameToWriter_) { if (auto fileWriter = DynamicPointerCast<IFileLogWriter>(writer)) { - fileWriter->CheckSpace(Config_->MinDiskSpace); + fileWriter->CheckSpace(config->MinDiskSpace); } } } void RegisterNotificatonWatch(TNotificationWatch* watch) { + VERIFY_THREAD_AFFINITY(LoggingThread); + if (watch->IsValid()) { // Watch can fail to initialize if the writer is disabled // e.g. due to the lack of space. @@ -978,6 +1031,8 @@ private: void PushEvent(TLoggerQueueItem&& event) { + VERIFY_THREAD_AFFINITY_ANY(); + auto& perThreadQueue = PerThreadQueue(); if (!perThreadQueue) { perThreadQueue = new TThreadLocalQueue(); @@ -994,9 +1049,10 @@ private: const TCounter& GetWrittenEventsCounter(const TLogEvent& event) { + VERIFY_THREAD_AFFINITY_ANY(); + auto key = std::pair(event.Category->Name, event.Level); auto it = WrittenEventsCounters_.find(key); - if (it == WrittenEventsCounters_.end()) { // TODO(prime@): optimize sensor count auto counter = Profiler @@ -1012,6 +1068,8 @@ private: void CollectSensors(ISensorWriter* writer) override { + VERIFY_THREAD_AFFINITY_ANY(); + auto writtenEvents = WrittenEvents_.load(); auto enqueuedEvents = EnqueuedEvents_.load(); auto suppressedEvents = SuppressedEvents_.load(); @@ -1027,6 +1085,8 @@ private: void OnDiskProfiling() { + VERIFY_THREAD_AFFINITY(LoggingThread); + try { auto minLogStorageAvailableSpace = std::numeric_limits<i64>::max(); auto minLogStorageFreeSpace = std::numeric_limits<i64>::max(); @@ -1059,6 +1119,8 @@ private: std::vector<TLoggingAnchorStat> CaptureAnchorStats() { + VERIFY_THREAD_AFFINITY(LoggingThread); + auto now = TInstant::Now(); auto deltaSeconds = (now - LastAnchorStatsCaptureTime_).SecondsFloat(); LastAnchorStatsCaptureTime_ = now; @@ -1086,14 +1148,17 @@ private: void OnAnchorProfiling() { - if (Config_->EnableAnchorProfiling && !AnchorBufferedProducer_) { + VERIFY_THREAD_AFFINITY(LoggingThread); + + auto config = Config_.Acquire(); + if (config->EnableAnchorProfiling && !AnchorBufferedProducer_) { AnchorBufferedProducer_ = New<TBufferedProducer>(); Profiler .WithSparse() .WithDefaultDisabled() .WithProducerRemoveSupport() .AddProducer("/anchors", AnchorBufferedProducer_); - } else if (!Config_->EnableAnchorProfiling && AnchorBufferedProducer_) { + } else if (!config->EnableAnchorProfiling && AnchorBufferedProducer_) { AnchorBufferedProducer_.Reset(); } @@ -1105,7 +1170,7 @@ private: TSensorBuffer sensorBuffer; for (const auto& stat : stats) { - if (stat.MessageRate < Config_->MinLoggedMessageRateToProfile) { + if (stat.MessageRate < config->MinLoggedMessageRateToProfile) { continue; } TWithTagGuard tagGuard(&sensorBuffer, "message", stat.Anchor->AnchorMessage); @@ -1242,20 +1307,24 @@ private: WrittenEvents_ += eventsWritten; - if (!Config_->FlushPeriod || ShutdownRequested_) { + auto config = Config_.Acquire(); + if (!config->FlushPeriod || ShutdownRequested_) { FlushWriters(); } } int ProcessTimeOrderedBuffer() { + VERIFY_THREAD_AFFINITY(LoggingThread); + int eventsWritten = 0; int eventsSuppressed = 0; SuppressedRequestIdSet_.InsertMany(Now(), SuppressedRequestIdQueue_.DequeueAll()); auto requestSuppressionEnabled = RequestSuppressionEnabled_.load(std::memory_order::relaxed); - auto suppressionDeadline = GetCpuInstant() - DurationToCpuDuration(Config_->RequestSuppressionTimeout); + auto config = Config_.Acquire(); + auto suppressionDeadline = GetCpuInstant() - DurationToCpuDuration(config->RequestSuppressionTimeout); while (!TimeOrderedBuffer_.empty()) { const auto& event = TimeOrderedBuffer_.front(); @@ -1274,7 +1343,7 @@ private: if (requestSuppressionEnabled && event.RequestId && SuppressedRequestIdSet_.Contains(event.RequestId)) { ++eventsSuppressed; } else { - WriteEvent(event); + WriteEvent(config, event); } }); @@ -1286,10 +1355,14 @@ private: return eventsWritten; } - void DoUpdateCategory(TLoggingCategory* category) + void DoUpdateCategory( + const TLogManagerConfigPtr& config, + TLoggingCategory* category) { + VERIFY_THREAD_AFFINITY_ANY(); + auto minPlainTextLevel = ELogLevel::Maximum; - for (const auto& rule : Config_->Rules) { + for (const auto& rule : config->Rules) { if (rule->IsApplicable(category->Name, ELogFamily::PlainText)) { minPlainTextLevel = std::min(minPlainTextLevel, rule->MinLevel); } @@ -1297,11 +1370,13 @@ private: category->MinPlainTextLevel.store(minPlainTextLevel, std::memory_order::relaxed); category->CurrentVersion.store(GetVersion(), std::memory_order::relaxed); - category->StructuredValidationSamplingRate.store(Config_->StructuredValidationSamplingRate, std::memory_order::relaxed); + category->StructuredValidationSamplingRate.store(config->StructuredValidationSamplingRate, std::memory_order::relaxed); } void DoRegisterAnchor(TLoggingAnchor* anchor) { + VERIFY_SPINLOCK_AFFINITY(SpinLock_); + // NB: Duplicates are not desirable but possible. AnchorMap_.emplace(anchor->AnchorMessage, anchor); anchor->NextAnchor = FirstAnchor_; @@ -1331,23 +1406,28 @@ private: DECLARE_THREAD_AFFINITY_SLOT(LoggingThread); - // Configuration. + TAtomicIntrusivePtr<TLogManagerConfig> Config_; + + // Protects the section of members below. NThreading::TForkAwareSpinLock SpinLock_; - // Version forces this very module's Logger object to update to our own - // default configuration (default level etc.). - std::atomic<int> Version_ = 0; - std::atomic<bool> AbortOnAlert_ = false; - TLogManagerConfigPtr Config_; - std::atomic<bool> ConfiguredFromEnv_ = false; THashMap<TString, std::unique_ptr<TLoggingCategory>> NameToCategory_; THashMap<TString, ILogWriterFactoryPtr> TypeNameToWriterFactory_; - const TLoggingCategory* SystemCategory_; - // These are just copies from Config_. + + // Incrementing version forces loggers to update their own default configuration (default level etc.). + std::atomic<int> Version_ = 0; + + std::atomic<bool> ConfiguredFromEnv_ = false; + + // These are just cached (for performance reason) copies from Config_. // The values are being read from arbitrary threads but stale values are fine. std::atomic<ui64> HighBacklogWatermark_ = Max<ui64>(); std::atomic<ui64> LowBacklogWatermark_ = Max<ui64>(); + std::atomic<bool> AbortOnAlert_ = false; + + std::atomic<bool> InitializationStarted_ = false; + std::atomic<NThreading::TThreadId> InitializerThreadId_ = NThreading::InvalidThreadId; + NThreading::TEvent InitializationFinished_; - std::once_flag Initialized_; std::once_flag Started_; std::atomic<bool> Suspended_ = false; std::atomic<bool> ScheduledOutOfBand_ = false; @@ -1380,7 +1460,9 @@ private: THashMap<TString, ILogWriterPtr> NameToWriter_; THashMap<TLogWriterCacheKey, std::vector<ILogWriterPtr>> KeyToCachedWriter_; + const std::vector<ILogWriterPtr> SystemWriters_; + const TLoggingCategory* SystemCategory_; std::atomic<bool> ReopenRequested_ = false; std::atomic<bool> ShutdownRequested_ = false; @@ -1431,105 +1513,155 @@ TLogManager::~TLogManager() = default; TLogManager* TLogManager::Get() { auto* logManager = LeakySingleton<TLogManager>(); - logManager->Initialize(); + logManager->Impl_->Initialize(); return logManager; } void TLogManager::Configure(TLogManagerConfigPtr config, bool sync) { + [[unlikely]] if (!Impl_->IsInitialized()) { + return; + } Impl_->Configure(std::move(config), /*fromEnv*/ false, sync); } void TLogManager::ConfigureFromEnv() { + [[unlikely]] if (!Impl_->IsInitialized()) { + return; + } Impl_->ConfigureFromEnv(); } bool TLogManager::IsConfiguredFromEnv() { + [[unlikely]] if (!Impl_->IsInitialized()) { + return false; + } return Impl_->IsConfiguredFromEnv(); } void TLogManager::Shutdown() { + [[unlikely]] if (!Impl_->IsInitialized()) { + return; + } Impl_->Shutdown(); } int TLogManager::GetVersion() const { + [[unlikely]] if (!Impl_->IsInitialized()) { + return 0; + } return Impl_->GetVersion(); } bool TLogManager::GetAbortOnAlert() const { + [[unlikely]] if (!Impl_->IsInitialized()) { + return false; + } return Impl_->GetAbortOnAlert(); } const TLoggingCategory* TLogManager::GetCategory(TStringBuf categoryName) { + [[unlikely]] if (!Impl_->IsInitialized()) { + return nullptr; + } return Impl_->GetCategory(categoryName); } void TLogManager::UpdateCategory(TLoggingCategory* category) { + [[unlikely]] if (!Impl_->IsInitialized()) { + return; + } Impl_->UpdateCategory(category); } void TLogManager::UpdateAnchor(TLoggingAnchor* anchor) { + [[unlikely]] if (!Impl_->IsInitialized()) { + return; + } Impl_->UpdateAnchor(anchor); } void TLogManager::RegisterStaticAnchor(TLoggingAnchor* anchor, ::TSourceLocation sourceLocation, TStringBuf anchorMessage) { + [[unlikely]] if (!Impl_->IsInitialized()) { + return; + } Impl_->RegisterStaticAnchor(anchor, sourceLocation, anchorMessage); } TLoggingAnchor* TLogManager::RegisterDynamicAnchor(TString anchorMessage) { + [[unlikely]] if (!Impl_->IsInitialized()) { + return nullptr; + } return Impl_->RegisterDynamicAnchor(std::move(anchorMessage)); } void TLogManager::RegisterWriterFactory(const TString& typeName, const ILogWriterFactoryPtr& factory) { + [[unlikely]] if (!Impl_->IsInitialized()) { + return; + } Impl_->RegisterWriterFactory(typeName, factory); } void TLogManager::UnregisterWriterFactory(const TString& typeName) { + [[unlikely]] if (!Impl_->IsInitialized()) { + return; + } Impl_->UnregisterWriterFactory(typeName); } void TLogManager::Enqueue(TLogEvent&& event) { + [[unlikely]] if (!Impl_->IsInitialized()) { + Cerr << NYT::Format("Trying to log event during logger initialization -- skipping") << Endl; + return; + } Impl_->Enqueue(std::move(event)); } void TLogManager::Reopen() { + [[unlikely]] if (!Impl_->IsInitialized()) { + return; + } Impl_->Reopen(); } void TLogManager::EnableReopenOnSighup() { + [[unlikely]] if (!Impl_->IsInitialized()) { + return; + } Impl_->EnableReopenOnSighup(); } void TLogManager::SuppressRequest(TRequestId requestId) { + [[unlikely]] if (!Impl_->IsInitialized()) { + return; + } Impl_->SuppressRequest(requestId); } void TLogManager::Synchronize(TInstant deadline) { + [[unlikely]] if (!Impl_->IsInitialized()) { + return; + } Impl_->Synchronize(deadline); } -void TLogManager::Initialize() -{ - Impl_->Initialize(); -} - //////////////////////////////////////////////////////////////////////////////// TFiberMinLogLevelGuard::TFiberMinLogLevelGuard(ELogLevel minLogLevel) diff --git a/yt/yt/core/logging/log_manager.h b/yt/yt/core/logging/log_manager.h index 08598d3080..e4309e3264 100644 --- a/yt/yt/core/logging/log_manager.h +++ b/yt/yt/core/logging/log_manager.h @@ -70,8 +70,6 @@ private: DECLARE_LEAKY_SINGLETON_FRIEND() - void Initialize(); - class TImpl; const TIntrusivePtr<TImpl> Impl_; }; diff --git a/yt/yt/core/misc/async_expiring_cache-inl.h b/yt/yt/core/misc/async_expiring_cache-inl.h index bad6d0a399..eb520f4dff 100644 --- a/yt/yt/core/misc/async_expiring_cache-inl.h +++ b/yt/yt/core/misc/async_expiring_cache-inl.h @@ -343,6 +343,27 @@ void TAsyncExpiringCache<TKey, TValue>::ForceRefresh(const TKey& key, const T& v } template <class TKey, class TValue> +void TAsyncExpiringCache<TKey, TValue>::PingEntry(const TKey& key) +{ + auto now = NProfiling::GetCpuInstant(); + auto guard = ReaderGuard(SpinLock_); + + if (auto it = Map_.find(key); it != Map_.end() && it->second->Promise.IsSet()) { + const auto& entry = it->second; + if (!entry->Promise.Get().IsOK()) { + return; + } + + entry->AccessDeadline = now + NProfiling::DurationToCpuDuration(Config()->ExpireAfterAccessTime); + entry->UpdateDeadline = now + NProfiling::DurationToCpuDuration(Config()->ExpireAfterSuccessfulUpdateTime); + if (!Config()->BatchUpdate) { + ScheduleEntryRefresh(entry, key, Config_->RefreshTime); + } + + } +} + +template <class TKey, class TValue> void TAsyncExpiringCache<TKey, TValue>::Set(const TKey& key, TErrorOr<TValue> valueOrError) { auto isValueOK = valueOrError.IsOK(); diff --git a/yt/yt/core/misc/async_expiring_cache.h b/yt/yt/core/misc/async_expiring_cache.h index 137df01f32..913849ecd1 100644 --- a/yt/yt/core/misc/async_expiring_cache.h +++ b/yt/yt/core/misc/async_expiring_cache.h @@ -92,6 +92,9 @@ protected: virtual bool CanCacheError(const TError& error) noexcept; + //! PingEntry resets refresh timer period and behaves like successful entry update. + void PingEntry(const TKey& key); + private: const NLogging::TLogger Logger_; diff --git a/yt/yt/core/misc/atomic_object-inl.h b/yt/yt/core/misc/atomic_object-inl.h deleted file mode 100644 index 1e76cbf5ab..0000000000 --- a/yt/yt/core/misc/atomic_object-inl.h +++ /dev/null @@ -1,97 +0,0 @@ -#ifndef ATOMIC_OBJECT_INL_H_ -#error "Direct inclusion of this file is not allowed, include atomic_object.h" -// For the sake of sane code completion. -#include "atomic_object.h" -#endif - -namespace NYT { - -//////////////////////////////////////////////////////////////////////////////// - -template <class T> -template <class U> -TAtomicObject<T>::TAtomicObject(U&& u) - : Object_(std::forward<U>(u)) -{ } - -template <class T> -template <class U> -void TAtomicObject<T>::Store(U&& u) -{ - // NB: Using exchange to avoid destructing the old object while holding the lock. - std::ignore = Exchange(std::forward<U>(u)); -} - -template <class T> -template <class U> -T TAtomicObject<T>::Exchange(U&& u) -{ - T tmpObject = std::forward<U>(u); - { - auto guard = WriterGuard(Spinlock_); - std::swap(Object_, tmpObject); - } - return tmpObject; -} - -template <class T> -bool TAtomicObject<T>::CompareExchange(T& expected, const T& desired) -{ - auto guard = WriterGuard(Spinlock_); - if (Object_ == expected) { - auto oldObject = std::move(Object_); - Y_UNUSED(oldObject); - Object_ = desired; - guard.Release(); - return true; - } else { - auto oldExpected = std::move(expected); - Y_UNUSED(oldExpected); - expected = Object_; - guard.Release(); - return false; - } -} - -template <class T> -template <CInvocable<void(T&)> F> -void TAtomicObject<T>::Transform(const F& func) -{ - auto guard = WriterGuard(Spinlock_); - func(Object_); -} - -template <class T> -template <class R, CInvocable<R(const T&)> F> -R TAtomicObject<T>::Read(const F& func) const -{ - auto guard = ReaderGuard(Spinlock_); - return func(Object_); -} - -template <class T> -T TAtomicObject<T>::Load() const -{ - auto guard = ReaderGuard(Spinlock_); - return Object_; -} - -//////////////////////////////////////////////////////////////////////////////// - -template <class TOriginal, class TSerialized> -void ToProto(TSerialized* serialized, const TAtomicObject<TOriginal>& original) -{ - ToProto(serialized, original.Load()); -} - -template <class TOriginal, class TSerialized> -void FromProto(TAtomicObject<TOriginal>* original, const TSerialized& serialized) -{ - TOriginal data; - FromProto(&data, serialized); - original->Store(std::move(data)); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT diff --git a/yt/yt/core/misc/atomic_object.h b/yt/yt/core/misc/atomic_object.h deleted file mode 100644 index 91ef9278b0..0000000000 --- a/yt/yt/core/misc/atomic_object.h +++ /dev/null @@ -1,63 +0,0 @@ -#pragma once - -#include <library/cpp/yt/threading/rw_spin_lock.h> - -#include <library/cpp/yt/misc/concepts.h> - -namespace NYT { - -//////////////////////////////////////////////////////////////////////////////// - -//! A synchronization object to load and store nontrivial object. -//! It looks like atomics but for objects. -template <class T> -class TAtomicObject -{ -public: - TAtomicObject() = default; - - template <class U> - TAtomicObject(U&& u); - - template <class U> - void Store(U&& u); - - //! Atomically replaces the old value with the new one and returns the old value. - template <class U> - T Exchange(U&& u); - - //! Atomically checks if then current value equals #expected. - //! If so, replaces it with #desired and returns |true|. - //! Otherwise, copies it into #expected and returns |false|. - bool CompareExchange(T& expected, const T& desired); - - //! Atomically transforms the value with function #func. - template <CInvocable<void(T&)> F> - void Transform(const F& func); - - //! Atomicaly reads the value with function #func. - template <class R = void, CInvocable<R(const T&)> F> - R Read(const F& func) const; - - T Load() const; - -private: - T Object_; - YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, Spinlock_); -}; - -//////////////////////////////////////////////////////////////////////////////// - -template <class TOriginal, class TSerialized> -void ToProto(TSerialized* serialized, const TAtomicObject<TOriginal>& original); - -template <class TOriginal, class TSerialized> -void FromProto(TAtomicObject<TOriginal>* original, const TSerialized& serialized); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT - -#define ATOMIC_OBJECT_INL_H_ -#include "atomic_object-inl.h" -#undef ATOMIC_OBJECT_INL_H_ diff --git a/yt/yt/core/misc/hazard_ptr.cpp b/yt/yt/core/misc/hazard_ptr.cpp index ce78cbccf2..ec288eddd9 100644 --- a/yt/yt/core/misc/hazard_ptr.cpp +++ b/yt/yt/core/misc/hazard_ptr.cpp @@ -1,7 +1,5 @@ #include "hazard_ptr.h" -#include "private.h" - #include <yt/yt/core/misc/singleton.h> #include <yt/yt/core/misc/proc.h> #include <yt/yt/core/misc/ring_queue.h> @@ -25,10 +23,6 @@ using namespace NConcurrency; //////////////////////////////////////////////////////////////////////////////// -static constexpr auto& Logger = LockFreeLogger; - -//////////////////////////////////////////////////////////////////////////////// - namespace NDetail { //////////////////////////////////////////////////////////////////////////////// @@ -325,16 +319,6 @@ bool THazardPointerManager::DoReclaimHazardPointers(THazardThreadState* threadSt retireList.push(item); }); - YT_LOG_TRACE_IF( - !protectedPointers.empty(), - "Scanning hazard pointers (Candidates: %v, Protected: %v)", - MakeFormattableView(TRingQueueIterableWrapper(retireList), [&] (auto* builder, auto item) { - builder->AppendFormat("%v", TTaggedPtr<void>::Unpack(item.PackedPtr).Ptr); - }), - MakeFormattableView(protectedPointers, [&] (auto* builder, auto ptr) { - builder->AppendFormat("%v", ptr); - })); - size_t pushedCount = 0; auto popCount = retireList.size(); while (popCount-- > 0) { diff --git a/yt/yt/core/misc/hazard_ptr.h b/yt/yt/core/misc/hazard_ptr.h index f6a3bf58ca..40202bbf44 100644 --- a/yt/yt/core/misc/hazard_ptr.h +++ b/yt/yt/core/misc/hazard_ptr.h @@ -4,8 +4,6 @@ #include <yt/yt/core/concurrency/scheduler_api.h> -#include <library/cpp/yt/logging/logger.h> - #include <atomic> namespace NYT { diff --git a/yt/yt/core/misc/ring_queue.h b/yt/yt/core/misc/ring_queue.h index 471871603e..5cabcc6802 100644 --- a/yt/yt/core/misc/ring_queue.h +++ b/yt/yt/core/misc/ring_queue.h @@ -3,6 +3,7 @@ #include "common.h" #include <library/cpp/yt/assert/assert.h> +#include <library/cpp/yt/string/format.h> namespace NYT { @@ -365,10 +366,19 @@ public: : Container_(container) { } + using value_type = TContainer::value_type; + private: TContainer& Container_; }; +namespace NDetail { + +template <class T, class Allocator> +constexpr bool CKnownRange<TRingQueueIterableWrapper<T, Allocator>> = true; + +} // namespace NDetail + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT diff --git a/yt/yt/core/misc/stripped_error.cpp b/yt/yt/core/misc/stripped_error.cpp index 3406f61dac..fbcfb90ff0 100644 --- a/yt/yt/core/misc/stripped_error.cpp +++ b/yt/yt/core/misc/stripped_error.cpp @@ -801,7 +801,7 @@ void AppendError(TStringBuilderBase* builder, const TError& error, int indent) [[unlikely]] if (!tokenizer.ParseNext()) { Cerr << NYT::Format( - "%v *** Empty toke encountered while formatting TError attribute (Key: %v, Value: %v" + "%v *** Empty token encountered while formatting TError attribute (Key: %v, Value: %v)" "(BuilderAccumulatedData: %v)", TInstant::Now(), key, diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp index af4e5f0a7e..ea52305bda 100644 --- a/yt/yt/core/net/connection.cpp +++ b/yt/yt/core/net/connection.cpp @@ -645,6 +645,11 @@ public: !PeerDisconnectedList_.IsFired(); } + bool IsReusable() + { + return IsIdle(); + } + TFuture<void> Abort(const TError& error) { YT_LOG_DEBUG(error, "Aborting connection"); @@ -1213,6 +1218,11 @@ public: return Impl_->IsIdle(); } + bool IsReusable() const override + { + return Impl_->IsReusable(); + } + TFuture<void> Abort() override { return Impl_->Abort(TError(EErrorCode::Aborted, "Connection aborted")); diff --git a/yt/yt/core/net/connection.h b/yt/yt/core/net/connection.h index 4078e11460..3c9998e1d0 100644 --- a/yt/yt/core/net/connection.h +++ b/yt/yt/core/net/connection.h @@ -73,16 +73,19 @@ struct IConnection virtual const TNetworkAddress& GetLocalAddress() const = 0; virtual const TNetworkAddress& GetRemoteAddress() const = 0; - // Returns true if connection is not is failed state and has no - // active IO operations. + //! Returns true if connection is not is failed state and has no + //! active IO operations. virtual bool IsIdle() const = 0; + //! Returns true if connection can be reused by a pool. + virtual bool IsReusable() const = 0; + virtual bool SetNoDelay() = 0; virtual bool SetKeepAlive() = 0; TFuture<void> Abort() override = 0; - // SubscribePeerDisconnect is best effort and is not guaranteed to fire. + //! This callback is best effort and is not guaranteed to fire. virtual void SubscribePeerDisconnect(TCallback<void()> callback) = 0; }; diff --git a/yt/yt/core/rpc/bus/channel.cpp b/yt/yt/core/rpc/bus/channel.cpp index f3959aed7b..294d654528 100644 --- a/yt/yt/core/rpc/bus/channel.cpp +++ b/yt/yt/core/rpc/bus/channel.cpp @@ -19,7 +19,7 @@ #include <yt/yt/core/concurrency/thread_affinity.h> #include <yt/yt/core/misc/finally.h> -#include <yt/yt/core/misc/atomic_object.h> +#include <library/cpp/yt/threading/atomic_object.h> #include <yt/yt/core/tracing/public.h> diff --git a/yt/yt/core/rpc/per_user_request_queue_provider.h b/yt/yt/core/rpc/per_user_request_queue_provider.h index 84a3b0389c..7a68315a01 100644 --- a/yt/yt/core/rpc/per_user_request_queue_provider.h +++ b/yt/yt/core/rpc/per_user_request_queue_provider.h @@ -7,7 +7,7 @@ #include <yt/yt/library/profiling/sensor.h> #include <yt/yt/library/syncmap/map.h> -#include <yt/yt/core/misc/atomic_object.h> +#include <library/cpp/yt/threading/atomic_object.h> namespace NYT::NRpc { diff --git a/yt/yt/core/rpc/stream-inl.h b/yt/yt/core/rpc/stream-inl.h index 4474158d4e..8f7ddd274b 100644 --- a/yt/yt/core/rpc/stream-inl.h +++ b/yt/yt/core/rpc/stream-inl.h @@ -43,11 +43,28 @@ TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream { auto invokeResult = request->Invoke().template As<void>(); auto metaHandlerResult = request->GetResponseAttachmentsStream()->Read() - .Apply(metaHandler); - return metaHandlerResult.Apply(BIND ([=] () { + .Apply(std::move(metaHandler)); + return metaHandlerResult.Apply(BIND ([req = std::move(request), res = std::move(invokeResult)] () mutable { return NDetail::CreateRpcClientOutputStreamFromInvokedRequest( - std::move(request), - std::move(invokeResult)); + std::move(req), + std::move(res)); + })); +} + +template <class TRequestMessage, class TResponse> +TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream( + TIntrusivePtr<TTypedClientRequest<TRequestMessage, TResponse>> request, + TCallback<void(TSharedRef)> metaHandler, + TCallback<void(TIntrusivePtr<TResponse>&&)> rspHandler) +{ + auto invokeResult = request->Invoke() + .ApplyUnique(std::move(rspHandler)); + auto metaHandlerResult = request->GetResponseAttachmentsStream()->Read() + .Apply(std::move(metaHandler)); + return metaHandlerResult.Apply(BIND ([req = std::move(request), res = std::move(invokeResult)] () mutable { + return NDetail::CreateRpcClientOutputStreamFromInvokedRequest( + std::move(req), + std::move(res)); })); } diff --git a/yt/yt/core/rpc/stream.h b/yt/yt/core/rpc/stream.h index 93e54155a8..f2220e7425 100644 --- a/yt/yt/core/rpc/stream.h +++ b/yt/yt/core/rpc/stream.h @@ -265,6 +265,13 @@ TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream TIntrusivePtr<TTypedClientRequest<TRequestMessage, TResponse>> request, TCallback<void(TSharedRef)> metaHandler); +//! This variant additionally allows non-trivial response of streaming request to be handled. +template <class TRequestMessage, class TResponse> +TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream( + TIntrusivePtr<TTypedClientRequest<TRequestMessage, TResponse>> request, + TCallback<void(TSharedRef)> metaHandler, + TCallback<void(TIntrusivePtr<TResponse>&&)> rspHandler); + //////////////////////////////////////////////////////////////////////////////// //! Handles an incoming streaming request that uses the #CreateRpcClientInputStream diff --git a/yt/yt/core/rpc/unittests/lib/test_service.cpp b/yt/yt/core/rpc/unittests/lib/test_service.cpp index afd60131da..8ac8d4720d 100644 --- a/yt/yt/core/rpc/unittests/lib/test_service.cpp +++ b/yt/yt/core/rpc/unittests/lib/test_service.cpp @@ -21,6 +21,7 @@ using namespace NConcurrency; //////////////////////////////////////////////////////////////////////////////// YT_DEFINE_GLOBAL(std::unique_ptr<NThreading::TEvent>, Latch_); +YT_DEFINE_GLOBAL(std::atomic<int>, ConcurrentCalls_, 0); //////////////////////////////////////////////////////////////////////////////// @@ -187,7 +188,7 @@ public: { try { context->SetRequestInfo(); - TDelayedExecutor::WaitForDuration(TDuration::Seconds(2)); + TDelayedExecutor::WaitForDuration(TDuration::Max()); context->Reply(); } catch (const TFiberCanceledException&) { SlowCallCanceled_.Set(); @@ -202,6 +203,10 @@ public: DECLARE_RPC_SERVICE_METHOD(NTestRpc, RequestBytesThrottledCall) { + THROW_ERROR_EXCEPTION_UNLESS(ConcurrentCalls_().fetch_add(1) == 0, "Too many concurrent calls on entry!"); + Sleep(TDuration::MilliSeconds(100)); + THROW_ERROR_EXCEPTION_UNLESS(ConcurrentCalls_().fetch_sub(1) == 1, "Too many concurrent calls on exit!"); + context->Reply(); } diff --git a/yt/yt/core/rpc/unittests/rpc_ut.cpp b/yt/yt/core/rpc/unittests/rpc_ut.cpp index f367c57db4..157ee8faaa 100644 --- a/yt/yt/core/rpc/unittests/rpc_ut.cpp +++ b/yt/yt/core/rpc/unittests/rpc_ut.cpp @@ -622,6 +622,8 @@ TYPED_TEST(TNotGrpcTest, Compression) TYPED_TEST(TRpcTest, ResponseMemoryTag) { + // FIXME: YT-23048 + return; static TMemoryTag testMemoryTag = 12345; testMemoryTag++; auto initialMemoryUsage = GetMemoryUsageForTag(testMemoryTag); @@ -659,7 +661,7 @@ TYPED_TEST(TNotGrpcTest, RequestBytesThrottling) methods = { RequestBytesThrottledCall = { request_bytes_throttler = { - limit = 1000000; + limit = 100000; } } } @@ -673,18 +675,16 @@ TYPED_TEST(TNotGrpcTest, RequestBytesThrottling) auto makeCall = [&] { auto req = proxy.RequestBytesThrottledCall(); - req->Attachments().push_back(TSharedMutableRef::Allocate(100'000)); + req->Attachments().push_back(TSharedMutableRef::Allocate(60'000)); return req->Invoke().AsVoid(); }; std::vector<TFuture<void>> futures; - for (int i = 0; i < 30; ++i) { + for (int i = 0; i < 5; ++i) { futures.push_back(makeCall()); } - NProfiling::TWallTimer timer; EXPECT_TRUE(AllSucceeded(std::move(futures)).Get().IsOK()); - EXPECT_LE(std::abs(static_cast<i64>(timer.GetElapsedTime().MilliSeconds()) - 3000), 200); } // Now test different types of errors. diff --git a/yt/yt/core/test_framework/test_proxy_service.h b/yt/yt/core/test_framework/test_proxy_service.h index 3f2c99415e..26c0bde173 100644 --- a/yt/yt/core/test_framework/test_proxy_service.h +++ b/yt/yt/core/test_framework/test_proxy_service.h @@ -10,7 +10,7 @@ #include <yt/yt/core/logging/log.h> -#include <yt/yt/core/misc/atomic_object.h> +#include <library/cpp/yt/threading/atomic_object.h> #include <yt/yt/core/ytree/attributes.h> diff --git a/yt/yt/core/yson/pull_parser_deserialize-inl.h b/yt/yt/core/yson/pull_parser_deserialize-inl.h index b0f6b9d8ca..78e343d917 100644 --- a/yt/yt/core/yson/pull_parser_deserialize-inl.h +++ b/yt/yt/core/yson/pull_parser_deserialize-inl.h @@ -8,6 +8,8 @@ #include <yt/yt/core/yson/token_writer.h> +#include <library/cpp/yt/misc/cast.h> + #include <vector> namespace NYT::NYson { @@ -239,6 +241,18 @@ void Deserialize(T& value, TYsonPullParserCursor* cursor) } } +template <class T> +requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T> +void Deserialize(T& value, TYsonPullParserCursor* cursor) +{ + static_assert(CanFitSubtype<i64, std::underlying_type_t<T>>()); + + MaybeSkipAttributes(cursor); + EnsureYsonToken("enum", *cursor, EYsonItemType::Int64Value); + value = static_cast<T>(CheckedIntegralCast<std::underlying_type_t<T>>((*cursor)->UncheckedAsInt64())); + cursor->Next(); +} + // TCompactVector template <class T, size_t N> void Deserialize(TCompactVector<T, N>& value, TYsonPullParserCursor* cursor, std::enable_if_t<ArePullParserDeserializable<T>(), void*>) diff --git a/yt/yt/core/yson/pull_parser_deserialize.h b/yt/yt/core/yson/pull_parser_deserialize.h index 5f6215180b..669b976394 100644 --- a/yt/yt/core/yson/pull_parser_deserialize.h +++ b/yt/yt/core/yson/pull_parser_deserialize.h @@ -113,6 +113,9 @@ void Deserialize( template <class T> requires TEnumTraits<T>::IsEnum void Deserialize(T& value, TYsonPullParserCursor* cursor); +template <class T> + requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T> +void Deserialize(T& value, TYsonPullParserCursor* cursor); // TCompactVector. template <class T, size_t N> diff --git a/yt/yt/core/ytree/fluent.h b/yt/yt/core/ytree/fluent.h index ec0c51fd7e..366ca9794d 100644 --- a/yt/yt/core/ytree/fluent.h +++ b/yt/yt/core/ytree/fluent.h @@ -88,7 +88,8 @@ namespace NYT::NYTree { // and close map; // * DoList(TFuncList func) -> TAny, same as DoMap(); // * DoListFor(TCollection collection, TFuncList func) -> TAny; same as DoMapFor(). -// +// * DoAttributes(TFuncAttributes func) -> TAny, open attributes, delegate invocation +// to a separate procedure and close attributes; // // TFluentMap: // * Item(TStringBuf key) -> TAny, open an element keyed with `key`; @@ -407,6 +408,14 @@ public: this->Consumer, TAnyWithoutAttributes<TParent>(this->Consumer, std::move(this->Parent))); } + + TAnyWithoutAttributes<TParent> DoAttributes(auto funcMap) + { + this->Consumer->OnBeginAttributes(); + InvokeFluentFunc<TFluentAttributes<TFluentYsonVoid>>(funcMap, this->Consumer); + this->Consumer->OnEndAttributes(); + return TAnyWithoutAttributes<TParent>(this->Consumer, std::move(this->Parent)); + } }; template <class TParent = TFluentYsonVoid> diff --git a/yt/yt/core/ytree/serialize-inl.h b/yt/yt/core/ytree/serialize-inl.h index fe1ce789e6..01fc90c73a 100644 --- a/yt/yt/core/ytree/serialize-inl.h +++ b/yt/yt/core/ytree/serialize-inl.h @@ -344,6 +344,14 @@ void Serialize(T value, NYson::IYsonConsumer* consumer) } } +template <class T> + requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T> +void Serialize(T value, NYson::IYsonConsumer* consumer) +{ + static_assert(CanFitSubtype<i64, std::underlying_type_t<T>>()); + consumer->OnInt64Scalar(static_cast<i64>(value)); +} + // std::optional template <class T> void Serialize(const std::optional<T>& value, NYson::IYsonConsumer* consumer) @@ -528,6 +536,23 @@ void Deserialize(T& value, INodePtr node) } } +template <class T> + requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T> +void Deserialize(T& value, INodePtr node) +{ + switch (node->GetType()) { + case ENodeType::Int64: { + // TODO: CheckedEnumCast via __PRETTY_FUNCTION__? + i64 serialized = node->AsInt64()->GetValue(); + value = static_cast<T>(CheckedIntegralCast<std::underlying_type_t<T>>(serialized)); + break; + } + default: + THROW_ERROR_EXCEPTION("Cannot deserialize enum from %Qlv node", + node->GetType()); + } +} + // std::optional template <class T> void Deserialize(std::optional<T>& value, INodePtr node) diff --git a/yt/yt/core/ytree/serialize.h b/yt/yt/core/ytree/serialize.h index 3aa784ac8d..2d5185f18d 100644 --- a/yt/yt/core/ytree/serialize.h +++ b/yt/yt/core/ytree/serialize.h @@ -109,6 +109,9 @@ void Serialize(IInputStream& input, NYson::IYsonConsumer* consumer); template <class T> requires TEnumTraits<T>::IsEnum void Serialize(T value, NYson::IYsonConsumer* consumer); +template <class T> + requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T> +void Serialize(T value, NYson::IYsonConsumer* consumer); // std::optional template <class T> @@ -218,6 +221,9 @@ void Deserialize(TGuid& value, INodePtr node); template <class T> requires TEnumTraits<T>::IsEnum void Deserialize(T& value, INodePtr node); +template <class T> + requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T> +void Deserialize(T& value, INodePtr node); // std::optional template <class T> diff --git a/yt/yt/core/ytree/tree_builder.h b/yt/yt/core/ytree/tree_builder.h index 2af4d56a8d..fd959280d7 100644 --- a/yt/yt/core/ytree/tree_builder.h +++ b/yt/yt/core/ytree/tree_builder.h @@ -52,4 +52,3 @@ std::unique_ptr<ITreeBuilder> CreateBuilderFromFactory(INodeFactory* factory); //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NYTree - diff --git a/yt/yt/core/ytree/unittests/serialize_ut.cpp b/yt/yt/core/ytree/unittests/serialize_ut.cpp index ee36997f70..63bf71b0d2 100644 --- a/yt/yt/core/ytree/unittests/serialize_ut.cpp +++ b/yt/yt/core/ytree/unittests/serialize_ut.cpp @@ -51,6 +51,12 @@ DEFINE_BIT_ENUM(ETestBitEnum, ((Green) (0x0004)) ); +enum class EPlainTestEnum +{ + First, + Second, +}; + template <typename T> T PullParserConvert(TYsonStringBuf s) { @@ -427,6 +433,13 @@ TEST(TSerializationTest, SerializableArcadiaEnum) } } +TEST(TSerializationTest, PlainEnum) +{ + TestSerializationDeserialization(EPlainTestEnum::First); + + TestSerializationDeserialization(static_cast<EPlainTestEnum>(42)); +} + TEST(TYTreeSerializationTest, Protobuf) { NProto::TTestMessage message; diff --git a/yt/yt/core/ytree/unittests/ytree_fluent_ut.cpp b/yt/yt/core/ytree/unittests/ytree_fluent_ut.cpp index 51f43f0e07..8b2e9b8970 100644 --- a/yt/yt/core/ytree/unittests/ytree_fluent_ut.cpp +++ b/yt/yt/core/ytree/unittests/ytree_fluent_ut.cpp @@ -15,8 +15,6 @@ using ::testing::StrictMock; //////////////////////////////////////////////////////////////////////////////// -// TODO(sandello): Fix this test under clang. -#ifndef __clang__ // String-like Scalars {{{ //////////////////////////////////////////////////////////////////////////////// @@ -187,7 +185,7 @@ TEST(TYTreeFluentMapTest, Items) StrictMock<TMockYsonConsumer> mock; InSequence dummy; - auto node = ConvertToNode(TYsonString("{bar = 10}")); + auto node = ConvertToNode(TYsonString(TString("{bar = 10}"))); EXPECT_CALL(mock, OnBeginMap()); EXPECT_CALL(mock, OnKeyedItem("bar")); @@ -275,7 +273,7 @@ TEST(TYTreeFluentListTest, Items) StrictMock<TMockYsonConsumer> mock; InSequence dummy; - auto node = ConvertToNode(TYsonString("[10; 20; 30]")); + auto node = ConvertToNode(TYsonString(TString("[10; 20; 30]"))); EXPECT_CALL(mock, OnBeginList()); EXPECT_CALL(mock, OnListItem()); @@ -420,8 +418,37 @@ TEST(TYTreeFluentTest, Complex) .EndList(); } -//////////////////////////////////////////////////////////////////////////////// -#endif +TEST(TYTreeFluentTest, DoMap) +{ + StrictMock<TMockYsonConsumer> mock; + + EXPECT_CALL(mock, OnBeginMap()); + EXPECT_CALL(mock, OnKeyedItem("key")); + EXPECT_CALL(mock, OnEntity); + EXPECT_CALL(mock, OnEndMap()); + + BuildYsonFluently(&mock) + .DoMap([] (TFluentMap map) { + map.Item("key").Entity(); + }); +} + +TEST(TYTreeFluentTest, DoAttributes) +{ + StrictMock<TMockYsonConsumer> mock; + + EXPECT_CALL(mock, OnBeginAttributes()); + EXPECT_CALL(mock, OnKeyedItem("key")); + EXPECT_CALL(mock, OnStringScalar("value")); + EXPECT_CALL(mock, OnEndAttributes()); + EXPECT_CALL(mock, OnEntity()); + + BuildYsonFluently(&mock) + .DoAttributes([] (TFluentAttributes attributes) { + attributes.Item("key").Value("value"); + }) + .Entity(); +} //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/ytree/ypath_service.cpp b/yt/yt/core/ytree/ypath_service.cpp index b078379c81..212fb9f76f 100644 --- a/yt/yt/core/ytree/ypath_service.cpp +++ b/yt/yt/core/ytree/ypath_service.cpp @@ -22,7 +22,7 @@ #include <yt/yt/core/concurrency/periodic_executor.h> #include <yt/yt/core/misc/checksum.h> -#include <yt/yt/core/misc/atomic_object.h> +#include <library/cpp/yt/threading/atomic_object.h> #include <library/cpp/yt/memory/atomic_intrusive_ptr.h> diff --git a/yt/yt/library/decimal/decimal.cpp b/yt/yt/library/decimal/decimal.cpp index e1674f84e2..3df4b44028 100644 --- a/yt/yt/library/decimal/decimal.cpp +++ b/yt/yt/library/decimal/decimal.cpp @@ -12,24 +12,263 @@ namespace NYT::NDecimal { //////////////////////////////////////////////////////////////////////////////// +// We use the same type used for binary representation of 256 bit decimals for +// implementing the necessary arithmetic operations. +using i256 = TDecimal::TValue256; + +// We chose to only implement a small subset of operations and functions necessary +// for converting 256 bit decimals to and from text. +// Rationale: there do not seem to be any good implementations of 256-bit arithmetic +// that we are comfortable depending on (for now). There are some big number +// implementations around openssl, but these types are intended for arbitrarily +// large integers and use dynamic allocations. It also does not make sense to commit +// to implementing full-fledged 256-bit arithmetics when we actually use a minority +// of operations. +// When we have a good enough int256 either in library/util or in contrib, we can +// switch to it easily. + +//////////////////////////////////////////////////////////////////////////////// + +constexpr i256 operator-(i256 value) noexcept +{ + // Invert. + for (int partIndex = 0; partIndex < std::ssize(value.Parts); ++partIndex) { + value.Parts[partIndex] = ~value.Parts[partIndex]; + } + + // Add 1. + for (int partIndex = 0; partIndex < std::ssize(value.Parts) && ++value.Parts[partIndex] == 0; ++partIndex) { } + + return value; +} + +constexpr std::strong_ordering operator<=>(const i256& lhs, const i256& rhs) +{ + bool lhsIsNegative = lhs.Parts.back() & (1u << 31); + bool rhsIsNegative = rhs.Parts.back() & (1u << 31); + + if (lhsIsNegative && !rhsIsNegative) { + return std::strong_ordering::less; + } + + if (!lhsIsNegative && rhsIsNegative) { + return std::strong_ordering::greater; + } + + for (int partIndex = std::ssize(lhs.Parts) - 1; partIndex >= 0; --partIndex) { + if (lhs.Parts[partIndex] != rhs.Parts[partIndex]) { + return lhs.Parts[partIndex] <=> rhs.Parts[partIndex]; + } + } + + return std::strong_ordering::equal; +} + +// Not synthesized by default :( +constexpr bool operator==(const i256& lhs, const i256& rhs) +{ + return lhs.Parts == rhs.Parts; +} + +//////////////////////////////////////////////////////////////////////////////// + +// Some operations require working with an explicitly unsigned type, since they +// might cause integer overflow, which is not very acceptable for signed types. +// It is also easier to implement some arithmetic operations (like *= 10) by +// shifting bits. +struct TUnsignedValue256 +{ + std::array<ui32, 8> Parts; +}; + +using ui256 = TUnsignedValue256; + +static_assert(sizeof(ui256) == sizeof(i256)); + +//////////////////////////////////////////////////////////////////////////////// + +constexpr bool operator==(const ui256& lhs, const ui256& rhs) +{ + return lhs.Parts == rhs.Parts; +} + +constexpr ui256 operator+(ui256 lhs, const ui256& rhs) +{ + ui64 carry = 0; + for (int partIndex = 0; partIndex < std::ssize(lhs.Parts); ++partIndex) { + carry += lhs.Parts[partIndex]; + carry += rhs.Parts[partIndex]; + lhs.Parts[partIndex] = carry; + carry >>= 32; + } + + return lhs; +} + +template <int Shift> +Y_FORCE_INLINE constexpr ui256 ShiftUp(ui256 value) +{ + static_assert(Shift >= 0 && Shift <= 32); + + value.Parts.back() <<= Shift; + for (int partIndex = std::ssize(value.Parts) - 2; partIndex >= 0; --partIndex) { + value.Parts[partIndex + 1] |= value.Parts[partIndex] >> (32 - Shift); + value.Parts[partIndex] <<= Shift; + } + + return value; +} + +//////////////////////////////////////////////////////////////////////////////// + template <typename T> constexpr bool ValidDecimalUnderlyingInteger = std::is_same_v<T, i32> || std::is_same_v<T, i64> || - std::is_same_v<T, i128>; + std::is_same_v<T, i128> || + std::is_same_v<T, i256>; + +template <typename T> +constexpr bool ValidDecimalUnderlyingUnsignedInteger = + std::is_same_v<T, ui32> || + std::is_same_v<T, ui64> || + std::is_same_v<T, ui128> || + std::is_same_v<T, ui256>; + +template <typename T> +Y_FORCE_INLINE constexpr T GetNan() +{ + if constexpr (std::is_same_v<T, i256>) { + constexpr i32 i32Max = std::numeric_limits<i32>::max(); + constexpr ui32 ui32Max = std::numeric_limits<ui32>::max(); + + return {ui32Max, ui32Max, ui32Max, ui32Max, ui32Max, ui32Max, ui32Max, i32Max}; + } else { + return std::numeric_limits<T>::max(); + } +} + +template <typename T> +Y_FORCE_INLINE constexpr T GetPlusInf() +{ + if constexpr (std::is_same_v<T, i256>) { + constexpr i32 i32Max = std::numeric_limits<i32>::max(); + constexpr ui32 ui32Max = std::numeric_limits<ui32>::max(); + + return {ui32Max - 1, ui32Max, ui32Max, ui32Max, ui32Max, ui32Max, ui32Max, i32Max}; + } else { + return std::numeric_limits<T>::max() - 1; + } +} template <typename T> struct TDecimalTraits { static_assert(ValidDecimalUnderlyingInteger<T>); - static constexpr T Nan = std::numeric_limits<T>::max(); - static constexpr T PlusInf = std::numeric_limits<T>::max() - 1; + static constexpr T Nan = GetNan<T>(); + static constexpr T PlusInf = GetPlusInf<T>(); static constexpr T MinusInf = -PlusInf; - - static constexpr T MinSpecialValue = PlusInf; }; +template <typename T> +Y_FORCE_INLINE constexpr bool IsNegativeInteger(T value) +{ + static_assert(ValidDecimalUnderlyingInteger<T>); + + if constexpr (std::is_same_v<T, i256>) { + return value.Parts.back() & (1u << 31); + } else { + return value < 0; + } +} + +template <typename T> +Y_FORCE_INLINE constexpr auto DecimalIntegerToUnsigned(T value) +{ + static_assert(ValidDecimalUnderlyingInteger<T>); + + if constexpr (std::is_same_v<T, i256>) { + return ui256{value.Parts}; + } else if constexpr (std::is_same_v<T, i128>) { + return ui128(value); + } else { + using TU = std::make_unsigned_t<T>; + return static_cast<TU>(value); + } +} + +template <typename T> +Y_FORCE_INLINE constexpr auto DecimalIntegerToSigned(T value) +{ + static_assert(ValidDecimalUnderlyingUnsignedInteger<T>); + + if constexpr (std::is_same_v<T, ui256>) { + return i256{value.Parts}; + } else if constexpr (std::is_same_v<T, ui128>) { + return i128(value); + } else { + using TS = std::make_signed_t<T>; + return static_cast<TS>(value); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +//! The functions below are used for implementing conversion to/from text to binary decimal. +//! We actually do not need any arithmetic operations beside v *= 10, v /= 10, addition and negation. +//! so they are the only ones actually implemented for our custom i256/ui256. + +template <typename T> +Y_FORCE_INLINE constexpr auto FlipMSB(T value) +{ + static_assert(ValidDecimalUnderlyingInteger<T>); + + if constexpr (std::is_same_v<T, i256>) { + value.Parts.back() ^= (1u << 31); + return value; + } else { + constexpr auto One = DecimalIntegerToUnsigned(T{1}); + // Bit operations are only valid with unsigned types. + return T(DecimalIntegerToUnsigned(value) ^ (One << (sizeof(T) * 8 - 1))); + } +} + +template <typename T> +Y_FORCE_INLINE constexpr ui32 GetNextDigit(T value, T* nextValue) +{ + static_assert(ValidDecimalUnderlyingUnsignedInteger<T>); + + if constexpr (std::is_same_v<T, ui256>) { + ui64 remainder = 0; + for (int partIndex = std::ssize(value.Parts) - 1; partIndex >= 0; --partIndex) { + // Everything should fit into long long since we are dividing by 10. + auto step = std::lldiv(value.Parts[partIndex] + (remainder << 32), 10); + value.Parts[partIndex] = step.quot; + remainder = step.rem; + } + *nextValue = value; + return remainder; + } else { + constexpr auto Ten = T{10}; + *nextValue = value / Ten; + return static_cast<ui32>(value % Ten); + } +} + +template <typename T> +Y_FORCE_INLINE constexpr T MultiplyByTen(T value) +{ + static_assert(ValidDecimalUnderlyingUnsignedInteger<T>); + + if constexpr (std::is_same_v<T, ui256>) { + // 2 * (4 * v + v) = 10v. + return ShiftUp<1>(ShiftUp<2>(value) + value); + } else { + return value * DecimalIntegerToUnsigned(10); + } +} + //////////////////////////////////////////////////////////////////////////////// constexpr int GetDecimalBinaryValueSize(int precision) @@ -39,34 +278,41 @@ constexpr int GetDecimalBinaryValueSize(int precision) return 4; } else if (precision <= 18) { return 8; - } else if (precision <= 35) { + } else if (precision <= 38) { return 16; + } else if (precision <= 76) { + return 32; } } return 0; } -static constexpr i128 DecimalIntegerMaxValueTable[] = { - i128{0}, // 0 - i128{9}, // 1 - i128{99}, // 2 - i128{999}, // 3 - i128{9999}, // 4 - i128{99999}, // 5 - i128{999999}, // 6 - i128{9999999}, // 7 - i128{99999999}, // 8 - i128{999999999}, // 9 - i128{9999999999ul}, // 10 - i128{99999999999ul}, // 11 - i128{999999999999ul}, // 12 - i128{9999999999999ul}, // 13 - i128{99999999999999ul}, // 14 - i128{999999999999999ul}, // 15 - i128{9999999999999999ul}, // 16 - i128{99999999999999999ul}, // 17 - i128{999999999999999999ul}, // 18 +static constexpr i32 Decimal32IntegerMaxValueTable[] = { + 0, // 0 + 9, // 1 + 99, // 2 + 999, // 3 + 9999, // 4 + 99999, // 5 + 999999, // 6 + 9999999, // 7 + 99999999, // 8 + 999999999, // 9 +}; +static constexpr i64 Decimal64IntegerMaxValueTable[] = { + 9999999999ul, // 10 + 99999999999ul, // 11 + 999999999999ul, // 12 + 9999999999999ul, // 13 + 99999999999999ul, // 14 + 999999999999999ul, // 15 + 9999999999999999ul, // 16 + 99999999999999999ul, // 17 + 999999999999999999ul, // 18 +}; + +static constexpr i128 Decimal128IntegerMaxValueTable[] = { // 128 bits // // Generated by fair Python script: @@ -79,7 +325,7 @@ static constexpr i128 DecimalIntegerMaxValueTable[] = { // hex_value[-16:], // hex_value[:-16] or "0", // precision)) - // for i in range(19, 36): + // for i in range(19, 39): // print_max_decimal(i) // i128{static_cast<ui64>(0x8ac7230489e7fffful)} | (i128{static_cast<ui64>(0x0ul)} << 64), // 19 @@ -99,37 +345,106 @@ static constexpr i128 DecimalIntegerMaxValueTable[] = { i128{static_cast<ui64>(0x38c15b09fffffffful)} | (i128{static_cast<ui64>(0x314dc6448d93ul)} << 64), // 33 i128{static_cast<ui64>(0x378d8e63fffffffful)} | (i128{static_cast<ui64>(0x1ed09bead87c0ul)} << 64), // 34 i128{static_cast<ui64>(0x2b878fe7fffffffful)} | (i128{static_cast<ui64>(0x13426172c74d82ul)} << 64), // 35 + i128{static_cast<ui64>(0xb34b9f0ffffffffful)} | (i128{static_cast<ui64>(0xc097ce7bc90715ul)} << 64), // 36 + i128{static_cast<ui64>(0x00f4369ffffffffful)} | (i128{static_cast<ui64>(0x785ee10d5da46d9ul)} << 64), // 37 + i128{static_cast<ui64>(0x098a223ffffffffful)} | (i128{static_cast<ui64>(0x4b3b4ca85a86c47aul)} << 64), // 38 +}; + +static constexpr i256 Decimal256IntegerMaxValueTable[] = { + // 256 bits + // + // Generated by fair Python script: + // + // def print_max_decimal(precision): + // max_value = int("9" * precision) + // hex_value = hex(max_value)[2:] # strip 0x + // hex_value = hex_value.strip("L") + // parts = [hex_value[-8 * i:-8 * (i - 1) if i > 1 else len(hex_value)] or "0" for i in range(1, 9)] + // assert sum(int(v, 16) * 2 ** (32 * i) for i, v in enumerate(parts)) == max_value + // assert int(parts[-1], 16) < 2 ** 31 - 1 + // joined_parts = ", ".join(f"static_cast<ui32>(0x{part}u)" for part in parts) + // print(f"{{{joined_parts}}}, // {precision}") + // + // for i in range(39, 77): + // print_max_decimal(i) + // + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x5f65567fu), static_cast<ui32>(0x8943acc4u), static_cast<ui32>(0xf050fe93u), static_cast<ui32>(0x2u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 39 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xb9f560ffu), static_cast<ui32>(0x5ca4bfabu), static_cast<ui32>(0x6329f1c3u), static_cast<ui32>(0x1du), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 40 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x4395c9ffu), static_cast<ui32>(0x9e6f7cb5u), static_cast<ui32>(0xdfa371a1u), static_cast<ui32>(0x125u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 41 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xa3d9e3ffu), static_cast<ui32>(0x305adf14u), static_cast<ui32>(0xbc627050u), static_cast<ui32>(0xb7au), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 42 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x6682e7ffu), static_cast<ui32>(0xe38cb6ceu), static_cast<ui32>(0x5bd86321u), static_cast<ui32>(0x72cbu), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 43 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x011d0fffu), static_cast<ui32>(0xe37f2410u), static_cast<ui32>(0x9673df52u), static_cast<ui32>(0x47bf1u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 44 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x0b229fffu), static_cast<ui32>(0xe2f768a0u), static_cast<ui32>(0xe086b93cu), static_cast<ui32>(0x2cd76fu), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 45 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x6f5a3fffu), static_cast<ui32>(0xddaa1640u), static_cast<ui32>(0xc5433c60u), static_cast<ui32>(0x1c06a5eu), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 46 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x59867fffu), static_cast<ui32>(0xa8a4de84u), static_cast<ui32>(0xb4a05bc8u), static_cast<ui32>(0x118427b3u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 47 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x7f40ffffu), static_cast<ui32>(0x9670b12bu), static_cast<ui32>(0x0e4395d6u), static_cast<ui32>(0xaf298d05u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 48 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xf889ffffu), static_cast<ui32>(0xe066ebb2u), static_cast<ui32>(0x8ea3da61u), static_cast<ui32>(0xd79f8232u), static_cast<ui32>(0x6u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 49 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xb563ffffu), static_cast<ui32>(0xc40534fdu), static_cast<ui32>(0x926687d2u), static_cast<ui32>(0x6c3b15f9u), static_cast<ui32>(0x44u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 50 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x15e7ffffu), static_cast<ui32>(0xa83411e9u), static_cast<ui32>(0xb8014e3bu), static_cast<ui32>(0x3a4edbbfu), static_cast<ui32>(0x2acu), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 51 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xdb0fffffu), static_cast<ui32>(0x9208b31au), static_cast<ui32>(0x300d0e54u), static_cast<ui32>(0x4714957du), static_cast<ui32>(0x1abau), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 52 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x8e9fffffu), static_cast<ui32>(0xb456ff0cu), static_cast<ui32>(0xe0828f4du), static_cast<ui32>(0xc6cdd6e3u), static_cast<ui32>(0x10b46u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 53 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x923fffffu), static_cast<ui32>(0x0b65f67du), static_cast<ui32>(0xc5199909u), static_cast<ui32>(0xc40a64e6u), static_cast<ui32>(0xa70c3u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 54 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xb67fffffu), static_cast<ui32>(0x71fba0e7u), static_cast<ui32>(0xb2fffa5au), static_cast<ui32>(0xa867f103u), static_cast<ui32>(0x6867a5u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 55 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x20ffffffu), static_cast<ui32>(0x73d4490du), static_cast<ui32>(0xfdffc788u), static_cast<ui32>(0x940f6a24u), static_cast<ui32>(0x4140c78u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 56 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x49ffffffu), static_cast<ui32>(0x864ada83u), static_cast<ui32>(0xebfdcb54u), static_cast<ui32>(0xc89a2571u), static_cast<ui32>(0x28c87cb5u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 57 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xe3ffffffu), static_cast<ui32>(0x3eec8920u), static_cast<ui32>(0x37e9f14du), static_cast<ui32>(0xd6057673u), static_cast<ui32>(0x97d4df19u), static_cast<ui32>(0x1u), static_cast<ui32>(0x0u)}, // 58 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xe7ffffffu), static_cast<ui32>(0x753d5b48u), static_cast<ui32>(0x2f236d04u), static_cast<ui32>(0x5c36a080u), static_cast<ui32>(0xee50b702u), static_cast<ui32>(0xfu), static_cast<ui32>(0x0u)}, // 59 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x0fffffffu), static_cast<ui32>(0x946590d9u), static_cast<ui32>(0xd762422cu), static_cast<ui32>(0x9a224501u), static_cast<ui32>(0x4f272617u), static_cast<ui32>(0x9fu), static_cast<ui32>(0x0u)}, // 60 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x9fffffffu), static_cast<ui32>(0xcbf7a87au), static_cast<ui32>(0x69d695bdu), static_cast<ui32>(0x0556b212u), static_cast<ui32>(0x17877cecu), static_cast<ui32>(0x639u), static_cast<ui32>(0x0u)}, // 61 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x3fffffffu), static_cast<ui32>(0xf7ac94cau), static_cast<ui32>(0x2261d969u), static_cast<ui32>(0x3562f4b8u), static_cast<ui32>(0xeb4ae138u), static_cast<ui32>(0x3e3au), static_cast<ui32>(0x0u)}, // 62 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x7fffffffu), static_cast<ui32>(0xacbdcfe6u), static_cast<ui32>(0x57d27e23u), static_cast<ui32>(0x15dd8f31u), static_cast<ui32>(0x30eccc32u), static_cast<ui32>(0x26e4du), static_cast<ui32>(0x0u)}, // 63 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xbf6a1f00u), static_cast<ui32>(0x6e38ed64u), static_cast<ui32>(0xdaa797edu), static_cast<ui32>(0xe93ff9f4u), static_cast<ui32>(0x184f03u), static_cast<ui32>(0x0u)}, // 64 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x7a253609u), static_cast<ui32>(0x4e3945efu), static_cast<ui32>(0x8a8bef46u), static_cast<ui32>(0x1c7fc390u), static_cast<ui32>(0xf31627u), static_cast<ui32>(0x0u)}, // 65 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xc5741c63u), static_cast<ui32>(0x0e3cbb5au), static_cast<ui32>(0x697758bfu), static_cast<ui32>(0x1cfda3a5u), static_cast<ui32>(0x97edd87u), static_cast<ui32>(0x0u)}, // 66 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xb6891be7u), static_cast<ui32>(0x8e5f518bu), static_cast<ui32>(0x1ea97776u), static_cast<ui32>(0x21e86476u), static_cast<ui32>(0x5ef4a747u), static_cast<ui32>(0x0u)}, // 67 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x215b170fu), static_cast<ui32>(0x8fb92f75u), static_cast<ui32>(0x329eaaa1u), static_cast<ui32>(0x5313ec9du), static_cast<ui32>(0xb58e88c7u), static_cast<ui32>(0x3u)}, // 68 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x4d8ee69fu), static_cast<ui32>(0x9d3bda93u), static_cast<ui32>(0xfa32aa4fu), static_cast<ui32>(0x3ec73e23u), static_cast<ui32>(0x179157c9u), static_cast<ui32>(0x25u)}, // 69 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x0795023fu), static_cast<ui32>(0x245689c1u), static_cast<ui32>(0xc5faa71cu), static_cast<ui32>(0x73c86d67u), static_cast<ui32>(0xebad6ddcu), static_cast<ui32>(0x172u)}, // 70 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x4bd2167fu), static_cast<ui32>(0x6b61618au), static_cast<ui32>(0xbbca8719u), static_cast<ui32>(0x85d4460du), static_cast<ui32>(0x34c64a9cu), static_cast<ui32>(0xe7du)}, // 71 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xf634e0ffu), static_cast<ui32>(0x31cdcf66u), static_cast<ui32>(0x55e946feu), static_cast<ui32>(0x3a4abc89u), static_cast<ui32>(0x0fbeea1du), static_cast<ui32>(0x90e4u)}, // 72 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x9e10c9ffu), static_cast<ui32>(0xf20a1a05u), static_cast<ui32>(0x5b1cc5edu), static_cast<ui32>(0x46eb5d5du), static_cast<ui32>(0x9d752524u), static_cast<ui32>(0x5a8e8u)}, // 73 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x2ca7e3ffu), static_cast<ui32>(0x74650438u), static_cast<ui32>(0x8f1fbb4bu), static_cast<ui32>(0xc531a5a5u), static_cast<ui32>(0x2693736au), static_cast<ui32>(0x389916u)}, // 74 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xbe8ee7ffu), static_cast<ui32>(0x8bf22a31u), static_cast<ui32>(0x973d50f2u), static_cast<ui32>(0xb3f07877u), static_cast<ui32>(0x81c2822bu), static_cast<ui32>(0x235faddu)}, // 75 + {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x71950fffu), static_cast<ui32>(0x7775a5f1u), static_cast<ui32>(0xe8652979u), static_cast<ui32>(0x0764b4abu), static_cast<ui32>(0x119915b5u), static_cast<ui32>(0x161bcca7u)}, // 76 }; template<typename T> -Y_FORCE_INLINE constexpr T GetDecimalMaxIntegerValue(int precision) +Y_FORCE_INLINE constexpr auto GetDecimalMaxIntegerValue(int precision) { static_assert(ValidDecimalUnderlyingInteger<T>); - if (TDecimal::GetValueBinarySize(precision) <= sizeof(T)) { - return DecimalIntegerMaxValueTable[precision]; - } else { + if (TDecimal::GetValueBinarySize(precision) > static_cast<int>(sizeof(T))) { YT_ABORT(); } -} -template <typename T> -Y_FORCE_INLINE constexpr auto DecimalIntegerToUnsigned(T value) -{ - static_assert(ValidDecimalUnderlyingInteger<T>); - - if constexpr (std::is_same_v<T, i128>) { - return ui128(value); + if constexpr (std::is_same_v<T, i32>) { + YT_VERIFY(precision <= 9); + return Decimal32IntegerMaxValueTable[precision]; + } else if constexpr (std::is_same_v<T, i64>) { + YT_VERIFY(precision >= 10 && precision <= 18); + return Decimal64IntegerMaxValueTable[precision - 10]; + } else if constexpr (std::is_same_v<T, i128>) { + YT_VERIFY(precision >= 19 && precision <= 38); + return Decimal128IntegerMaxValueTable[precision - 19]; + } else if constexpr (std::is_same_v<T, i256>) { + YT_VERIFY(precision >= 39 && precision <= 76); + return Decimal256IntegerMaxValueTable[precision - 39]; } else { - using TU = std::make_unsigned_t<T>; - return static_cast<TU>(value); + YT_ABORT(); } } template <typename T> static Y_FORCE_INLINE T DecimalHostToInet(T value) { - if constexpr (std::is_same_v<T, i128> || std::is_same_v<T, ui128>) { + if constexpr (std::is_same_v<T, i256> || std::is_same_v<T, ui256>) { + for (int partIndex = 0; partIndex < std::ssize(value.Parts) / 2; ++partIndex) { + value.Parts[partIndex] = ::HostToInet(value.Parts[partIndex]); + value.Parts[std::size(value.Parts) - 1 - partIndex] = ::HostToInet(value.Parts[std::size(value.Parts) - 1 - partIndex]); + std::swap(value.Parts[partIndex], value.Parts[std::size(value.Parts) - 1 - partIndex]); + } + return value; + } else if constexpr (std::is_same_v<T, i128> || std::is_same_v<T, ui128>) { return T(::HostToInet(GetLow(value)), ::HostToInet(GetHigh(value))); } else { return ::HostToInet(value); @@ -139,7 +454,14 @@ static Y_FORCE_INLINE T DecimalHostToInet(T value) template <typename T> static Y_FORCE_INLINE T DecimalInetToHost(T value) { - if constexpr (std::is_same_v<T, i128> || std::is_same_v<T, ui128>) { + if constexpr (std::is_same_v<T, i256> || std::is_same_v<T, ui256>) { + for (int partIndex = 0; partIndex < std::ssize(value.Parts) / 2; ++partIndex) { + value.Parts[partIndex] = ::InetToHost(value.Parts[partIndex]); + value.Parts[std::size(value.Parts) - 1 - partIndex] = ::InetToHost(value.Parts[std::size(value.Parts) - 1 - partIndex]); + std::swap(value.Parts[partIndex], value.Parts[std::size(value.Parts) - 1 - partIndex]); + } + return value; + } else if constexpr (std::is_same_v<T, i128> || std::is_same_v<T, ui128>) { return T(::InetToHost(GetLow(value)), ::InetToHost(GetHigh(value))); } else { return ::InetToHost(value); @@ -151,22 +473,14 @@ static T DecimalBinaryToIntegerUnchecked(TStringBuf binaryValue) { T result; memcpy(&result, binaryValue.data(), sizeof(result)); - result = DecimalInetToHost(result); - - constexpr auto one = DecimalIntegerToUnsigned(T{1}); - result = static_cast<T>(DecimalIntegerToUnsigned(result) ^ (one << (sizeof(T) * 8 - 1))); - - return result; + return FlipMSB(DecimalInetToHost(result)); } template<typename T> static void DecimalIntegerToBinaryUnchecked(T decodedValue, void* buf) { - auto unsignedValue = DecimalIntegerToUnsigned(decodedValue); - constexpr auto one = DecimalIntegerToUnsigned(T{1}); - unsignedValue ^= (one << (sizeof(T) * 8 - 1)); - unsignedValue = DecimalHostToInet(unsignedValue); - memcpy(buf, &unsignedValue, sizeof(unsignedValue)); + auto preparedValue = DecimalHostToInet(FlipMSB(decodedValue)); + memcpy(buf, &preparedValue, sizeof(preparedValue)); } static void CheckDecimalValueSize(TStringBuf value, int precision, int scale) @@ -192,12 +506,6 @@ static Y_FORCE_INLINE TStringBuf PlaceOnBuffer(TStringBuf value, char* buffer) template<typename T> static TStringBuf WriteTextDecimalUnchecked(T decodedValue, int scale, char* buffer) { - i8 digits[std::numeric_limits<T>::digits + 1] = {0,}; - static constexpr auto ten = DecimalIntegerToUnsigned(T{10}); - - const bool negative = decodedValue < 0; - auto absValue = DecimalIntegerToUnsigned(negative ? -decodedValue : decodedValue); - if (decodedValue == TDecimalTraits<T>::MinusInf) { static constexpr TStringBuf minusInf = "-inf"; return PlaceOnBuffer(minusInf, buffer); @@ -209,10 +517,14 @@ static TStringBuf WriteTextDecimalUnchecked(T decodedValue, int scale, char* buf return PlaceOnBuffer(nan, buffer); } + i8 digits[TDecimal::MaxTextSize] = {0,}; + + bool negative = IsNegativeInteger(decodedValue); + auto absValue = DecimalIntegerToUnsigned(negative ? -decodedValue : decodedValue); + auto* curDigit = digits; - while (absValue > 0) { - *curDigit = static_cast<int>(absValue % ten); - absValue = absValue / ten; + while (absValue != DecimalIntegerToUnsigned(T{0})) { + *curDigit = GetNextDigit(absValue, &absValue); curDigit++; } YT_VERIFY(curDigit <= digits + std::size(digits)); @@ -313,35 +625,40 @@ T DecimalTextToInteger(TStringBuf textValue, int precision, int scale) break; } - T result = 0; + // This value can overflow in the process of parsing the text value. + // We do throw an exception later, but UB is not good for your health. + auto result = DecimalIntegerToUnsigned(T{0}); int beforePoint = 0; int afterPoint = 0; + + auto addDigit = [&] (auto digit) { + // We use this type to avoid warnings about casting signed types to unsigned/narrowing ints. + // Ugly, but this way we don't need to define cumbersome constructors for TValue256. + ui16 currentDigit = *digit - '0'; + if (currentDigit < 0 || currentDigit > 9) { + ThrowInvalidDecimal(textValue, precision, scale); + } + + result = MultiplyByTen(result); + result = result + DecimalIntegerToUnsigned(T{currentDigit}); + }; + for (; cur != end; ++cur) { if (*cur == '.') { ++cur; for (; cur != end; ++cur) { - int currentDigit = *cur - '0'; - result *= 10; - result += currentDigit; + addDigit(cur); ++afterPoint; - if (currentDigit < 0 || currentDigit > 9) { - ThrowInvalidDecimal(textValue, precision, scale); - } } break; } - int currentDigit = *cur - '0'; - result *= 10; - result += currentDigit; + addDigit(cur); ++beforePoint; - if (currentDigit < 0 || currentDigit > 9) { - ThrowInvalidDecimal(textValue, precision, scale); - } } for (; afterPoint < scale; ++afterPoint) { - result *= 10; + result = MultiplyByTen(result); } if (afterPoint > scale) { @@ -352,7 +669,10 @@ T DecimalTextToInteger(TStringBuf textValue, int precision, int scale) ThrowInvalidDecimal(textValue, precision, scale, "too many digits before decimal point"); } - return negative ? -result : result; + // This cast is guaranteed by the checks above to be correct. + auto signedResult = DecimalIntegerToSigned(result); + // This is safe: the range of representable values fits into [-signed_max, signed_max] for each underlying type. + return negative ? -signedResult : signedResult; } template<typename T> @@ -374,6 +694,8 @@ TStringBuf TDecimal::BinaryToText(TStringBuf binaryDecimal, int precision, int s return DecimalBinaryToTextUncheckedImpl<i64>(binaryDecimal, scale, buffer); case 16: return DecimalBinaryToTextUncheckedImpl<i128>(binaryDecimal, scale, buffer); + case 32: + return DecimalBinaryToTextUncheckedImpl<i256>(binaryDecimal, scale, buffer); } CheckDecimalValueSize(binaryDecimal, precision, scale); YT_ABORT(); @@ -410,8 +732,10 @@ TStringBuf TDecimal::TextToBinary(TStringBuf textValue, int precision, int scale return TextToBinaryImpl<i64>(textValue, precision, scale, buffer); case 16: return TextToBinaryImpl<i128>(textValue, precision, scale, buffer); + case 32: + return TextToBinaryImpl<i256>(textValue, precision, scale, buffer); default: - static_assert(GetDecimalBinaryValueSize(TDecimal::MaxPrecision) == 16); + static_assert(GetDecimalBinaryValueSize(TDecimal::MaxPrecision) == 32); YT_ABORT(); } } @@ -444,7 +768,7 @@ static void ValidateDecimalBinaryValueImpl(TStringBuf binaryDecimal, int precisi { T decoded = DecimalBinaryToIntegerUnchecked<T>(binaryDecimal); - const T maxValue = static_cast<T>(DecimalIntegerMaxValueTable[precision]); + auto maxValue = GetDecimalMaxIntegerValue<T>(precision); if (-maxValue <= decoded && decoded <= maxValue) { return; @@ -478,11 +802,14 @@ void TDecimal::ValidateBinaryValue(TStringBuf binaryDecimal, int precision, int return ValidateDecimalBinaryValueImpl<i64>(binaryDecimal, precision, scale); case 16: return ValidateDecimalBinaryValueImpl<i128>(binaryDecimal, precision, scale); + case 32: + return ValidateDecimalBinaryValueImpl<i256>(binaryDecimal, precision, scale); default: - static_assert(GetDecimalBinaryValueSize(TDecimal::MaxPrecision) == 16); + static_assert(GetDecimalBinaryValueSize(TDecimal::MaxPrecision) == 32); YT_ABORT(); } } + template <typename T> Y_FORCE_INLINE void CheckDecimalIntBits(int precision) { @@ -495,6 +822,14 @@ Y_FORCE_INLINE void CheckDecimalIntBits(int precision) } } +Y_FORCE_INLINE void CheckDecimalFitsInto128Bits(int precision) +{ + if (precision > 38) { + THROW_ERROR_EXCEPTION("Decimal<%v, ?> does not fit into int128", + precision); + } +} + int TDecimal::GetValueBinarySize(int precision) { const auto result = GetDecimalBinaryValueSize(precision); @@ -535,7 +870,7 @@ TStringBuf TDecimal::WriteBinary128(int precision, TValue128 value, char* buffer return TStringBuf{buffer, sizeof(TValue128)}; } -TStringBuf TDecimal::WriteBinaryVariadic(int precision, TValue128 value, char* buffer, size_t bufferLength) +TStringBuf TDecimal::WriteBinary128Variadic(int precision, TValue128 value, char* buffer, size_t bufferLength) { const size_t resultLength = GetValueBinarySize(precision); switch (resultLength) { @@ -550,6 +885,16 @@ TStringBuf TDecimal::WriteBinaryVariadic(int precision, TValue128 value, char* b } } +TStringBuf TDecimal::WriteBinary256(int precision, TValue256 value, char* buffer, size_t bufferLength) +{ + const size_t resultLength = GetValueBinarySize(precision); + CheckDecimalIntBits<TValue256>(precision); + YT_VERIFY(bufferLength >= resultLength); + + DecimalIntegerToBinaryUnchecked(std::move(value), buffer); + return TStringBuf{buffer, sizeof(TValue256)}; +} + template <typename T> Y_FORCE_INLINE void CheckBufferLength(int precision, size_t bufferLength) { @@ -581,6 +926,12 @@ TDecimal::TValue128 TDecimal::ParseBinary128(int precision, TStringBuf buffer) return {GetLow(result), static_cast<i64>(GetHigh(result))}; } +TDecimal::TValue256 TDecimal::ParseBinary256(int precision, TStringBuf buffer) +{ + CheckBufferLength<i256>(precision, buffer.Size()); + return DecimalBinaryToIntegerUnchecked<i256>(buffer); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDecimal diff --git a/yt/yt/library/decimal/decimal.h b/yt/yt/library/decimal/decimal.h index 8a6cf34a8f..27375d3904 100644 --- a/yt/yt/library/decimal/decimal.h +++ b/yt/yt/library/decimal/decimal.h @@ -5,6 +5,8 @@ #include <util/system/defaults.h> #include <util/generic/string.h> +#include <array> + namespace NYT::NDecimal { //////////////////////////////////////////////////////////////////////////////// @@ -12,6 +14,9 @@ namespace NYT::NDecimal { class TDecimal { public: + //! Both types defined below represent signed values. They are only intended + //! to be used for manipulating binary data, since they don't actually expose + //! any arithmetic operations. You should avoid dealing with individual parts. struct TValue128 { ui64 Low; @@ -19,15 +24,21 @@ public: }; static_assert(sizeof(TValue128) == 2 * sizeof(ui64)); + struct TValue256 + { + std::array<ui32, 8> Parts; + }; + static_assert(sizeof(TValue256) == 4 * sizeof(ui64)); + public: - // Maximum precision supported by YT - static constexpr int MaxPrecision = 35; - static constexpr int MaxBinarySize = 16; + //! Maximum precision supported by YT. + static constexpr int MaxPrecision = 76; + static constexpr int MaxBinarySize = 32; // NB. Sometimes we print values that exceed MaxPrecision (e.g. in error messages) - // MaxTextSize is chosen so we can print ANY i128 number as decimal. + // MaxTextSize is chosen so we can print ANY i256 number as decimal. static constexpr int MaxTextSize = - std::numeric_limits<ui128>::digits + 1 // max number of digits in ui128 number + 77 // length of 2^63 in decimal form + 1 // possible decimal point + 1; // possible minus sign @@ -49,13 +60,15 @@ public: static TStringBuf WriteBinary32(int precision, i32 value, char* buffer, size_t bufferLength); static TStringBuf WriteBinary64(int precision, i64 value, char* buffer, size_t bufferLength); static TStringBuf WriteBinary128(int precision, TValue128 value, char* buffer, size_t bufferLength); + static TStringBuf WriteBinary256(int precision, TValue256 value, char* buffer, size_t bufferLength); // Writes either 32-bit, 64-bit or 128-bit binary value depending on precision, provided a TValue128. - static TStringBuf WriteBinaryVariadic(int precision, TValue128 value, char* buffer, size_t bufferLength); + static TStringBuf WriteBinary128Variadic(int precision, TValue128 value, char* buffer, size_t bufferLength); static i32 ParseBinary32(int precision, TStringBuf buffer); static i64 ParseBinary64(int precision, TStringBuf buffer); static TValue128 ParseBinary128(int precision, TStringBuf buffer); + static TValue256 ParseBinary256(int precision, TStringBuf buffer); }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/library/decimal/unittests/decimal_ut.cpp b/yt/yt/library/decimal/unittests/decimal_ut.cpp index ba8be4ab15..50c550707c 100644 --- a/yt/yt/library/decimal/unittests/decimal_ut.cpp +++ b/yt/yt/library/decimal/unittests/decimal_ut.cpp @@ -98,6 +98,150 @@ TEST(TDecimal, TestTextBinaryConversion) "61449825198266175750309883089040771", "800BD5B5D5F0C73E0C9CD4943298B583"); + // A few more test cases with big numbers for various precisions generated by python snippet: + // import random + // def print_test_case(plus, binary_size, precision): + // textval = "".join(random.choice("0123456789") for _ in range(precision)) + // if not plus: + // textval = "-" + textval + // binval = hex(2 ** (binary_size * 8 - 1) + int(textval)) + // binval = binval[2:].strip('L') # strip 0x and final 'L' + // binval = binval.upper() + // print( + // "TEST_TEXT_BINARY_CONVERSION(\n" + // " {precision}, 0,\n" + // " \"{text}\",\n" + // " \"{binary}\");\n" + // .format(precision=precision, text=textval.lstrip("0"), binary=binval) + // ) + // random.seed(42) + // + // for binary_size, precision in (4, 9), (8, 18), (16, 35), (16, 38), (32, 39), (32, 76): + // print_test_case(False, binary_size, precision) + // print_test_case(False, binary_size, precision) + // print_test_case(True, binary_size, precision) + // print_test_case(True, binary_size, precision) + + TEST_TEXT_BINARY_CONVERSION( + 9, 0, + "-104332181", + "79C8046B"); + + TEST_TEXT_BINARY_CONVERSION( + 9, 0, + "-960013389", + "46C75BB3"); + + TEST_TEXT_BINARY_CONVERSION( + 9, 0, + "83863794", + "84FFA8F2"); + + TEST_TEXT_BINARY_CONVERSION( + 9, 0, + "26542351", + "8195010F"); + + TEST_TEXT_BINARY_CONVERSION( + 18, 0, + "-161559407816184959", + "7DC2069316FE6B81"); + + TEST_TEXT_BINARY_CONVERSION( + 18, 0, + "-310341316475255341", + "7BB17237885D85D3"); + + TEST_TEXT_BINARY_CONVERSION( + 18, 0, + "928327648350305641", + "8CE21513E317FD69"); + + TEST_TEXT_BINARY_CONVERSION( + 18, 0, + "395376724238849696", + "857CA90930B6D6A0"); + + TEST_TEXT_BINARY_CONVERSION( + 35, 0, + "-53287101226916697848018451462704828", + "7FF5BCBE39B5F1A05ED5F0135FD2B144"); + + TEST_TEXT_BINARY_CONVERSION( + 35, 0, + "-14893252880957015430391171822782489", + "7FFD21B4B88705D84E5A0ECF560DF7E7"); + + TEST_TEXT_BINARY_CONVERSION( + 35, 0, + "63834657871331509839301031051834738", + "800C4B4AA7E81EF6726D934AA12AB572"); + + TEST_TEXT_BINARY_CONVERSION( + 35, 0, + "29973763116566701065133387262473178", + "8005C5D2141747EF1198888FB96033DA"); + + TEST_TEXT_BINARY_CONVERSION( + 38, 0, + "-10801326773602606474687234309805009788", + "77DFBD795731FA57AD40A913BEB49484"); + + TEST_TEXT_BINARY_CONVERSION( + 38, 0, + "-20812191361939909169985435346247510799", + "7057B7BE1F13B6F1D6DB2F569BA994F1"); + + TEST_TEXT_BINARY_CONVERSION( + 38, 0, + "11838425135427849808412411824493534874", + "88E7FF6C4CB25E5053056568535B9E9A"); + + TEST_TEXT_BINARY_CONVERSION( + 38, 0, + "1640052427868011280598262045053315869", + "813BDCD3E2BEF11A4624071A88406B1D"); + + TEST_TEXT_BINARY_CONVERSION( + 39, 0, + "-232260256342160733754330365414586850142", + "7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF51444D3EF60040D9A177B4EBDCD1E4A2"); + + TEST_TEXT_BINARY_CONVERSION( + 39, 0, + "-940196556981693406088356159514846564823", + "7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFD3CACBACF9473427D637FA1697111EE29"); + + TEST_TEXT_BINARY_CONVERSION( + 39, 0, + "662994680443699577738721489513433200379", + "80000000000000000000000000000001F2C8217C5832413CEDC2D8713542F6FB"); + + TEST_TEXT_BINARY_CONVERSION( + 39, 0, + "176936763201632870831727889579868727743", + "80000000000000000000000000000000851CC7F2FA54AF2AC8A1868AB524A9BF"); + + TEST_TEXT_BINARY_CONVERSION( + 76, 0, + "-4873471434558122362316658760366909670546688937346706562729806990162720465375", + "7539B681CC20CEEDAB96C358A49B105EB5C51C958B66D69F946E52B2C4CBDE21"); + + TEST_TEXT_BINARY_CONVERSION( + 76, 0, + "-5646417080531003309232719374529912419049663193149190586518506716572628498776", + "73843DBE5B00AA3065C94D18C3DE5B54ABBCB7AD4AB9775BD7308BA12A14C6A8"); + + TEST_TEXT_BINARY_CONVERSION( + 76, 0, + "9453147379965075273545494808313678377701436349578856855744431351823374989413", + "94E64AB40D1C30A9A2BFEA9C96B7584F559D5572641F0C90E86764F3187BB865"); + + TEST_TEXT_BINARY_CONVERSION( + 76, 0, + "4352408240084271094777520471167190229413186999386774964990913341232812067974", + "899F603224EC624E2351628FF4637023C84FDBB0E2BBCBBB9395D8DD19B30C86"); + #undef TEST_TEXT_BINARY_CONVERSION } @@ -127,20 +271,27 @@ TEST(TDecimal, TestPrecisionScaleLimits) EXPECT_THROW_WITH_SUBSTRING(TDecimal::TextToBinary("314.15", 5, 3), "too many digits before decimal point"); EXPECT_THROW_WITH_SUBSTRING(TDecimal::TextToBinary("-314.15", 5, 3), "too many digits before decimal point"); + // This group of tests checks that text values which cause signed overflow throw valid exceptions. + // The values for each precision are equal to 2^(binary_size - 1) + 0.11. + EXPECT_THROW_WITH_SUBSTRING(TDecimal::TextToBinary("2147483647.11", 9, 2), "too many digits before decimal point"); + EXPECT_THROW_WITH_SUBSTRING(TDecimal::TextToBinary("9223372036854775807.11", 18, 2), "too many digits before decimal point"); + EXPECT_THROW_WITH_SUBSTRING(TDecimal::TextToBinary("170141183460469231731687303715884105727.11", 35, 2), "too many digits before decimal point"); + EXPECT_THROW_WITH_SUBSTRING(TDecimal::TextToBinary("57896044618658097711785492504343953926634992332820282019728792003956564819967.11", TDecimal::MaxPrecision, 2), "too many digits before decimal point"); + // Sometimes we want to print values that are not representable with given precision // (e.g. in error messages we sometimes want to print text value of invalid decimal to explain that it has // more digits than allowed by precision). // // Here we test that extreme values are printed ok. - auto maxBinaryDecimal = HexDecode("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFD"); - auto minBinaryDecimal1 = HexDecode("00000000000000000000000000000000"); - auto minBinaryDecimal2 = HexDecode("00000000000000000000000000000003"); + auto maxBinaryDecimal = HexDecode("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFD"); + auto minBinaryDecimal1 = HexDecode("0000000000000000000000000000000000000000000000000000000000000000"); + auto minBinaryDecimal2 = HexDecode("0000000000000000000000000000000000000000000000000000000000000003"); EXPECT_EQ(TDecimal::MaxBinarySize, std::ssize(maxBinaryDecimal)); // If max TDecimal::MaxBinarySize ever increases EXPECT_EQ(TDecimal::MaxBinarySize, std::ssize(minBinaryDecimal1)); // please update this test EXPECT_EQ(TDecimal::MaxBinarySize, std::ssize(minBinaryDecimal2)); // with better values. - EXPECT_EQ("1701411834604692317316873037158841057.25", TDecimal::BinaryToText(maxBinaryDecimal, TDecimal::MaxPrecision, 2)); - EXPECT_EQ("-1701411834604692317316873037158841057.28", TDecimal::BinaryToText(minBinaryDecimal1, TDecimal::MaxPrecision, 2)); - EXPECT_EQ("-1701411834604692317316873037158841057.25", TDecimal::BinaryToText(minBinaryDecimal2, TDecimal::MaxPrecision, 2)); + EXPECT_EQ("578960446186580977117854925043439539266349923328202820197287920039565648199.65", TDecimal::BinaryToText(maxBinaryDecimal, TDecimal::MaxPrecision, 2)); + EXPECT_EQ("-578960446186580977117854925043439539266349923328202820197287920039565648199.68", TDecimal::BinaryToText(minBinaryDecimal1, TDecimal::MaxPrecision, 2)); + EXPECT_EQ("-578960446186580977117854925043439539266349923328202820197287920039565648199.65", TDecimal::BinaryToText(minBinaryDecimal2, TDecimal::MaxPrecision, 2)); } TEST(TDecimal, TestValidation) @@ -148,6 +299,7 @@ TEST(TDecimal, TestValidation) EXPECT_NO_THROW(TDecimal::ValidateBinaryValue(HexDecode("8000013A"), 3, 2)); EXPECT_NO_THROW(TDecimal::ValidateBinaryValue(HexDecode("80000000" "0000013A"), 10, 2)); EXPECT_NO_THROW(TDecimal::ValidateBinaryValue(HexDecode("80000000" "00000000" "00000000" "0000013A"), 35, 2)); + EXPECT_NO_THROW(TDecimal::ValidateBinaryValue(HexDecode("80000000" "00000000" "00000000" "00000000" "00000000" "00000000" "00000000" "0000013A"), 76, 2)); } class TDecimalWithPrecisionTest diff --git a/yt/yt/library/formats/arrow_parser.cpp b/yt/yt/library/formats/arrow_parser.cpp index cc31e25335..f34127990a 100644 --- a/yt/yt/library/formats/arrow_parser.cpp +++ b/yt/yt/library/formats/arrow_parser.cpp @@ -163,28 +163,17 @@ public: return ParseNull(); } - // Decimal types. For now, YT natively supports only Decimal128 with scale up to 35. - // Thus, we represent short enough decimals as native YT decimals, and wider decimals as - // their decimal string representation; but the latter is subject to change whenever we - // get the native support for Decimal128 with scale up to 38 or Decimal256 with scale up to 76. arrow::Status Visit(const arrow::Decimal128Type& type) override { - constexpr int MaximumYTDecimalPrecision = 35; - if (type.precision() <= MaximumYTDecimalPrecision) { - return ParseStringLikeArray<arrow::Decimal128Array>([&] (const TStringBuf& value, i64 columnId) { - return MakeDecimalBinaryValue(value, columnId, type.precision()); - }); - } else { - return ParseStringLikeArray<arrow::Decimal128Array>([&] (const TStringBuf& value, i64 columnId) { - return MakeDecimalTextValue<arrow::Decimal128>(value, columnId, type.scale()); - }); - } + return ParseStringLikeArray<arrow::Decimal128Array>([&] (const TStringBuf& value, i64 columnId) { + return MakeDecimalBinaryValue<TDecimal::TValue128>(value, columnId, type.precision()); + }); } arrow::Status Visit(const arrow::Decimal256Type& type) override { return ParseStringLikeArray<arrow::Decimal256Array>([&] (const TStringBuf& value, i64 columnId) { - return MakeDecimalTextValue<arrow::Decimal256>(value, columnId, type.scale()); + return MakeDecimalBinaryValue<TDecimal::TValue256>(value, columnId, type.precision()); }); } @@ -294,33 +283,31 @@ private: return arrow::Status::OK(); } + template <class TUnderlyingValueType> TUnversionedValue MakeDecimalBinaryValue(const TStringBuf& value, i64 columnId, int precision) { - // NB: arrow wire representation of Decimal128 is little-endian and (obviously) 128 bit, + // NB: Arrow wire representation of Decimal128 is little-endian and (obviously) 128 bit, // while YT in-memory representation of Decimal is big-endian, variadic-length of either 32 bit, 64 bit or 128 bit, // and MSB-flipped to ensure lexical sorting order. - TDecimal::TValue128 value128; - YT_VERIFY(value.size() == sizeof(value128)); - std::memcpy(&value128, value.data(), value.size()); + // Representation of Decimal256 is similar, but only 256 bits. + TUnderlyingValueType decimalValue; + YT_VERIFY(value.size() == sizeof(decimalValue)); + std::memcpy(&decimalValue, value.data(), value.size()); - const auto maxByteCount = sizeof(value128); + const auto maxByteCount = sizeof(decimalValue); char* buffer = BufferForStringLikeValues_->Preallocate(maxByteCount); - auto decimalBinary = TDecimal::WriteBinaryVariadic(precision, value128, buffer, maxByteCount); + TStringBuf decimalBinary; + if constexpr (std::is_same_v<TUnderlyingValueType, TDecimal::TValue128>) { + decimalBinary = TDecimal::WriteBinary128Variadic(precision, decimalValue, buffer, maxByteCount); + } else if constexpr (std::is_same_v<TUnderlyingValueType, TDecimal::TValue256>) { + decimalBinary = TDecimal::WriteBinary256(precision, decimalValue, buffer, maxByteCount); + } else { + static_assert(std::is_same_v<TUnderlyingValueType, TDecimal::TValue256>, "Unexpected decimal type"); + } BufferForStringLikeValues_->Advance(decimalBinary.size()); return MakeUnversionedStringValue(decimalBinary, columnId); } - - template <class TArrowDecimalType> - TUnversionedValue MakeDecimalTextValue(const TStringBuf& value, i64 columnId, int scale) - { - TArrowDecimalType decimal(reinterpret_cast<const uint8_t*>(value.data())); - auto string = decimal.ToString(scale); - char* buffer = BufferForStringLikeValues_->Preallocate(string.size()); - std::memcpy(buffer, string.data(), string.size()); - BufferForStringLikeValues_->Advance(string.size()); - return MakeUnversionedStringValue(TStringBuf(buffer, string.size()), columnId); - } }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/library/formats/skiff_parser.cpp b/yt/yt/library/formats/skiff_parser.cpp index edff9ffa0b..febde44f42 100644 --- a/yt/yt/library/formats/skiff_parser.cpp +++ b/yt/yt/library/formats/skiff_parser.cpp @@ -300,6 +300,7 @@ const auto precision = denullifiedType.GetPrecision(); CASE(EWireType::Int32); CASE(EWireType::Int64); CASE(EWireType::Int128); + CASE(EWireType::Int256); #undef CASE case EWireType::Yson32: return CreatePrimitiveTypeConverter(wireType, fieldDescription.IsRequired(), columnId, ysonConverter); diff --git a/yt/yt/library/formats/skiff_writer.cpp b/yt/yt/library/formats/skiff_writer.cpp index 3bc6965219..0984e72c8f 100644 --- a/yt/yt/library/formats/skiff_writer.cpp +++ b/yt/yt/library/formats/skiff_writer.cpp @@ -506,6 +506,10 @@ TUnversionedValueToSkiffConverter CreateDecimalValueConverter( return CreatePrimitiveValueConverter<EValueType::String>( isRequired, TDecimalSkiffWriter<EWireType::Int128>(precision)); + case EWireType::Int256: + return CreatePrimitiveValueConverter<EValueType::String>( + isRequired, + TDecimalSkiffWriter<EWireType::Int256>(precision)); case EWireType::Yson32: return CreatePrimitiveValueConverter(wireType, isRequired); default: diff --git a/yt/yt/library/formats/skiff_yson_converter-inl.h b/yt/yt/library/formats/skiff_yson_converter-inl.h index 43dabc63a5..8d5a2f8efd 100644 --- a/yt/yt/library/formats/skiff_yson_converter-inl.h +++ b/yt/yt/library/formats/skiff_yson_converter-inl.h @@ -70,8 +70,18 @@ Y_FORCE_INLINE TStringBuf TDecimalSkiffParser<SkiffWireType>::operator() (NSkiff TDecimal::TValue128{skiffValue.Low, skiffValue.High}, Buffer_, sizeof(Buffer_)); + } else if constexpr (SkiffWireType == EWireType::Int256) { + const auto skiffValue = parser->ParseInt256(); + TDecimal::TValue256 decimalValue; + static_assert(sizeof(decimalValue) == sizeof(skiffValue)); + std::memcpy(&decimalValue, &skiffValue, sizeof(decimalValue)); + return TDecimal::WriteBinary256( + Precision_, + std::move(decimalValue), + Buffer_, + sizeof(Buffer_)); } else { - static_assert(SkiffWireType == EWireType::Int128); + static_assert(SkiffWireType == EWireType::Int256); } } @@ -99,9 +109,15 @@ void TDecimalSkiffWriter<SkiffWireType>::operator()(TStringBuf value, NSkiff::TC } else if constexpr (SkiffWireType == EWireType::Int128) { auto intValue = TDecimal::ParseBinary128(Precision_, value); writer->WriteInt128(TInt128{intValue.Low, intValue.High}); + } else if constexpr (SkiffWireType == EWireType::Int256) { + auto intValue = TDecimal::ParseBinary256(Precision_, value); + TInt256 skiffValue; + static_assert(sizeof(skiffValue) == sizeof(intValue)); + std::memcpy(&skiffValue, &intValue, sizeof(skiffValue)); + writer->WriteInt256(std::move(skiffValue)); } else { // poor man's static_assert(false) - static_assert(SkiffWireType == EWireType::Int128); + static_assert(SkiffWireType == EWireType::Int256); } } diff --git a/yt/yt/library/formats/skiff_yson_converter.cpp b/yt/yt/library/formats/skiff_yson_converter.cpp index 82fac3ef1b..ebde4dcb3d 100644 --- a/yt/yt/library/formats/skiff_yson_converter.cpp +++ b/yt/yt/library/formats/skiff_yson_converter.cpp @@ -748,6 +748,10 @@ TYsonToSkiffConverter CreateDecimalYsonToSkiffConverter( return CreatePrimitiveTypeYsonToSkiffConverter<EYsonItemType::StringValue>( std::move(descriptor), TDecimalSkiffWriter<EWireType::Int128>(precision)); + case EWireType::Int256: + return CreatePrimitiveTypeYsonToSkiffConverter<EYsonItemType::StringValue>( + std::move(descriptor), + TDecimalSkiffWriter<EWireType::Int256>(precision)); case EWireType::Yson32: return CreatePrimitiveTypeYsonToSkiffConverter(std::move(descriptor), wireType); default: @@ -1815,6 +1819,8 @@ TSkiffToYsonConverter CreateDecimalSkiffToYsonConverter( return TPrimitiveTypeSkiffToYsonConverter(TDecimalSkiffParser<EWireType::Int64>(precision)); case EWireType::Int128: return TPrimitiveTypeSkiffToYsonConverter(TDecimalSkiffParser<EWireType::Int128>(precision)); + case EWireType::Int256: + return TPrimitiveTypeSkiffToYsonConverter(TDecimalSkiffParser<EWireType::Int256>(precision)); case EWireType::Yson32: return CreatePrimitiveTypeSkiffToYsonConverter(wireType); default: @@ -1899,6 +1905,8 @@ void CheckSkiffWireTypeForDecimal(int precision, NSkiff::EWireType wireType) skiffBinarySize = sizeof(i64); } else if (wireType == NSkiff::EWireType::Int128) { skiffBinarySize = 2 * sizeof(i64); + } else if (wireType == NSkiff::EWireType::Int256) { + skiffBinarySize = 4 * sizeof(i64); } if (decimalBinarySize != skiffBinarySize) { diff --git a/yt/yt/library/program/config.cpp b/yt/yt/library/program/config.cpp index 52aa9e5d8f..74b9d61fdd 100644 --- a/yt/yt/library/program/config.cpp +++ b/yt/yt/library/program/config.cpp @@ -187,6 +187,8 @@ void TSingletonsDynamicConfig::Register(TRegistrar registrar) .Optional(); registrar.Parameter("tcmalloc", &TThis::TCMalloc) .Optional(); + registrar.Parameter("stockpile", &TThis::Stockpile) + .Optional(); registrar.Parameter("protobuf_interop", &TThis::ProtobufInterop) .DefaultNew(); } diff --git a/yt/yt/library/tracing/jaeger/tracer.h b/yt/yt/library/tracing/jaeger/tracer.h index ed4850b1e9..387b3a8360 100644 --- a/yt/yt/library/tracing/jaeger/tracer.h +++ b/yt/yt/library/tracing/jaeger/tracer.h @@ -10,7 +10,7 @@ #include <yt/yt/library/tvm/service/public.h> #include <yt/yt/core/misc/mpsc_stack.h> -#include <yt/yt/core/misc/atomic_object.h> +#include <library/cpp/yt/threading/atomic_object.h> #include <yt/yt/core/rpc/grpc/config.h> diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 54321bcc96..9c9e018be9 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -1445,6 +1445,7 @@ message TIndexInfo required NYT.NProto.TGuid index_table_id = 1; required int32 index_kind = 2; // NTableClient::ESecondaryIndexKind optional string predicate = 3; + optional string unfolded_column = 4; } message TReqGetTableMountInfo @@ -2484,6 +2485,27 @@ message TRspGetJobStderr //////////////////////////////////////////////////////////////////////////////// +message TReqGetJobTrace +{ + oneof operation_id_or_alias { + NYT.NProto.TGuid operation_id = 1; + string operation_alias = 2; + } + optional NYT.NProto.TGuid job_id = 3; + optional NYT.NProto.TGuid trace_id = 4; + optional int64 from_event_index = 5; + optional int64 to_event_index = 6; + optional int64 from_time = 7; + optional int64 to_time = 8; +} + +message TRspGetJobTrace +{ + repeated TJobTraceEvent events = 1; +} + +//////////////////////////////////////////////////////////////////////////////// + message TReqGetJobFailContext { oneof operation_id_or_alias { @@ -3156,6 +3178,16 @@ message TListJobsResult repeated NYT.NProto.TError errors = 6; } +message TJobTraceEvent +{ + required NYT.NProto.TGuid operation_id = 1; + required NYT.NProto.TGuid job_id = 2; + required NYT.NProto.TGuid trace_id = 3; + required int64 event_index = 4; + required string event = 5; + required int64 event_time = 6; // TInstant +} + //////////////////////////////////////////////////////////////////////////////// message TGetFileFromCacheResult @@ -3377,15 +3409,14 @@ message TRspGetQueryTrackerInfo } //////////////////////////////////////////////////////////////////////////////// -// NB(arkady-e1ppa): Under construction. // Distributed table client //////////////////////////////////////////////////////////////////////////////// message TReqStartDistributedWriteSession { - optional string path = 1; - - // TDistributedWriteSessionStartOptions contents... + required string path = 1; + + optional TTransactionalOptions transactional_options = 100; } message TRspStartDistributedWriteSession @@ -3395,29 +3426,27 @@ message TRspStartDistributedWriteSession message TReqFinishDistributedWriteSession { - required bytes session = 2; // YSON-serialized TDistributedWriteSession + required bytes session = 1; // YSON-serialized TDistributedWriteSession - // TDistributedWriteSessionFinishOptions contents... + required int32 max_children_per_attach_request = 2; } message TRspFinishDistributedWriteSession { } -message TReqParticipantWriteTable +message TReqWriteTableFragment { optional bytes config = 1; // YSON-serialized TTableWriterConfig optional bytes format = 2; // YSON-serialized TFormat - required bytes cookie = 3; // YSON-serialized TDistributedWriteCookie - - // TParticipantTableWriterOptions contents... + required bytes cookie = 3; // YSON-serialized TFragmentWriteCookie } -message TRspParticipantWriteTable +message TRspWriteTableFragment { - required bytes cookie = 1; // YSON-serialized TDistributedWriteCookie + required bytes cookie = 1; // YSON-serialized TFragmentWriteCookie } /////////////////////////////////////////////////////////////////////////////// |