aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2024-10-16 12:11:24 +0000
committerAlexander Smirnov <alex@ydb.tech>2024-10-16 12:11:24 +0000
commit40811e93f3fdf9342a9295369994012420fac548 (patch)
treea8d85e094a9c21e10aa250f537c101fc2016a049 /yt
parent30ebe5357bb143648c6be4d151ecd4944af81ada (diff)
parent28a0c4a9f297064538a018c512cd9bbd00a1a35d (diff)
downloadydb-40811e93f3fdf9342a9295369994012420fac548.tar.gz
Merge branch 'rightlib' into mergelibs-241016-1210
Diffstat (limited to 'yt')
-rw-r--r--yt/cpp/mapreduce/client/client.cpp8
-rw-r--r--yt/cpp/mapreduce/client/client.h4
-rw-r--r--yt/cpp/mapreduce/interface/client.h12
-rw-r--r--yt/cpp/mapreduce/interface/fwd.h3
-rw-r--r--yt/cpp/mapreduce/interface/operation.h64
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp49
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.h6
-rw-r--r--yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp9
-rw-r--r--yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h4
-rw-r--r--yt/yt/client/api/delegating_client.h11
-rw-r--r--yt/yt/client/api/distributed_table_client.h23
-rw-r--r--yt/yt/client/api/distributed_table_session.cpp197
-rw-r--r--yt/yt/client/api/distributed_table_session.h158
-rw-r--r--yt/yt/client/api/distributed_table_sessions.cpp17
-rw-r--r--yt/yt/client/api/distributed_table_sessions.h41
-rw-r--r--yt/yt/client/api/operation_client.cpp13
-rw-r--r--yt/yt/client/api/operation_client.h28
-rw-r--r--yt/yt/client/api/public.h3
-rw-r--r--yt/yt/client/api/rpc_proxy/api_service_proxy.h3
-rw-r--r--yt/yt/client/api/rpc_proxy/client_base.cpp2
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp53
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.h10
-rw-r--r--yt/yt/client/api/rpc_proxy/connection_impl.h2
-rw-r--r--yt/yt/client/api/rpc_proxy/helpers.cpp118
-rw-r--r--yt/yt/client/api/rpc_proxy/helpers.h53
-rw-r--r--yt/yt/client/api/rpc_proxy/table_mount_cache.cpp5
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.cpp2
-rw-r--r--yt/yt/client/chunk_client/public.h7
-rw-r--r--yt/yt/client/driver/distributed_table_commands.cpp56
-rw-r--r--yt/yt/client/driver/distributed_table_commands.h6
-rw-r--r--yt/yt/client/driver/driver.cpp7
-rw-r--r--yt/yt/client/driver/scheduler_commands.cpp46
-rw-r--r--yt/yt/client/driver/scheduler_commands.h14
-rw-r--r--yt/yt/client/federated/client.cpp126
-rw-r--r--yt/yt/client/federated/config.cpp3
-rw-r--r--yt/yt/client/federated/config.h3
-rw-r--r--yt/yt/client/federated/unittests/client_ut.cpp14
-rw-r--r--yt/yt/client/federated/unittests/connection_ut.cpp87
-rw-r--r--yt/yt/client/hedging/hedging.cpp3
-rw-r--r--yt/yt/client/logging/dynamic_table_log_writer.cpp2
-rw-r--r--yt/yt/client/object_client/helpers.cpp5
-rw-r--r--yt/yt/client/object_client/helpers.h3
-rw-r--r--yt/yt/client/queue_client/producer_client.cpp180
-rw-r--r--yt/yt/client/queue_client/producer_client.h23
-rw-r--r--yt/yt/client/scheduler/public.h7
-rw-r--r--yt/yt/client/table_client/helpers-inl.h42
-rw-r--r--yt/yt/client/table_client/helpers.h14
-rw-r--r--yt/yt/client/table_client/logical_type.h2
-rw-r--r--yt/yt/client/table_client/table_upload_options.cpp385
-rw-r--r--yt/yt/client/table_client/table_upload_options.h87
-rw-r--r--yt/yt/client/tablet_client/table_mount_cache.h1
-rw-r--r--yt/yt/client/transaction_client/remote_timestamp_provider.cpp8
-rw-r--r--yt/yt/client/unittests/mock/client.h13
-rw-r--r--yt/yt/client/unittests/mock/transaction.h2
-rw-r--r--yt/yt/client/ya.make3
-rw-r--r--yt/yt/core/bus/tcp/connection.h2
-rw-r--r--yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp17
-rw-r--r--yt/yt/core/crypto/tls.cpp10
-rw-r--r--yt/yt/core/http/config.cpp4
-rw-r--r--yt/yt/core/http/connection_reuse_helpers.cpp2
-rw-r--r--yt/yt/core/http/server.cpp27
-rw-r--r--yt/yt/core/http/server.h6
-rw-r--r--yt/yt/core/http/unittests/http_ut.cpp5
-rw-r--r--yt/yt/core/https/server.cpp6
-rw-r--r--yt/yt/core/https/server.h5
-rw-r--r--yt/yt/core/logging/log_manager.cpp272
-rw-r--r--yt/yt/core/logging/log_manager.h2
-rw-r--r--yt/yt/core/misc/async_expiring_cache-inl.h21
-rw-r--r--yt/yt/core/misc/async_expiring_cache.h3
-rw-r--r--yt/yt/core/misc/atomic_object-inl.h97
-rw-r--r--yt/yt/core/misc/atomic_object.h63
-rw-r--r--yt/yt/core/misc/hazard_ptr.cpp16
-rw-r--r--yt/yt/core/misc/hazard_ptr.h2
-rw-r--r--yt/yt/core/misc/ring_queue.h10
-rw-r--r--yt/yt/core/misc/stripped_error.cpp2
-rw-r--r--yt/yt/core/net/connection.cpp10
-rw-r--r--yt/yt/core/net/connection.h9
-rw-r--r--yt/yt/core/rpc/bus/channel.cpp2
-rw-r--r--yt/yt/core/rpc/per_user_request_queue_provider.h2
-rw-r--r--yt/yt/core/rpc/stream-inl.h25
-rw-r--r--yt/yt/core/rpc/stream.h7
-rw-r--r--yt/yt/core/rpc/unittests/lib/test_service.cpp7
-rw-r--r--yt/yt/core/rpc/unittests/rpc_ut.cpp10
-rw-r--r--yt/yt/core/test_framework/test_proxy_service.h2
-rw-r--r--yt/yt/core/yson/pull_parser_deserialize-inl.h14
-rw-r--r--yt/yt/core/yson/pull_parser_deserialize.h3
-rw-r--r--yt/yt/core/ytree/fluent.h11
-rw-r--r--yt/yt/core/ytree/serialize-inl.h25
-rw-r--r--yt/yt/core/ytree/serialize.h6
-rw-r--r--yt/yt/core/ytree/tree_builder.h1
-rw-r--r--yt/yt/core/ytree/unittests/serialize_ut.cpp13
-rw-r--r--yt/yt/core/ytree/unittests/ytree_fluent_ut.cpp39
-rw-r--r--yt/yt/core/ytree/ypath_service.cpp2
-rw-r--r--yt/yt/library/decimal/decimal.cpp515
-rw-r--r--yt/yt/library/decimal/decimal.h25
-rw-r--r--yt/yt/library/decimal/unittests/decimal_ut.cpp164
-rw-r--r--yt/yt/library/formats/arrow_parser.cpp51
-rw-r--r--yt/yt/library/formats/skiff_parser.cpp1
-rw-r--r--yt/yt/library/formats/skiff_writer.cpp4
-rw-r--r--yt/yt/library/formats/skiff_yson_converter-inl.h20
-rw-r--r--yt/yt/library/formats/skiff_yson_converter.cpp8
-rw-r--r--yt/yt/library/program/config.cpp2
-rw-r--r--yt/yt/library/tracing/jaeger/tracer.h2
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto53
104 files changed, 2867 insertions, 785 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 9e2885bbe0..9dab176bde 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -1243,6 +1243,14 @@ IFileReaderPtr TClient::GetJobStderr(
return NRawClient::GetJobStderr(Context_, operationId, jobId, options);
}
+std::vector<TJobTraceEvent> TClient::GetJobTrace(
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& options)
+{
+ CheckShutdown();
+ return NRawClient::GetJobTrace(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
+}
+
TNode::TListType TClient::SkyShareTable(
const std::vector<TYPath>& tablePaths,
const TSkyShareTableOptions& options)
diff --git a/yt/cpp/mapreduce/client/client.h b/yt/cpp/mapreduce/client/client.h
index 5de00285ef..32c458316d 100644
--- a/yt/cpp/mapreduce/client/client.h
+++ b/yt/cpp/mapreduce/client/client.h
@@ -449,6 +449,10 @@ public:
const TJobId& jobId,
const TGetJobStderrOptions& options = TGetJobStderrOptions()) override;
+ std::vector<TJobTraceEvent> GetJobTrace(
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& options = TGetJobTraceOptions()) override;
+
TNode::TListType SkyShareTable(
const std::vector<TYPath>& tablePaths,
const TSkyShareTableOptions& options = TSkyShareTableOptions()) override;
diff --git a/yt/cpp/mapreduce/interface/client.h b/yt/cpp/mapreduce/interface/client.h
index 56efa3c23c..3032025140 100644
--- a/yt/cpp/mapreduce/interface/client.h
+++ b/yt/cpp/mapreduce/interface/client.h
@@ -493,6 +493,18 @@ public:
const TGetJobStderrOptions& options = TGetJobStderrOptions()) = 0;
///
+ /// @brief Get trace of a job.
+ ///
+ /// @ref NYT::TErrorResponse exception is thrown if it is missing.
+ ///
+ /// @note YT doesn't store all job traces.
+ ///
+ /// @see [YT doc](https://ytsaurus.tech/docs/en/api/commands.html#get_job_trace)
+ virtual std::vector<TJobTraceEvent> GetJobTrace(
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& options = TGetJobTraceOptions()) = 0;
+
+ ///
/// @brief Create one or several rbtorrents for files in a blob table.
///
/// If specified, one torrent is created for each value of `KeyColumns` option.
diff --git a/yt/cpp/mapreduce/interface/fwd.h b/yt/cpp/mapreduce/interface/fwd.h
index 0434c03d8b..485b45129a 100644
--- a/yt/cpp/mapreduce/interface/fwd.h
+++ b/yt/cpp/mapreduce/interface/fwd.h
@@ -157,6 +157,7 @@ namespace NYT {
using TTabletCellId = TGUID;
using TReplicaId = TGUID;
using TJobId = TGUID;
+ using TJobTraceId = TGUID;
using TYPath = TString;
using TLocalFilePath = TString;
@@ -370,6 +371,8 @@ namespace NYT {
struct TListJobsOptions;
+ struct TGetJobTraceOptions;
+
struct IOperationClient;
enum class EFinishedJobState : int;
diff --git a/yt/cpp/mapreduce/interface/operation.h b/yt/cpp/mapreduce/interface/operation.h
index 9a85049886..f2de3ea3bd 100644
--- a/yt/cpp/mapreduce/interface/operation.h
+++ b/yt/cpp/mapreduce/interface/operation.h
@@ -3048,6 +3048,70 @@ struct TGetFailedJobInfoOptions
////////////////////////////////////////////////////////////////////////////////
///
+/// @brief Options for @ref NYT::IClient::GetJobTrace.
+struct TGetJobTraceOptions
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TGetJobTraceOptions;
+ /// @endcond
+
+ ///
+ /// @brief Id of the job.
+ FLUENT_FIELD_OPTION(TJobId, JobId);
+
+ ///
+ /// @brief Id of the trace.
+ FLUENT_FIELD_OPTION(TJobTraceId, TraceId);
+
+ ///
+ /// @brief Search for traces with time >= `FromTime`.
+ FLUENT_FIELD_OPTION(i64, FromTime);
+
+ ///
+ /// @brief Search for traces with time <= `ToTime`.
+ FLUENT_FIELD_OPTION(i64, ToTime);
+
+ ///
+ /// @brief Search for traces with event index >= `FromEventIndex`.
+ FLUENT_FIELD_OPTION(i64, FromEventIndex);
+
+ ///
+ /// @brief Search for traces with event index >= `ToEventIndex`.
+ FLUENT_FIELD_OPTION(i64, ToEventIndex);
+};
+
+///
+/// @brief Response for @ref NYT::IOperation::GetJobTrace.
+struct TJobTraceEvent
+{
+ ///
+ /// @brief Id of the operation.
+ TOperationId OperationId;
+
+ ///
+ /// @brief Id of the job.
+ TJobId JobId;
+
+ ///
+ /// @brief Id of the trace.
+ TJobTraceId TraceId;
+
+ ///
+ /// @brief Index of the trace event.
+ i64 EventIndex;
+
+ ///
+ /// @brief Raw evenr in json format.
+ TString Event;
+
+ ///
+ /// @brief Time of the event.
+ TInstant EventTime;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
/// @brief Interface representing an operation.
struct IOperation
: public TThrRefBase
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
index 2f9610a1ca..59868c599e 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -783,6 +783,55 @@ IFileReaderPtr GetJobStderr(
return new TResponseReader(context, std::move(header));
}
+TJobTraceEvent ParseJobTraceEvent(const TNode& node)
+{
+ const auto& mapNode = node.AsMap();
+ TJobTraceEvent result;
+
+ if (auto idNode = mapNode.FindPtr("operation_id")) {
+ result.OperationId = GetGuid(idNode->AsString());
+ }
+ if (auto idNode = mapNode.FindPtr("job_id")) {
+ result.JobId = GetGuid(idNode->AsString());
+ }
+ if (auto idNode = mapNode.FindPtr("trace_id")) {
+ result.TraceId = GetGuid(idNode->AsString());
+ }
+ if (auto eventIndexNode = mapNode.FindPtr("event_index")) {
+ result.EventIndex = eventIndexNode->AsInt64();
+ }
+ if (auto eventNode = mapNode.FindPtr("event")) {
+ result.Event = eventNode->AsString();
+ }
+ if (auto eventTimeNode = mapNode.FindPtr("event_time")) {
+ result.EventTime = TInstant::ParseIso8601(eventTimeNode->AsString());;
+ }
+
+ return result;
+}
+
+std::vector<TJobTraceEvent> GetJobTrace(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& options)
+{
+ THttpHeader header("GET", "get_job_trace");
+ header.MergeParameters(SerializeParamsForGetJobTrace(operationId, options));
+ auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header);
+ auto resultNode = NodeFromYsonString(responseInfo.Response);
+
+ std::vector<TJobTraceEvent> result;
+
+ const auto& traceEventNodesList = resultNode.AsList();
+ result.reserve(traceEventNodesList.size());
+ for (const auto& traceEventNode : traceEventNodesList) {
+ result.push_back(ParseJobTraceEvent(traceEventNode));
+ }
+
+ return result;
+}
+
TMaybe<TYPath> GetFileFromCache(
const IRequestRetryPolicyPtr& retryPolicy,
const TClientContext& context,
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h
index c2d1a53b51..0a183d617c 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.h
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.h
@@ -257,6 +257,12 @@ TString GetJobStderrWithRetries(
const TJobId& jobId,
const TGetJobStderrOptions& options = TGetJobStderrOptions());
+std::vector<TJobTraceEvent> GetJobTrace(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& options = TGetJobTraceOptions());
+
//
// File cache
//
diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
index a638ac581d..bebc59ec91 100644
--- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
+++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
@@ -525,6 +525,15 @@ TNode SerializeParamsForGetJob(
return result;
}
+TNode SerializeParamsForGetJobTrace(
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& /* options */)
+{
+ TNode result;
+ SetOperationIdParam(&result, operationId);
+ return result;
+}
+
TNode SerializeParamsForListJobs(
const TOperationId& operationId,
const TListJobsOptions& options)
diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
index a7ab35d91d..69a4888267 100644
--- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
+++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
@@ -126,6 +126,10 @@ TNode SerializeParamsForListJobs(
const TOperationId& operationId,
const TListJobsOptions& options);
+TNode SerializeParamsForGetJobTrace(
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& options);
+
TNode SerializeParametersForInsertRows(
const TString& pathPrefix,
const TYPath& path,
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h
index c14f2d49d6..6f85657139 100644
--- a/yt/yt/client/api/delegating_client.h
+++ b/yt/yt/client/api/delegating_client.h
@@ -516,6 +516,11 @@ public:
const TGetJobStderrOptions& options),
(operationIdOrAlias, jobId, options))
+ DELEGATE_METHOD(TFuture<std::vector<TJobTraceEvent>>, GetJobTrace, (
+ const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
+ const TGetJobTraceOptions& options),
+ (operationIdOrAlias, options))
+
DELEGATE_METHOD(TFuture<TSharedRef>, GetJobFailContext, (
const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
NJobTrackerClient::TJobId jobId,
@@ -851,9 +856,9 @@ public:
const TDistributedWriteSessionFinishOptions& options),
(std::move(session), options))
- DELEGATE_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (
- const TDistributedWriteCookiePtr& cookie,
- const TParticipantTableWriterOptions& options),
+ DELEGATE_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (
+ const TFragmentWriteCookiePtr& cookie,
+ const TFragmentTableWriterOptions& options),
(cookie, options))
// Shuffle Service
diff --git a/yt/yt/client/api/distributed_table_client.h b/yt/yt/client/api/distributed_table_client.h
index 9ac983371c..d1bca5bc3a 100644
--- a/yt/yt/client/api/distributed_table_client.h
+++ b/yt/yt/client/api/distributed_table_client.h
@@ -4,6 +4,8 @@
#include <yt/yt/client/table_client/config.h>
+#include <library/cpp/yt/misc/non_null_ptr.h>
+
namespace NYT::NApi {
////////////////////////////////////////////////////////////////////////////////
@@ -13,10 +15,11 @@ struct TDistributedWriteSessionStartOptions
{ };
struct TDistributedWriteSessionFinishOptions
- : public TTransactionalOptions
-{ };
+{
+ int MaxChildrenPerAttachRequest = 10'000;
+};
-struct TParticipantTableWriterOptions
+struct TFragmentTableWriterOptions
: public TTableWriterOptions
{ };
@@ -24,6 +27,7 @@ struct TParticipantTableWriterOptions
struct IDistributedTableClientBase
{
+public:
virtual ~IDistributedTableClientBase() = default;
virtual TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession(
@@ -33,6 +37,13 @@ struct IDistributedTableClientBase
virtual TFuture<void> FinishDistributedWriteSession(
TDistributedWriteSessionPtr session,
const TDistributedWriteSessionFinishOptions& options = {}) = 0;
+
+ // Helper used to implement FinishDistributedWriteSession efficiently
+ // without compromising privacy of session fields.
+ // defined in yt/yt/client/api/distributed_table_session.cpp.
+ void* GetOpaqueDistributedWriteResults(Y_LIFETIME_BOUND const TDistributedWriteSessionPtr& session);
+ // Used in chunk writer for results recording
+ void RecordOpaqueWriteResult(const TFragmentWriteCookiePtr& cookie, void* opaqueWriteResult);
};
////////////////////////////////////////////////////////////////////////////////
@@ -41,9 +52,9 @@ struct IDistributedTableClient
{
virtual ~IDistributedTableClient() = default;
- virtual TFuture<ITableWriterPtr> CreateParticipantTableWriter(
- const TDistributedWriteCookiePtr& cookie,
- const TParticipantTableWriterOptions& options = {}) = 0;
+ virtual TFuture<ITableWriterPtr> CreateFragmentTableWriter(
+ const TFragmentWriteCookiePtr& cookie,
+ const TFragmentTableWriterOptions& options = {}) = 0;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/distributed_table_session.cpp b/yt/yt/client/api/distributed_table_session.cpp
new file mode 100644
index 0000000000..d67998b9d2
--- /dev/null
+++ b/yt/yt/client/api/distributed_table_session.cpp
@@ -0,0 +1,197 @@
+#include "distributed_table_session.h"
+
+#include "client.h"
+#include "transaction.h"
+
+namespace NYT::NApi {
+
+using namespace NYPath;
+using namespace NObjectClient;
+using namespace NTableClient;
+using namespace NTransactionClient;
+using namespace NYTree;
+using namespace NCypressClient;
+using namespace NChunkClient;
+
+////////////////////////////////////////////////////////////////////////////////
+
+TTableWriterPatchInfo::TTableWriterPatchInfo(
+ TRichYPath richPath,
+ TObjectId objectId,
+ TCellTag externalCellTag,
+ TMasterTableSchemaId chunkSchemaId,
+ TTableSchemaPtr chunkSchema,
+ std::optional<TLegacyOwningKey> writerLastKey,
+ int maxHeavyColumns,
+ TTimestamp timestamp,
+ const IAttributeDictionary& tableAttributes)
+ : TTableWriterPatchInfo()
+{
+ ObjectId = objectId;
+ RichPath = std::move(richPath);
+
+ ChunkSchemaId = chunkSchemaId;
+ ChunkSchema = std::move(chunkSchema);
+
+ WriterLastKey = writerLastKey;
+ MaxHeavyColumns = maxHeavyColumns;
+
+ TableAttributes = tableAttributes.ToMap();
+
+ ExternalCellTag = externalCellTag;
+
+ Timestamp = timestamp;
+}
+
+void TTableWriterPatchInfo::Register(TRegistrar registrar)
+{
+ registrar.Parameter("table_id", &TThis::ObjectId);
+ registrar.Parameter("table_path", &TThis::RichPath);
+
+ registrar.Parameter("chunk_schema_id", &TThis::ChunkSchemaId);
+ registrar.Parameter("chunk_schema", &TThis::ChunkSchema);
+
+ registrar.Parameter("writer_last_key", &TThis::WriterLastKey);
+ registrar.Parameter("max_heavy_columns", &TThis::MaxHeavyColumns);
+
+ registrar.Parameter("table_attributes", &TThis::TableAttributes);
+
+ registrar.Parameter("external_cell_tag", &TThis::ExternalCellTag);
+
+ registrar.Parameter("timestamp", &TThis::Timestamp);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TFragmentWriteResult::Register(TRegistrar registrar)
+{
+ registrar.Parameter("min_boundary_key", &TThis::MinBoundaryKey);
+ registrar.Parameter("max_boundary_key", &TThis::MaxBoundaryKey);
+ registrar.Parameter("chunk_list_id", &TThis::ChunkListId);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+const TTableWriterPatchInfo& TFragmentWriteCookie::GetPatchInfo() const
+{
+ return PatchInfo_;
+}
+
+TTransactionId TFragmentWriteCookie::GetMainTransactionId() const
+{
+ return MainTxId_;
+}
+
+TTransactionId TFragmentWriteCookie::GetUploadTransactionId() const
+{
+ return UploadTxId_;
+}
+
+void TFragmentWriteCookie::Register(TRegistrar registrar)
+{
+ registrar.Parameter("session_id", &TThis::Id_);
+
+ registrar.Parameter("tx_id", &TThis::MainTxId_);
+ registrar.Parameter("upload_tx_id", &TThis::UploadTxId_);
+
+ registrar.Parameter("patch_info", &TThis::PatchInfo_);
+
+ registrar.Parameter("writer_results", &TThis::WriteResults_);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TDistributedWriteSession::TDistributedWriteSession(
+ TTransactionId mainTxId,
+ TTransactionId uploadTxId,
+ TChunkListId rootChunkListId,
+ TTableWriterPatchInfo patchInfo)
+ : TDistributedWriteSession()
+{
+ Id_ = TDistributedWriteSessionId(TGuid::Create());
+ MainTxId_ = mainTxId;
+ UploadTxId_ = uploadTxId;
+
+ RootChunkListId_ = rootChunkListId;
+
+ PatchInfo_ = std::move(patchInfo);
+
+ // Signatures?
+}
+
+TTransactionId TDistributedWriteSession::GetMainTransactionId() const
+{
+ return MainTxId_;
+}
+
+TTransactionId TDistributedWriteSession::GetUploadTransactionId() const
+{
+ return UploadTxId_;
+}
+
+const TTableWriterPatchInfo& TDistributedWriteSession::GetPatchInfo() const Y_LIFETIME_BOUND
+{
+ return PatchInfo_;
+}
+
+TChunkListId TDistributedWriteSession::GetRootChunkListId() const
+{
+ return RootChunkListId_;
+}
+
+TFragmentWriteCookiePtr TDistributedWriteSession::GiveCookie()
+{
+ auto cookie = New<TFragmentWriteCookie>();
+ cookie->Id_ = Id_;
+ cookie->MainTxId_ = MainTxId_;
+ cookie->UploadTxId_ = UploadTxId_;
+ cookie->PatchInfo_ = PatchInfo_;
+
+ return cookie;
+}
+
+void TDistributedWriteSession::TakeCookie(TFragmentWriteCookiePtr cookie)
+{
+ // Verify cookie signature?
+ WriteResults_.reserve(std::ssize(WriteResults_) + std::ssize(cookie->WriteResults_));
+ for (const auto& writeResult : cookie->WriteResults_) {
+ WriteResults_.push_back(writeResult);
+ }
+}
+
+TFuture<void> TDistributedWriteSession::Ping(IClientPtr client)
+{
+ // NB(arkady-e1ppa): AutoAbort = false by default.
+ auto mainTx = client->AttachTransaction(MainTxId_);
+
+ return mainTx->Ping();
+}
+
+void TDistributedWriteSession::Register(TRegistrar registrar)
+{
+ registrar.Parameter("session_id", &TThis::Id_);
+ registrar.Parameter("tx_id", &TThis::MainTxId_);
+ registrar.Parameter("upload_tx_id", &TThis::UploadTxId_);
+
+ registrar.Parameter("root_chunk_list_id", &TThis::RootChunkListId_);
+
+ registrar.Parameter("patch_info", &TThis::PatchInfo_);
+
+ registrar.Parameter("write_results", &TThis::WriteResults_);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void* IDistributedTableClientBase::GetOpaqueDistributedWriteResults(Y_LIFETIME_BOUND const TDistributedWriteSessionPtr& session)
+{
+ return static_cast<void*>(&session->WriteResults_);
+}
+
+void IDistributedTableClientBase::RecordOpaqueWriteResult(const TFragmentWriteCookiePtr& cookie, void* opaqueWriteResult)
+{
+ auto* concrete = static_cast<TFragmentWriteResult*>(opaqueWriteResult);
+ YT_ASSERT(concrete);
+ cookie->WriteResults_.push_back(*concrete);
+}
+
+} // namespace NYT::NApi
diff --git a/yt/yt/client/api/distributed_table_session.h b/yt/yt/client/api/distributed_table_session.h
new file mode 100644
index 0000000000..c8c074c84c
--- /dev/null
+++ b/yt/yt/client/api/distributed_table_session.h
@@ -0,0 +1,158 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/client/table_client/table_upload_options.h>
+
+#include <yt/yt/client/chunk_client/public.h>
+
+#include <yt/yt/client/table_client/key.h>
+
+#include <yt/yt/client/ypath/rich.h>
+
+#include <yt/yt/core/ytree/yson_struct.h>
+
+namespace NYT::NApi {
+
+////////////////////////////////////////////////////////////////////////////////
+
+YT_DEFINE_STRONG_TYPEDEF(TDistributedWriteSessionId, TGuid);
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Used by distributed table writer to patch
+// its config based on table data.
+// NB(arkady-e1ppa): TableUploadOptions are
+// encoded in RichPath + TableAttributes and thus
+// are not required to be stored directly.
+struct TTableWriterPatchInfo
+ : public NYTree::TYsonStructLite
+{
+public:
+ NObjectClient::TObjectId ObjectId;
+
+ NYPath::TRichYPath RichPath;
+
+ // NB(arkady-e1ppa): Empty writer last key Serialization into Deserialization
+ // somehow results in a []; row which is considered non-empty for some reason.
+ // This matters too little for me to bother saving up the std::optional wrapper.
+ std::optional<NTableClient::TLegacyOwningKey> WriterLastKey;
+ int MaxHeavyColumns;
+
+ NTableClient::TMasterTableSchemaId ChunkSchemaId;
+ NTableClient::TTableSchemaPtr ChunkSchema;
+
+ NYTree::INodePtr TableAttributes;
+
+ NObjectClient::TCellTag ExternalCellTag;
+
+ NTransactionClient::TTimestamp Timestamp;
+
+ TTableWriterPatchInfo(
+ NYPath::TRichYPath richPath,
+ NObjectClient::TObjectId objectId,
+ NObjectClient::TCellTag externalCellTag,
+ NTableClient::TMasterTableSchemaId chunkSchemaId,
+ NTableClient::TTableSchemaPtr chunkSchema,
+ std::optional<NTableClient::TLegacyOwningKey> writerLastKey,
+ int maxHeavyColumns,
+ NTransactionClient::TTimestamp timestamp,
+ const NYTree::IAttributeDictionary& tableAttributes);
+
+ REGISTER_YSON_STRUCT_LITE(TTableWriterPatchInfo)
+
+ static void Register(TRegistrar registrar);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TFragmentWriteResult
+ : public NYTree::TYsonStructLite
+{
+ NTableClient::TLegacyOwningKey MinBoundaryKey;
+ NTableClient::TLegacyOwningKey MaxBoundaryKey;
+ NChunkClient::TChunkListId ChunkListId;
+
+ REGISTER_YSON_STRUCT_LITE(TFragmentWriteResult)
+
+ static void Register(TRegistrar registrar);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TFragmentWriteCookie
+ : public NYTree::TYsonStruct
+{
+public:
+ const TTableWriterPatchInfo& GetPatchInfo() const;
+
+ NCypressClient::TTransactionId GetMainTransactionId() const;
+ NCypressClient::TTransactionId GetUploadTransactionId() const;
+
+private:
+ TDistributedWriteSessionId Id_;
+
+ NCypressClient::TTransactionId MainTxId_;
+ NCypressClient::TTransactionId UploadTxId_;
+
+ TTableWriterPatchInfo PatchInfo_;
+
+ std::vector<TFragmentWriteResult> WriteResults_;
+
+ REGISTER_YSON_STRUCT(TFragmentWriteCookie);
+
+ static void Register(TRegistrar registrar);
+
+ friend class TDistributedWriteSession;
+ friend struct IDistributedTableClientBase;
+};
+
+DEFINE_REFCOUNTED_TYPE(TFragmentWriteCookie);
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TDistributedWriteSession
+ : public NYTree::TYsonStruct
+{
+public:
+ TDistributedWriteSession(
+ NCypressClient::TTransactionId mainTxId,
+ NCypressClient::TTransactionId uploadTxId,
+ NChunkClient::TChunkListId rootChunkListId,
+ TTableWriterPatchInfo patchInfo);
+
+ NCypressClient::TTransactionId GetMainTransactionId() const;
+ NCypressClient::TTransactionId GetUploadTransactionId() const;
+ const TTableWriterPatchInfo& GetPatchInfo() const Y_LIFETIME_BOUND;
+ NChunkClient::TChunkListId GetRootChunkListId() const;
+
+ TFragmentWriteCookiePtr GiveCookie();
+ void TakeCookie(TFragmentWriteCookiePtr cookie);
+
+ TFuture<void> Ping(IClientPtr client);
+
+ REGISTER_YSON_STRUCT(TDistributedWriteSession);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ TDistributedWriteSessionId Id_;
+
+ NCypressClient::TTransactionId MainTxId_;
+ NCypressClient::TTransactionId UploadTxId_;
+
+ NChunkClient::TChunkListId RootChunkListId_;
+
+ TTableWriterPatchInfo PatchInfo_;
+
+ // This is used to commit changes when session is over.
+ std::vector<TFragmentWriteResult> WriteResults_;
+
+ friend struct IDistributedTableClientBase;
+};
+
+DEFINE_REFCOUNTED_TYPE(TDistributedWriteSession);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi
diff --git a/yt/yt/client/api/distributed_table_sessions.cpp b/yt/yt/client/api/distributed_table_sessions.cpp
deleted file mode 100644
index b454ad476b..0000000000
--- a/yt/yt/client/api/distributed_table_sessions.cpp
+++ /dev/null
@@ -1,17 +0,0 @@
-#include "distributed_table_sessions.h"
-
-namespace NYT::NApi {
-
-////////////////////////////////////////////////////////////////////////////////
-
-void TDistributedWriteCookie::Register(TRegistrar /*registrar*/)
-{ }
-
-////////////////////////////////////////////////////////////////////////////////
-
-void TDistributedWriteSession::Register(TRegistrar /*registrar*/)
-{ }
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NApi
diff --git a/yt/yt/client/api/distributed_table_sessions.h b/yt/yt/client/api/distributed_table_sessions.h
deleted file mode 100644
index 19b3c0f5c0..0000000000
--- a/yt/yt/client/api/distributed_table_sessions.h
+++ /dev/null
@@ -1,41 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include <yt/yt/core/ytree/yson_struct.h>
-
-namespace NYT::NApi {
-
-////////////////////////////////////////////////////////////////////////////////
-
-YT_DEFINE_STRONG_TYPEDEF(TDistributedWriteSessionId, TGuid);
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TDistributedWriteCookie
- : public NYTree::TYsonStruct
-{
-public:
- REGISTER_YSON_STRUCT(TDistributedWriteCookie);
-
- static void Register(TRegistrar registrar);
-};
-
-DEFINE_REFCOUNTED_TYPE(TDistributedWriteCookie);
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TDistributedWriteSession
- : public NYTree::TYsonStruct
-{
-public:
- REGISTER_YSON_STRUCT(TDistributedWriteSession);
-
- static void Register(TRegistrar registrar);
-};
-
-DEFINE_REFCOUNTED_TYPE(TDistributedWriteSession);
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NApi
diff --git a/yt/yt/client/api/operation_client.cpp b/yt/yt/client/api/operation_client.cpp
index 4e1816a6e7..8f8dbb355b 100644
--- a/yt/yt/client/api/operation_client.cpp
+++ b/yt/yt/client/api/operation_client.cpp
@@ -217,6 +217,19 @@ void Serialize(const TJob& job, NYson::IYsonConsumer* consumer, TStringBuf idKey
.EndMap();
}
+void Serialize(const TJobTraceEvent& traceEvent, NYson::IYsonConsumer* consumer)
+{
+ NYTree::BuildYsonFluently(consumer)
+ .BeginMap()
+ .Item("operation_id").Value(traceEvent.OperationId)
+ .Item("job_id").Value(traceEvent.JobId)
+ .Item("trace_id").Value(traceEvent.TraceId)
+ .Item("event_index").Value(traceEvent.EventIndex)
+ .Item("event").Value(traceEvent.Event)
+ .Item("event_time").Value(traceEvent.EventTime.MicroSeconds())
+ .EndMap();
+}
+
////////////////////////////////////////////////////////////////////////////////
void TListOperationsAccessFilter::Register(TRegistrar registrar)
diff --git a/yt/yt/client/api/operation_client.h b/yt/yt/client/api/operation_client.h
index 48687497da..d5072db1fc 100644
--- a/yt/yt/client/api/operation_client.h
+++ b/yt/yt/client/api/operation_client.h
@@ -89,6 +89,18 @@ struct TGetJobStderrOptions
std::optional<i64> Offset;
};
+struct TGetJobTraceOptions
+ : public TTimeoutOptions
+ , public TMasterReadOptions
+{
+ std::optional<NJobTrackerClient::TJobId> JobId;
+ std::optional<NScheduler::TJobTraceId> TraceId;
+ std::optional<i64> FromTime;
+ std::optional<i64> ToTime;
+ std::optional<i64> FromEventIndex;
+ std::optional<i64> ToEventIndex;
+};
+
struct TGetJobFailContextOptions
: public TTimeoutOptions
, public TMasterReadOptions
@@ -346,6 +358,18 @@ struct TJob
void Serialize(const TJob& job, NYson::IYsonConsumer* consumer, TStringBuf idKey);
+struct TJobTraceEvent
+{
+ NJobTrackerClient::TOperationId OperationId;
+ NJobTrackerClient::TJobId JobId;
+ NScheduler::TJobTraceId TraceId;
+ i64 EventIndex;
+ TString Event;
+ TInstant EventTime;
+};
+
+void Serialize(const TJobTraceEvent& traceEvent, NYson::IYsonConsumer* consumer);
+
struct TListJobsStatistics
{
TEnumIndexedArray<NJobTrackerClient::EJobState, i64> StateCounts;
@@ -443,6 +467,10 @@ struct IOperationClient
NJobTrackerClient::TJobId jobId,
const TGetJobStderrOptions& options = {}) = 0;
+ virtual TFuture<std::vector<TJobTraceEvent>> GetJobTrace(
+ const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
+ const TGetJobTraceOptions& options = {}) = 0;
+
virtual TFuture<TSharedRef> GetJobFailContext(
const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
NJobTrackerClient::TJobId jobId,
diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h
index be710a04ce..47eea56fb8 100644
--- a/yt/yt/client/api/public.h
+++ b/yt/yt/client/api/public.h
@@ -188,7 +188,8 @@ DECLARE_REFCOUNTED_STRUCT(TBackupManifest)
DECLARE_REFCOUNTED_STRUCT(TListOperationsAccessFilter)
DECLARE_REFCOUNTED_CLASS(TDistributedWriteSession)
-DECLARE_REFCOUNTED_CLASS(TDistributedWriteCookie)
+DECLARE_REFCOUNTED_CLASS(TFragmentWriteCookie)
+struct IDistributedTableClientBase;
DECLARE_REFCOUNTED_STRUCT(TShuffleHandle)
diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
index 6b00c7470a..5f1c945e3f 100644
--- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h
+++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
@@ -121,6 +121,7 @@ public:
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetJobInputPaths);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetJobSpec);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetJobStderr);
+ DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetJobTrace);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetJobFailContext);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, AbandonJob);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PollJobShell);
@@ -207,7 +208,7 @@ public:
// Distributed table client
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, StartDistributedWriteSession);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, FinishDistributedWriteSession);
- DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ParticipantWriteTable,
+ DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, WriteTableFragment,
.SetStreamingEnabled(true));
// Shuffle service
diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp
index c86a1305af..6ad3c152b4 100644
--- a/yt/yt/client/api/rpc_proxy/client_base.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_base.cpp
@@ -12,7 +12,7 @@
#include "table_writer.h"
#include "transaction.h"
-#include <yt/yt/client/api/distributed_table_sessions.h>
+#include <yt/yt/client/api/distributed_table_session.h>
#include <yt/yt/client/api/file_reader.h>
#include <yt/yt/client/api/file_writer.h>
#include <yt/yt/client/api/journal_reader.h>
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index b9e4944381..430b366714 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -8,6 +8,7 @@
#include "timestamp_provider.h"
#include "transaction.h"
+#include <yt/yt/client/api/distributed_table_session.h>
#include <yt/yt/client/api/helpers.h>
#include <yt/yt/client/api/rowset.h>
#include <yt/yt/client/api/transaction.h>
@@ -752,12 +753,14 @@ TFuture<void> TClient::AlterReplicationCard(
return req->Invoke().As<void>();
}
-TFuture<ITableWriterPtr> TClient::CreateParticipantTableWriter(
- const TDistributedWriteCookiePtr& cookie,
- const TParticipantTableWriterOptions& options)
+TFuture<ITableWriterPtr> TClient::CreateFragmentTableWriter(
+ const TFragmentWriteCookiePtr& cookie,
+ const TFragmentTableWriterOptions& options)
{
+ using TRspPtr = TIntrusivePtr<NRpc::TTypedClientResponse<NProto::TRspWriteTableFragment>>;
+
auto proxy = CreateApiServiceProxy();
- auto req = proxy.ParticipantWriteTable();
+ auto req = proxy.WriteTableFragment();
InitStreamingRequest(*req);
FillRequest(req.Get(), cookie, options);
@@ -768,12 +771,15 @@ TFuture<ITableWriterPtr> TClient::CreateParticipantTableWriter(
BIND ([=] (const TSharedRef& metaRef) {
NApi::NRpcProxy::NProto::TWriteTableMeta meta;
if (!TryDeserializeProto(&meta, metaRef)) {
- THROW_ERROR_EXCEPTION("Failed to deserialize schema for participant table writer");
+ THROW_ERROR_EXCEPTION("Failed to deserialize schema for fragment table writer");
}
FromProto(schema.Get(), meta.schema());
+ }),
+ BIND([=] (TRspPtr&& rsp) mutable {
+ Deserialize(*cookie, ConvertToNode(TYsonString(rsp->cookie())));
}))
- .Apply(BIND([=] (IAsyncZeroCopyOutputStreamPtr outputStream) {
+ .ApplyUnique(BIND([=] (IAsyncZeroCopyOutputStreamPtr&& outputStream) {
return NRpcProxy::CreateTableWriter(std::move(outputStream), std::move(schema));
})).As<ITableWriterPtr>();
}
@@ -1328,6 +1334,41 @@ TFuture<TGetJobStderrResponse> TClient::GetJobStderr(
}));
}
+TFuture<std::vector<TJobTraceEvent>> TClient::GetJobTrace(
+ const TOperationIdOrAlias& operationIdOrAlias,
+ const TGetJobTraceOptions& options)
+{
+ auto proxy = CreateApiServiceProxy();
+
+ auto req = proxy.GetJobTrace();
+ SetTimeoutOptions(*req, options);
+
+ NScheduler::ToProto(req, operationIdOrAlias);
+ if (options.JobId) {
+ ToProto(req->mutable_job_id(), *options.JobId);
+ }
+ if (options.TraceId) {
+ ToProto(req->mutable_trace_id(), *options.TraceId);
+ }
+ if (options.FromTime) {
+ req->set_from_time(*options.FromTime);
+ }
+ if (options.ToTime) {
+ req->set_to_time(*options.ToTime);
+ }
+ if (options.FromEventIndex) {
+ req->set_from_event_index(*options.FromEventIndex);
+ }
+ if (options.ToEventIndex) {
+ req->set_to_event_index(*options.ToEventIndex);
+ }
+
+ return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspGetJobTracePtr& rsp) {
+ return FromProto<std::vector<TJobTraceEvent>>(rsp->events());
+ }));
+}
+
+
TFuture<TSharedRef> TClient::GetJobFailContext(
const TOperationIdOrAlias& operationIdOrAlias,
NJobTrackerClient::TJobId jobId,
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h
index d3905300e3..16e9400391 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.h
+++ b/yt/yt/client/api/rpc_proxy/client_impl.h
@@ -130,9 +130,9 @@ public:
const TAlterReplicationCardOptions& options = {}) override;
// Distributed table client
- TFuture<ITableWriterPtr> CreateParticipantTableWriter(
- const TDistributedWriteCookiePtr& cookie,
- const TParticipantTableWriterOptions& options) override;
+ TFuture<ITableWriterPtr> CreateFragmentTableWriter(
+ const TFragmentWriteCookiePtr& cookie,
+ const TFragmentTableWriterOptions& options) override;
// Queues.
TFuture<NQueueClient::IQueueRowsetPtr> PullQueue(
@@ -278,6 +278,10 @@ public:
NJobTrackerClient::TJobId jobId,
const NApi::TGetJobStderrOptions& options) override;
+ TFuture<std::vector<TJobTraceEvent>> GetJobTrace(
+ const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
+ const NApi::TGetJobTraceOptions& options) override;
+
TFuture<TSharedRef> GetJobFailContext(
const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
NJobTrackerClient::TJobId jobId,
diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.h b/yt/yt/client/api/rpc_proxy/connection_impl.h
index 576b655870..92d1f5b4ec 100644
--- a/yt/yt/client/api/rpc_proxy/connection_impl.h
+++ b/yt/yt/client/api/rpc_proxy/connection_impl.h
@@ -9,7 +9,7 @@
#include <yt/yt/core/rpc/public.h>
// TODO(prime@): Create HTTP endpoint for discovery that works without authentication.
-#include <yt/yt/core/misc/atomic_object.h>
+#include <library/cpp/yt/threading/atomic_object.h>
#include <yt/yt/core/service_discovery/public.h>
diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp
index a89368573c..3a6a3650be 100644
--- a/yt/yt/client/api/rpc_proxy/helpers.cpp
+++ b/yt/yt/client/api/rpc_proxy/helpers.cpp
@@ -1,5 +1,6 @@
#include "helpers.h"
+#include <yt/yt/client/api/distributed_table_session.h>
#include <yt/yt/client/api/rowset.h>
#include <yt/yt/client/api/table_client.h>
@@ -58,6 +59,17 @@ void ToProto(
proto->set_suppress_upstream_sync(options.SuppressUpstreamSync);
}
+void FromProto(
+ NApi::TTransactionalOptions* options,
+ const NProto::TTransactionalOptions& proto)
+{
+ FromProto(&options->TransactionId, proto.transaction_id());
+ options->Ping = proto.ping();
+ options->PingAncestors = proto.ping_ancestors();
+ options->SuppressTransactionCoordinatorSync = proto.suppress_transaction_coordinator_sync();
+ options->SuppressUpstreamSync = proto.suppress_upstream_sync();
+}
+
void ToProto(
NProto::TPrerequisiteOptions* proto,
const NApi::TPrerequisiteOptions& options)
@@ -426,6 +438,30 @@ void FromProto(
FromProto(&result->Errors, proto.errors());
}
+void ToProto(
+ NProto::TJobTraceEvent* proto,
+ const NApi::TJobTraceEvent& result)
+{
+ ToProto(proto->mutable_operation_id(), result.OperationId);
+ ToProto(proto->mutable_job_id(), result.JobId);
+ ToProto(proto->mutable_trace_id(), result.TraceId);
+ proto->set_event_index(result.EventIndex);
+ proto->set_event(result.Event);
+ proto->set_event_time(ToProto<i64>(result.EventTime));
+}
+
+void FromProto(
+ NApi::TJobTraceEvent* result,
+ const NProto::TJobTraceEvent& proto)
+{
+ FromProto(&result->OperationId, proto.operation_id());
+ FromProto(&result->JobId, proto.job_id());
+ FromProto(&result->TraceId, proto.trace_id());
+ result->EventIndex = proto.event_index();
+ result->Event = proto.event();
+ result->EventTime = TInstant::FromValue(proto.event_time());
+}
+
////////////////////////////////////////////////////////////////////////////////
// MISC
////////////////////////////////////////////////////////////////////////////////
@@ -1922,28 +1958,22 @@ void FillRequest(
const NYPath::TRichYPath& path,
const TDistributedWriteSessionStartOptions& options)
{
- Y_UNUSED(req, path, options);
+ ToProto(req->mutable_path(), path);
+
+ if (options.TransactionId) {
+ ToProto(req->mutable_transactional_options(), options);
+ }
}
-void FromProto(
+void ParseRequest(
+ NYPath::TRichYPath* mutablePath,
TDistributedWriteSessionStartOptions* mutableOptions,
const TReqStartDistributedWriteSession& req)
{
- Y_UNUSED(req, mutableOptions);
-}
-
-void FromProto(
- TDistributedWriteSession* mutableSession,
- TRspStartDistributedWriteSession&& rsp)
-{
- Y_UNUSED(mutableSession, rsp);
-}
-
-void ToProto(
- TRspStartDistributedWriteSession* rsp,
- const TDistributedWriteSessionPtr& session)
-{
- Y_UNUSED(rsp, session);
+ *mutablePath = FromProto<NYPath::TRichYPath>(req.path());
+ if (req.has_transactional_options()) {
+ FromProto(mutableOptions, req.transactional_options());
+ }
}
////////////////////////////////////////////////////////////////////////////////
@@ -1953,52 +1983,44 @@ void FillRequest(
TDistributedWriteSessionPtr session,
const TDistributedWriteSessionFinishOptions& options)
{
- Y_UNUSED(req, session, options);
+ req->set_session(ConvertToYsonString(session).ToString());
+ req->set_max_children_per_attach_request(options.MaxChildrenPerAttachRequest);
}
-void FromProto(
+void ParseRequest(
+ TDistributedWriteSessionPtr* mutableSession,
TDistributedWriteSessionFinishOptions* mutableOptions,
const TReqFinishDistributedWriteSession& req)
{
- Y_UNUSED(req, mutableOptions);
-}
-
-void FromProto(
- TDistributedWriteSession* mutableSession,
- const TReqFinishDistributedWriteSession& req)
-{
- Y_UNUSED(mutableSession, req);
+ *mutableSession = ConvertTo<TDistributedWriteSessionPtr>(TYsonString(req.session()));
+ mutableOptions->MaxChildrenPerAttachRequest = req.max_children_per_attach_request();
}
////////////////////////////////////////////////////////////////////////////////
void FillRequest(
- TReqParticipantWriteTable* req,
- const TDistributedWriteCookiePtr& cookie,
- const TParticipantTableWriterOptions& options)
+ TReqWriteTableFragment* req,
+ const TFragmentWriteCookiePtr& cookie,
+ const TFragmentTableWriterOptions& options)
{
- Y_UNUSED(req, cookie, options);
-}
+ req->set_cookie(ConvertToYsonString(cookie).ToString());
-void FromProto(
- TParticipantTableWriterOptions* mutableOptions,
- const TReqParticipantWriteTable& req)
-{
- Y_UNUSED(req, mutableOptions);
-}
-
-void FromProto(
- TDistributedWriteCookie* cookie,
- const TReqParticipantWriteTable& req)
-{
- Y_UNUSED(cookie, req);
+ if (options.Config) {
+ req->set_config(ConvertToYsonString(*options.Config).ToString());
+ }
}
-void ToProto(
- TRspParticipantWriteTable* rsp,
- const TDistributedWriteCookiePtr& cookie)
+void ParseRequest(
+ TFragmentWriteCookiePtr* mutableCookie,
+ TFragmentTableWriterOptions* mutableOptions,
+ const TReqWriteTableFragment& req)
{
- Y_UNUSED(rsp, cookie);
+ *mutableCookie = ConvertTo<TFragmentWriteCookiePtr>(TYsonString(req.cookie()));
+ if (req.has_config()) {
+ mutableOptions->Config = ConvertTo<TTableWriterConfigPtr>(TYsonString(req.config()));
+ } else {
+ mutableOptions->Config = ConvertTo<TTableWriterConfigPtr>(TYsonString(TStringBuf("{}")));
+ }
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/rpc_proxy/helpers.h b/yt/yt/client/api/rpc_proxy/helpers.h
index 9027337359..8849b7be44 100644
--- a/yt/yt/client/api/rpc_proxy/helpers.h
+++ b/yt/yt/client/api/rpc_proxy/helpers.h
@@ -30,6 +30,10 @@ void ToProto(
NProto::TTransactionalOptions* proto,
const NApi::TTransactionalOptions& options);
+void FromProto(
+ NApi::TTransactionalOptions* options,
+ const NProto::TTransactionalOptions& proto);
+
void ToProto(
NProto::TPrerequisiteOptions* proto,
const NApi::TPrerequisiteOptions& options);
@@ -106,6 +110,14 @@ void FromProto(
NApi::TListJobsResult* result,
const NProto::TListJobsResult& proto);
+void ToProto(
+ NProto::TJobTraceEvent* proto,
+ const NApi::TJobTraceEvent& result);
+
+void FromProto(
+ NApi::TJobTraceEvent* result,
+ const NProto::TJobTraceEvent& proto);
+
void ToProto(NProto::TColumnSchema* protoSchema, const NTableClient::TColumnSchema& schema);
void FromProto(NTableClient::TColumnSchema* schema, const NProto::TColumnSchema& protoSchema);
@@ -279,18 +291,11 @@ void FillRequest(
const NYPath::TRichYPath& path,
const TDistributedWriteSessionStartOptions& options);
-void FromProto(
+void ParseRequest(
+ NYPath::TRichYPath* mutablePath,
TDistributedWriteSessionStartOptions* mutableOptions,
const TReqStartDistributedWriteSession& req);
-void FromProto(
- TDistributedWriteSession* mutableSession,
- TRspStartDistributedWriteSession&& rsp);
-
-void ToProto(
- TRspStartDistributedWriteSession* rsp,
- const TDistributedWriteSessionPtr& session);
-
////////////////////////////////////////////////////////////////////////////////
void FillRequest(
@@ -298,32 +303,22 @@ void FillRequest(
TDistributedWriteSessionPtr session,
const TDistributedWriteSessionFinishOptions& options);
-void FromProto(
+void ParseRequest(
+ TDistributedWriteSessionPtr* mutableSession,
TDistributedWriteSessionFinishOptions* mutableOptions,
const TReqFinishDistributedWriteSession& req);
-void FromProto(
- TDistributedWriteSession* mutableSession,
- const TReqFinishDistributedWriteSession& req);
-
////////////////////////////////////////////////////////////////////////////////
void FillRequest(
- TReqParticipantWriteTable* req,
- const TDistributedWriteCookiePtr& cookie,
- const TParticipantTableWriterOptions& options);
-
-void FromProto(
- TParticipantTableWriterOptions* mutableOptions,
- const TReqParticipantWriteTable& req);
-
-void FromProto(
- TDistributedWriteCookie* cookie,
- const TReqParticipantWriteTable& req);
-
-void ToProto(
- TRspParticipantWriteTable* rsp,
- const TDistributedWriteCookiePtr& cookie);
+ TReqWriteTableFragment* req,
+ const TFragmentWriteCookiePtr& cookie,
+ const TFragmentTableWriterOptions& options);
+
+void ParseRequest(
+ TFragmentWriteCookiePtr* mutableCookie,
+ TFragmentTableWriterOptions* mutableOptions,
+ const TReqWriteTableFragment& req);
} // namespace NProto
diff --git a/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp b/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp
index 392312d511..571d0901b6 100644
--- a/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp
+++ b/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp
@@ -103,9 +103,8 @@ private:
TIndexInfo indexInfo{
.TableId = FromProto<NObjectClient::TObjectId>(protoIndexInfo.index_table_id()),
.Kind = FromProto<ESecondaryIndexKind>(protoIndexInfo.index_kind()),
- .Predicate = protoIndexInfo.has_predicate()
- ? std::make_optional(FromProto<TString>(protoIndexInfo.predicate()))
- : std::nullopt,
+ .Predicate = YT_PROTO_OPTIONAL(protoIndexInfo, predicate),
+ .UnfoldedColumn = YT_PROTO_OPTIONAL(protoIndexInfo, unfolded_column),
};
THROW_ERROR_EXCEPTION_UNLESS(TEnumTraits<ESecondaryIndexKind>::FindLiteralByValue(indexInfo.Kind).has_value(),
"Unsupported secondary index kind %Qlv (client not up-to-date)",
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
index e719d41f8c..e52fdf7acb 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
@@ -938,7 +938,7 @@ TFuture<void> TTransaction::FinishDistributedWriteSession(
ValidateActive();
return Client_->FinishDistributedWriteSession(
std::move(session),
- PatchTransactionId(options));
+ options);
}
TFuture<void> TTransaction::DoAbort(
diff --git a/yt/yt/client/chunk_client/public.h b/yt/yt/client/chunk_client/public.h
index e38cbb567f..5c93ee55fd 100644
--- a/yt/yt/client/chunk_client/public.h
+++ b/yt/yt/client/chunk_client/public.h
@@ -88,6 +88,13 @@ YT_DEFINE_ERROR_ENUM(
((DiskHealthCheckFailed) (759))
((TooManyChunksToFetch) (760))
((TotalMemoryLimitExceeded) (761))
+ ((ForbiddenErasureCodec) (762))
+);
+
+DEFINE_ENUM(EUpdateMode,
+ ((None) (0))
+ ((Append) (1))
+ ((Overwrite) (2))
);
using TChunkId = NObjectClient::TObjectId;
diff --git a/yt/yt/client/driver/distributed_table_commands.cpp b/yt/yt/client/driver/distributed_table_commands.cpp
index 5d41f1abc8..7bd88c5279 100644
--- a/yt/yt/client/driver/distributed_table_commands.cpp
+++ b/yt/yt/client/driver/distributed_table_commands.cpp
@@ -2,34 +2,16 @@
#include "config.h"
#include "helpers.h"
-#include <yt/yt/client/api/distributed_table_sessions.h>
-
-// #include <yt/yt/client/api/rowset.h>
-// #include <yt/yt/client/api/skynet.h>
-
-// #include <yt/yt/client/chaos_client/replication_card_serialization.h>
+#include <yt/yt/client/api/distributed_table_session.h>
#include <yt/yt/client/formats/config.h>
#include <yt/yt/client/formats/parser.h>
-// #include <yt/yt/client/table_client/adapters.h>
-// #include <yt/yt/client/table_client/blob_reader.h>
-// #include <yt/yt/client/table_client/columnar_statistics.h>
-// #include <yt/yt/client/table_client/row_buffer.h>
-// #include <yt/yt/client/table_client/table_consumer.h>
-// #include <yt/yt/client/table_client/table_output.h>
-// #include <yt/yt/client/table_client/unversioned_writer.h>
-// #include <yt/yt/client/table_client/versioned_writer.h>
-// #include <yt/yt/client/table_client/wire_protocol.h>
-
-// #include <yt/yt/client/tablet_client/table_mount_cache.h>
-
#include <yt/yt/client/ypath/public.h>
#include <yt/yt/library/formats/format.h>
#include <yt/yt/core/concurrency/scheduler_api.h>
-// #include <yt/yt/core/misc/finally.h>
#include <yt/yt/core/ytree/convert.h>
@@ -45,20 +27,6 @@ using namespace NYPath;
////////////////////////////////////////////////////////////////////////////////
-// namespace {
-
-// NLogging::TLogger WithCommandTag(
-// const NLogging::TLogger& logger,
-// const ICommandContextPtr& context)
-// {
-// return logger.WithTag("Command: %v",
-// context->Request().CommandName);
-// }
-
-// } // namespace
-
-////////////////////////////////////////////////////////////////////////////////
-
void TStartDistributedWriteSessionCommand::Register(TRegistrar registrar)
{
registrar.Parameter("path", &TThis::Path);
@@ -88,8 +56,6 @@ void TFinishDistributedWriteSessionCommand::Register(TRegistrar registrar)
// -> Nothing
void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context)
{
- auto transaction = AttachTransaction(context, /*required*/ false);
-
auto session = ConvertTo<TDistributedWriteSessionPtr>(Session);
WaitFor(context->GetClient()->FinishDistributedWriteSession(
@@ -102,35 +68,35 @@ void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context
////////////////////////////////////////////////////////////////////////////////
-void TParticipantWriteTableCommand::Execute(ICommandContextPtr context)
+void TWriteTableFragmentCommand::Execute(ICommandContextPtr context)
{
- TTypedCommand<NApi::TParticipantTableWriterOptions>::Execute(std::move(context));
+ TTypedCommand<NApi::TFragmentTableWriterOptions>::Execute(std::move(context));
}
-void TParticipantWriteTableCommand::Register(TRegistrar /*registrar*/)
+void TWriteTableFragmentCommand::Register(TRegistrar /*registrar*/)
{ }
-TFuture<NApi::ITableWriterPtr> TParticipantWriteTableCommand::CreateTableWriter(
+TFuture<NApi::ITableWriterPtr> TWriteTableFragmentCommand::CreateTableWriter(
const ICommandContextPtr& context) const
{
PutMethodInfoInTraceContext("participant_write_table");
return context
->GetClient()
- ->CreateParticipantTableWriter(
- StaticPointerCast<TDistributedWriteCookie>(ResultingCookie),
- TTypedCommand<TParticipantTableWriterOptions>::Options);
+ ->CreateFragmentTableWriter(
+ StaticPointerCast<TFragmentWriteCookie>(ResultingCookie),
+ TTypedCommand<TFragmentTableWriterOptions>::Options);
}
// -> Cookie
-void TParticipantWriteTableCommand::DoExecute(ICommandContextPtr context)
+void TWriteTableFragmentCommand::DoExecute(ICommandContextPtr context)
{
- auto cookie = ConvertTo<TDistributedWriteCookiePtr>(Cookie);
+ auto cookie = ConvertTo<TFragmentWriteCookiePtr>(Cookie);
ResultingCookie = StaticPointerCast<TRefCounted>(std::move(cookie));
DoExecuteImpl(context);
ProduceOutput(context, [cookie = std::move(ResultingCookie)] (IYsonConsumer* consumer) {
- Serialize(StaticPointerCast<TDistributedWriteCookie>(cookie), consumer);
+ Serialize(StaticPointerCast<TFragmentWriteCookie>(cookie), consumer);
});
}
diff --git a/yt/yt/client/driver/distributed_table_commands.h b/yt/yt/client/driver/distributed_table_commands.h
index ea3f7a9b7b..fa3d6bb4e5 100644
--- a/yt/yt/client/driver/distributed_table_commands.h
+++ b/yt/yt/client/driver/distributed_table_commands.h
@@ -45,8 +45,8 @@ private:
////////////////////////////////////////////////////////////////////////////////
// -> Cookie
-class TParticipantWriteTableCommand
- : public TTypedCommand<NApi::TParticipantTableWriterOptions>
+class TWriteTableFragmentCommand
+ : public TTypedCommand<NApi::TFragmentTableWriterOptions>
, private TWriteTableCommand
{
public:
@@ -54,7 +54,7 @@ public:
// ambiguity in dispatch.
void Execute(ICommandContextPtr context) override;
- REGISTER_YSON_STRUCT_LITE(TParticipantWriteTableCommand);
+ REGISTER_YSON_STRUCT_LITE(TWriteTableFragmentCommand);
static void Register(TRegistrar registrar);
diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp
index 19a0f3bdc5..e13634f539 100644
--- a/yt/yt/client/driver/driver.cpp
+++ b/yt/yt/client/driver/driver.cpp
@@ -292,6 +292,7 @@ public:
REGISTER_ALL(TGetJobInputCommand, "get_job_input", Null, Binary, false, true );
REGISTER_ALL(TGetJobInputPathsCommand, "get_job_input_paths", Null, Structured, false, true );
REGISTER_ALL(TGetJobStderrCommand, "get_job_stderr", Null, Binary, false, true );
+ REGISTER_ALL(TGetJobTraceCommand, "get_job_trace", Null, Structured, false, true );
REGISTER_ALL(TGetJobFailContextCommand, "get_job_fail_context", Null, Binary, false, true );
REGISTER_ALL(TGetJobSpecCommand, "get_job_spec", Null, Structured, false, true );
REGISTER_ALL(TListOperationsCommand, "list_operations", Null, Structured, false, false);
@@ -395,9 +396,9 @@ public:
REGISTER_ALL(TRevokeLeaseCommand, "revoke_lease", Null, Structured, true, false);
REGISTER_ALL(TReferenceLeaseCommand, "reference_lease", Null, Structured, true, false);
REGISTER_ALL(TUnreferenceLeaseCommand, "unreference_lease", Null, Structured, true, false);
- REGISTER_ALL(TStartDistributedWriteSessionCommand, "start_distributed_write_session", Null, Structured, true, false);
- REGISTER_ALL(TFinishDistributedWriteSessionCommand, "finish_distributed_write_session", Null, Null, true, false);
- REGISTER_ALL(TParticipantWriteTableCommand, "participant_write_table", Tabular, Structured, true, true );
+ REGISTER_ALL(TStartDistributedWriteSessionCommand, "start_distributed_write_session", Null, Structured, true, false);
+ REGISTER_ALL(TFinishDistributedWriteSessionCommand, "finish_distributed_write_session", Null, Null, true, false);
+ REGISTER_ALL(TWriteTableFragmentCommand, "distributed_write_table_partition", Tabular, Structured, true, true );
}
#undef REGISTER
diff --git a/yt/yt/client/driver/scheduler_commands.cpp b/yt/yt/client/driver/scheduler_commands.cpp
index 9cb7396725..53f82c7bf2 100644
--- a/yt/yt/client/driver/scheduler_commands.cpp
+++ b/yt/yt/client/driver/scheduler_commands.cpp
@@ -174,6 +174,52 @@ void TGetJobStderrCommand::DoExecute(ICommandContextPtr context)
////////////////////////////////////////////////////////////////////////////////
+void TGetJobTraceCommand::Register(TRegistrar registrar)
+{
+ registrar.ParameterWithUniversalAccessor<std::optional<TJobId>>(
+ "job_id",
+ [] (TThis* command) -> auto& {return command->Options.JobId; })
+ .Optional(/*init*/ false);
+
+ registrar.ParameterWithUniversalAccessor<std::optional<TJobTraceId>>(
+ "trace_id",
+ [] (TThis* command) -> auto& {return command->Options.TraceId; })
+ .Optional(/*init*/ false);
+
+ registrar.ParameterWithUniversalAccessor<std::optional<i64>>(
+ "from_event_index",
+ [] (TThis* command) -> auto& {return command->Options.FromEventIndex; })
+ .Optional(/*init*/ false);
+
+ registrar.ParameterWithUniversalAccessor<std::optional<i64>>(
+ "to_event_index",
+ [] (TThis* command) -> auto& {return command->Options.ToEventIndex; })
+ .Optional(/*init*/ false);
+
+ registrar.ParameterWithUniversalAccessor<std::optional<i64>>(
+ "from_time",
+ [] (TThis* command) -> auto& {return command->Options.FromTime; })
+ .Optional(/*init*/ false);
+
+ registrar.ParameterWithUniversalAccessor<std::optional<i64>>(
+ "to_time",
+ [] (TThis* command) -> auto& {return command->Options.ToTime; })
+ .Optional(/*init*/ false);
+}
+
+void TGetJobTraceCommand::DoExecute(ICommandContextPtr context)
+{
+ auto result = WaitFor(context->GetClient()->GetJobTrace(OperationIdOrAlias, Options))
+ .ValueOrThrow();
+
+ context->ProduceOutputValue(BuildYsonStringFluently()
+ .DoListFor(result, [&] (TFluentList fluent, const TJobTraceEvent& traceEvent) {
+ Serialize(traceEvent, fluent.GetConsumer());
+ }));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
void TGetJobFailContextCommand::Register(TRegistrar registrar)
{
registrar.Parameter("job_id", &TThis::JobId);
diff --git a/yt/yt/client/driver/scheduler_commands.h b/yt/yt/client/driver/scheduler_commands.h
index 280d55e331..59d1e9d21e 100644
--- a/yt/yt/client/driver/scheduler_commands.h
+++ b/yt/yt/client/driver/scheduler_commands.h
@@ -137,6 +137,20 @@ private:
////////////////////////////////////////////////////////////////////////////////
+class TGetJobTraceCommand
+ : public TSimpleOperationCommandBase<NApi::TGetJobTraceOptions>
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TGetJobTraceCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
class TGetJobFailContextCommand
: public TSimpleOperationCommandBase<NApi::TGetJobFailContextOptions>
{
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index a3410ff50a..f43f76958d 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -15,6 +15,8 @@
#include <yt/yt/client/object_client/helpers.h>
+#include <yt/yt/client/security_client/public.h>
+
#include <yt/yt/core/concurrency/periodic_executor.h>
#include <yt/yt/core/net/address.h>
@@ -40,19 +42,20 @@ DECLARE_REFCOUNTED_CLASS(TClient)
////////////////////////////////////////////////////////////////////////////////
-std::optional<TString> GetDataCenterByClient(const IClientPtr& client)
+TFuture<std::optional<TString>> GetDataCenterByClient(const IClientPtr& client)
{
TListNodeOptions options;
options.MaxSize = 1;
- auto items = NConcurrency::WaitFor(client->ListNode(RpcProxiesPath, options))
- .ValueOrThrow();
- auto itemsList = NYTree::ConvertTo<NYTree::IListNodePtr>(items);
- if (!itemsList->GetChildCount()) {
- return std::nullopt;
- }
- auto host = itemsList->GetChildren()[0];
- return NNet::InferYPClusterFromHostName(host->GetValue<TString>());
+ return client->ListNode(RpcProxiesPath, options)
+ .Apply(BIND([] (const NYson::TYsonString& items) {
+ auto itemsList = NYTree::ConvertTo<NYTree::IListNodePtr>(items);
+ if (!itemsList->GetChildCount()) {
+ return std::optional<TString>();
+ }
+ auto host = itemsList->GetChildren()[0];
+ return NNet::InferYPClusterFromHostName(host->GetValue<TString>());
+ }));
}
class TTransaction
@@ -248,17 +251,24 @@ DECLARE_REFCOUNTED_TYPE(TTransaction)
////////////////////////////////////////////////////////////////////////////////
+enum class EClientPriority : ui8
+{
+ Local,
+ Remote,
+ Undefined,
+};
+
DECLARE_REFCOUNTED_STRUCT(TClientDescription)
struct TClientDescription final
{
- TClientDescription(IClientPtr client, int priority)
+ TClientDescription(IClientPtr client, EClientPriority priority)
: Client(std::move(client))
, Priority(priority)
{ }
IClientPtr Client;
- int Priority;
+ EClientPriority Priority;
std::atomic<bool> HasErrors{false};
};
@@ -407,6 +417,7 @@ public:
UNIMPLEMENTED_METHOD(TFuture<NYson::TYsonString>, GetJobInputPaths, (NJobTrackerClient::TJobId, const TGetJobInputPathsOptions&));
UNIMPLEMENTED_METHOD(TFuture<NYson::TYsonString>, GetJobSpec, (NJobTrackerClient::TJobId, const TGetJobSpecOptions&));
UNIMPLEMENTED_METHOD(TFuture<TGetJobStderrResponse>, GetJobStderr, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobStderrOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<std::vector<TJobTraceEvent>>, GetJobTrace, (const NScheduler::TOperationIdOrAlias&, const TGetJobTraceOptions&));
UNIMPLEMENTED_METHOD(TFuture<TSharedRef>, GetJobFailContext, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobFailContextOptions&));
UNIMPLEMENTED_METHOD(TFuture<TListOperationsResult>, ListOperations, (const TListOperationsOptions&));
UNIMPLEMENTED_METHOD(TFuture<TListJobsResult>, ListJobs, (const NScheduler::TOperationIdOrAlias&, const TListJobsOptions&));
@@ -475,7 +486,7 @@ public:
UNIMPLEMENTED_METHOD(TFuture<TGetFlowViewResult>, GetFlowView, (const NYPath::TYPath&, const NYPath::TYPath&, const TGetFlowViewOptions&));
UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&));
UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&));
- UNIMPLEMENTED_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (const TDistributedWriteCookiePtr&, const TParticipantTableWriterOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (const TFragmentWriteCookiePtr&, const TFragmentTableWriterOptions&));
UNIMPLEMENTED_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (const TString& , int, const TStartShuffleOptions&));
UNIMPLEMENTED_METHOD(TFuture<void>, FinishShuffle, (const TShuffleHandlePtr&, const TFinishShuffleOptions&));
UNIMPLEMENTED_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (const TShuffleHandlePtr&, int, const NTableClient::TTableReaderConfigPtr&));
@@ -502,6 +513,7 @@ private:
private:
const TFederationConfigPtr Config_;
const NConcurrency::TPeriodicExecutorPtr Executor_;
+ const TString LocalDatacenter_;
std::vector<TClientDescriptionPtr> UnderlyingClients_;
IClientPtr ActiveClient_;
@@ -592,19 +604,14 @@ TClient::TClient(const std::vector<IClientPtr>& underlyingClients, TFederationCo
NRpc::TDispatcher::Get()->GetLightInvoker(),
BIND(&TClient::CheckClustersHealth, MakeWeak(this)),
Config_->ClusterHealthCheckPeriod))
+ , LocalDatacenter_(NNet::GetLocalYPCluster())
{
YT_VERIFY(!underlyingClients.empty());
UnderlyingClients_.reserve(underlyingClients.size());
- const auto& localDatacenter = NNet::GetLocalYPCluster();
for (const auto& client : underlyingClients) {
- int priority = GetDataCenterByClient(client) == localDatacenter ? 1 : 0;
- UnderlyingClients_.push_back(New<TClientDescription>(client, priority));
+ UnderlyingClients_.push_back(New<TClientDescription>(client, EClientPriority::Undefined));
}
- std::stable_sort(UnderlyingClients_.begin(), UnderlyingClients_.end(), [] (const auto& lhs, const auto& rhs) {
- return lhs->Priority > rhs->Priority;
- });
-
ActiveClient_ = UnderlyingClients_[0]->Client;
ActiveClientIndex_ = 0;
@@ -614,43 +621,54 @@ TClient::TClient(const std::vector<IClientPtr>& underlyingClients, TFederationCo
void TClient::CheckClustersHealth()
{
TCheckClusterLivenessOptions options;
- options.CheckCypressRoot = true;
+ options.CheckCypressRoot = Config_->CheckCypressRoot;
options.CheckTabletCellBundle = Config_->BundleName;
- int activeClientIndex = ActiveClientIndex_.load();
- std::optional<int> betterClientIndex;
-
std::vector<TFuture<void>> checks;
checks.reserve(UnderlyingClients_.size());
-
for (const auto& clientDescription : UnderlyingClients_) {
checks.emplace_back(clientDescription->Client->CheckClusterLiveness(options));
}
- for (int index = 0; index < std::ssize(checks); ++index) {
+ for (int index = 0; index != std::ssize(checks); ++index) {
const auto& check = checks[index];
- bool hasErrors = !NConcurrency::WaitFor(check).IsOK();
- UnderlyingClients_[index]->HasErrors = hasErrors;
- if (!betterClientIndex && !hasErrors && index < activeClientIndex) {
- betterClientIndex = index;
- }
+ auto error = NConcurrency::WaitFor(check);
+ YT_LOG_DEBUG_UNLESS(error.IsOK(), error, "Cluster %Qv is marked as unhealthy",
+ UnderlyingClients_[index]->Client->GetClusterName(/*fetchIfNull*/ false));
+ UnderlyingClients_[index]->HasErrors = !error.IsOK()
+ && !error.FindMatching(NSecurityClient::EErrorCode::AuthorizationError); // Ignore authorization errors.
}
- if (betterClientIndex && ActiveClientIndex_ == activeClientIndex) {
- int newClientIndex = *betterClientIndex;
- auto guard = NThreading::WriterGuard(Lock_);
- ActiveClient_ = UnderlyingClients_[newClientIndex]->Client;
- ActiveClientIndex_ = newClientIndex;
- return;
+ for (int index = 0; index != std::ssize(UnderlyingClients_); ++index) {
+ auto& client = UnderlyingClients_[index];
+ // `Priority` accessed only from this thread so it is not require synchronization.
+ if (client->Priority == EClientPriority::Undefined) {
+ auto clientDatacenter = NConcurrency::WaitFor(GetDataCenterByClient(client->Client));
+ if (clientDatacenter.IsOK()) {
+ client->Priority = clientDatacenter.Value() == LocalDatacenter_
+ ? EClientPriority::Local
+ : EClientPriority::Remote;
+ }
+ }
+ }
+ // Compute better activeClientIndex.
+ int betterClientIndex = ActiveClientIndex_.load();
+ auto betterPriority = UnderlyingClients_[betterClientIndex]->HasErrors
+ ? EClientPriority::Undefined
+ : UnderlyingClients_[betterClientIndex]->Priority;
+
+ for (int index = 0; index != std::ssize(UnderlyingClients_); ++index) {
+ const auto& client = UnderlyingClients_[index];
+ if (!client->HasErrors && client->Priority < betterPriority) {
+ betterClientIndex = index;
+ betterPriority = client->Priority;
+ }
}
- // If active cluster is not healthy, try changing it.
- if (UnderlyingClients_[activeClientIndex]->HasErrors) {
+ if (ActiveClientIndex_ != betterClientIndex) {
auto guard = NThreading::WriterGuard(Lock_);
- // Check that active client wasn't changed.
- if (ActiveClientIndex_ == activeClientIndex && UnderlyingClients_[activeClientIndex]->HasErrors) {
- UpdateActiveClient();
- }
+ ActiveClient_ = UnderlyingClients_[betterClientIndex]->Client;
+ ActiveClientIndex_ = betterClientIndex;
}
}
@@ -751,42 +769,34 @@ void TClient::HandleError(const TErrorOr<void>& error, int clientIndex)
if (ActiveClientIndex_ != clientIndex) {
return;
}
-
- auto guard = WriterGuard(Lock_);
- if (ActiveClientIndex_ != clientIndex) {
- return;
- }
-
UpdateActiveClient();
}
void TClient::UpdateActiveClient()
{
- VERIFY_WRITER_SPINLOCK_AFFINITY(Lock_);
-
- int activeClientIndex = ActiveClientIndex_.load();
+ VERIFY_THREAD_AFFINITY_ANY();
for (int index = 0; index < std::ssize(UnderlyingClients_); ++index) {
const auto& clientDescription = UnderlyingClients_[index];
if (!clientDescription->HasErrors) {
- if (activeClientIndex != index) {
+ if (ActiveClientIndex_ != index) {
YT_LOG_DEBUG("Active client was changed (PreviousClientIndex: %v, NewClientIndex: %v)",
- activeClientIndex,
+ ActiveClientIndex_.load(),
index);
+ auto guard = NThreading::WriterGuard(Lock_);
+ ActiveClientIndex_ = index;
+ ActiveClient_ = clientDescription->Client;
}
-
- ActiveClient_ = clientDescription->Client;
- ActiveClientIndex_ = index;
- break;
+ return;
}
}
}
TClient::TActiveClientInfo TClient::GetActiveClient()
{
- auto guard = ReaderGuard(Lock_);
YT_LOG_TRACE("Request will be send to the active client (ClientIndex: %v)",
ActiveClientIndex_.load());
+ auto guard = ReaderGuard(Lock_);
return {ActiveClient_, ActiveClientIndex_.load()};
}
diff --git a/yt/yt/client/federated/config.cpp b/yt/yt/client/federated/config.cpp
index edba2d190e..f5381e890a 100644
--- a/yt/yt/client/federated/config.cpp
+++ b/yt/yt/client/federated/config.cpp
@@ -15,6 +15,9 @@ void TFederationConfig::Register(TRegistrar registrar)
.GreaterThan(TDuration::Zero())
.Default(TDuration::Seconds(60));
+ registrar.Parameter("check_cypress_root", &TThis::CheckCypressRoot)
+ .Default(true);
+
registrar.Parameter("cluster_retry_attempts", &TThis::ClusterRetryAttempts)
.GreaterThanOrEqual(0)
.Default(3);
diff --git a/yt/yt/client/federated/config.h b/yt/yt/client/federated/config.h
index 115d0450dc..d092c3f019 100644
--- a/yt/yt/client/federated/config.h
+++ b/yt/yt/client/federated/config.h
@@ -22,6 +22,9 @@ public:
//! How often cluster liveness should be checked on the background.
TDuration ClusterHealthCheckPeriod;
+ //! Checks Cypress root availability in liveness check.
+ bool CheckCypressRoot;
+
//! Maximum number of retry attempts to make.
int ClusterRetryAttempts;
diff --git a/yt/yt/client/federated/unittests/client_ut.cpp b/yt/yt/client/federated/unittests/client_ut.cpp
index 0bc009fb6f..169c1c5039 100644
--- a/yt/yt/client/federated/unittests/client_ut.cpp
+++ b/yt/yt/client/federated/unittests/client_ut.cpp
@@ -119,13 +119,13 @@ TEST(TFederatedClientTest, Basic)
.WillRepeatedly(Return(VoidFuture));
// Creation of federated client.
- std::vector<IClientPtr> clients{mockClientSas, mockClientVla};
+ std::vector<IClientPtr> clients{mockClientVla, mockClientSas};
auto config = New<TFederationConfig>();
- config->ClusterHealthCheckPeriod = TDuration::Seconds(5);
+ config->ClusterHealthCheckPeriod = TDuration::Seconds(3);
config->ClusterRetryAttempts = 1;
auto federatedClient = CreateClient(clients, config);
- // 1. `vla` client should be used as closest cluster.
+ // 1. `vla` client should be used as first cluster.
// 2. error from `vla` cluster should be received.
// 3. `sas` client should be used as other cluster.
@@ -213,6 +213,9 @@ TEST(TFederatedClientTest, CheckHealth)
EXPECT_CALL(*mockClientSas, LookupRows(data.Path, _, _, _))
.WillOnce(Return(MakeFuture(data.LookupResult2)));
+ // Wait initialization and choose `local` cluster.
+ Sleep(TDuration::Seconds(2));
+
// From `vla`.
{
auto result = federatedClient->LookupRows(data.Path, data.NameTable, data.Keys);
@@ -462,8 +465,6 @@ TEST(TFederatedClientTest, AttachTransaction)
auto mockConnectionVla = New<TStrictMockConnection>();
EXPECT_CALL(*mockConnectionVla, GetClusterTag())
.WillRepeatedly(Return(NObjectClient::TCellTag(456)));
- EXPECT_CALL(*mockClientVla, GetConnection())
- .WillOnce(Return(mockConnectionVla));
// Creation of federated client.
std::vector<IClientPtr> clients{mockClientSas, mockClientVla};
@@ -471,6 +472,9 @@ TEST(TFederatedClientTest, AttachTransaction)
config->ClusterHealthCheckPeriod = TDuration::Seconds(5);
auto federatedClient = CreateClient(clients, config);
+ // Wait initialization.
+ Sleep(TDuration::Seconds(2));
+
auto mockTransactionSas = New<TStrictMockTransaction>();
auto transactionId = TGuid(0, 123 << 16, 0, 0);
EXPECT_CALL(*mockTransactionSas, GetId())
diff --git a/yt/yt/client/federated/unittests/connection_ut.cpp b/yt/yt/client/federated/unittests/connection_ut.cpp
index 7f7473d019..e512087a14 100644
--- a/yt/yt/client/federated/unittests/connection_ut.cpp
+++ b/yt/yt/client/federated/unittests/connection_ut.cpp
@@ -35,11 +35,19 @@ TEST(TFederatedConnectionTest, CreateClient)
// To identify best (closest) cluster.
NYson::TYsonString nodesYsonSas(TStringBuf(R"(["a-rpc-proxy-a.sas.yp-c.yandex.net:9013"])"));
EXPECT_CALL(*mockClientSas, ListNode("//sys/rpc_proxies", _))
- .WillRepeatedly(Return(MakeFuture(nodesYsonSas)));
+ .WillOnce(Return(MakeFuture(nodesYsonSas)));
+ EXPECT_CALL(*mockClientSas, GetNode("//test/node", _))
+ .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TGetNodeOptions&) {
+ return MakeFuture(nodesYsonSas);
+ });
NYson::TYsonString nodesYsonVla(TStringBuf(R"(["a-rpc-proxy-a.vla.yp-c.yandex.net:9013"])"));
EXPECT_CALL(*mockClientVla, ListNode("//sys/rpc_proxies", _))
- .WillRepeatedly(Return(MakeFuture(nodesYsonVla)));
+ .WillOnce(Return(MakeFuture(nodesYsonVla)));
+ EXPECT_CALL(*mockClientVla, GetNode("//test/node", _))
+ .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TGetNodeOptions&) {
+ return MakeFuture(nodesYsonVla);
+ });
EXPECT_CALL(*mockClientSas, CheckClusterLiveness(_))
.WillRepeatedly(Return(VoidFuture));
@@ -65,8 +73,81 @@ TEST(TFederatedConnectionTest, CreateClient)
auto connection = CreateConnection({mockConnectionSas, mockConnectionVla}, config);
EXPECT_THAT(connection->GetLoggingTag(), testing::HasSubstr("Clusters: (sas; vla)"));
auto client = connection->CreateClient(clientOptions);
- auto nodes = client->ListNode("//sys/rpc_proxies").Get().ValueOrThrow();
+ auto nodes = client->GetNode("//test/node").Get().ValueOrThrow();
EXPECT_EQ(nodesYsonSas, nodes);
+
+ Sleep(TDuration::Seconds(2));
+ auto nodes2 = client->GetNode("//test/node").Get().ValueOrThrow();
+ EXPECT_EQ(nodesYsonSas, nodes2);
+}
+
+TEST(TFederatedConnectionTest, CreateClientWhenOneClusterUnavailable)
+{
+ auto config = New<TFederationConfig>();
+ config->BundleName = "my_bundle";
+ config->ClusterHealthCheckPeriod = TDuration::Seconds(5);
+
+ auto mockConnectionSas = New<TStrictMockConnection>();
+ auto mockConnectionVla = New<TStrictMockConnection>();
+ auto mockClientSas = New<TStrictMockClient>();
+ auto mockClientVla = New<TStrictMockClient>();
+
+ // To identify best (closest) cluster.
+ NYson::TYsonString nodesYsonSas(TStringBuf(R"(["a-rpc-proxy-a.sas.yp-c.yandex.net:9013"])"));
+ EXPECT_CALL(*mockClientSas, ListNode("//sys/rpc_proxies", _))
+ .WillOnce(Return(MakeFuture<NYson::TYsonString>(TError("Failure"))))
+ .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TListNodeOptions&) {
+ return MakeFuture(nodesYsonSas);
+ });
+ EXPECT_CALL(*mockClientSas, GetNode("//test/node", _))
+ .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TGetNodeOptions&) {
+ return MakeFuture(nodesYsonSas);
+ });
+
+ NYson::TYsonString nodesYsonVla(TStringBuf(R"(["a-rpc-proxy-a.vla.yp-c.yandex.net:9013"])"));
+ EXPECT_CALL(*mockClientVla, ListNode("//sys/rpc_proxies", _))
+ .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TListNodeOptions&) {
+ return MakeFuture(nodesYsonVla);
+ });
+ EXPECT_CALL(*mockClientVla, GetNode("//test/node", _))
+ .WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TGetNodeOptions&) {
+ return MakeFuture(nodesYsonVla);
+ });
+
+ EXPECT_CALL(*mockClientSas, CheckClusterLiveness(_))
+ .WillRepeatedly(Return(VoidFuture));
+ EXPECT_CALL(*mockClientVla, CheckClusterLiveness(_))
+ .WillRepeatedly(Return(VoidFuture));
+
+ NApi::TClientOptions clientOptions;
+ EXPECT_CALL(*mockConnectionSas, CreateClient(::testing::Ref(clientOptions)))
+ .WillOnce(Return(mockClientSas));
+ EXPECT_CALL(*mockConnectionVla, CreateClient(::testing::Ref(clientOptions)))
+ .WillOnce(Return(mockClientVla));
+
+ EXPECT_CALL(*mockConnectionSas, GetLoggingTag())
+ .WillOnce(ReturnRefOfCopy(TString("sas")));
+ EXPECT_CALL(*mockConnectionVla, GetLoggingTag())
+ .WillOnce(ReturnRefOfCopy(TString("vla")));
+
+ auto finally = Finally([oldLocalHostName = NNet::GetLocalHostName()] {
+ NNet::WriteLocalHostName(oldLocalHostName);
+ });
+ NNet::WriteLocalHostName("a-rpc-proxy.sas.yp-c.yandex.net");
+
+ auto connection = CreateConnection({mockConnectionSas, mockConnectionVla}, config);
+ EXPECT_THAT(connection->GetLoggingTag(), testing::HasSubstr("Clusters: (sas; vla)"));
+ auto client = connection->CreateClient(clientOptions);
+
+ Sleep(TDuration::Seconds(2));
+
+ auto nodes1 = client->GetNode("//test/node").Get().ValueOrThrow();
+ EXPECT_EQ(nodesYsonVla, nodes1);
+
+ Sleep(TDuration::Seconds(6));
+
+ auto nodes2 = client->GetNode("//test/node").Get().ValueOrThrow();
+ EXPECT_EQ(nodesYsonSas, nodes2);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp
index ae0376bcda..3fb36f8560 100644
--- a/yt/yt/client/hedging/hedging.cpp
+++ b/yt/yt/client/hedging/hedging.cpp
@@ -111,7 +111,7 @@ public:
UNSUPPORTED_METHOD(IJournalWriterPtr, CreateJournalWriter, (const TYPath&, const TJournalWriterOptions&));
UNSUPPORTED_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&));
UNSUPPORTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (TDistributedWriteSessionPtr, const TDistributedWriteSessionFinishOptions&));
- UNSUPPORTED_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (const TDistributedWriteCookiePtr&, const TParticipantTableWriterOptions&));
+ UNSUPPORTED_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (const TFragmentWriteCookiePtr&, const TFragmentTableWriterOptions&));
// IClient methods.
// Unsupported methods.
@@ -164,6 +164,7 @@ public:
UNSUPPORTED_METHOD(TFuture<NYson::TYsonString>, GetJobInputPaths, (NJobTrackerClient::TJobId, const TGetJobInputPathsOptions&));
UNSUPPORTED_METHOD(TFuture<NYson::TYsonString>, GetJobSpec, (NJobTrackerClient::TJobId, const TGetJobSpecOptions&));
UNSUPPORTED_METHOD(TFuture<TGetJobStderrResponse>, GetJobStderr, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobStderrOptions&));
+ UNSUPPORTED_METHOD(TFuture<std::vector<TJobTraceEvent>>, GetJobTrace, (const NScheduler::TOperationIdOrAlias&, const TGetJobTraceOptions&));
UNSUPPORTED_METHOD(TFuture<TSharedRef>, GetJobFailContext, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobFailContextOptions&));
UNSUPPORTED_METHOD(TFuture<TListOperationsResult>, ListOperations, (const TListOperationsOptions&));
UNSUPPORTED_METHOD(TFuture<TListJobsResult>, ListJobs, (const NScheduler::TOperationIdOrAlias&, const TListJobsOptions&));
diff --git a/yt/yt/client/logging/dynamic_table_log_writer.cpp b/yt/yt/client/logging/dynamic_table_log_writer.cpp
index 1ff8ff680b..c6c731f2dd 100644
--- a/yt/yt/client/logging/dynamic_table_log_writer.cpp
+++ b/yt/yt/client/logging/dynamic_table_log_writer.cpp
@@ -24,7 +24,7 @@
#include <yt/yt/core/logging/formatter.h>
#include <yt/yt/core/logging/system_log_event_provider.h>
-#include <yt/yt/core/misc/atomic_object.h>
+#include <library/cpp/yt/threading/atomic_object.h>
#include <yt/yt/core/misc/proc.h>
#include <library/cpp/yt/memory/atomic_intrusive_ptr.h>
diff --git a/yt/yt/client/object_client/helpers.cpp b/yt/yt/client/object_client/helpers.cpp
index c4d02fa6d0..795c0fb34c 100644
--- a/yt/yt/client/object_client/helpers.cpp
+++ b/yt/yt/client/object_client/helpers.cpp
@@ -330,6 +330,11 @@ bool IsSchemaType(EObjectType type)
return (static_cast<ui32>(type) & SchemaObjectTypeMask) != 0;
}
+TString FormatObjectType(EObjectType type)
+{
+ return IsSchemaType(type) ? Format("schema:%v", TypeFromSchemaType(type)) : FormatEnum(type);
+}
+
bool IsGlobalCellId(TCellId cellId)
{
auto type = TypeFromId(cellId);
diff --git a/yt/yt/client/object_client/helpers.h b/yt/yt/client/object_client/helpers.h
index 3139e5481b..1c668033c9 100644
--- a/yt/yt/client/object_client/helpers.h
+++ b/yt/yt/client/object_client/helpers.h
@@ -119,6 +119,9 @@ EObjectType SchemaTypeFromType(EObjectType type);
//! Returns the regular type for a given schema #type.
EObjectType TypeFromSchemaType(EObjectType type);
+//! Formats object type into string (taking schemas into account).
+TString FormatObjectType(EObjectType type);
+
//! Constructs the id from its parts.
TObjectId MakeId(
EObjectType type,
diff --git a/yt/yt/client/queue_client/producer_client.cpp b/yt/yt/client/queue_client/producer_client.cpp
index 2115805de0..4040bd337c 100644
--- a/yt/yt/client/queue_client/producer_client.cpp
+++ b/yt/yt/client/queue_client/producer_client.cpp
@@ -1,5 +1,7 @@
#include "producer_client.h"
+#include "private.h"
+
#include <yt/yt/client/api/client.h>
#include <yt/yt/client/api/transaction.h>
@@ -10,6 +12,9 @@
#include <yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.pb.h>
+#include <yt/yt/core/concurrency/action_queue.h>
+#include <yt/yt/core/concurrency/periodic_executor.h>
+
namespace NYT::NQueueClient {
using namespace NApi;
@@ -23,6 +28,10 @@ using namespace NYTree;
////////////////////////////////////////////////////////////////////////////////
+static constexpr auto& Logger = QueueClientLogger;
+
+////////////////////////////////////////////////////////////////////////////////
+
class TProducerSession
: public IProducerSession
{
@@ -34,24 +43,42 @@ public:
TNameTablePtr nameTable,
TQueueProducerSessionId sessionId,
TCreateQueueProducerSessionResult createSessionResult,
- TProducerSessionOptions options)
+ TProducerSessionOptions options,
+ IInvokerPtr invoker)
: Client_(std::move(client))
, ProducerPath_(std::move(producerPath))
, QueuePath_(std::move(queuePath))
, NameTable_(std::move(nameTable))
, SessionId_(std::move(sessionId))
+ , Invoker_(std::move(invoker))
, Options_(std::move(options))
, Epoch_(createSessionResult.Epoch)
, LastSequenceNumber_(createSessionResult.SequenceNumber)
, UserMeta_(std::move(createSessionResult.UserMeta))
, BufferedRowWriter_(CreateWireProtocolWriter())
- { }
+ {
+ if (Options_.BackgroundFlushPeriod) {
+ if (!Invoker_) {
+ THROW_ERROR_EXCEPTION("Cannot create producer session with background flush without invoker");
+ }
+
+ FlushExecutor_ = New<TPeriodicExecutor>(
+ Invoker_,
+ BIND(&TProducerSession::OnFlush, NYT::MakeWeak(this)),
+ *Options_.BackgroundFlushPeriod);
+ } else {
+ if (!Options_.BatchOptions.RowCount && !Options_.BatchOptions.ByteSize) {
+ YT_LOG_DEBUG("None of batch row count or batch byte size are specified, batch byte size will be equal to 16 MB");
+ Options_.BatchOptions.ByteSize = 16_MB;
+ }
+ }
+ }
// TODO(nadya73): add possibility to pass user meta.
TQueueProducerSequenceNumber GetLastSequenceNumber() const override
{
- return LastSequenceNumber_;
+ return LastSequenceNumber_.load();
}
const INodePtr& GetUserMeta() const override
@@ -61,23 +88,87 @@ public:
bool Write(TRange<TUnversionedRow> rows) override
{
+ auto guard = Guard(SpinLock_);
+
+ if (!Started_) {
+ if (FlushExecutor_) {
+ FlushExecutor_->Start();
+ }
+ Started_ = true;
+ }
+
+ if (Closed_) {
+ return false;
+ }
+
for (const auto& row : rows) {
BufferedRowWriter_->WriteUnversionedRow(row);
++BufferedRowCount_;
}
- return BufferedRowWriter_->GetByteSize() < Options_.MaxBufferSize;
+ if (IsFlushNeeded()) {
+ if (!FlushExecutor_) {
+ return false;
+ }
+ FlushExecutor_->ScheduleOutOfBand();
+ }
+ return true;
}
TFuture<void> GetReadyEvent() override
{
+ if (FlushExecutor_) {
+ return VoidFuture;
+ }
+
+ {
+ auto guard = Guard(SpinLock_);
+ if (Closed_) {
+ return MakeFuture<void>(TError("Producer session was closed"));
+ }
+ }
return TryToFlush();
}
- TFuture<void> Close() override
+ TFuture<void> Flush() override
{
- return Flush();
+ if (!FlushExecutor_) {
+ std::vector<TSharedRef> serializedRows;
+ {
+ auto guard = Guard(SpinLock_);
+ YT_LOG_DEBUG("Flushing rows (RowCount: %v)", BufferedRowCount_);
+ serializedRows = GetRowsToFlushAndResetBuffer();
+ }
+
+ return FlushImpl(serializedRows);
+ }
+
+ FlushExecutor_->ScheduleOutOfBand();
+ return FlushExecutor_->GetExecutedEvent();
}
+ TFuture<void> Close() override
+ {
+ {
+ auto guard = Guard(SpinLock_);
+ Closed_ = true;
+ }
+
+ // Run one last flush will finish writing the remaining items and
+ // eventually lead to the stop promise being set.
+ // A single flush is enough since it is guaranteed that no new messages are added to the queue after the
+ // critical section above.
+
+ if (!FlushExecutor_) {
+ return TryToFlush();
+ }
+
+ FlushExecutor_->ScheduleOutOfBand();
+
+ return StoppedPromise_.ToFuture()
+ .Apply(BIND([this, this_ = MakeStrong(this)] {
+ return FlushExecutor_->Stop();
+ }));
+ }
std::optional<TMD5Hash> GetDigest() const override
{
@@ -90,44 +181,94 @@ private:
const TRichYPath QueuePath_;
const TNameTablePtr NameTable_;
const TQueueProducerSessionId SessionId_;
- const TProducerSessionOptions Options_;
+ const IInvokerPtr Invoker_;
+
+ TProducerSessionOptions Options_;
TQueueProducerEpoch Epoch_ = TQueueProducerEpoch{0};
- TQueueProducerSequenceNumber LastSequenceNumber_ = TQueueProducerSequenceNumber{0};
+ std::atomic<TQueueProducerSequenceNumber> LastSequenceNumber_ = TQueueProducerSequenceNumber{0};
INodePtr UserMeta_;
std::unique_ptr<IWireProtocolWriter> BufferedRowWriter_;
i64 BufferedRowCount_ = 0;
+ bool Started_ = false;
+ TPeriodicExecutorPtr FlushExecutor_;
+
+ bool Closed_ = false;
+ TPromise<void> StoppedPromise_ = NewPromise<void>();
+
YT_DECLARE_SPIN_LOCK(TSpinLock, SpinLock_);
+ bool IsFlushNeeded() const
+ {
+ VERIFY_SPINLOCK_AFFINITY(SpinLock_);
+
+ return (Options_.BatchOptions.ByteSize && static_cast<i64>(BufferedRowWriter_->GetByteSize()) >= *Options_.BatchOptions.ByteSize)
+ || (Options_.BatchOptions.RowCount && BufferedRowCount_ >= *Options_.BatchOptions.RowCount);
+ }
+
TFuture<void> TryToFlush()
{
- if (BufferedRowWriter_->GetByteSize() >= Options_.MaxBufferSize) {
- return Flush();
+ std::vector<TSharedRef> serializedRows;
+ {
+ auto guard = Guard(SpinLock_);
+ if (!IsFlushNeeded() && !Closed_) {
+ return VoidFuture;
+ }
+ serializedRows = GetRowsToFlushAndResetBuffer();
}
- return VoidFuture;
+ return FlushImpl(std::move(serializedRows));
}
- TFuture<void> Flush()
+ std::vector<TSharedRef> GetRowsToFlushAndResetBuffer()
{
- auto guard = Guard(SpinLock_);
+ VERIFY_SPINLOCK_AFFINITY(SpinLock_);
auto writer = CreateWireProtocolWriter();
writer->WriteSerializedRowset(BufferedRowCount_, BufferedRowWriter_->Finish());
BufferedRowWriter_ = CreateWireProtocolWriter();
BufferedRowCount_ = 0;
+ return writer->Finish();
+ }
+
+ void OnFlush()
+ {
+ std::vector<TSharedRef> serializedRows;
+ {
+ auto guard = Guard(SpinLock_);
+ YT_LOG_DEBUG("Flushing rows (RowCount: %v)", BufferedRowCount_);
+ serializedRows = GetRowsToFlushAndResetBuffer();
+ }
+
+ WaitFor(FlushImpl(std::move(serializedRows)))
+ .ThrowOnError();
+ bool isStopped = false;
+ {
+ auto guard = Guard(SpinLock_);
+ if (Closed_ && BufferedRowCount_ == 0) {
+ isStopped = true;
+ }
+ }
+
+ if (isStopped) {
+ StoppedPromise_.TrySet();
+ }
+ }
+
+ TFuture<void> FlushImpl(std::vector<TSharedRef> serializedRows)
+ {
return Client_->StartTransaction(ETransactionType::Tablet)
- .Apply(BIND([writer = std::move(writer), this, this_ = MakeStrong(this)] (const ITransactionPtr& transaction) {
+ .Apply(BIND([serializedRows = std::move(serializedRows), this, this_ = MakeStrong(this)] (const ITransactionPtr& transaction) {
TPushQueueProducerOptions pushQueueProducerOptions;
if (Options_.AutoSequenceNumber) {
- pushQueueProducerOptions.SequenceNumber = TQueueProducerSequenceNumber{LastSequenceNumber_.Underlying() + 1};
+ pushQueueProducerOptions.SequenceNumber = TQueueProducerSequenceNumber{LastSequenceNumber_.load().Underlying() + 1};
}
- return transaction->PushQueueProducer(ProducerPath_, QueuePath_, SessionId_, Epoch_, NameTable_, writer->Finish(), pushQueueProducerOptions)
+ return transaction->PushQueueProducer(ProducerPath_, QueuePath_, SessionId_, Epoch_, NameTable_, serializedRows, pushQueueProducerOptions)
.Apply(BIND([=, this, this_ = MakeStrong(this)] (const TPushQueueProducerResult& pushQueueProducerResult) {
- LastSequenceNumber_ = pushQueueProducerResult.LastSequenceNumber;
+ LastSequenceNumber_.store(pushQueueProducerResult.LastSequenceNumber);
return transaction->Commit();
}));
})).AsVoid();
@@ -149,12 +290,13 @@ public:
const TRichYPath& queuePath,
const TNameTablePtr& nameTable,
const TQueueProducerSessionId& sessionId,
- const TProducerSessionOptions& options) override
+ const TProducerSessionOptions& options,
+ const IInvokerPtr& invoker) override
{
return Client_->CreateQueueProducerSession(ProducerPath_, queuePath, sessionId)
.Apply(BIND([=, this, this_ = MakeStrong(this)]
(const TCreateQueueProducerSessionResult& createSessionResult) -> IProducerSessionPtr {
- return New<TProducerSession>(Client_, ProducerPath_, queuePath, nameTable, sessionId, createSessionResult, options);
+ return New<TProducerSession>(Client_, ProducerPath_, queuePath, nameTable, sessionId, createSessionResult, options, invoker);
}));
}
diff --git a/yt/yt/client/queue_client/producer_client.h b/yt/yt/client/queue_client/producer_client.h
index 4e1b46756a..3fc73ed3a8 100644
--- a/yt/yt/client/queue_client/producer_client.h
+++ b/yt/yt/client/queue_client/producer_client.h
@@ -17,6 +17,14 @@ namespace NYT::NQueueClient {
////////////////////////////////////////////////////////////////////////////////
+struct TProducerSessionBatchOptions
+{
+ //! Weight of serialized buffered rows when flush will be called regardless of the background flush period.
+ std::optional<i64> ByteSize;
+ //! Buffered rows count when flush will be called regardless of the background flush period.
+ std::optional<i64> RowCount;
+};
+
struct TProducerSessionOptions
{
//! If true, sequence numbers will be incremented automatically,
@@ -24,8 +32,12 @@ struct TProducerSessionOptions
//! If false, each row should contain value of $sequnce_number column.
bool AutoSequenceNumber = false;
- //! Size of buffer when rows will be flushed to server.
- size_t MaxBufferSize = 1_MB;
+ //! Batch sizes when rows will be flushed to server regardless of the background flush period (if it is specified).
+ //! If there is no background flush, rows will be flush when `BatchOptions::ByteSize` or `BatchOptions::RowCount` is reached. If none of them are specified, `BatchOptions::ByteSize` will be equal to 16 MB.
+ TProducerSessionBatchOptions BatchOptions;
+
+ //! If set, rows will be flushed in background with this period.
+ std::optional<TDuration> BackgroundFlushPeriod;
};
struct IProducerSession
@@ -38,6 +50,9 @@ struct IProducerSession
//! Get user meta saved in the producer session.
virtual const NYTree::INodePtr& GetUserMeta() const = 0;
+
+ //! Flush all written rows.
+ virtual TFuture<void> Flush() = 0;
};
DEFINE_REFCOUNTED_TYPE(IProducerSession)
@@ -47,12 +62,12 @@ struct IProducerClient
: public virtual TRefCounted
{
//! Create a session (or increase its epoch) and return session writer.
- //! NB: Session writer return by this method is NOT thread-safe.
virtual TFuture<IProducerSessionPtr> CreateSession(
const NYPath::TRichYPath& queuePath,
const NTableClient::TNameTablePtr& nameTable,
const NQueueClient::TQueueProducerSessionId& sessionId,
- const TProducerSessionOptions& options = {}) = 0;
+ const TProducerSessionOptions& options = {},
+ const IInvokerPtr& invoker = nullptr) = 0;
};
DEFINE_REFCOUNTED_TYPE(IProducerClient)
diff --git a/yt/yt/client/scheduler/public.h b/yt/yt/client/scheduler/public.h
index d92ae73f78..ac7afba09b 100644
--- a/yt/yt/client/scheduler/public.h
+++ b/yt/yt/client/scheduler/public.h
@@ -8,6 +8,12 @@ namespace NYT::NScheduler {
////////////////////////////////////////////////////////////////////////////////
+YT_DEFINE_STRONG_TYPEDEF(TJobTraceId, TGuid);
+
+extern const TJobTraceId NullJobTraceId;
+
+////////////////////////////////////////////////////////////////////////////////
+
using NJobTrackerClient::TJobId;
using NJobTrackerClient::TOperationId;
@@ -61,6 +67,7 @@ YT_DEFINE_ERROR_ENUM(
((MasterDisconnected) (218))
((NoSuchJobShell) (219))
((JobResourceLimitsRestrictionsViolated) (220))
+ ((CannotUseBothAclAndAco) (221))
);
DEFINE_ENUM(EUnavailableChunkAction,
diff --git a/yt/yt/client/table_client/helpers-inl.h b/yt/yt/client/table_client/helpers-inl.h
index a4e39d7a6c..86cc5a793c 100644
--- a/yt/yt/client/table_client/helpers-inl.h
+++ b/yt/yt/client/table_client/helpers-inl.h
@@ -14,6 +14,7 @@
#include <yt/yt/core/concurrency/scheduler.h>
#include <library/cpp/yt/misc/strong_typedef.h>
+#include <library/cpp/yt/misc/cast.h>
#include <array>
@@ -142,8 +143,10 @@ void ToUnversionedValue(
if constexpr (TEnumTraits<T>::IsStringSerializableEnum) {
ToUnversionedValue(unversionedValue, NYT::FormatEnum(value), rowBuffer, id, flags);
} else if constexpr (TEnumTraits<T>::IsBitEnum) {
+ static_assert(CanFitSubtype<ui64, std::underlying_type_t<T>>());
ToUnversionedValue(unversionedValue, static_cast<ui64>(value), rowBuffer, id, flags);
} else {
+ static_assert(CanFitSubtype<i64, std::underlying_type_t<T>>());
ToUnversionedValue(unversionedValue, static_cast<i64>(value), rowBuffer, id, flags);
}
}
@@ -154,12 +157,16 @@ void FromUnversionedValue(
T* value,
TUnversionedValue unversionedValue)
{
+ static_assert(TEnumTraits<T>::IsStringSerializableEnum ||
+ TEnumTraits<T>::IsBitEnum && CanFitSubtype<ui64, std::underlying_type_t<T>>() ||
+ !TEnumTraits<T>::IsBitEnum && CanFitSubtype<i64, std::underlying_type_t<T>>());
+
switch (unversionedValue.Type) {
case EValueType::Int64:
- *value = static_cast<T>(unversionedValue.Data.Int64);
+ *value = static_cast<T>(CheckedIntegralCast<std::underlying_type_t<T>>(unversionedValue.Data.Int64));
break;
case EValueType::Uint64:
- *value = static_cast<T>(unversionedValue.Data.Uint64);
+ *value = static_cast<T>(CheckedIntegralCast<std::underlying_type_t<T>>(unversionedValue.Data.Uint64));
break;
case EValueType::String:
*value = NYT::ParseEnum<T>(unversionedValue.AsStringBuf());
@@ -170,6 +177,37 @@ void FromUnversionedValue(
}
}
+template <class T>
+ requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T>
+void ToUnversionedValue(
+ TUnversionedValue* unversionedValue,
+ T value,
+ const TRowBufferPtr& rowBuffer,
+ int id,
+ EValueFlags flags)
+{
+ static_assert(CanFitSubtype<i64, std::underlying_type_t<T>>());
+ ToUnversionedValue(unversionedValue, static_cast<i64>(value), rowBuffer, id, flags);
+}
+
+template <class T>
+ requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T>
+void FromUnversionedValue(
+ T* value,
+ TUnversionedValue unversionedValue)
+{
+ static_assert(CanFitSubtype<i64, std::underlying_type_t<T>>());
+
+ switch (unversionedValue.Type) {
+ case EValueType::Int64:
+ *value = static_cast<T>(CheckedIntegralCast<std::underlying_type_t<T>>(unversionedValue.Data.Int64));
+ break;
+ default:
+ THROW_ERROR_EXCEPTION("Cannot parse enum value from %Qlv",
+ unversionedValue.Type);
+ }
+}
+
////////////////////////////////////////////////////////////////////////////////
void ProtobufToUnversionedValueImpl(
diff --git a/yt/yt/client/table_client/helpers.h b/yt/yt/client/table_client/helpers.h
index b86124557e..d1c5d17325 100644
--- a/yt/yt/client/table_client/helpers.h
+++ b/yt/yt/client/table_client/helpers.h
@@ -148,6 +148,20 @@ void FromUnversionedValue(
TUnversionedValue unversionedValue);
template <class T>
+ requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T>
+void ToUnversionedValue(
+ TUnversionedValue* unversionedValue,
+ T value,
+ const TRowBufferPtr& rowBuffer,
+ int id = 0,
+ EValueFlags flags = EValueFlags::None);
+template <class T>
+ requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T>
+void FromUnversionedValue(
+ T* value,
+ TUnversionedValue unversionedValue);
+
+template <class T>
TUnversionedValue ToUnversionedValue(
T&& value,
const TRowBufferPtr& rowBuffer,
diff --git a/yt/yt/client/table_client/logical_type.h b/yt/yt/client/table_client/logical_type.h
index 904ff9708b..bd482df984 100644
--- a/yt/yt/client/table_client/logical_type.h
+++ b/yt/yt/client/table_client/logical_type.h
@@ -170,7 +170,7 @@ class TDecimalLogicalType
{
public:
static constexpr int MinPrecision = 1;
- static constexpr int MaxPrecision = 35;
+ static constexpr int MaxPrecision = 76;
public:
TDecimalLogicalType(int precision, int scale);
diff --git a/yt/yt/client/table_client/table_upload_options.cpp b/yt/yt/client/table_client/table_upload_options.cpp
new file mode 100644
index 0000000000..06c4d9c064
--- /dev/null
+++ b/yt/yt/client/table_client/table_upload_options.cpp
@@ -0,0 +1,385 @@
+#include "table_upload_options.h"
+#include "helpers.h"
+
+#include <yt/yt/client/table_client/helpers.h>
+
+#include <yt/yt/client/ypath/rich.h>
+
+#include <yt/yt/core/ytree/helpers.h>
+
+namespace NYT::NTableClient {
+
+using namespace NChunkClient;
+using namespace NCompression;
+using namespace NCypressClient;
+using namespace NYPath;
+using namespace NYTree;
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEpochSchema::TEpochSchema(const TEpochSchema& other)
+{
+ *this = other;
+}
+
+TEpochSchema& TEpochSchema::operator=(const TEpochSchema& other)
+{
+ TableSchema_ = other.TableSchema_;
+ Revision_ += other.Revision_ + 1;
+ return *this;
+}
+
+TEpochSchema& TEpochSchema::operator=(TTableSchemaPtr schema)
+{
+ Set(schema);
+ return *this;
+}
+
+const TTableSchema* TEpochSchema::operator->() const
+{
+ return TableSchema_.Get();
+}
+
+const TTableSchemaPtr& TEpochSchema::operator*() const
+{
+ return TableSchema_;
+}
+
+const TTableSchemaPtr& TEpochSchema::Get() const
+{
+ return TableSchema_;
+}
+
+ui64 TEpochSchema::GetRevision() const
+{
+ return Revision_;
+}
+
+ui64 TEpochSchema::Set(const TTableSchemaPtr& schema)
+{
+ TableSchema_ = schema;
+ return ++Revision_;
+}
+
+void TEpochSchema::Persist(const NPhoenix::TPersistenceContext& context)
+{
+ using NYT::Persist;
+
+ Persist(context, Revision_);
+ Persist<TNonNullableIntrusivePtrSerializer<>>(context, TableSchema_);
+}
+
+ui64 TEpochSchema::Reset()
+{
+ TableSchema_ = New<TTableSchema>();
+ return ++Revision_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TTableSchemaPtr TTableUploadOptions::GetUploadSchema() const
+{
+ switch (SchemaModification) {
+ case ETableSchemaModification::None:
+ return TableSchema.Get();
+
+ case ETableSchemaModification::UnversionedUpdate:
+ return TableSchema->ToUnversionedUpdate(/*sorted*/ true);
+
+ default:
+ YT_ABORT();
+ }
+}
+
+void TTableUploadOptions::Persist(const NPhoenix::TPersistenceContext& context)
+{
+ using NYT::Persist;
+
+ Persist(context, UpdateMode);
+ Persist(context, LockMode);
+ // COMPAT(h0pless): NControllerAgent::ESnapshotVersion::AddChunkSchemas
+ if (context.GetVersion() >= 301300) {
+ Persist(context, TableSchema);
+ } else {
+ TTableSchemaPtr schema;
+ Persist<TNonNullableIntrusivePtrSerializer<>>(context, schema);
+ TableSchema.Set(schema);
+ }
+ Persist(context, SchemaId);
+ Persist(context, SchemaModification);
+ // COMPAT(dave11ar): NControllerAgent::ESnapshotVersion::VersionedMapReduceWrite
+ if (context.GetVersion() >= 301602) {
+ Persist(context, VersionedWriteOptions);
+ }
+ Persist(context, SchemaMode);
+ Persist(context, OptimizeFor);
+ // COMPAT(babenko): NControllerAgent::ESnapshotVersion::ChunkFormat
+ if (context.GetVersion() >= 301103) {
+ Persist(context, ChunkFormat);
+ }
+ Persist(context, CompressionCodec);
+ Persist(context, ErasureCodec);
+ Persist(context, EnableStripedErasure);
+ Persist(context, SecurityTags);
+ Persist(context, PartiallySorted);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+static void ValidateSortColumnsEqual(const TSortColumns& sortColumns, const TTableSchema& schema)
+{
+ if (sortColumns != schema.GetSortColumns()) {
+ THROW_ERROR_EXCEPTION("YPath attribute \"sorted_by\" must be compatible with table schema for a \"strong\" schema mode")
+ << TErrorAttribute("sort_columns", sortColumns)
+ << TErrorAttribute("table_schema", schema);
+ }
+}
+
+static void ValidateAppendKeyColumns(const TSortColumns& sortColumns, const TTableSchema& schema, i64 rowCount)
+{
+ ValidateSortColumns(sortColumns);
+
+ if (rowCount == 0) {
+ return;
+ }
+
+ auto tableSortColumns = schema.GetSortColumns();
+ bool areKeyColumnsCompatible = true;
+ if (tableSortColumns.size() < sortColumns.size()) {
+ areKeyColumnsCompatible = false;
+ } else {
+ for (int i = 0; i < std::ssize(sortColumns); ++i) {
+ if (tableSortColumns[i] != sortColumns[i]) {
+ areKeyColumnsCompatible = false;
+ break;
+ }
+ }
+ }
+
+ if (!areKeyColumnsCompatible) {
+ THROW_ERROR_EXCEPTION("Sort columns mismatch while trying to append sorted data into a non-empty table")
+ << TErrorAttribute("append_sort_columns", sortColumns)
+ << TErrorAttribute("table_sort_columns", tableSortColumns);
+ }
+}
+
+const std::vector<TString>& GetTableUploadOptionsAttributeKeys()
+{
+ static const std::vector<TString> Result{
+ "schema_mode",
+ "optimize_for",
+ "chunk_format",
+ "compression_codec",
+ "erasure_codec",
+ "enable_striped_erasure",
+ "dynamic"
+ };
+ return Result;
+}
+
+TTableUploadOptions GetTableUploadOptions(
+ const TRichYPath& path,
+ const IAttributeDictionary& cypressTableAttributes,
+ const TTableSchemaPtr& schema,
+ i64 rowCount)
+{
+ auto schemaMode = cypressTableAttributes.Get<ETableSchemaMode>("schema_mode");
+ auto optimizeFor = cypressTableAttributes.Get<EOptimizeFor>("optimize_for");
+ auto chunkFormat = cypressTableAttributes.Find<EChunkFormat>("chunk_format");
+ auto compressionCodec = cypressTableAttributes.Get<NCompression::ECodec>("compression_codec");
+ auto erasureCodec = cypressTableAttributes.Get<NErasure::ECodec>("erasure_codec", NErasure::ECodec::None);
+ auto enableStripedErasure = cypressTableAttributes.Get<bool>("enable_striped_erasure", false);
+ auto dynamic = cypressTableAttributes.Get<bool>("dynamic");
+
+ // Validate "optimize_for" and "chunk_format" compatibility.
+ if (chunkFormat) {
+ ValidateTableChunkFormatAndOptimizeFor(*chunkFormat, optimizeFor);
+ }
+
+ // Some ypath attributes are not compatible with attribute "schema".
+ if (path.GetAppend() && path.GetSchema()) {
+ THROW_ERROR_EXCEPTION("YPath attributes \"append\" and \"schema\" are not compatible")
+ << TErrorAttribute("path", path);
+ }
+
+ if (!path.GetSortedBy().empty() && path.GetSchema()) {
+ THROW_ERROR_EXCEPTION("YPath attributes \"sorted_by\" and \"schema\" are not compatible")
+ << TErrorAttribute("path", path);
+ }
+
+ // Dynamic tables have their own requirements as well.
+ if (dynamic) {
+ if (path.GetSchema()) {
+ THROW_ERROR_EXCEPTION("YPath attribute \"schema\" cannot be set on a dynamic table")
+ << TErrorAttribute("path", path);
+ }
+
+ if (!path.GetSortedBy().empty()) {
+ THROW_ERROR_EXCEPTION("YPath attribute \"sorted_by\" cannot be set on a dynamic table")
+ << TErrorAttribute("path", path);
+ }
+ }
+
+ TTableUploadOptions result;
+ // NB: Saving schema to make sure that if changes are applied to it the schema revision also changes.
+ result.TableSchema = schema;
+ auto pathSchema = path.GetSchema();
+ if (path.GetAppend() && !path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Strong)) {
+ ValidateSortColumnsEqual(path.GetSortedBy(), *schema);
+
+ result.LockMode = ELockMode::Exclusive;
+ result.UpdateMode = EUpdateMode::Append;
+ result.SchemaMode = ETableSchemaMode::Strong;
+ } else if (path.GetAppend() && !path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Weak)) {
+ // Old behaviour.
+ ValidateAppendKeyColumns(path.GetSortedBy(), *schema, rowCount);
+
+ result.LockMode = ELockMode::Exclusive;
+ result.UpdateMode = EUpdateMode::Append;
+ result.SchemaMode = ETableSchemaMode::Weak;
+ result.TableSchema = TTableSchema::FromSortColumns(path.GetSortedBy());
+ } else if (path.GetAppend() && path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Strong)) {
+ result.LockMode = (schema->IsSorted() && !dynamic) ? ELockMode::Exclusive : ELockMode::Shared;
+ result.UpdateMode = EUpdateMode::Append;
+ result.SchemaMode = ETableSchemaMode::Strong;
+ } else if (path.GetAppend() && path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Weak)) {
+ // Old behaviour - reset key columns if there were any.
+ result.LockMode = ELockMode::Shared;
+ result.UpdateMode = EUpdateMode::Append;
+ result.SchemaMode = ETableSchemaMode::Weak;
+ result.TableSchema.Reset();
+ } else if (!path.GetAppend() && !path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Strong)) {
+ ValidateSortColumnsEqual(path.GetSortedBy(), *schema);
+
+ result.LockMode = ELockMode::Exclusive;
+ result.UpdateMode = EUpdateMode::Overwrite;
+ result.SchemaMode = ETableSchemaMode::Strong;
+ } else if (!path.GetAppend() && !path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Weak)) {
+ result.LockMode = ELockMode::Exclusive;
+ result.UpdateMode = EUpdateMode::Overwrite;
+ result.SchemaMode = ETableSchemaMode::Weak;
+ result.TableSchema = TTableSchema::FromSortColumns(path.GetSortedBy());
+ } else if (!path.GetAppend() && pathSchema && (schemaMode == ETableSchemaMode::Strong)) {
+ result.LockMode = ELockMode::Exclusive;
+ result.UpdateMode = EUpdateMode::Overwrite;
+ result.SchemaMode = ETableSchemaMode::Strong;
+ result.TableSchema = pathSchema;
+ } else if (!path.GetAppend() && pathSchema && (schemaMode == ETableSchemaMode::Weak)) {
+ // Change from Weak to Strong schema mode.
+ result.LockMode = ELockMode::Exclusive;
+ result.UpdateMode = EUpdateMode::Overwrite;
+ result.SchemaMode = ETableSchemaMode::Strong;
+ result.TableSchema = pathSchema;
+ } else if (!path.GetAppend() && path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Strong)) {
+ result.LockMode = ELockMode::Exclusive;
+ result.UpdateMode = EUpdateMode::Overwrite;
+ result.SchemaMode = ETableSchemaMode::Strong;
+ } else if (!path.GetAppend() && path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Weak)) {
+ result.LockMode = ELockMode::Exclusive;
+ result.UpdateMode = EUpdateMode::Overwrite;
+ result.SchemaMode = ETableSchemaMode::Weak;
+ result.TableSchema.Reset();
+ } else {
+ // Do not use YT_ABORT here, since this code is executed inside scheduler.
+ THROW_ERROR_EXCEPTION("Failed to define upload parameters")
+ << TErrorAttribute("path", path)
+ << TErrorAttribute("schema_mode", schemaMode)
+ << TErrorAttribute("schema", *schema);
+ }
+
+ if (path.GetAppend() && path.GetOptimizeFor()) {
+ THROW_ERROR_EXCEPTION("YPath attributes \"append\" and \"optimize_for\" are not compatible")
+ << TErrorAttribute("path", path);
+ }
+
+ result.OptimizeFor = path.GetOptimizeFor() ? *path.GetOptimizeFor() : optimizeFor;
+ result.ChunkFormat = path.GetChunkFormat() ? *path.GetChunkFormat() : chunkFormat;
+
+ if (path.GetAppend() && path.GetCompressionCodec()) {
+ THROW_ERROR_EXCEPTION("YPath attributes \"append\" and \"compression_codec\" are not compatible")
+ << TErrorAttribute("path", path);
+ }
+
+ if (path.GetCompressionCodec()) {
+ result.CompressionCodec = *path.GetCompressionCodec();
+ } else {
+ result.CompressionCodec = compressionCodec;
+ }
+
+ if (path.GetAppend() && path.GetErasureCodec()) {
+ THROW_ERROR_EXCEPTION("YPath attributes \"append\" and \"erasure_codec\" are not compatible")
+ << TErrorAttribute("path", path);
+ }
+
+ result.ErasureCodec = path.GetErasureCodec().value_or(erasureCodec);
+ result.EnableStripedErasure = enableStripedErasure;
+
+ if (path.GetSchemaModification() == ETableSchemaModification::UnversionedUpdateUnsorted) {
+ THROW_ERROR_EXCEPTION("YPath attribute \"schema_modification\" cannot have value %Qlv for output tables",
+ path.GetSchemaModification())
+ << TErrorAttribute("path", path);
+ } else if (!dynamic && path.GetSchemaModification() != ETableSchemaModification::None) {
+ THROW_ERROR_EXCEPTION("YPath attribute \"schema_modification\" can have value %Qlv only for dynamic tables",
+ path.GetSchemaModification())
+ << TErrorAttribute("path", path);
+ }
+ result.SchemaModification = path.GetSchemaModification();
+
+ auto versionedWriteOptions = path.GetVersionedWriteOptions();
+ if (!dynamic && versionedWriteOptions.WriteMode != EVersionedIOMode::Default) {
+ THROW_ERROR_EXCEPTION("YPath attribute \"versioned_write_options/write_mode\" can have value %Qlv only for dynamic tables",
+ versionedWriteOptions.WriteMode)
+ << TErrorAttribute("path", path);
+ }
+ if (versionedWriteOptions.WriteMode != EVersionedIOMode::Default && path.GetSchemaModification() != ETableSchemaModification::None) {
+ THROW_ERROR_EXCEPTION("YPath attributes \"versioned_write_options/write_mode\" and \"schema_modification\""
+ "can not be set in non-trivial state together: \"versioned_write_options/write_mode\" is %Qlv, \"schema_modification\" is %Qlv",
+ versionedWriteOptions.WriteMode,
+ path.GetSchemaModification())
+ << TErrorAttribute("path", path);
+ }
+ result.VersionedWriteOptions = versionedWriteOptions;
+
+ if (!dynamic && path.GetPartiallySorted()) {
+ THROW_ERROR_EXCEPTION("YPath attribute \"partially_sorted\" can be set only for dynamic tables")
+ << TErrorAttribute("path", path);
+ }
+ result.PartiallySorted = path.GetPartiallySorted();
+
+ result.SecurityTags = path.GetSecurityTags();
+
+ return result;
+}
+
+TTableUploadOptions GetFileUploadOptions(
+ const TRichYPath& path,
+ const IAttributeDictionary& cypressTableAttributes)
+{
+ auto compressionCodec = cypressTableAttributes.Get<NCompression::ECodec>("compression_codec");
+ auto enableStripedErasure = cypressTableAttributes.Get<bool>("enable_striped_erasure", false);
+ auto erasureCodec = cypressTableAttributes.Get<NErasure::ECodec>("erasure_codec", NErasure::ECodec::None);
+
+ if (path.GetAppend()) {
+ THROW_ERROR_EXCEPTION("Attribute \"append\" is not supported for files")
+ << TErrorAttribute("path", path);
+ }
+
+ // NB(coteeq): Fill for sanity. They should not have impact on behaviour, because
+ // RichYPath's compression_codec & erasure_codec are disallowed in remote copy.
+ // TODO(coteeq): Make it YT_VERIFY
+ if (path.GetCompressionCodec() || path.GetErasureCodec()) {
+ THROW_ERROR_EXCEPTION("\"compression_codec\" and \"erasure_codec\" are disallowed for files")
+ << TErrorAttribute("path", path);
+ }
+
+ TTableUploadOptions result;
+ result.CompressionCodec = compressionCodec;
+ result.ErasureCodec = erasureCodec;
+ result.EnableStripedErasure = enableStripedErasure;
+ result.SecurityTags = path.GetSecurityTags();
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NTableClient
diff --git a/yt/yt/client/table_client/table_upload_options.h b/yt/yt/client/table_client/table_upload_options.h
new file mode 100644
index 0000000000..24e19766fd
--- /dev/null
+++ b/yt/yt/client/table_client/table_upload_options.h
@@ -0,0 +1,87 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/client/chunk_client/public.h>
+
+#include <yt/yt/client/table_client/schema.h>
+#include <yt/yt/client/table_client/versioned_io_options.h>
+
+#include <yt/yt/client/security_client/public.h>
+
+#include <yt/yt/library/erasure/public.h>
+
+#include <yt/yt/core/compression/public.h>
+
+#include <yt/yt/core/misc/phoenix.h>
+
+namespace NYT::NTableClient {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TEpochSchema
+{
+public:
+ TEpochSchema() = default;
+ TEpochSchema(const TEpochSchema& other);
+ TEpochSchema& operator=(const TEpochSchema& other);
+
+ TEpochSchema(const TEpochSchema&& other) = delete;
+ TEpochSchema& operator=(TEpochSchema&& other) = delete;
+
+ TEpochSchema& operator=(TTableSchemaPtr schema);
+
+ const TTableSchema* operator->() const;
+ const TTableSchemaPtr& operator*() const;
+
+ const TTableSchemaPtr& Get() const;
+ ui64 GetRevision() const;
+
+ ui64 Set(const TTableSchemaPtr& schema);
+
+ void Persist(const NPhoenix::TPersistenceContext& context);
+
+ ui64 Reset();
+
+private:
+ TTableSchemaPtr TableSchema_ = New<TTableSchema>();
+ ui64 Revision_ = 0;
+};
+
+struct TTableUploadOptions
+{
+ NChunkClient::EUpdateMode UpdateMode = NChunkClient::EUpdateMode::Overwrite;
+ NCypressClient::ELockMode LockMode = NCypressClient::ELockMode::Exclusive;
+ TEpochSchema TableSchema;
+ TMasterTableSchemaId SchemaId;
+ ETableSchemaModification SchemaModification = ETableSchemaModification::None;
+ TVersionedWriteOptions VersionedWriteOptions;
+ ETableSchemaMode SchemaMode = ETableSchemaMode::Strong;
+ EOptimizeFor OptimizeFor = EOptimizeFor::Lookup;
+ std::optional<NChunkClient::EChunkFormat> ChunkFormat;
+ NCompression::ECodec CompressionCodec = NCompression::ECodec::None;
+ NErasure::ECodec ErasureCodec = NErasure::ECodec::None;
+ bool EnableStripedErasure = false;
+ std::optional<std::vector<NSecurityClient::TSecurityTag>> SecurityTags;
+ bool PartiallySorted = false;
+
+ TTableSchemaPtr GetUploadSchema() const;
+
+ void Persist(const NPhoenix::TPersistenceContext& context);
+};
+
+const std::vector<TString>& GetTableUploadOptionsAttributeKeys();
+
+TTableUploadOptions GetTableUploadOptions(
+ const NYPath::TRichYPath& path,
+ const NYTree::IAttributeDictionary& cypressTableAttributes,
+ const TTableSchemaPtr& schema,
+ i64 rowCount);
+
+TTableUploadOptions GetFileUploadOptions(
+ const NYPath::TRichYPath& path,
+ const NYTree::IAttributeDictionary& cypressTableAttributes);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NTableClient
diff --git a/yt/yt/client/tablet_client/table_mount_cache.h b/yt/yt/client/tablet_client/table_mount_cache.h
index a450fdf833..086b466487 100644
--- a/yt/yt/client/tablet_client/table_mount_cache.h
+++ b/yt/yt/client/tablet_client/table_mount_cache.h
@@ -65,6 +65,7 @@ struct TIndexInfo
NObjectClient::TObjectId TableId;
ESecondaryIndexKind Kind;
std::optional<TString> Predicate;
+ std::optional<TString> UnfoldedColumn;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/transaction_client/remote_timestamp_provider.cpp b/yt/yt/client/transaction_client/remote_timestamp_provider.cpp
index f856c462d3..e2584e3106 100644
--- a/yt/yt/client/transaction_client/remote_timestamp_provider.cpp
+++ b/yt/yt/client/transaction_client/remote_timestamp_provider.cpp
@@ -135,7 +135,7 @@ ITimestampProviderPtr CreateRemoteTimestampProvider(
return CreateRemoteTimestampProvider(
std::move(config),
std::move(channel),
- false);
+ /*allowOldClocks*/ false);
}
ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
@@ -159,7 +159,7 @@ ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
return CreateBatchingRemoteTimestampProvider(
std::move(config),
std::move(channel),
- false);
+ /*allowOldClocks*/ false);
}
ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
@@ -180,7 +180,7 @@ ITimestampProviderPtr CreateBatchingRemoteTimestampProvider(
return CreateBatchingRemoteTimestampProvider(
config,
channelFactory,
- false);
+ /*allowOldClocks*/ false);
}
////////////////////////////////////////////////////////////////////////////////
@@ -215,7 +215,7 @@ TAlienRemoteTimestampProvidersMap CreateAlienTimestampProvidersMap(
CreateBatchingRemoteTimestampProvider(
foreignProviderConfig->TimestampProvider,
channelFactory,
- true));
+ /*allowOldClocks*/ true));
}
return alienProvidersMap;
diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h
index b8217c8657..43e8056b1e 100644
--- a/yt/yt/client/unittests/mock/client.h
+++ b/yt/yt/client/unittests/mock/client.h
@@ -2,7 +2,7 @@
#include <yt/yt/client/api/connection.h>
#include <yt/yt/client/api/client.h>
-#include <yt/yt/client/api/distributed_table_sessions.h>
+#include <yt/yt/client/api/distributed_table_session.h>
#include <yt/yt/client/api/file_writer.h>
#include <yt/yt/client/api/journal_reader.h>
#include <yt/yt/client/api/journal_writer.h>
@@ -626,6 +626,11 @@ public:
const TGetJobStderrOptions& options),
(override));
+ MOCK_METHOD(TFuture<std::vector<TJobTraceEvent>>, GetJobTrace, (
+ const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
+ const TGetJobTraceOptions& options),
+ (override));
+
MOCK_METHOD(TFuture<TSharedRef>, GetJobFailContext, (
const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
NJobTrackerClient::TJobId jobId,
@@ -836,9 +841,9 @@ public:
const TDistributedWriteSessionFinishOptions& options),
(override));
- MOCK_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (
- const TDistributedWriteCookiePtr& cookie,
- const TParticipantTableWriterOptions& options),
+ MOCK_METHOD(TFuture<ITableWriterPtr>, CreateFragmentTableWriter, (
+ const TFragmentWriteCookiePtr& cookie,
+ const TFragmentTableWriterOptions& options),
(override));
MOCK_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (
diff --git a/yt/yt/client/unittests/mock/transaction.h b/yt/yt/client/unittests/mock/transaction.h
index 9bb296dbe8..943e4fa201 100644
--- a/yt/yt/client/unittests/mock/transaction.h
+++ b/yt/yt/client/unittests/mock/transaction.h
@@ -1,6 +1,6 @@
#pragma once
-#include <yt/yt/client/api/distributed_table_sessions.h>
+#include <yt/yt/client/api/distributed_table_session.h>
#include <yt/yt/client/api/file_writer.h>
#include <yt/yt/client/api/journal_reader.h>
#include <yt/yt/client/api/journal_writer.h>
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index a0cc4caf88..a09ba33382 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -13,7 +13,7 @@ SRCS(
api/client_cache.cpp
api/delegating_client.cpp
api/delegating_transaction.cpp
- api/distributed_table_sessions.cpp
+ api/distributed_table_session.cpp
api/etc_client.cpp
api/journal_client.cpp
api/operation_client.cpp
@@ -127,6 +127,7 @@ SRCS(
table_client/schemaless_buffered_dynamic_table_writer.cpp
table_client/schemaless_dynamic_table_writer.cpp
table_client/serialize.cpp
+ table_client/table_upload_options.cpp
table_client/logical_type.cpp
table_client/merge_table_schemas.cpp
table_client/name_table.cpp
diff --git a/yt/yt/core/bus/tcp/connection.h b/yt/yt/core/bus/tcp/connection.h
index 2948d20b67..421216ed81 100644
--- a/yt/yt/core/bus/tcp/connection.h
+++ b/yt/yt/core/bus/tcp/connection.h
@@ -15,7 +15,7 @@
#include <yt/yt/core/net/address.h>
-#include <yt/yt/core/misc/atomic_object.h>
+#include <library/cpp/yt/threading/atomic_object.h>
#include <yt/yt/core/misc/blob.h>
#include <yt/yt/core/misc/mpsc_stack.h>
#include <yt/yt/core/misc/ring_queue.h>
diff --git a/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp b/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp
index 7db0cf83c7..4ddc546cdb 100644
--- a/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp
@@ -152,15 +152,18 @@ TEST_F(TQuantizedExecutorTest, Simple)
TEST_F(TQuantizedExecutorTest, Timeout)
{
- InitSimple(/*workerCount*/ 4, /*iterationCount*/ std::numeric_limits<i64>::max());
+ InitSimple(/*workerCount*/ 2, /*iterationCount*/ std::numeric_limits<i64>::max());
- for (int index = 1; index <= 10; ++index) {
- WaitFor(Executor_->Run(TDuration::MilliSeconds(100)))
- .ThrowOnError();
+ WaitFor(Executor_->Run(TDuration::MilliSeconds(100)))
+ .ThrowOnError();
- auto counter = SimpleCallbackProvider_->GetCounter();
- EXPECT_LE(counter, /*workerCount*/ 4 * /*milliseconds*/ 100.0 / /*period*/ 5 * index * 1.25);
- }
+ auto oldCounter = SimpleCallbackProvider_->GetCounter();
+
+ TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100));
+
+ auto newCounter = SimpleCallbackProvider_->GetCounter();
+
+ EXPECT_EQ(oldCounter, newCounter);
}
TEST_F(TQuantizedExecutorTest, LongCallback1)
diff --git a/yt/yt/core/crypto/tls.cpp b/yt/yt/core/crypto/tls.cpp
index 9acfb26180..0eb910fbab 100644
--- a/yt/yt/core/crypto/tls.cpp
+++ b/yt/yt/core/crypto/tls.cpp
@@ -340,10 +340,12 @@ public:
bool IsIdle() const override
{
- return
- Underlying_->IsIdle() &&
- ActiveIOCount_ == 0 &&
- !Failed_;
+ return ActiveIOCount_ == 0 && !Failed_;
+ }
+
+ bool IsReusable() const override
+ {
+ return IsIdle() && Underlying_->IsReusable();
}
TFuture<void> Abort() override
diff --git a/yt/yt/core/http/config.cpp b/yt/yt/core/http/config.cpp
index 2fb5b909aa..7be4dca2d4 100644
--- a/yt/yt/core/http/config.cpp
+++ b/yt/yt/core/http/config.cpp
@@ -38,10 +38,10 @@ void TServerConfig::Register(TRegistrar registrar)
.Default(80);
registrar.Parameter("max_simultaneous_connections", &TThis::MaxSimultaneousConnections)
- .Default(50000);
+ .Default(50'000);
registrar.Parameter("max_backlog_size", &TThis::MaxBacklogSize)
- .Default(8192);
+ .Default(8'192);
registrar.Parameter("bind_retry_count", &TThis::BindRetryCount)
.Default(5);
diff --git a/yt/yt/core/http/connection_reuse_helpers.cpp b/yt/yt/core/http/connection_reuse_helpers.cpp
index 55ba238a5e..a0e1de77be 100644
--- a/yt/yt/core/http/connection_reuse_helpers.cpp
+++ b/yt/yt/core/http/connection_reuse_helpers.cpp
@@ -17,7 +17,7 @@ TReusableConnectionState::TReusableConnectionState(
TReusableConnectionState::~TReusableConnectionState()
{
- if (Reusable && OwningPool && Connection->IsIdle()) {
+ if (Reusable && OwningPool && Connection->IsReusable()) {
OwningPool->Release(std::move(Connection));
}
}
diff --git a/yt/yt/core/http/server.cpp b/yt/yt/core/http/server.cpp
index ae7df4b620..d2fda153b5 100644
--- a/yt/yt/core/http/server.cpp
+++ b/yt/yt/core/http/server.cpp
@@ -12,7 +12,6 @@
#include <yt/yt/core/concurrency/thread_pool_poller.h>
#include <yt/yt/core/misc/finally.h>
-#include <yt/yt/core/misc/memory_usage_tracker.h>
#include <yt/yt/core/misc/public.h>
#include <yt/yt/core/ytree/convert.h>
@@ -63,7 +62,6 @@ public:
IPollerPtr poller,
IPollerPtr acceptor,
IInvokerPtr invoker,
- IMemoryUsageTrackerPtr memoryUsageTracker,
IRequestPathMatcherPtr requestPathMatcher,
bool ownPoller = false)
: Config_(std::move(config))
@@ -71,7 +69,6 @@ public:
, Poller_(std::move(poller))
, Acceptor_(std::move(acceptor))
, Invoker_(std::move(invoker))
- , MemoryUsageTracker_(std::move(memoryUsageTracker))
, OwnPoller_(ownPoller)
, RequestPathMatcher_(std::move(requestPathMatcher))
{ }
@@ -126,7 +123,6 @@ private:
const IPollerPtr Poller_;
const IPollerPtr Acceptor_;
const IInvokerPtr Invoker_;
- const IMemoryUsageTrackerPtr MemoryUsageTracker_;
const bool OwnPoller_ = false;
IRequestPathMatcherPtr RequestPathMatcher_;
@@ -224,14 +220,6 @@ private:
SetRequestId(response, request->GetRequestId());
- if (MemoryUsageTracker_ && MemoryUsageTracker_->IsExceeded()) {
- THROW_ERROR_EXCEPTION(
- EStatusCode::TooManyRequests,
- "Request is dropped due to high memory pressure")
- << TErrorAttribute("total_memory_limit", MemoryUsageTracker_->GetLimit())
- << TErrorAttribute("memory_usage", MemoryUsageTracker_->GetUsed());
- }
-
handler->HandleRequest(request, response);
NTracing::FlushCurrentTraceContextElapsedTime();
@@ -393,7 +381,6 @@ IServerPtr CreateServer(
IPollerPtr poller,
IPollerPtr acceptor,
IInvokerPtr invoker,
- IMemoryUsageTrackerPtr memoryUsageTracker,
bool ownPoller)
{
auto handlers = New<TRequestPathMatcher>();
@@ -403,7 +390,6 @@ IServerPtr CreateServer(
std::move(poller),
std::move(acceptor),
std::move(invoker),
- std::move(memoryUsageTracker),
std::move(handlers),
ownPoller);
}
@@ -413,7 +399,6 @@ IServerPtr CreateServer(
IPollerPtr poller,
IPollerPtr acceptor,
IInvokerPtr invoker,
- IMemoryUsageTrackerPtr memoryUsageTracker,
bool ownPoller)
{
auto address = TNetworkAddress::CreateIPv6Any(config->Port);
@@ -426,7 +411,6 @@ IServerPtr CreateServer(
std::move(poller),
std::move(acceptor),
std::move(invoker),
- std::move(memoryUsageTracker),
ownPoller);
} catch (const std::exception& ex) {
if (i + 1 == config->BindRetryCount) {
@@ -456,7 +440,6 @@ IServerPtr CreateServer(
std::move(poller),
std::move(acceptor),
std::move(invoker),
- /*memoryUsageTracker*/ GetNullMemoryUsageTracker(),
/*ownPoller*/ false);
}
@@ -464,8 +447,7 @@ IServerPtr CreateServer(
TServerConfigPtr config,
IListenerPtr listener,
IPollerPtr poller,
- IPollerPtr acceptor,
- IMemoryUsageTrackerPtr memoryUsageTracker)
+ IPollerPtr acceptor)
{
auto invoker = poller->GetInvoker();
return CreateServer(
@@ -474,15 +456,13 @@ IServerPtr CreateServer(
std::move(poller),
std::move(acceptor),
std::move(invoker),
- std::move(memoryUsageTracker),
/*ownPoller*/ false);
}
IServerPtr CreateServer(
TServerConfigPtr config,
IPollerPtr poller,
- IPollerPtr acceptor,
- IMemoryUsageTrackerPtr memoryUsageTracker)
+ IPollerPtr acceptor)
{
auto invoker = poller->GetInvoker();
return CreateServer(
@@ -490,7 +470,6 @@ IServerPtr CreateServer(
std::move(poller),
std::move(acceptor),
std::move(invoker),
- std::move(memoryUsageTracker),
/*ownPoller*/ false);
}
@@ -520,7 +499,6 @@ IServerPtr CreateServer(TServerConfigPtr config, int pollerThreadCount)
std::move(poller),
std::move(acceptor),
std::move(invoker),
- /*memoryUsageTracker*/ GetNullMemoryUsageTracker(),
/*ownPoller*/ true);
}
@@ -535,7 +513,6 @@ IServerPtr CreateServer(
std::move(poller),
std::move(acceptor),
std::move(invoker),
- /*memoryUsageTracker*/ GetNullMemoryUsageTracker(),
/*ownPoller*/ false);
}
diff --git a/yt/yt/core/http/server.h b/yt/yt/core/http/server.h
index 171fb70399..e96720981b 100644
--- a/yt/yt/core/http/server.h
+++ b/yt/yt/core/http/server.h
@@ -89,16 +89,14 @@ IServerPtr CreateServer(
TServerConfigPtr config,
NNet::IListenerPtr listener,
NConcurrency::IPollerPtr poller,
- NConcurrency::IPollerPtr acceptor,
- IMemoryUsageTrackerPtr memoryTracker = GetNullMemoryUsageTracker());
+ NConcurrency::IPollerPtr acceptor);
IServerPtr CreateServer(
TServerConfigPtr config,
NConcurrency::IPollerPtr poller);
IServerPtr CreateServer(
TServerConfigPtr config,
NConcurrency::IPollerPtr poller,
- NConcurrency::IPollerPtr acceptor,
- IMemoryUsageTrackerPtr memoryTracker = GetNullMemoryUsageTracker());
+ NConcurrency::IPollerPtr acceptor);
IServerPtr CreateServer(
int port,
NConcurrency::IPollerPtr poller);
diff --git a/yt/yt/core/http/unittests/http_ut.cpp b/yt/yt/core/http/unittests/http_ut.cpp
index cda7dea65f..7030a2350f 100644
--- a/yt/yt/core/http/unittests/http_ut.cpp
+++ b/yt/yt/core/http/unittests/http_ut.cpp
@@ -206,6 +206,11 @@ struct TFakeConnection
return true;
}
+ bool IsReusable() const override
+ {
+ return true;
+ }
+
TFuture<void> Abort() override
{
THROW_ERROR_EXCEPTION("Not implemented");
diff --git a/yt/yt/core/https/server.cpp b/yt/yt/core/https/server.cpp
index b97cb801e8..92bdee379f 100644
--- a/yt/yt/core/https/server.cpp
+++ b/yt/yt/core/https/server.cpp
@@ -112,8 +112,7 @@ IServerPtr CreateServer(
const TServerConfigPtr& config,
const IPollerPtr& poller,
const IPollerPtr& acceptor,
- const IInvokerPtr& controlInvoker,
- const IMemoryUsageTrackerPtr& memoryTracker)
+ const IInvokerPtr& controlInvoker)
{
auto sslContext = New<TSslContext>();
ApplySslConfig(sslContext, config->Credentials);
@@ -165,8 +164,7 @@ IServerPtr CreateServer(
configCopy,
tlsListener,
poller,
- acceptor,
- memoryTracker);
+ acceptor);
return New<TServer>(std::move(httpServer), std::move(certificateUpdater));
}
diff --git a/yt/yt/core/https/server.h b/yt/yt/core/https/server.h
index 46994e8ea4..c6c40eeec3 100644
--- a/yt/yt/core/https/server.h
+++ b/yt/yt/core/https/server.h
@@ -8,8 +8,6 @@
#include <yt/yt/core/http/public.h>
-#include <yt/yt/core/misc/memory_usage_tracker.h>
-
namespace NYT::NHttps {
////////////////////////////////////////////////////////////////////////////////
@@ -28,8 +26,7 @@ NHttp::IServerPtr CreateServer(
const TServerConfigPtr& config,
const NConcurrency::IPollerPtr& poller,
const NConcurrency::IPollerPtr& acceptor,
- const IInvokerPtr& controlInvoker,
- const IMemoryUsageTrackerPtr& memoryTracker = GetNullMemoryUsageTracker());
+ const IInvokerPtr& controlInvoker);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/logging/log_manager.cpp b/yt/yt/core/logging/log_manager.cpp
index 5209a25373..fa96cbc28a 100644
--- a/yt/yt/core/logging/log_manager.cpp
+++ b/yt/yt/core/logging/log_manager.cpp
@@ -348,26 +348,52 @@ public:
RegisterWriterFactory(TString(TStderrLogWriterConfig::WriterType), GetStderrLogWriterFactory());
}
+ bool IsInitialized() const
+ {
+ return InitializationFinished_.Test();
+ }
+
void Initialize()
{
- std::call_once(Initialized_, [&] {
- // NB: Cannot place this logic inside ctor since it may boot up Compression threads unexpected
- // and these will try to access TLogManager instance causing a deadlock.
- try {
- if (auto config = TLogManagerConfig::TryCreateFromEnv()) {
- DoUpdateConfig(config, /*fromEnv*/ true);
- }
- } catch (const std::exception& ex) {
- fprintf(stderr, "Error configuring logging from environment variables\n%s\n",
- ex.what());
+ [[likely]] if (InitializationFinished_.Test()) {
+ // Don't bother doing syscalls on a hot path.
+ return;
+ }
+
+ // Sync is done via event so there is no need for stronger memory orders.
+ // Case of recursive call is alright, because there sync is done via sequenced-before ordering.
+ [[likely]] if (InitializationStarted_.exchange(true, std::memory_order::relaxed)) {
+ NThreading::TThreadId initializerThreadId = NThreading::InvalidThreadId;
+ while (initializerThreadId == NThreading::InvalidThreadId) {
+ initializerThreadId = InitializerThreadId_.load(std::memory_order::relaxed);
+ }
+ if (GetCurrentThreadId() == initializerThreadId) {
+ // Recursive call -- bail out.
+ return;
}
+ // Another thread -- now wait for real.
+ InitializationFinished_.Wait();
+ return;
+ }
+ InitializerThreadId_.store(GetCurrentThreadId(), std::memory_order::relaxed);
- if (!IsConfiguredFromEnv()) {
- DoUpdateConfig(TLogManagerConfig::CreateDefault(), /*fromEnv*/ false);
+ // NB: Cannot place this logic inside ctor since it may boot up Compression threads unexpected
+ // and these will try to access TLogManager instance causing a deadlock.
+ try {
+ if (auto config = TLogManagerConfig::TryCreateFromEnv()) {
+ DoUpdateConfig(config, /*fromEnv*/ true);
}
+ } catch (const std::exception& ex) {
+ fprintf(stderr, "Error configuring logging from environment variables\n%s\n",
+ ex.what());
+ }
- SystemCategory_ = GetCategory(SystemLoggingCategoryName);
- });
+ if (!IsConfiguredFromEnv()) {
+ DoUpdateConfig(TLogManagerConfig::CreateDefault(), /*fromEnv*/ false);
+ }
+
+ SystemCategory_ = GetCategory(SystemLoggingCategoryName);
+ InitializationFinished_.NotifyAll();
}
void Configure(INodePtr node)
@@ -422,18 +448,20 @@ public:
{
ShutdownRequested_.store(true);
+ auto config = Config_.Acquire();
+
if (LoggingThread_->GetThreadId() == GetCurrentThreadId()) {
FlushWriters();
} else {
// Wait for all previously enqueued messages to be flushed
// but no more than ShutdownGraceTimeout to prevent hanging.
- Synchronize(TInstant::Now() + Config_->ShutdownGraceTimeout);
+ Synchronize(TInstant::Now() + config->ShutdownGraceTimeout);
}
// For now this is the only way to wait for log writers that perform asynchronous flushes.
// TODO(achulkov2): Refactor log manager to support asynchronous operations.
- if (Config_->ShutdownBusyTimeout) {
- Sleep(Config_->ShutdownBusyTimeout);
+ if (config->ShutdownBusyTimeout != TDuration::Zero()) {
+ Sleep(config->ShutdownBusyTimeout);
}
EventQueue_->Shutdown();
@@ -460,13 +488,14 @@ public:
}
auto guard = Guard(SpinLock_);
+ auto config = Config_.Acquire();
auto it = NameToCategory_.find(categoryName);
if (it == NameToCategory_.end()) {
auto category = std::make_unique<TLoggingCategory>();
category->Name = categoryName;
category->ActualVersion = &Version_;
it = NameToCategory_.emplace(categoryName, std::move(category)).first;
- DoUpdateCategory(it->second.get());
+ DoUpdateCategory(config, it->second.get());
}
return it->second.get();
}
@@ -474,14 +503,17 @@ public:
void UpdateCategory(TLoggingCategory* category)
{
auto guard = Guard(SpinLock_);
- DoUpdateCategory(category);
+ auto config = Config_.Acquire();
+ DoUpdateCategory(config, category);
}
void UpdateAnchor(TLoggingAnchor* anchor)
{
auto guard = Guard(SpinLock_);
+ auto config = Config_.Acquire();
+
bool enabled = true;
- for (const auto& prefix : Config_->SuppressedMessages) {
+ for (const auto& prefix : config->SuppressedMessages) {
if (anchor->AnchorMessage.StartsWith(prefix)) {
enabled = false;
break;
@@ -619,11 +651,9 @@ public:
void SuppressRequest(TRequestId requestId)
{
- if (!RequestSuppressionEnabled_) {
- return;
+ if (RequestSuppressionEnabled_.load(std::memory_order_relaxed)) {
+ SuppressedRequestIdQueue_.Enqueue(requestId);
}
-
- SuppressedRequestIdQueue_.Enqueue(requestId);
}
void Synchronize(TInstant deadline = TInstant::Max())
@@ -678,6 +708,8 @@ private:
void EnsureStarted()
{
+ VERIFY_THREAD_AFFINITY_ANY();
+
std::call_once(Started_, [&] {
if (LoggingThread_->IsStopping()) {
return;
@@ -695,7 +727,9 @@ private:
});
}
- const std::vector<ILogWriterPtr>& GetWriters(const TLogEvent& event)
+ const std::vector<ILogWriterPtr>& GetWriters(
+ const TLogManagerConfigPtr& config,
+ const TLogEvent& event)
{
VERIFY_THREAD_AFFINITY(LoggingThread);
@@ -710,7 +744,7 @@ private:
}
THashSet<TString> writerNames;
- for (const auto& rule : Config_->Rules) {
+ for (const auto& rule : config->Rules) {
if (rule->IsApplicable(event.Category->Name, event.Level, event.Family)) {
writerNames.insert(rule->Writers.begin(), rule->Writers.end());
}
@@ -757,10 +791,7 @@ private:
return;
}
- AbortOnAlert_.store(event.Config->AbortOnAlert);
-
EnsureStarted();
-
FlushWriters();
try {
@@ -771,8 +802,10 @@ private:
}
}
- std::unique_ptr<ILogFormatter> CreateFormatter(const TLogWriterConfigPtr& writerConfig)
+ static std::unique_ptr<ILogFormatter> CreateFormatter(const TLogWriterConfigPtr& writerConfig)
{
+ VERIFY_THREAD_AFFINITY_ANY();
+
switch (writerConfig->Format) {
case ELogFormat::PlainText:
return std::make_unique<TPlainTextLogFormatter>(
@@ -795,7 +828,10 @@ private:
void DoUpdateConfig(const TLogManagerConfigPtr& config, bool fromEnv)
{
- if (AreNodesEqual(ConvertToNode(Config_), ConvertToNode(config))) {
+ // This could be called both from ctor and from LoggingThread.
+
+ auto oldConfig = Config_.Acquire();
+ if (AreNodesEqual(ConvertToNode(oldConfig), ConvertToNode(config))) {
return;
}
@@ -850,31 +886,36 @@ private:
category->StructuredValidationSamplingRate.store(config->StructuredValidationSamplingRate, std::memory_order::relaxed);
}
- Config_ = config;
ConfiguredFromEnv_.store(fromEnv);
- HighBacklogWatermark_.store(Config_->HighBacklogWatermark);
- LowBacklogWatermark_.store(Config_->LowBacklogWatermark);
- RequestSuppressionEnabled_.store(Config_->RequestSuppressionTimeout != TDuration::Zero());
+ HighBacklogWatermark_.store(config->HighBacklogWatermark);
+ LowBacklogWatermark_.store(config->LowBacklogWatermark);
+ RequestSuppressionEnabled_.store(config->RequestSuppressionTimeout != TDuration::Zero());
+ AbortOnAlert_.store(config->AbortOnAlert);
- CompressionThreadPool_->Configure(Config_->CompressionThreadCount);
+ CompressionThreadPool_->Configure(config->CompressionThreadCount);
if (RequestSuppressionEnabled_) {
- SuppressedRequestIdSet_.SetTtl((Config_->RequestSuppressionTimeout + DequeuePeriod) * 2);
+ SuppressedRequestIdSet_.SetTtl((config->RequestSuppressionTimeout + DequeuePeriod) * 2);
} else {
SuppressedRequestIdSet_.Clear();
SuppressedRequestIdQueue_.DequeueAll();
}
- FlushExecutor_->SetPeriod(Config_->FlushPeriod);
- WatchExecutor_->SetPeriod(Config_->WatchPeriod);
- CheckSpaceExecutor_->SetPeriod(Config_->CheckSpacePeriod);
- FileRotationExecutor_->SetPeriod(Config_->RotationCheckPeriod);
+ FlushExecutor_->SetPeriod(config->FlushPeriod);
+ WatchExecutor_->SetPeriod(config->WatchPeriod);
+ CheckSpaceExecutor_->SetPeriod(config->CheckSpacePeriod);
+ FileRotationExecutor_->SetPeriod(config->RotationCheckPeriod);
+ Config_.Store(std::move(config));
Version_++;
}
- void WriteEvent(const TLogEvent& event)
+ void WriteEvent(
+ const TLogManagerConfigPtr& config,
+ const TLogEvent& event)
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
if (ReopenRequested_.exchange(false)) {
ReloadWriters();
}
@@ -886,21 +927,26 @@ private:
event.Anchor->ByteCounter.Current += std::ssize(event.MessageRef);
}
- for (const auto& writer : GetWriters(event)) {
+ for (const auto& writer : GetWriters(config, event)) {
writer->Write(event);
}
}
void FlushWriters()
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
for (const auto& [name, writer] : NameToWriter_) {
writer->Flush();
}
+
FlushedEvents_ = WrittenEvents_.load();
}
void RotateFiles()
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
for (const auto& [name, writer] : NameToWriter_) {
if (auto fileWriter = DynamicPointerCast<IFileLogWriter>(writer)) {
fileWriter->MaybeRotate();
@@ -910,6 +956,8 @@ private:
void ReloadWriters()
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
Version_++;
for (const auto& [name, writer] : NameToWriter_) {
writer->Reload();
@@ -918,15 +966,20 @@ private:
void CheckSpace()
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
+ auto config = Config_.Acquire();
for (const auto& [name, writer] : NameToWriter_) {
if (auto fileWriter = DynamicPointerCast<IFileLogWriter>(writer)) {
- fileWriter->CheckSpace(Config_->MinDiskSpace);
+ fileWriter->CheckSpace(config->MinDiskSpace);
}
}
}
void RegisterNotificatonWatch(TNotificationWatch* watch)
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
if (watch->IsValid()) {
// Watch can fail to initialize if the writer is disabled
// e.g. due to the lack of space.
@@ -978,6 +1031,8 @@ private:
void PushEvent(TLoggerQueueItem&& event)
{
+ VERIFY_THREAD_AFFINITY_ANY();
+
auto& perThreadQueue = PerThreadQueue();
if (!perThreadQueue) {
perThreadQueue = new TThreadLocalQueue();
@@ -994,9 +1049,10 @@ private:
const TCounter& GetWrittenEventsCounter(const TLogEvent& event)
{
+ VERIFY_THREAD_AFFINITY_ANY();
+
auto key = std::pair(event.Category->Name, event.Level);
auto it = WrittenEventsCounters_.find(key);
-
if (it == WrittenEventsCounters_.end()) {
// TODO(prime@): optimize sensor count
auto counter = Profiler
@@ -1012,6 +1068,8 @@ private:
void CollectSensors(ISensorWriter* writer) override
{
+ VERIFY_THREAD_AFFINITY_ANY();
+
auto writtenEvents = WrittenEvents_.load();
auto enqueuedEvents = EnqueuedEvents_.load();
auto suppressedEvents = SuppressedEvents_.load();
@@ -1027,6 +1085,8 @@ private:
void OnDiskProfiling()
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
try {
auto minLogStorageAvailableSpace = std::numeric_limits<i64>::max();
auto minLogStorageFreeSpace = std::numeric_limits<i64>::max();
@@ -1059,6 +1119,8 @@ private:
std::vector<TLoggingAnchorStat> CaptureAnchorStats()
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
auto now = TInstant::Now();
auto deltaSeconds = (now - LastAnchorStatsCaptureTime_).SecondsFloat();
LastAnchorStatsCaptureTime_ = now;
@@ -1086,14 +1148,17 @@ private:
void OnAnchorProfiling()
{
- if (Config_->EnableAnchorProfiling && !AnchorBufferedProducer_) {
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
+ auto config = Config_.Acquire();
+ if (config->EnableAnchorProfiling && !AnchorBufferedProducer_) {
AnchorBufferedProducer_ = New<TBufferedProducer>();
Profiler
.WithSparse()
.WithDefaultDisabled()
.WithProducerRemoveSupport()
.AddProducer("/anchors", AnchorBufferedProducer_);
- } else if (!Config_->EnableAnchorProfiling && AnchorBufferedProducer_) {
+ } else if (!config->EnableAnchorProfiling && AnchorBufferedProducer_) {
AnchorBufferedProducer_.Reset();
}
@@ -1105,7 +1170,7 @@ private:
TSensorBuffer sensorBuffer;
for (const auto& stat : stats) {
- if (stat.MessageRate < Config_->MinLoggedMessageRateToProfile) {
+ if (stat.MessageRate < config->MinLoggedMessageRateToProfile) {
continue;
}
TWithTagGuard tagGuard(&sensorBuffer, "message", stat.Anchor->AnchorMessage);
@@ -1242,20 +1307,24 @@ private:
WrittenEvents_ += eventsWritten;
- if (!Config_->FlushPeriod || ShutdownRequested_) {
+ auto config = Config_.Acquire();
+ if (!config->FlushPeriod || ShutdownRequested_) {
FlushWriters();
}
}
int ProcessTimeOrderedBuffer()
{
+ VERIFY_THREAD_AFFINITY(LoggingThread);
+
int eventsWritten = 0;
int eventsSuppressed = 0;
SuppressedRequestIdSet_.InsertMany(Now(), SuppressedRequestIdQueue_.DequeueAll());
auto requestSuppressionEnabled = RequestSuppressionEnabled_.load(std::memory_order::relaxed);
- auto suppressionDeadline = GetCpuInstant() - DurationToCpuDuration(Config_->RequestSuppressionTimeout);
+ auto config = Config_.Acquire();
+ auto suppressionDeadline = GetCpuInstant() - DurationToCpuDuration(config->RequestSuppressionTimeout);
while (!TimeOrderedBuffer_.empty()) {
const auto& event = TimeOrderedBuffer_.front();
@@ -1274,7 +1343,7 @@ private:
if (requestSuppressionEnabled && event.RequestId && SuppressedRequestIdSet_.Contains(event.RequestId)) {
++eventsSuppressed;
} else {
- WriteEvent(event);
+ WriteEvent(config, event);
}
});
@@ -1286,10 +1355,14 @@ private:
return eventsWritten;
}
- void DoUpdateCategory(TLoggingCategory* category)
+ void DoUpdateCategory(
+ const TLogManagerConfigPtr& config,
+ TLoggingCategory* category)
{
+ VERIFY_THREAD_AFFINITY_ANY();
+
auto minPlainTextLevel = ELogLevel::Maximum;
- for (const auto& rule : Config_->Rules) {
+ for (const auto& rule : config->Rules) {
if (rule->IsApplicable(category->Name, ELogFamily::PlainText)) {
minPlainTextLevel = std::min(minPlainTextLevel, rule->MinLevel);
}
@@ -1297,11 +1370,13 @@ private:
category->MinPlainTextLevel.store(minPlainTextLevel, std::memory_order::relaxed);
category->CurrentVersion.store(GetVersion(), std::memory_order::relaxed);
- category->StructuredValidationSamplingRate.store(Config_->StructuredValidationSamplingRate, std::memory_order::relaxed);
+ category->StructuredValidationSamplingRate.store(config->StructuredValidationSamplingRate, std::memory_order::relaxed);
}
void DoRegisterAnchor(TLoggingAnchor* anchor)
{
+ VERIFY_SPINLOCK_AFFINITY(SpinLock_);
+
// NB: Duplicates are not desirable but possible.
AnchorMap_.emplace(anchor->AnchorMessage, anchor);
anchor->NextAnchor = FirstAnchor_;
@@ -1331,23 +1406,28 @@ private:
DECLARE_THREAD_AFFINITY_SLOT(LoggingThread);
- // Configuration.
+ TAtomicIntrusivePtr<TLogManagerConfig> Config_;
+
+ // Protects the section of members below.
NThreading::TForkAwareSpinLock SpinLock_;
- // Version forces this very module's Logger object to update to our own
- // default configuration (default level etc.).
- std::atomic<int> Version_ = 0;
- std::atomic<bool> AbortOnAlert_ = false;
- TLogManagerConfigPtr Config_;
- std::atomic<bool> ConfiguredFromEnv_ = false;
THashMap<TString, std::unique_ptr<TLoggingCategory>> NameToCategory_;
THashMap<TString, ILogWriterFactoryPtr> TypeNameToWriterFactory_;
- const TLoggingCategory* SystemCategory_;
- // These are just copies from Config_.
+
+ // Incrementing version forces loggers to update their own default configuration (default level etc.).
+ std::atomic<int> Version_ = 0;
+
+ std::atomic<bool> ConfiguredFromEnv_ = false;
+
+ // These are just cached (for performance reason) copies from Config_.
// The values are being read from arbitrary threads but stale values are fine.
std::atomic<ui64> HighBacklogWatermark_ = Max<ui64>();
std::atomic<ui64> LowBacklogWatermark_ = Max<ui64>();
+ std::atomic<bool> AbortOnAlert_ = false;
+
+ std::atomic<bool> InitializationStarted_ = false;
+ std::atomic<NThreading::TThreadId> InitializerThreadId_ = NThreading::InvalidThreadId;
+ NThreading::TEvent InitializationFinished_;
- std::once_flag Initialized_;
std::once_flag Started_;
std::atomic<bool> Suspended_ = false;
std::atomic<bool> ScheduledOutOfBand_ = false;
@@ -1380,7 +1460,9 @@ private:
THashMap<TString, ILogWriterPtr> NameToWriter_;
THashMap<TLogWriterCacheKey, std::vector<ILogWriterPtr>> KeyToCachedWriter_;
+
const std::vector<ILogWriterPtr> SystemWriters_;
+ const TLoggingCategory* SystemCategory_;
std::atomic<bool> ReopenRequested_ = false;
std::atomic<bool> ShutdownRequested_ = false;
@@ -1431,105 +1513,155 @@ TLogManager::~TLogManager() = default;
TLogManager* TLogManager::Get()
{
auto* logManager = LeakySingleton<TLogManager>();
- logManager->Initialize();
+ logManager->Impl_->Initialize();
return logManager;
}
void TLogManager::Configure(TLogManagerConfigPtr config, bool sync)
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ return;
+ }
Impl_->Configure(std::move(config), /*fromEnv*/ false, sync);
}
void TLogManager::ConfigureFromEnv()
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ return;
+ }
Impl_->ConfigureFromEnv();
}
bool TLogManager::IsConfiguredFromEnv()
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ return false;
+ }
return Impl_->IsConfiguredFromEnv();
}
void TLogManager::Shutdown()
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ return;
+ }
Impl_->Shutdown();
}
int TLogManager::GetVersion() const
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ return 0;
+ }
return Impl_->GetVersion();
}
bool TLogManager::GetAbortOnAlert() const
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ return false;
+ }
return Impl_->GetAbortOnAlert();
}
const TLoggingCategory* TLogManager::GetCategory(TStringBuf categoryName)
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ return nullptr;
+ }
return Impl_->GetCategory(categoryName);
}
void TLogManager::UpdateCategory(TLoggingCategory* category)
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ return;
+ }
Impl_->UpdateCategory(category);
}
void TLogManager::UpdateAnchor(TLoggingAnchor* anchor)
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ return;
+ }
Impl_->UpdateAnchor(anchor);
}
void TLogManager::RegisterStaticAnchor(TLoggingAnchor* anchor, ::TSourceLocation sourceLocation, TStringBuf anchorMessage)
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ return;
+ }
Impl_->RegisterStaticAnchor(anchor, sourceLocation, anchorMessage);
}
TLoggingAnchor* TLogManager::RegisterDynamicAnchor(TString anchorMessage)
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ return nullptr;
+ }
return Impl_->RegisterDynamicAnchor(std::move(anchorMessage));
}
void TLogManager::RegisterWriterFactory(const TString& typeName, const ILogWriterFactoryPtr& factory)
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ return;
+ }
Impl_->RegisterWriterFactory(typeName, factory);
}
void TLogManager::UnregisterWriterFactory(const TString& typeName)
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ return;
+ }
Impl_->UnregisterWriterFactory(typeName);
}
void TLogManager::Enqueue(TLogEvent&& event)
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ Cerr << NYT::Format("Trying to log event during logger initialization -- skipping") << Endl;
+ return;
+ }
Impl_->Enqueue(std::move(event));
}
void TLogManager::Reopen()
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ return;
+ }
Impl_->Reopen();
}
void TLogManager::EnableReopenOnSighup()
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ return;
+ }
Impl_->EnableReopenOnSighup();
}
void TLogManager::SuppressRequest(TRequestId requestId)
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ return;
+ }
Impl_->SuppressRequest(requestId);
}
void TLogManager::Synchronize(TInstant deadline)
{
+ [[unlikely]] if (!Impl_->IsInitialized()) {
+ return;
+ }
Impl_->Synchronize(deadline);
}
-void TLogManager::Initialize()
-{
- Impl_->Initialize();
-}
-
////////////////////////////////////////////////////////////////////////////////
TFiberMinLogLevelGuard::TFiberMinLogLevelGuard(ELogLevel minLogLevel)
diff --git a/yt/yt/core/logging/log_manager.h b/yt/yt/core/logging/log_manager.h
index 08598d3080..e4309e3264 100644
--- a/yt/yt/core/logging/log_manager.h
+++ b/yt/yt/core/logging/log_manager.h
@@ -70,8 +70,6 @@ private:
DECLARE_LEAKY_SINGLETON_FRIEND()
- void Initialize();
-
class TImpl;
const TIntrusivePtr<TImpl> Impl_;
};
diff --git a/yt/yt/core/misc/async_expiring_cache-inl.h b/yt/yt/core/misc/async_expiring_cache-inl.h
index bad6d0a399..eb520f4dff 100644
--- a/yt/yt/core/misc/async_expiring_cache-inl.h
+++ b/yt/yt/core/misc/async_expiring_cache-inl.h
@@ -343,6 +343,27 @@ void TAsyncExpiringCache<TKey, TValue>::ForceRefresh(const TKey& key, const T& v
}
template <class TKey, class TValue>
+void TAsyncExpiringCache<TKey, TValue>::PingEntry(const TKey& key)
+{
+ auto now = NProfiling::GetCpuInstant();
+ auto guard = ReaderGuard(SpinLock_);
+
+ if (auto it = Map_.find(key); it != Map_.end() && it->second->Promise.IsSet()) {
+ const auto& entry = it->second;
+ if (!entry->Promise.Get().IsOK()) {
+ return;
+ }
+
+ entry->AccessDeadline = now + NProfiling::DurationToCpuDuration(Config()->ExpireAfterAccessTime);
+ entry->UpdateDeadline = now + NProfiling::DurationToCpuDuration(Config()->ExpireAfterSuccessfulUpdateTime);
+ if (!Config()->BatchUpdate) {
+ ScheduleEntryRefresh(entry, key, Config_->RefreshTime);
+ }
+
+ }
+}
+
+template <class TKey, class TValue>
void TAsyncExpiringCache<TKey, TValue>::Set(const TKey& key, TErrorOr<TValue> valueOrError)
{
auto isValueOK = valueOrError.IsOK();
diff --git a/yt/yt/core/misc/async_expiring_cache.h b/yt/yt/core/misc/async_expiring_cache.h
index 137df01f32..913849ecd1 100644
--- a/yt/yt/core/misc/async_expiring_cache.h
+++ b/yt/yt/core/misc/async_expiring_cache.h
@@ -92,6 +92,9 @@ protected:
virtual bool CanCacheError(const TError& error) noexcept;
+ //! PingEntry resets refresh timer period and behaves like successful entry update.
+ void PingEntry(const TKey& key);
+
private:
const NLogging::TLogger Logger_;
diff --git a/yt/yt/core/misc/atomic_object-inl.h b/yt/yt/core/misc/atomic_object-inl.h
deleted file mode 100644
index 1e76cbf5ab..0000000000
--- a/yt/yt/core/misc/atomic_object-inl.h
+++ /dev/null
@@ -1,97 +0,0 @@
-#ifndef ATOMIC_OBJECT_INL_H_
-#error "Direct inclusion of this file is not allowed, include atomic_object.h"
-// For the sake of sane code completion.
-#include "atomic_object.h"
-#endif
-
-namespace NYT {
-
-////////////////////////////////////////////////////////////////////////////////
-
-template <class T>
-template <class U>
-TAtomicObject<T>::TAtomicObject(U&& u)
- : Object_(std::forward<U>(u))
-{ }
-
-template <class T>
-template <class U>
-void TAtomicObject<T>::Store(U&& u)
-{
- // NB: Using exchange to avoid destructing the old object while holding the lock.
- std::ignore = Exchange(std::forward<U>(u));
-}
-
-template <class T>
-template <class U>
-T TAtomicObject<T>::Exchange(U&& u)
-{
- T tmpObject = std::forward<U>(u);
- {
- auto guard = WriterGuard(Spinlock_);
- std::swap(Object_, tmpObject);
- }
- return tmpObject;
-}
-
-template <class T>
-bool TAtomicObject<T>::CompareExchange(T& expected, const T& desired)
-{
- auto guard = WriterGuard(Spinlock_);
- if (Object_ == expected) {
- auto oldObject = std::move(Object_);
- Y_UNUSED(oldObject);
- Object_ = desired;
- guard.Release();
- return true;
- } else {
- auto oldExpected = std::move(expected);
- Y_UNUSED(oldExpected);
- expected = Object_;
- guard.Release();
- return false;
- }
-}
-
-template <class T>
-template <CInvocable<void(T&)> F>
-void TAtomicObject<T>::Transform(const F& func)
-{
- auto guard = WriterGuard(Spinlock_);
- func(Object_);
-}
-
-template <class T>
-template <class R, CInvocable<R(const T&)> F>
-R TAtomicObject<T>::Read(const F& func) const
-{
- auto guard = ReaderGuard(Spinlock_);
- return func(Object_);
-}
-
-template <class T>
-T TAtomicObject<T>::Load() const
-{
- auto guard = ReaderGuard(Spinlock_);
- return Object_;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-template <class TOriginal, class TSerialized>
-void ToProto(TSerialized* serialized, const TAtomicObject<TOriginal>& original)
-{
- ToProto(serialized, original.Load());
-}
-
-template <class TOriginal, class TSerialized>
-void FromProto(TAtomicObject<TOriginal>* original, const TSerialized& serialized)
-{
- TOriginal data;
- FromProto(&data, serialized);
- original->Store(std::move(data));
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT
diff --git a/yt/yt/core/misc/atomic_object.h b/yt/yt/core/misc/atomic_object.h
deleted file mode 100644
index 91ef9278b0..0000000000
--- a/yt/yt/core/misc/atomic_object.h
+++ /dev/null
@@ -1,63 +0,0 @@
-#pragma once
-
-#include <library/cpp/yt/threading/rw_spin_lock.h>
-
-#include <library/cpp/yt/misc/concepts.h>
-
-namespace NYT {
-
-////////////////////////////////////////////////////////////////////////////////
-
-//! A synchronization object to load and store nontrivial object.
-//! It looks like atomics but for objects.
-template <class T>
-class TAtomicObject
-{
-public:
- TAtomicObject() = default;
-
- template <class U>
- TAtomicObject(U&& u);
-
- template <class U>
- void Store(U&& u);
-
- //! Atomically replaces the old value with the new one and returns the old value.
- template <class U>
- T Exchange(U&& u);
-
- //! Atomically checks if then current value equals #expected.
- //! If so, replaces it with #desired and returns |true|.
- //! Otherwise, copies it into #expected and returns |false|.
- bool CompareExchange(T& expected, const T& desired);
-
- //! Atomically transforms the value with function #func.
- template <CInvocable<void(T&)> F>
- void Transform(const F& func);
-
- //! Atomicaly reads the value with function #func.
- template <class R = void, CInvocable<R(const T&)> F>
- R Read(const F& func) const;
-
- T Load() const;
-
-private:
- T Object_;
- YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, Spinlock_);
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-template <class TOriginal, class TSerialized>
-void ToProto(TSerialized* serialized, const TAtomicObject<TOriginal>& original);
-
-template <class TOriginal, class TSerialized>
-void FromProto(TAtomicObject<TOriginal>* original, const TSerialized& serialized);
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT
-
-#define ATOMIC_OBJECT_INL_H_
-#include "atomic_object-inl.h"
-#undef ATOMIC_OBJECT_INL_H_
diff --git a/yt/yt/core/misc/hazard_ptr.cpp b/yt/yt/core/misc/hazard_ptr.cpp
index ce78cbccf2..ec288eddd9 100644
--- a/yt/yt/core/misc/hazard_ptr.cpp
+++ b/yt/yt/core/misc/hazard_ptr.cpp
@@ -1,7 +1,5 @@
#include "hazard_ptr.h"
-#include "private.h"
-
#include <yt/yt/core/misc/singleton.h>
#include <yt/yt/core/misc/proc.h>
#include <yt/yt/core/misc/ring_queue.h>
@@ -25,10 +23,6 @@ using namespace NConcurrency;
////////////////////////////////////////////////////////////////////////////////
-static constexpr auto& Logger = LockFreeLogger;
-
-////////////////////////////////////////////////////////////////////////////////
-
namespace NDetail {
////////////////////////////////////////////////////////////////////////////////
@@ -325,16 +319,6 @@ bool THazardPointerManager::DoReclaimHazardPointers(THazardThreadState* threadSt
retireList.push(item);
});
- YT_LOG_TRACE_IF(
- !protectedPointers.empty(),
- "Scanning hazard pointers (Candidates: %v, Protected: %v)",
- MakeFormattableView(TRingQueueIterableWrapper(retireList), [&] (auto* builder, auto item) {
- builder->AppendFormat("%v", TTaggedPtr<void>::Unpack(item.PackedPtr).Ptr);
- }),
- MakeFormattableView(protectedPointers, [&] (auto* builder, auto ptr) {
- builder->AppendFormat("%v", ptr);
- }));
-
size_t pushedCount = 0;
auto popCount = retireList.size();
while (popCount-- > 0) {
diff --git a/yt/yt/core/misc/hazard_ptr.h b/yt/yt/core/misc/hazard_ptr.h
index f6a3bf58ca..40202bbf44 100644
--- a/yt/yt/core/misc/hazard_ptr.h
+++ b/yt/yt/core/misc/hazard_ptr.h
@@ -4,8 +4,6 @@
#include <yt/yt/core/concurrency/scheduler_api.h>
-#include <library/cpp/yt/logging/logger.h>
-
#include <atomic>
namespace NYT {
diff --git a/yt/yt/core/misc/ring_queue.h b/yt/yt/core/misc/ring_queue.h
index 471871603e..5cabcc6802 100644
--- a/yt/yt/core/misc/ring_queue.h
+++ b/yt/yt/core/misc/ring_queue.h
@@ -3,6 +3,7 @@
#include "common.h"
#include <library/cpp/yt/assert/assert.h>
+#include <library/cpp/yt/string/format.h>
namespace NYT {
@@ -365,10 +366,19 @@ public:
: Container_(container)
{ }
+ using value_type = TContainer::value_type;
+
private:
TContainer& Container_;
};
+namespace NDetail {
+
+template <class T, class Allocator>
+constexpr bool CKnownRange<TRingQueueIterableWrapper<T, Allocator>> = true;
+
+} // namespace NDetail
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
diff --git a/yt/yt/core/misc/stripped_error.cpp b/yt/yt/core/misc/stripped_error.cpp
index 3406f61dac..fbcfb90ff0 100644
--- a/yt/yt/core/misc/stripped_error.cpp
+++ b/yt/yt/core/misc/stripped_error.cpp
@@ -801,7 +801,7 @@ void AppendError(TStringBuilderBase* builder, const TError& error, int indent)
[[unlikely]] if (!tokenizer.ParseNext()) {
Cerr <<
NYT::Format(
- "%v *** Empty toke encountered while formatting TError attribute (Key: %v, Value: %v"
+ "%v *** Empty token encountered while formatting TError attribute (Key: %v, Value: %v)"
"(BuilderAccumulatedData: %v)",
TInstant::Now(),
key,
diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp
index af4e5f0a7e..ea52305bda 100644
--- a/yt/yt/core/net/connection.cpp
+++ b/yt/yt/core/net/connection.cpp
@@ -645,6 +645,11 @@ public:
!PeerDisconnectedList_.IsFired();
}
+ bool IsReusable()
+ {
+ return IsIdle();
+ }
+
TFuture<void> Abort(const TError& error)
{
YT_LOG_DEBUG(error, "Aborting connection");
@@ -1213,6 +1218,11 @@ public:
return Impl_->IsIdle();
}
+ bool IsReusable() const override
+ {
+ return Impl_->IsReusable();
+ }
+
TFuture<void> Abort() override
{
return Impl_->Abort(TError(EErrorCode::Aborted, "Connection aborted"));
diff --git a/yt/yt/core/net/connection.h b/yt/yt/core/net/connection.h
index 4078e11460..3c9998e1d0 100644
--- a/yt/yt/core/net/connection.h
+++ b/yt/yt/core/net/connection.h
@@ -73,16 +73,19 @@ struct IConnection
virtual const TNetworkAddress& GetLocalAddress() const = 0;
virtual const TNetworkAddress& GetRemoteAddress() const = 0;
- // Returns true if connection is not is failed state and has no
- // active IO operations.
+ //! Returns true if connection is not is failed state and has no
+ //! active IO operations.
virtual bool IsIdle() const = 0;
+ //! Returns true if connection can be reused by a pool.
+ virtual bool IsReusable() const = 0;
+
virtual bool SetNoDelay() = 0;
virtual bool SetKeepAlive() = 0;
TFuture<void> Abort() override = 0;
- // SubscribePeerDisconnect is best effort and is not guaranteed to fire.
+ //! This callback is best effort and is not guaranteed to fire.
virtual void SubscribePeerDisconnect(TCallback<void()> callback) = 0;
};
diff --git a/yt/yt/core/rpc/bus/channel.cpp b/yt/yt/core/rpc/bus/channel.cpp
index f3959aed7b..294d654528 100644
--- a/yt/yt/core/rpc/bus/channel.cpp
+++ b/yt/yt/core/rpc/bus/channel.cpp
@@ -19,7 +19,7 @@
#include <yt/yt/core/concurrency/thread_affinity.h>
#include <yt/yt/core/misc/finally.h>
-#include <yt/yt/core/misc/atomic_object.h>
+#include <library/cpp/yt/threading/atomic_object.h>
#include <yt/yt/core/tracing/public.h>
diff --git a/yt/yt/core/rpc/per_user_request_queue_provider.h b/yt/yt/core/rpc/per_user_request_queue_provider.h
index 84a3b0389c..7a68315a01 100644
--- a/yt/yt/core/rpc/per_user_request_queue_provider.h
+++ b/yt/yt/core/rpc/per_user_request_queue_provider.h
@@ -7,7 +7,7 @@
#include <yt/yt/library/profiling/sensor.h>
#include <yt/yt/library/syncmap/map.h>
-#include <yt/yt/core/misc/atomic_object.h>
+#include <library/cpp/yt/threading/atomic_object.h>
namespace NYT::NRpc {
diff --git a/yt/yt/core/rpc/stream-inl.h b/yt/yt/core/rpc/stream-inl.h
index 4474158d4e..8f7ddd274b 100644
--- a/yt/yt/core/rpc/stream-inl.h
+++ b/yt/yt/core/rpc/stream-inl.h
@@ -43,11 +43,28 @@ TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream
{
auto invokeResult = request->Invoke().template As<void>();
auto metaHandlerResult = request->GetResponseAttachmentsStream()->Read()
- .Apply(metaHandler);
- return metaHandlerResult.Apply(BIND ([=] () {
+ .Apply(std::move(metaHandler));
+ return metaHandlerResult.Apply(BIND ([req = std::move(request), res = std::move(invokeResult)] () mutable {
return NDetail::CreateRpcClientOutputStreamFromInvokedRequest(
- std::move(request),
- std::move(invokeResult));
+ std::move(req),
+ std::move(res));
+ }));
+}
+
+template <class TRequestMessage, class TResponse>
+TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream(
+ TIntrusivePtr<TTypedClientRequest<TRequestMessage, TResponse>> request,
+ TCallback<void(TSharedRef)> metaHandler,
+ TCallback<void(TIntrusivePtr<TResponse>&&)> rspHandler)
+{
+ auto invokeResult = request->Invoke()
+ .ApplyUnique(std::move(rspHandler));
+ auto metaHandlerResult = request->GetResponseAttachmentsStream()->Read()
+ .Apply(std::move(metaHandler));
+ return metaHandlerResult.Apply(BIND ([req = std::move(request), res = std::move(invokeResult)] () mutable {
+ return NDetail::CreateRpcClientOutputStreamFromInvokedRequest(
+ std::move(req),
+ std::move(res));
}));
}
diff --git a/yt/yt/core/rpc/stream.h b/yt/yt/core/rpc/stream.h
index 93e54155a8..f2220e7425 100644
--- a/yt/yt/core/rpc/stream.h
+++ b/yt/yt/core/rpc/stream.h
@@ -265,6 +265,13 @@ TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream
TIntrusivePtr<TTypedClientRequest<TRequestMessage, TResponse>> request,
TCallback<void(TSharedRef)> metaHandler);
+//! This variant additionally allows non-trivial response of streaming request to be handled.
+template <class TRequestMessage, class TResponse>
+TFuture<NConcurrency::IAsyncZeroCopyOutputStreamPtr> CreateRpcClientOutputStream(
+ TIntrusivePtr<TTypedClientRequest<TRequestMessage, TResponse>> request,
+ TCallback<void(TSharedRef)> metaHandler,
+ TCallback<void(TIntrusivePtr<TResponse>&&)> rspHandler);
+
////////////////////////////////////////////////////////////////////////////////
//! Handles an incoming streaming request that uses the #CreateRpcClientInputStream
diff --git a/yt/yt/core/rpc/unittests/lib/test_service.cpp b/yt/yt/core/rpc/unittests/lib/test_service.cpp
index afd60131da..8ac8d4720d 100644
--- a/yt/yt/core/rpc/unittests/lib/test_service.cpp
+++ b/yt/yt/core/rpc/unittests/lib/test_service.cpp
@@ -21,6 +21,7 @@ using namespace NConcurrency;
////////////////////////////////////////////////////////////////////////////////
YT_DEFINE_GLOBAL(std::unique_ptr<NThreading::TEvent>, Latch_);
+YT_DEFINE_GLOBAL(std::atomic<int>, ConcurrentCalls_, 0);
////////////////////////////////////////////////////////////////////////////////
@@ -187,7 +188,7 @@ public:
{
try {
context->SetRequestInfo();
- TDelayedExecutor::WaitForDuration(TDuration::Seconds(2));
+ TDelayedExecutor::WaitForDuration(TDuration::Max());
context->Reply();
} catch (const TFiberCanceledException&) {
SlowCallCanceled_.Set();
@@ -202,6 +203,10 @@ public:
DECLARE_RPC_SERVICE_METHOD(NTestRpc, RequestBytesThrottledCall)
{
+ THROW_ERROR_EXCEPTION_UNLESS(ConcurrentCalls_().fetch_add(1) == 0, "Too many concurrent calls on entry!");
+ Sleep(TDuration::MilliSeconds(100));
+ THROW_ERROR_EXCEPTION_UNLESS(ConcurrentCalls_().fetch_sub(1) == 1, "Too many concurrent calls on exit!");
+
context->Reply();
}
diff --git a/yt/yt/core/rpc/unittests/rpc_ut.cpp b/yt/yt/core/rpc/unittests/rpc_ut.cpp
index f367c57db4..157ee8faaa 100644
--- a/yt/yt/core/rpc/unittests/rpc_ut.cpp
+++ b/yt/yt/core/rpc/unittests/rpc_ut.cpp
@@ -622,6 +622,8 @@ TYPED_TEST(TNotGrpcTest, Compression)
TYPED_TEST(TRpcTest, ResponseMemoryTag)
{
+ // FIXME: YT-23048
+ return;
static TMemoryTag testMemoryTag = 12345;
testMemoryTag++;
auto initialMemoryUsage = GetMemoryUsageForTag(testMemoryTag);
@@ -659,7 +661,7 @@ TYPED_TEST(TNotGrpcTest, RequestBytesThrottling)
methods = {
RequestBytesThrottledCall = {
request_bytes_throttler = {
- limit = 1000000;
+ limit = 100000;
}
}
}
@@ -673,18 +675,16 @@ TYPED_TEST(TNotGrpcTest, RequestBytesThrottling)
auto makeCall = [&] {
auto req = proxy.RequestBytesThrottledCall();
- req->Attachments().push_back(TSharedMutableRef::Allocate(100'000));
+ req->Attachments().push_back(TSharedMutableRef::Allocate(60'000));
return req->Invoke().AsVoid();
};
std::vector<TFuture<void>> futures;
- for (int i = 0; i < 30; ++i) {
+ for (int i = 0; i < 5; ++i) {
futures.push_back(makeCall());
}
- NProfiling::TWallTimer timer;
EXPECT_TRUE(AllSucceeded(std::move(futures)).Get().IsOK());
- EXPECT_LE(std::abs(static_cast<i64>(timer.GetElapsedTime().MilliSeconds()) - 3000), 200);
}
// Now test different types of errors.
diff --git a/yt/yt/core/test_framework/test_proxy_service.h b/yt/yt/core/test_framework/test_proxy_service.h
index 3f2c99415e..26c0bde173 100644
--- a/yt/yt/core/test_framework/test_proxy_service.h
+++ b/yt/yt/core/test_framework/test_proxy_service.h
@@ -10,7 +10,7 @@
#include <yt/yt/core/logging/log.h>
-#include <yt/yt/core/misc/atomic_object.h>
+#include <library/cpp/yt/threading/atomic_object.h>
#include <yt/yt/core/ytree/attributes.h>
diff --git a/yt/yt/core/yson/pull_parser_deserialize-inl.h b/yt/yt/core/yson/pull_parser_deserialize-inl.h
index b0f6b9d8ca..78e343d917 100644
--- a/yt/yt/core/yson/pull_parser_deserialize-inl.h
+++ b/yt/yt/core/yson/pull_parser_deserialize-inl.h
@@ -8,6 +8,8 @@
#include <yt/yt/core/yson/token_writer.h>
+#include <library/cpp/yt/misc/cast.h>
+
#include <vector>
namespace NYT::NYson {
@@ -239,6 +241,18 @@ void Deserialize(T& value, TYsonPullParserCursor* cursor)
}
}
+template <class T>
+requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T>
+void Deserialize(T& value, TYsonPullParserCursor* cursor)
+{
+ static_assert(CanFitSubtype<i64, std::underlying_type_t<T>>());
+
+ MaybeSkipAttributes(cursor);
+ EnsureYsonToken("enum", *cursor, EYsonItemType::Int64Value);
+ value = static_cast<T>(CheckedIntegralCast<std::underlying_type_t<T>>((*cursor)->UncheckedAsInt64()));
+ cursor->Next();
+}
+
// TCompactVector
template <class T, size_t N>
void Deserialize(TCompactVector<T, N>& value, TYsonPullParserCursor* cursor, std::enable_if_t<ArePullParserDeserializable<T>(), void*>)
diff --git a/yt/yt/core/yson/pull_parser_deserialize.h b/yt/yt/core/yson/pull_parser_deserialize.h
index 5f6215180b..669b976394 100644
--- a/yt/yt/core/yson/pull_parser_deserialize.h
+++ b/yt/yt/core/yson/pull_parser_deserialize.h
@@ -113,6 +113,9 @@ void Deserialize(
template <class T>
requires TEnumTraits<T>::IsEnum
void Deserialize(T& value, TYsonPullParserCursor* cursor);
+template <class T>
+ requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T>
+void Deserialize(T& value, TYsonPullParserCursor* cursor);
// TCompactVector.
template <class T, size_t N>
diff --git a/yt/yt/core/ytree/fluent.h b/yt/yt/core/ytree/fluent.h
index ec0c51fd7e..366ca9794d 100644
--- a/yt/yt/core/ytree/fluent.h
+++ b/yt/yt/core/ytree/fluent.h
@@ -88,7 +88,8 @@ namespace NYT::NYTree {
// and close map;
// * DoList(TFuncList func) -> TAny, same as DoMap();
// * DoListFor(TCollection collection, TFuncList func) -> TAny; same as DoMapFor().
-//
+// * DoAttributes(TFuncAttributes func) -> TAny, open attributes, delegate invocation
+// to a separate procedure and close attributes;
//
// TFluentMap:
// * Item(TStringBuf key) -> TAny, open an element keyed with `key`;
@@ -407,6 +408,14 @@ public:
this->Consumer,
TAnyWithoutAttributes<TParent>(this->Consumer, std::move(this->Parent)));
}
+
+ TAnyWithoutAttributes<TParent> DoAttributes(auto funcMap)
+ {
+ this->Consumer->OnBeginAttributes();
+ InvokeFluentFunc<TFluentAttributes<TFluentYsonVoid>>(funcMap, this->Consumer);
+ this->Consumer->OnEndAttributes();
+ return TAnyWithoutAttributes<TParent>(this->Consumer, std::move(this->Parent));
+ }
};
template <class TParent = TFluentYsonVoid>
diff --git a/yt/yt/core/ytree/serialize-inl.h b/yt/yt/core/ytree/serialize-inl.h
index fe1ce789e6..01fc90c73a 100644
--- a/yt/yt/core/ytree/serialize-inl.h
+++ b/yt/yt/core/ytree/serialize-inl.h
@@ -344,6 +344,14 @@ void Serialize(T value, NYson::IYsonConsumer* consumer)
}
}
+template <class T>
+ requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T>
+void Serialize(T value, NYson::IYsonConsumer* consumer)
+{
+ static_assert(CanFitSubtype<i64, std::underlying_type_t<T>>());
+ consumer->OnInt64Scalar(static_cast<i64>(value));
+}
+
// std::optional
template <class T>
void Serialize(const std::optional<T>& value, NYson::IYsonConsumer* consumer)
@@ -528,6 +536,23 @@ void Deserialize(T& value, INodePtr node)
}
}
+template <class T>
+ requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T>
+void Deserialize(T& value, INodePtr node)
+{
+ switch (node->GetType()) {
+ case ENodeType::Int64: {
+ // TODO: CheckedEnumCast via __PRETTY_FUNCTION__?
+ i64 serialized = node->AsInt64()->GetValue();
+ value = static_cast<T>(CheckedIntegralCast<std::underlying_type_t<T>>(serialized));
+ break;
+ }
+ default:
+ THROW_ERROR_EXCEPTION("Cannot deserialize enum from %Qlv node",
+ node->GetType());
+ }
+}
+
// std::optional
template <class T>
void Deserialize(std::optional<T>& value, INodePtr node)
diff --git a/yt/yt/core/ytree/serialize.h b/yt/yt/core/ytree/serialize.h
index 3aa784ac8d..2d5185f18d 100644
--- a/yt/yt/core/ytree/serialize.h
+++ b/yt/yt/core/ytree/serialize.h
@@ -109,6 +109,9 @@ void Serialize(IInputStream& input, NYson::IYsonConsumer* consumer);
template <class T>
requires TEnumTraits<T>::IsEnum
void Serialize(T value, NYson::IYsonConsumer* consumer);
+template <class T>
+ requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T>
+void Serialize(T value, NYson::IYsonConsumer* consumer);
// std::optional
template <class T>
@@ -218,6 +221,9 @@ void Deserialize(TGuid& value, INodePtr node);
template <class T>
requires TEnumTraits<T>::IsEnum
void Deserialize(T& value, INodePtr node);
+template <class T>
+ requires (!TEnumTraits<T>::IsEnum) && std::is_enum_v<T>
+void Deserialize(T& value, INodePtr node);
// std::optional
template <class T>
diff --git a/yt/yt/core/ytree/tree_builder.h b/yt/yt/core/ytree/tree_builder.h
index 2af4d56a8d..fd959280d7 100644
--- a/yt/yt/core/ytree/tree_builder.h
+++ b/yt/yt/core/ytree/tree_builder.h
@@ -52,4 +52,3 @@ std::unique_ptr<ITreeBuilder> CreateBuilderFromFactory(INodeFactory* factory);
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NYTree
-
diff --git a/yt/yt/core/ytree/unittests/serialize_ut.cpp b/yt/yt/core/ytree/unittests/serialize_ut.cpp
index ee36997f70..63bf71b0d2 100644
--- a/yt/yt/core/ytree/unittests/serialize_ut.cpp
+++ b/yt/yt/core/ytree/unittests/serialize_ut.cpp
@@ -51,6 +51,12 @@ DEFINE_BIT_ENUM(ETestBitEnum,
((Green) (0x0004))
);
+enum class EPlainTestEnum
+{
+ First,
+ Second,
+};
+
template <typename T>
T PullParserConvert(TYsonStringBuf s)
{
@@ -427,6 +433,13 @@ TEST(TSerializationTest, SerializableArcadiaEnum)
}
}
+TEST(TSerializationTest, PlainEnum)
+{
+ TestSerializationDeserialization(EPlainTestEnum::First);
+
+ TestSerializationDeserialization(static_cast<EPlainTestEnum>(42));
+}
+
TEST(TYTreeSerializationTest, Protobuf)
{
NProto::TTestMessage message;
diff --git a/yt/yt/core/ytree/unittests/ytree_fluent_ut.cpp b/yt/yt/core/ytree/unittests/ytree_fluent_ut.cpp
index 51f43f0e07..8b2e9b8970 100644
--- a/yt/yt/core/ytree/unittests/ytree_fluent_ut.cpp
+++ b/yt/yt/core/ytree/unittests/ytree_fluent_ut.cpp
@@ -15,8 +15,6 @@ using ::testing::StrictMock;
////////////////////////////////////////////////////////////////////////////////
-// TODO(sandello): Fix this test under clang.
-#ifndef __clang__
// String-like Scalars {{{
////////////////////////////////////////////////////////////////////////////////
@@ -187,7 +185,7 @@ TEST(TYTreeFluentMapTest, Items)
StrictMock<TMockYsonConsumer> mock;
InSequence dummy;
- auto node = ConvertToNode(TYsonString("{bar = 10}"));
+ auto node = ConvertToNode(TYsonString(TString("{bar = 10}")));
EXPECT_CALL(mock, OnBeginMap());
EXPECT_CALL(mock, OnKeyedItem("bar"));
@@ -275,7 +273,7 @@ TEST(TYTreeFluentListTest, Items)
StrictMock<TMockYsonConsumer> mock;
InSequence dummy;
- auto node = ConvertToNode(TYsonString("[10; 20; 30]"));
+ auto node = ConvertToNode(TYsonString(TString("[10; 20; 30]")));
EXPECT_CALL(mock, OnBeginList());
EXPECT_CALL(mock, OnListItem());
@@ -420,8 +418,37 @@ TEST(TYTreeFluentTest, Complex)
.EndList();
}
-////////////////////////////////////////////////////////////////////////////////
-#endif
+TEST(TYTreeFluentTest, DoMap)
+{
+ StrictMock<TMockYsonConsumer> mock;
+
+ EXPECT_CALL(mock, OnBeginMap());
+ EXPECT_CALL(mock, OnKeyedItem("key"));
+ EXPECT_CALL(mock, OnEntity);
+ EXPECT_CALL(mock, OnEndMap());
+
+ BuildYsonFluently(&mock)
+ .DoMap([] (TFluentMap map) {
+ map.Item("key").Entity();
+ });
+}
+
+TEST(TYTreeFluentTest, DoAttributes)
+{
+ StrictMock<TMockYsonConsumer> mock;
+
+ EXPECT_CALL(mock, OnBeginAttributes());
+ EXPECT_CALL(mock, OnKeyedItem("key"));
+ EXPECT_CALL(mock, OnStringScalar("value"));
+ EXPECT_CALL(mock, OnEndAttributes());
+ EXPECT_CALL(mock, OnEntity());
+
+ BuildYsonFluently(&mock)
+ .DoAttributes([] (TFluentAttributes attributes) {
+ attributes.Item("key").Value("value");
+ })
+ .Entity();
+}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/ytree/ypath_service.cpp b/yt/yt/core/ytree/ypath_service.cpp
index b078379c81..212fb9f76f 100644
--- a/yt/yt/core/ytree/ypath_service.cpp
+++ b/yt/yt/core/ytree/ypath_service.cpp
@@ -22,7 +22,7 @@
#include <yt/yt/core/concurrency/periodic_executor.h>
#include <yt/yt/core/misc/checksum.h>
-#include <yt/yt/core/misc/atomic_object.h>
+#include <library/cpp/yt/threading/atomic_object.h>
#include <library/cpp/yt/memory/atomic_intrusive_ptr.h>
diff --git a/yt/yt/library/decimal/decimal.cpp b/yt/yt/library/decimal/decimal.cpp
index e1674f84e2..3df4b44028 100644
--- a/yt/yt/library/decimal/decimal.cpp
+++ b/yt/yt/library/decimal/decimal.cpp
@@ -12,24 +12,263 @@ namespace NYT::NDecimal {
////////////////////////////////////////////////////////////////////////////////
+// We use the same type used for binary representation of 256 bit decimals for
+// implementing the necessary arithmetic operations.
+using i256 = TDecimal::TValue256;
+
+// We chose to only implement a small subset of operations and functions necessary
+// for converting 256 bit decimals to and from text.
+// Rationale: there do not seem to be any good implementations of 256-bit arithmetic
+// that we are comfortable depending on (for now). There are some big number
+// implementations around openssl, but these types are intended for arbitrarily
+// large integers and use dynamic allocations. It also does not make sense to commit
+// to implementing full-fledged 256-bit arithmetics when we actually use a minority
+// of operations.
+// When we have a good enough int256 either in library/util or in contrib, we can
+// switch to it easily.
+
+////////////////////////////////////////////////////////////////////////////////
+
+constexpr i256 operator-(i256 value) noexcept
+{
+ // Invert.
+ for (int partIndex = 0; partIndex < std::ssize(value.Parts); ++partIndex) {
+ value.Parts[partIndex] = ~value.Parts[partIndex];
+ }
+
+ // Add 1.
+ for (int partIndex = 0; partIndex < std::ssize(value.Parts) && ++value.Parts[partIndex] == 0; ++partIndex) { }
+
+ return value;
+}
+
+constexpr std::strong_ordering operator<=>(const i256& lhs, const i256& rhs)
+{
+ bool lhsIsNegative = lhs.Parts.back() & (1u << 31);
+ bool rhsIsNegative = rhs.Parts.back() & (1u << 31);
+
+ if (lhsIsNegative && !rhsIsNegative) {
+ return std::strong_ordering::less;
+ }
+
+ if (!lhsIsNegative && rhsIsNegative) {
+ return std::strong_ordering::greater;
+ }
+
+ for (int partIndex = std::ssize(lhs.Parts) - 1; partIndex >= 0; --partIndex) {
+ if (lhs.Parts[partIndex] != rhs.Parts[partIndex]) {
+ return lhs.Parts[partIndex] <=> rhs.Parts[partIndex];
+ }
+ }
+
+ return std::strong_ordering::equal;
+}
+
+// Not synthesized by default :(
+constexpr bool operator==(const i256& lhs, const i256& rhs)
+{
+ return lhs.Parts == rhs.Parts;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Some operations require working with an explicitly unsigned type, since they
+// might cause integer overflow, which is not very acceptable for signed types.
+// It is also easier to implement some arithmetic operations (like *= 10) by
+// shifting bits.
+struct TUnsignedValue256
+{
+ std::array<ui32, 8> Parts;
+};
+
+using ui256 = TUnsignedValue256;
+
+static_assert(sizeof(ui256) == sizeof(i256));
+
+////////////////////////////////////////////////////////////////////////////////
+
+constexpr bool operator==(const ui256& lhs, const ui256& rhs)
+{
+ return lhs.Parts == rhs.Parts;
+}
+
+constexpr ui256 operator+(ui256 lhs, const ui256& rhs)
+{
+ ui64 carry = 0;
+ for (int partIndex = 0; partIndex < std::ssize(lhs.Parts); ++partIndex) {
+ carry += lhs.Parts[partIndex];
+ carry += rhs.Parts[partIndex];
+ lhs.Parts[partIndex] = carry;
+ carry >>= 32;
+ }
+
+ return lhs;
+}
+
+template <int Shift>
+Y_FORCE_INLINE constexpr ui256 ShiftUp(ui256 value)
+{
+ static_assert(Shift >= 0 && Shift <= 32);
+
+ value.Parts.back() <<= Shift;
+ for (int partIndex = std::ssize(value.Parts) - 2; partIndex >= 0; --partIndex) {
+ value.Parts[partIndex + 1] |= value.Parts[partIndex] >> (32 - Shift);
+ value.Parts[partIndex] <<= Shift;
+ }
+
+ return value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
template <typename T>
constexpr bool ValidDecimalUnderlyingInteger =
std::is_same_v<T, i32> ||
std::is_same_v<T, i64> ||
- std::is_same_v<T, i128>;
+ std::is_same_v<T, i128> ||
+ std::is_same_v<T, i256>;
+
+template <typename T>
+constexpr bool ValidDecimalUnderlyingUnsignedInteger =
+ std::is_same_v<T, ui32> ||
+ std::is_same_v<T, ui64> ||
+ std::is_same_v<T, ui128> ||
+ std::is_same_v<T, ui256>;
+
+template <typename T>
+Y_FORCE_INLINE constexpr T GetNan()
+{
+ if constexpr (std::is_same_v<T, i256>) {
+ constexpr i32 i32Max = std::numeric_limits<i32>::max();
+ constexpr ui32 ui32Max = std::numeric_limits<ui32>::max();
+
+ return {ui32Max, ui32Max, ui32Max, ui32Max, ui32Max, ui32Max, ui32Max, i32Max};
+ } else {
+ return std::numeric_limits<T>::max();
+ }
+}
+
+template <typename T>
+Y_FORCE_INLINE constexpr T GetPlusInf()
+{
+ if constexpr (std::is_same_v<T, i256>) {
+ constexpr i32 i32Max = std::numeric_limits<i32>::max();
+ constexpr ui32 ui32Max = std::numeric_limits<ui32>::max();
+
+ return {ui32Max - 1, ui32Max, ui32Max, ui32Max, ui32Max, ui32Max, ui32Max, i32Max};
+ } else {
+ return std::numeric_limits<T>::max() - 1;
+ }
+}
template <typename T>
struct TDecimalTraits
{
static_assert(ValidDecimalUnderlyingInteger<T>);
- static constexpr T Nan = std::numeric_limits<T>::max();
- static constexpr T PlusInf = std::numeric_limits<T>::max() - 1;
+ static constexpr T Nan = GetNan<T>();
+ static constexpr T PlusInf = GetPlusInf<T>();
static constexpr T MinusInf = -PlusInf;
-
- static constexpr T MinSpecialValue = PlusInf;
};
+template <typename T>
+Y_FORCE_INLINE constexpr bool IsNegativeInteger(T value)
+{
+ static_assert(ValidDecimalUnderlyingInteger<T>);
+
+ if constexpr (std::is_same_v<T, i256>) {
+ return value.Parts.back() & (1u << 31);
+ } else {
+ return value < 0;
+ }
+}
+
+template <typename T>
+Y_FORCE_INLINE constexpr auto DecimalIntegerToUnsigned(T value)
+{
+ static_assert(ValidDecimalUnderlyingInteger<T>);
+
+ if constexpr (std::is_same_v<T, i256>) {
+ return ui256{value.Parts};
+ } else if constexpr (std::is_same_v<T, i128>) {
+ return ui128(value);
+ } else {
+ using TU = std::make_unsigned_t<T>;
+ return static_cast<TU>(value);
+ }
+}
+
+template <typename T>
+Y_FORCE_INLINE constexpr auto DecimalIntegerToSigned(T value)
+{
+ static_assert(ValidDecimalUnderlyingUnsignedInteger<T>);
+
+ if constexpr (std::is_same_v<T, ui256>) {
+ return i256{value.Parts};
+ } else if constexpr (std::is_same_v<T, ui128>) {
+ return i128(value);
+ } else {
+ using TS = std::make_signed_t<T>;
+ return static_cast<TS>(value);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! The functions below are used for implementing conversion to/from text to binary decimal.
+//! We actually do not need any arithmetic operations beside v *= 10, v /= 10, addition and negation.
+//! so they are the only ones actually implemented for our custom i256/ui256.
+
+template <typename T>
+Y_FORCE_INLINE constexpr auto FlipMSB(T value)
+{
+ static_assert(ValidDecimalUnderlyingInteger<T>);
+
+ if constexpr (std::is_same_v<T, i256>) {
+ value.Parts.back() ^= (1u << 31);
+ return value;
+ } else {
+ constexpr auto One = DecimalIntegerToUnsigned(T{1});
+ // Bit operations are only valid with unsigned types.
+ return T(DecimalIntegerToUnsigned(value) ^ (One << (sizeof(T) * 8 - 1)));
+ }
+}
+
+template <typename T>
+Y_FORCE_INLINE constexpr ui32 GetNextDigit(T value, T* nextValue)
+{
+ static_assert(ValidDecimalUnderlyingUnsignedInteger<T>);
+
+ if constexpr (std::is_same_v<T, ui256>) {
+ ui64 remainder = 0;
+ for (int partIndex = std::ssize(value.Parts) - 1; partIndex >= 0; --partIndex) {
+ // Everything should fit into long long since we are dividing by 10.
+ auto step = std::lldiv(value.Parts[partIndex] + (remainder << 32), 10);
+ value.Parts[partIndex] = step.quot;
+ remainder = step.rem;
+ }
+ *nextValue = value;
+ return remainder;
+ } else {
+ constexpr auto Ten = T{10};
+ *nextValue = value / Ten;
+ return static_cast<ui32>(value % Ten);
+ }
+}
+
+template <typename T>
+Y_FORCE_INLINE constexpr T MultiplyByTen(T value)
+{
+ static_assert(ValidDecimalUnderlyingUnsignedInteger<T>);
+
+ if constexpr (std::is_same_v<T, ui256>) {
+ // 2 * (4 * v + v) = 10v.
+ return ShiftUp<1>(ShiftUp<2>(value) + value);
+ } else {
+ return value * DecimalIntegerToUnsigned(10);
+ }
+}
+
////////////////////////////////////////////////////////////////////////////////
constexpr int GetDecimalBinaryValueSize(int precision)
@@ -39,34 +278,41 @@ constexpr int GetDecimalBinaryValueSize(int precision)
return 4;
} else if (precision <= 18) {
return 8;
- } else if (precision <= 35) {
+ } else if (precision <= 38) {
return 16;
+ } else if (precision <= 76) {
+ return 32;
}
}
return 0;
}
-static constexpr i128 DecimalIntegerMaxValueTable[] = {
- i128{0}, // 0
- i128{9}, // 1
- i128{99}, // 2
- i128{999}, // 3
- i128{9999}, // 4
- i128{99999}, // 5
- i128{999999}, // 6
- i128{9999999}, // 7
- i128{99999999}, // 8
- i128{999999999}, // 9
- i128{9999999999ul}, // 10
- i128{99999999999ul}, // 11
- i128{999999999999ul}, // 12
- i128{9999999999999ul}, // 13
- i128{99999999999999ul}, // 14
- i128{999999999999999ul}, // 15
- i128{9999999999999999ul}, // 16
- i128{99999999999999999ul}, // 17
- i128{999999999999999999ul}, // 18
+static constexpr i32 Decimal32IntegerMaxValueTable[] = {
+ 0, // 0
+ 9, // 1
+ 99, // 2
+ 999, // 3
+ 9999, // 4
+ 99999, // 5
+ 999999, // 6
+ 9999999, // 7
+ 99999999, // 8
+ 999999999, // 9
+};
+static constexpr i64 Decimal64IntegerMaxValueTable[] = {
+ 9999999999ul, // 10
+ 99999999999ul, // 11
+ 999999999999ul, // 12
+ 9999999999999ul, // 13
+ 99999999999999ul, // 14
+ 999999999999999ul, // 15
+ 9999999999999999ul, // 16
+ 99999999999999999ul, // 17
+ 999999999999999999ul, // 18
+};
+
+static constexpr i128 Decimal128IntegerMaxValueTable[] = {
// 128 bits
//
// Generated by fair Python script:
@@ -79,7 +325,7 @@ static constexpr i128 DecimalIntegerMaxValueTable[] = {
// hex_value[-16:],
// hex_value[:-16] or "0",
// precision))
- // for i in range(19, 36):
+ // for i in range(19, 39):
// print_max_decimal(i)
//
i128{static_cast<ui64>(0x8ac7230489e7fffful)} | (i128{static_cast<ui64>(0x0ul)} << 64), // 19
@@ -99,37 +345,106 @@ static constexpr i128 DecimalIntegerMaxValueTable[] = {
i128{static_cast<ui64>(0x38c15b09fffffffful)} | (i128{static_cast<ui64>(0x314dc6448d93ul)} << 64), // 33
i128{static_cast<ui64>(0x378d8e63fffffffful)} | (i128{static_cast<ui64>(0x1ed09bead87c0ul)} << 64), // 34
i128{static_cast<ui64>(0x2b878fe7fffffffful)} | (i128{static_cast<ui64>(0x13426172c74d82ul)} << 64), // 35
+ i128{static_cast<ui64>(0xb34b9f0ffffffffful)} | (i128{static_cast<ui64>(0xc097ce7bc90715ul)} << 64), // 36
+ i128{static_cast<ui64>(0x00f4369ffffffffful)} | (i128{static_cast<ui64>(0x785ee10d5da46d9ul)} << 64), // 37
+ i128{static_cast<ui64>(0x098a223ffffffffful)} | (i128{static_cast<ui64>(0x4b3b4ca85a86c47aul)} << 64), // 38
+};
+
+static constexpr i256 Decimal256IntegerMaxValueTable[] = {
+ // 256 bits
+ //
+ // Generated by fair Python script:
+ //
+ // def print_max_decimal(precision):
+ // max_value = int("9" * precision)
+ // hex_value = hex(max_value)[2:] # strip 0x
+ // hex_value = hex_value.strip("L")
+ // parts = [hex_value[-8 * i:-8 * (i - 1) if i > 1 else len(hex_value)] or "0" for i in range(1, 9)]
+ // assert sum(int(v, 16) * 2 ** (32 * i) for i, v in enumerate(parts)) == max_value
+ // assert int(parts[-1], 16) < 2 ** 31 - 1
+ // joined_parts = ", ".join(f"static_cast<ui32>(0x{part}u)" for part in parts)
+ // print(f"{{{joined_parts}}}, // {precision}")
+ //
+ // for i in range(39, 77):
+ // print_max_decimal(i)
+ //
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x5f65567fu), static_cast<ui32>(0x8943acc4u), static_cast<ui32>(0xf050fe93u), static_cast<ui32>(0x2u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 39
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xb9f560ffu), static_cast<ui32>(0x5ca4bfabu), static_cast<ui32>(0x6329f1c3u), static_cast<ui32>(0x1du), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 40
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x4395c9ffu), static_cast<ui32>(0x9e6f7cb5u), static_cast<ui32>(0xdfa371a1u), static_cast<ui32>(0x125u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 41
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xa3d9e3ffu), static_cast<ui32>(0x305adf14u), static_cast<ui32>(0xbc627050u), static_cast<ui32>(0xb7au), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 42
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x6682e7ffu), static_cast<ui32>(0xe38cb6ceu), static_cast<ui32>(0x5bd86321u), static_cast<ui32>(0x72cbu), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 43
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x011d0fffu), static_cast<ui32>(0xe37f2410u), static_cast<ui32>(0x9673df52u), static_cast<ui32>(0x47bf1u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 44
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x0b229fffu), static_cast<ui32>(0xe2f768a0u), static_cast<ui32>(0xe086b93cu), static_cast<ui32>(0x2cd76fu), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 45
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x6f5a3fffu), static_cast<ui32>(0xddaa1640u), static_cast<ui32>(0xc5433c60u), static_cast<ui32>(0x1c06a5eu), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 46
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x59867fffu), static_cast<ui32>(0xa8a4de84u), static_cast<ui32>(0xb4a05bc8u), static_cast<ui32>(0x118427b3u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 47
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x7f40ffffu), static_cast<ui32>(0x9670b12bu), static_cast<ui32>(0x0e4395d6u), static_cast<ui32>(0xaf298d05u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 48
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xf889ffffu), static_cast<ui32>(0xe066ebb2u), static_cast<ui32>(0x8ea3da61u), static_cast<ui32>(0xd79f8232u), static_cast<ui32>(0x6u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 49
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xb563ffffu), static_cast<ui32>(0xc40534fdu), static_cast<ui32>(0x926687d2u), static_cast<ui32>(0x6c3b15f9u), static_cast<ui32>(0x44u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 50
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x15e7ffffu), static_cast<ui32>(0xa83411e9u), static_cast<ui32>(0xb8014e3bu), static_cast<ui32>(0x3a4edbbfu), static_cast<ui32>(0x2acu), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 51
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xdb0fffffu), static_cast<ui32>(0x9208b31au), static_cast<ui32>(0x300d0e54u), static_cast<ui32>(0x4714957du), static_cast<ui32>(0x1abau), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 52
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x8e9fffffu), static_cast<ui32>(0xb456ff0cu), static_cast<ui32>(0xe0828f4du), static_cast<ui32>(0xc6cdd6e3u), static_cast<ui32>(0x10b46u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 53
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x923fffffu), static_cast<ui32>(0x0b65f67du), static_cast<ui32>(0xc5199909u), static_cast<ui32>(0xc40a64e6u), static_cast<ui32>(0xa70c3u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 54
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xb67fffffu), static_cast<ui32>(0x71fba0e7u), static_cast<ui32>(0xb2fffa5au), static_cast<ui32>(0xa867f103u), static_cast<ui32>(0x6867a5u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 55
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x20ffffffu), static_cast<ui32>(0x73d4490du), static_cast<ui32>(0xfdffc788u), static_cast<ui32>(0x940f6a24u), static_cast<ui32>(0x4140c78u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 56
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x49ffffffu), static_cast<ui32>(0x864ada83u), static_cast<ui32>(0xebfdcb54u), static_cast<ui32>(0xc89a2571u), static_cast<ui32>(0x28c87cb5u), static_cast<ui32>(0x0u), static_cast<ui32>(0x0u)}, // 57
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xe3ffffffu), static_cast<ui32>(0x3eec8920u), static_cast<ui32>(0x37e9f14du), static_cast<ui32>(0xd6057673u), static_cast<ui32>(0x97d4df19u), static_cast<ui32>(0x1u), static_cast<ui32>(0x0u)}, // 58
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xe7ffffffu), static_cast<ui32>(0x753d5b48u), static_cast<ui32>(0x2f236d04u), static_cast<ui32>(0x5c36a080u), static_cast<ui32>(0xee50b702u), static_cast<ui32>(0xfu), static_cast<ui32>(0x0u)}, // 59
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x0fffffffu), static_cast<ui32>(0x946590d9u), static_cast<ui32>(0xd762422cu), static_cast<ui32>(0x9a224501u), static_cast<ui32>(0x4f272617u), static_cast<ui32>(0x9fu), static_cast<ui32>(0x0u)}, // 60
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x9fffffffu), static_cast<ui32>(0xcbf7a87au), static_cast<ui32>(0x69d695bdu), static_cast<ui32>(0x0556b212u), static_cast<ui32>(0x17877cecu), static_cast<ui32>(0x639u), static_cast<ui32>(0x0u)}, // 61
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x3fffffffu), static_cast<ui32>(0xf7ac94cau), static_cast<ui32>(0x2261d969u), static_cast<ui32>(0x3562f4b8u), static_cast<ui32>(0xeb4ae138u), static_cast<ui32>(0x3e3au), static_cast<ui32>(0x0u)}, // 62
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x7fffffffu), static_cast<ui32>(0xacbdcfe6u), static_cast<ui32>(0x57d27e23u), static_cast<ui32>(0x15dd8f31u), static_cast<ui32>(0x30eccc32u), static_cast<ui32>(0x26e4du), static_cast<ui32>(0x0u)}, // 63
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xbf6a1f00u), static_cast<ui32>(0x6e38ed64u), static_cast<ui32>(0xdaa797edu), static_cast<ui32>(0xe93ff9f4u), static_cast<ui32>(0x184f03u), static_cast<ui32>(0x0u)}, // 64
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x7a253609u), static_cast<ui32>(0x4e3945efu), static_cast<ui32>(0x8a8bef46u), static_cast<ui32>(0x1c7fc390u), static_cast<ui32>(0xf31627u), static_cast<ui32>(0x0u)}, // 65
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xc5741c63u), static_cast<ui32>(0x0e3cbb5au), static_cast<ui32>(0x697758bfu), static_cast<ui32>(0x1cfda3a5u), static_cast<ui32>(0x97edd87u), static_cast<ui32>(0x0u)}, // 66
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xb6891be7u), static_cast<ui32>(0x8e5f518bu), static_cast<ui32>(0x1ea97776u), static_cast<ui32>(0x21e86476u), static_cast<ui32>(0x5ef4a747u), static_cast<ui32>(0x0u)}, // 67
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x215b170fu), static_cast<ui32>(0x8fb92f75u), static_cast<ui32>(0x329eaaa1u), static_cast<ui32>(0x5313ec9du), static_cast<ui32>(0xb58e88c7u), static_cast<ui32>(0x3u)}, // 68
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x4d8ee69fu), static_cast<ui32>(0x9d3bda93u), static_cast<ui32>(0xfa32aa4fu), static_cast<ui32>(0x3ec73e23u), static_cast<ui32>(0x179157c9u), static_cast<ui32>(0x25u)}, // 69
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x0795023fu), static_cast<ui32>(0x245689c1u), static_cast<ui32>(0xc5faa71cu), static_cast<ui32>(0x73c86d67u), static_cast<ui32>(0xebad6ddcu), static_cast<ui32>(0x172u)}, // 70
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x4bd2167fu), static_cast<ui32>(0x6b61618au), static_cast<ui32>(0xbbca8719u), static_cast<ui32>(0x85d4460du), static_cast<ui32>(0x34c64a9cu), static_cast<ui32>(0xe7du)}, // 71
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xf634e0ffu), static_cast<ui32>(0x31cdcf66u), static_cast<ui32>(0x55e946feu), static_cast<ui32>(0x3a4abc89u), static_cast<ui32>(0x0fbeea1du), static_cast<ui32>(0x90e4u)}, // 72
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x9e10c9ffu), static_cast<ui32>(0xf20a1a05u), static_cast<ui32>(0x5b1cc5edu), static_cast<ui32>(0x46eb5d5du), static_cast<ui32>(0x9d752524u), static_cast<ui32>(0x5a8e8u)}, // 73
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x2ca7e3ffu), static_cast<ui32>(0x74650438u), static_cast<ui32>(0x8f1fbb4bu), static_cast<ui32>(0xc531a5a5u), static_cast<ui32>(0x2693736au), static_cast<ui32>(0x389916u)}, // 74
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xbe8ee7ffu), static_cast<ui32>(0x8bf22a31u), static_cast<ui32>(0x973d50f2u), static_cast<ui32>(0xb3f07877u), static_cast<ui32>(0x81c2822bu), static_cast<ui32>(0x235faddu)}, // 75
+ {static_cast<ui32>(0xffffffffu), static_cast<ui32>(0xffffffffu), static_cast<ui32>(0x71950fffu), static_cast<ui32>(0x7775a5f1u), static_cast<ui32>(0xe8652979u), static_cast<ui32>(0x0764b4abu), static_cast<ui32>(0x119915b5u), static_cast<ui32>(0x161bcca7u)}, // 76
};
template<typename T>
-Y_FORCE_INLINE constexpr T GetDecimalMaxIntegerValue(int precision)
+Y_FORCE_INLINE constexpr auto GetDecimalMaxIntegerValue(int precision)
{
static_assert(ValidDecimalUnderlyingInteger<T>);
- if (TDecimal::GetValueBinarySize(precision) <= sizeof(T)) {
- return DecimalIntegerMaxValueTable[precision];
- } else {
+ if (TDecimal::GetValueBinarySize(precision) > static_cast<int>(sizeof(T))) {
YT_ABORT();
}
-}
-template <typename T>
-Y_FORCE_INLINE constexpr auto DecimalIntegerToUnsigned(T value)
-{
- static_assert(ValidDecimalUnderlyingInteger<T>);
-
- if constexpr (std::is_same_v<T, i128>) {
- return ui128(value);
+ if constexpr (std::is_same_v<T, i32>) {
+ YT_VERIFY(precision <= 9);
+ return Decimal32IntegerMaxValueTable[precision];
+ } else if constexpr (std::is_same_v<T, i64>) {
+ YT_VERIFY(precision >= 10 && precision <= 18);
+ return Decimal64IntegerMaxValueTable[precision - 10];
+ } else if constexpr (std::is_same_v<T, i128>) {
+ YT_VERIFY(precision >= 19 && precision <= 38);
+ return Decimal128IntegerMaxValueTable[precision - 19];
+ } else if constexpr (std::is_same_v<T, i256>) {
+ YT_VERIFY(precision >= 39 && precision <= 76);
+ return Decimal256IntegerMaxValueTable[precision - 39];
} else {
- using TU = std::make_unsigned_t<T>;
- return static_cast<TU>(value);
+ YT_ABORT();
}
}
template <typename T>
static Y_FORCE_INLINE T DecimalHostToInet(T value)
{
- if constexpr (std::is_same_v<T, i128> || std::is_same_v<T, ui128>) {
+ if constexpr (std::is_same_v<T, i256> || std::is_same_v<T, ui256>) {
+ for (int partIndex = 0; partIndex < std::ssize(value.Parts) / 2; ++partIndex) {
+ value.Parts[partIndex] = ::HostToInet(value.Parts[partIndex]);
+ value.Parts[std::size(value.Parts) - 1 - partIndex] = ::HostToInet(value.Parts[std::size(value.Parts) - 1 - partIndex]);
+ std::swap(value.Parts[partIndex], value.Parts[std::size(value.Parts) - 1 - partIndex]);
+ }
+ return value;
+ } else if constexpr (std::is_same_v<T, i128> || std::is_same_v<T, ui128>) {
return T(::HostToInet(GetLow(value)), ::HostToInet(GetHigh(value)));
} else {
return ::HostToInet(value);
@@ -139,7 +454,14 @@ static Y_FORCE_INLINE T DecimalHostToInet(T value)
template <typename T>
static Y_FORCE_INLINE T DecimalInetToHost(T value)
{
- if constexpr (std::is_same_v<T, i128> || std::is_same_v<T, ui128>) {
+ if constexpr (std::is_same_v<T, i256> || std::is_same_v<T, ui256>) {
+ for (int partIndex = 0; partIndex < std::ssize(value.Parts) / 2; ++partIndex) {
+ value.Parts[partIndex] = ::InetToHost(value.Parts[partIndex]);
+ value.Parts[std::size(value.Parts) - 1 - partIndex] = ::InetToHost(value.Parts[std::size(value.Parts) - 1 - partIndex]);
+ std::swap(value.Parts[partIndex], value.Parts[std::size(value.Parts) - 1 - partIndex]);
+ }
+ return value;
+ } else if constexpr (std::is_same_v<T, i128> || std::is_same_v<T, ui128>) {
return T(::InetToHost(GetLow(value)), ::InetToHost(GetHigh(value)));
} else {
return ::InetToHost(value);
@@ -151,22 +473,14 @@ static T DecimalBinaryToIntegerUnchecked(TStringBuf binaryValue)
{
T result;
memcpy(&result, binaryValue.data(), sizeof(result));
- result = DecimalInetToHost(result);
-
- constexpr auto one = DecimalIntegerToUnsigned(T{1});
- result = static_cast<T>(DecimalIntegerToUnsigned(result) ^ (one << (sizeof(T) * 8 - 1)));
-
- return result;
+ return FlipMSB(DecimalInetToHost(result));
}
template<typename T>
static void DecimalIntegerToBinaryUnchecked(T decodedValue, void* buf)
{
- auto unsignedValue = DecimalIntegerToUnsigned(decodedValue);
- constexpr auto one = DecimalIntegerToUnsigned(T{1});
- unsignedValue ^= (one << (sizeof(T) * 8 - 1));
- unsignedValue = DecimalHostToInet(unsignedValue);
- memcpy(buf, &unsignedValue, sizeof(unsignedValue));
+ auto preparedValue = DecimalHostToInet(FlipMSB(decodedValue));
+ memcpy(buf, &preparedValue, sizeof(preparedValue));
}
static void CheckDecimalValueSize(TStringBuf value, int precision, int scale)
@@ -192,12 +506,6 @@ static Y_FORCE_INLINE TStringBuf PlaceOnBuffer(TStringBuf value, char* buffer)
template<typename T>
static TStringBuf WriteTextDecimalUnchecked(T decodedValue, int scale, char* buffer)
{
- i8 digits[std::numeric_limits<T>::digits + 1] = {0,};
- static constexpr auto ten = DecimalIntegerToUnsigned(T{10});
-
- const bool negative = decodedValue < 0;
- auto absValue = DecimalIntegerToUnsigned(negative ? -decodedValue : decodedValue);
-
if (decodedValue == TDecimalTraits<T>::MinusInf) {
static constexpr TStringBuf minusInf = "-inf";
return PlaceOnBuffer(minusInf, buffer);
@@ -209,10 +517,14 @@ static TStringBuf WriteTextDecimalUnchecked(T decodedValue, int scale, char* buf
return PlaceOnBuffer(nan, buffer);
}
+ i8 digits[TDecimal::MaxTextSize] = {0,};
+
+ bool negative = IsNegativeInteger(decodedValue);
+ auto absValue = DecimalIntegerToUnsigned(negative ? -decodedValue : decodedValue);
+
auto* curDigit = digits;
- while (absValue > 0) {
- *curDigit = static_cast<int>(absValue % ten);
- absValue = absValue / ten;
+ while (absValue != DecimalIntegerToUnsigned(T{0})) {
+ *curDigit = GetNextDigit(absValue, &absValue);
curDigit++;
}
YT_VERIFY(curDigit <= digits + std::size(digits));
@@ -313,35 +625,40 @@ T DecimalTextToInteger(TStringBuf textValue, int precision, int scale)
break;
}
- T result = 0;
+ // This value can overflow in the process of parsing the text value.
+ // We do throw an exception later, but UB is not good for your health.
+ auto result = DecimalIntegerToUnsigned(T{0});
int beforePoint = 0;
int afterPoint = 0;
+
+ auto addDigit = [&] (auto digit) {
+ // We use this type to avoid warnings about casting signed types to unsigned/narrowing ints.
+ // Ugly, but this way we don't need to define cumbersome constructors for TValue256.
+ ui16 currentDigit = *digit - '0';
+ if (currentDigit < 0 || currentDigit > 9) {
+ ThrowInvalidDecimal(textValue, precision, scale);
+ }
+
+ result = MultiplyByTen(result);
+ result = result + DecimalIntegerToUnsigned(T{currentDigit});
+ };
+
for (; cur != end; ++cur) {
if (*cur == '.') {
++cur;
for (; cur != end; ++cur) {
- int currentDigit = *cur - '0';
- result *= 10;
- result += currentDigit;
+ addDigit(cur);
++afterPoint;
- if (currentDigit < 0 || currentDigit > 9) {
- ThrowInvalidDecimal(textValue, precision, scale);
- }
}
break;
}
- int currentDigit = *cur - '0';
- result *= 10;
- result += currentDigit;
+ addDigit(cur);
++beforePoint;
- if (currentDigit < 0 || currentDigit > 9) {
- ThrowInvalidDecimal(textValue, precision, scale);
- }
}
for (; afterPoint < scale; ++afterPoint) {
- result *= 10;
+ result = MultiplyByTen(result);
}
if (afterPoint > scale) {
@@ -352,7 +669,10 @@ T DecimalTextToInteger(TStringBuf textValue, int precision, int scale)
ThrowInvalidDecimal(textValue, precision, scale, "too many digits before decimal point");
}
- return negative ? -result : result;
+ // This cast is guaranteed by the checks above to be correct.
+ auto signedResult = DecimalIntegerToSigned(result);
+ // This is safe: the range of representable values fits into [-signed_max, signed_max] for each underlying type.
+ return negative ? -signedResult : signedResult;
}
template<typename T>
@@ -374,6 +694,8 @@ TStringBuf TDecimal::BinaryToText(TStringBuf binaryDecimal, int precision, int s
return DecimalBinaryToTextUncheckedImpl<i64>(binaryDecimal, scale, buffer);
case 16:
return DecimalBinaryToTextUncheckedImpl<i128>(binaryDecimal, scale, buffer);
+ case 32:
+ return DecimalBinaryToTextUncheckedImpl<i256>(binaryDecimal, scale, buffer);
}
CheckDecimalValueSize(binaryDecimal, precision, scale);
YT_ABORT();
@@ -410,8 +732,10 @@ TStringBuf TDecimal::TextToBinary(TStringBuf textValue, int precision, int scale
return TextToBinaryImpl<i64>(textValue, precision, scale, buffer);
case 16:
return TextToBinaryImpl<i128>(textValue, precision, scale, buffer);
+ case 32:
+ return TextToBinaryImpl<i256>(textValue, precision, scale, buffer);
default:
- static_assert(GetDecimalBinaryValueSize(TDecimal::MaxPrecision) == 16);
+ static_assert(GetDecimalBinaryValueSize(TDecimal::MaxPrecision) == 32);
YT_ABORT();
}
}
@@ -444,7 +768,7 @@ static void ValidateDecimalBinaryValueImpl(TStringBuf binaryDecimal, int precisi
{
T decoded = DecimalBinaryToIntegerUnchecked<T>(binaryDecimal);
- const T maxValue = static_cast<T>(DecimalIntegerMaxValueTable[precision]);
+ auto maxValue = GetDecimalMaxIntegerValue<T>(precision);
if (-maxValue <= decoded && decoded <= maxValue) {
return;
@@ -478,11 +802,14 @@ void TDecimal::ValidateBinaryValue(TStringBuf binaryDecimal, int precision, int
return ValidateDecimalBinaryValueImpl<i64>(binaryDecimal, precision, scale);
case 16:
return ValidateDecimalBinaryValueImpl<i128>(binaryDecimal, precision, scale);
+ case 32:
+ return ValidateDecimalBinaryValueImpl<i256>(binaryDecimal, precision, scale);
default:
- static_assert(GetDecimalBinaryValueSize(TDecimal::MaxPrecision) == 16);
+ static_assert(GetDecimalBinaryValueSize(TDecimal::MaxPrecision) == 32);
YT_ABORT();
}
}
+
template <typename T>
Y_FORCE_INLINE void CheckDecimalIntBits(int precision)
{
@@ -495,6 +822,14 @@ Y_FORCE_INLINE void CheckDecimalIntBits(int precision)
}
}
+Y_FORCE_INLINE void CheckDecimalFitsInto128Bits(int precision)
+{
+ if (precision > 38) {
+ THROW_ERROR_EXCEPTION("Decimal<%v, ?> does not fit into int128",
+ precision);
+ }
+}
+
int TDecimal::GetValueBinarySize(int precision)
{
const auto result = GetDecimalBinaryValueSize(precision);
@@ -535,7 +870,7 @@ TStringBuf TDecimal::WriteBinary128(int precision, TValue128 value, char* buffer
return TStringBuf{buffer, sizeof(TValue128)};
}
-TStringBuf TDecimal::WriteBinaryVariadic(int precision, TValue128 value, char* buffer, size_t bufferLength)
+TStringBuf TDecimal::WriteBinary128Variadic(int precision, TValue128 value, char* buffer, size_t bufferLength)
{
const size_t resultLength = GetValueBinarySize(precision);
switch (resultLength) {
@@ -550,6 +885,16 @@ TStringBuf TDecimal::WriteBinaryVariadic(int precision, TValue128 value, char* b
}
}
+TStringBuf TDecimal::WriteBinary256(int precision, TValue256 value, char* buffer, size_t bufferLength)
+{
+ const size_t resultLength = GetValueBinarySize(precision);
+ CheckDecimalIntBits<TValue256>(precision);
+ YT_VERIFY(bufferLength >= resultLength);
+
+ DecimalIntegerToBinaryUnchecked(std::move(value), buffer);
+ return TStringBuf{buffer, sizeof(TValue256)};
+}
+
template <typename T>
Y_FORCE_INLINE void CheckBufferLength(int precision, size_t bufferLength)
{
@@ -581,6 +926,12 @@ TDecimal::TValue128 TDecimal::ParseBinary128(int precision, TStringBuf buffer)
return {GetLow(result), static_cast<i64>(GetHigh(result))};
}
+TDecimal::TValue256 TDecimal::ParseBinary256(int precision, TStringBuf buffer)
+{
+ CheckBufferLength<i256>(precision, buffer.Size());
+ return DecimalBinaryToIntegerUnchecked<i256>(buffer);
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NDecimal
diff --git a/yt/yt/library/decimal/decimal.h b/yt/yt/library/decimal/decimal.h
index 8a6cf34a8f..27375d3904 100644
--- a/yt/yt/library/decimal/decimal.h
+++ b/yt/yt/library/decimal/decimal.h
@@ -5,6 +5,8 @@
#include <util/system/defaults.h>
#include <util/generic/string.h>
+#include <array>
+
namespace NYT::NDecimal {
////////////////////////////////////////////////////////////////////////////////
@@ -12,6 +14,9 @@ namespace NYT::NDecimal {
class TDecimal
{
public:
+ //! Both types defined below represent signed values. They are only intended
+ //! to be used for manipulating binary data, since they don't actually expose
+ //! any arithmetic operations. You should avoid dealing with individual parts.
struct TValue128
{
ui64 Low;
@@ -19,15 +24,21 @@ public:
};
static_assert(sizeof(TValue128) == 2 * sizeof(ui64));
+ struct TValue256
+ {
+ std::array<ui32, 8> Parts;
+ };
+ static_assert(sizeof(TValue256) == 4 * sizeof(ui64));
+
public:
- // Maximum precision supported by YT
- static constexpr int MaxPrecision = 35;
- static constexpr int MaxBinarySize = 16;
+ //! Maximum precision supported by YT.
+ static constexpr int MaxPrecision = 76;
+ static constexpr int MaxBinarySize = 32;
// NB. Sometimes we print values that exceed MaxPrecision (e.g. in error messages)
- // MaxTextSize is chosen so we can print ANY i128 number as decimal.
+ // MaxTextSize is chosen so we can print ANY i256 number as decimal.
static constexpr int MaxTextSize =
- std::numeric_limits<ui128>::digits + 1 // max number of digits in ui128 number
+ 77 // length of 2^63 in decimal form
+ 1 // possible decimal point
+ 1; // possible minus sign
@@ -49,13 +60,15 @@ public:
static TStringBuf WriteBinary32(int precision, i32 value, char* buffer, size_t bufferLength);
static TStringBuf WriteBinary64(int precision, i64 value, char* buffer, size_t bufferLength);
static TStringBuf WriteBinary128(int precision, TValue128 value, char* buffer, size_t bufferLength);
+ static TStringBuf WriteBinary256(int precision, TValue256 value, char* buffer, size_t bufferLength);
// Writes either 32-bit, 64-bit or 128-bit binary value depending on precision, provided a TValue128.
- static TStringBuf WriteBinaryVariadic(int precision, TValue128 value, char* buffer, size_t bufferLength);
+ static TStringBuf WriteBinary128Variadic(int precision, TValue128 value, char* buffer, size_t bufferLength);
static i32 ParseBinary32(int precision, TStringBuf buffer);
static i64 ParseBinary64(int precision, TStringBuf buffer);
static TValue128 ParseBinary128(int precision, TStringBuf buffer);
+ static TValue256 ParseBinary256(int precision, TStringBuf buffer);
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/library/decimal/unittests/decimal_ut.cpp b/yt/yt/library/decimal/unittests/decimal_ut.cpp
index ba8be4ab15..50c550707c 100644
--- a/yt/yt/library/decimal/unittests/decimal_ut.cpp
+++ b/yt/yt/library/decimal/unittests/decimal_ut.cpp
@@ -98,6 +98,150 @@ TEST(TDecimal, TestTextBinaryConversion)
"61449825198266175750309883089040771",
"800BD5B5D5F0C73E0C9CD4943298B583");
+ // A few more test cases with big numbers for various precisions generated by python snippet:
+ // import random
+ // def print_test_case(plus, binary_size, precision):
+ // textval = "".join(random.choice("0123456789") for _ in range(precision))
+ // if not plus:
+ // textval = "-" + textval
+ // binval = hex(2 ** (binary_size * 8 - 1) + int(textval))
+ // binval = binval[2:].strip('L') # strip 0x and final 'L'
+ // binval = binval.upper()
+ // print(
+ // "TEST_TEXT_BINARY_CONVERSION(\n"
+ // " {precision}, 0,\n"
+ // " \"{text}\",\n"
+ // " \"{binary}\");\n"
+ // .format(precision=precision, text=textval.lstrip("0"), binary=binval)
+ // )
+ // random.seed(42)
+ //
+ // for binary_size, precision in (4, 9), (8, 18), (16, 35), (16, 38), (32, 39), (32, 76):
+ // print_test_case(False, binary_size, precision)
+ // print_test_case(False, binary_size, precision)
+ // print_test_case(True, binary_size, precision)
+ // print_test_case(True, binary_size, precision)
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 9, 0,
+ "-104332181",
+ "79C8046B");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 9, 0,
+ "-960013389",
+ "46C75BB3");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 9, 0,
+ "83863794",
+ "84FFA8F2");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 9, 0,
+ "26542351",
+ "8195010F");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 18, 0,
+ "-161559407816184959",
+ "7DC2069316FE6B81");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 18, 0,
+ "-310341316475255341",
+ "7BB17237885D85D3");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 18, 0,
+ "928327648350305641",
+ "8CE21513E317FD69");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 18, 0,
+ "395376724238849696",
+ "857CA90930B6D6A0");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 35, 0,
+ "-53287101226916697848018451462704828",
+ "7FF5BCBE39B5F1A05ED5F0135FD2B144");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 35, 0,
+ "-14893252880957015430391171822782489",
+ "7FFD21B4B88705D84E5A0ECF560DF7E7");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 35, 0,
+ "63834657871331509839301031051834738",
+ "800C4B4AA7E81EF6726D934AA12AB572");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 35, 0,
+ "29973763116566701065133387262473178",
+ "8005C5D2141747EF1198888FB96033DA");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 38, 0,
+ "-10801326773602606474687234309805009788",
+ "77DFBD795731FA57AD40A913BEB49484");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 38, 0,
+ "-20812191361939909169985435346247510799",
+ "7057B7BE1F13B6F1D6DB2F569BA994F1");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 38, 0,
+ "11838425135427849808412411824493534874",
+ "88E7FF6C4CB25E5053056568535B9E9A");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 38, 0,
+ "1640052427868011280598262045053315869",
+ "813BDCD3E2BEF11A4624071A88406B1D");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 39, 0,
+ "-232260256342160733754330365414586850142",
+ "7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF51444D3EF60040D9A177B4EBDCD1E4A2");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 39, 0,
+ "-940196556981693406088356159514846564823",
+ "7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFD3CACBACF9473427D637FA1697111EE29");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 39, 0,
+ "662994680443699577738721489513433200379",
+ "80000000000000000000000000000001F2C8217C5832413CEDC2D8713542F6FB");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 39, 0,
+ "176936763201632870831727889579868727743",
+ "80000000000000000000000000000000851CC7F2FA54AF2AC8A1868AB524A9BF");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 76, 0,
+ "-4873471434558122362316658760366909670546688937346706562729806990162720465375",
+ "7539B681CC20CEEDAB96C358A49B105EB5C51C958B66D69F946E52B2C4CBDE21");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 76, 0,
+ "-5646417080531003309232719374529912419049663193149190586518506716572628498776",
+ "73843DBE5B00AA3065C94D18C3DE5B54ABBCB7AD4AB9775BD7308BA12A14C6A8");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 76, 0,
+ "9453147379965075273545494808313678377701436349578856855744431351823374989413",
+ "94E64AB40D1C30A9A2BFEA9C96B7584F559D5572641F0C90E86764F3187BB865");
+
+ TEST_TEXT_BINARY_CONVERSION(
+ 76, 0,
+ "4352408240084271094777520471167190229413186999386774964990913341232812067974",
+ "899F603224EC624E2351628FF4637023C84FDBB0E2BBCBBB9395D8DD19B30C86");
+
#undef TEST_TEXT_BINARY_CONVERSION
}
@@ -127,20 +271,27 @@ TEST(TDecimal, TestPrecisionScaleLimits)
EXPECT_THROW_WITH_SUBSTRING(TDecimal::TextToBinary("314.15", 5, 3), "too many digits before decimal point");
EXPECT_THROW_WITH_SUBSTRING(TDecimal::TextToBinary("-314.15", 5, 3), "too many digits before decimal point");
+ // This group of tests checks that text values which cause signed overflow throw valid exceptions.
+ // The values for each precision are equal to 2^(binary_size - 1) + 0.11.
+ EXPECT_THROW_WITH_SUBSTRING(TDecimal::TextToBinary("2147483647.11", 9, 2), "too many digits before decimal point");
+ EXPECT_THROW_WITH_SUBSTRING(TDecimal::TextToBinary("9223372036854775807.11", 18, 2), "too many digits before decimal point");
+ EXPECT_THROW_WITH_SUBSTRING(TDecimal::TextToBinary("170141183460469231731687303715884105727.11", 35, 2), "too many digits before decimal point");
+ EXPECT_THROW_WITH_SUBSTRING(TDecimal::TextToBinary("57896044618658097711785492504343953926634992332820282019728792003956564819967.11", TDecimal::MaxPrecision, 2), "too many digits before decimal point");
+
// Sometimes we want to print values that are not representable with given precision
// (e.g. in error messages we sometimes want to print text value of invalid decimal to explain that it has
// more digits than allowed by precision).
//
// Here we test that extreme values are printed ok.
- auto maxBinaryDecimal = HexDecode("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFD");
- auto minBinaryDecimal1 = HexDecode("00000000000000000000000000000000");
- auto minBinaryDecimal2 = HexDecode("00000000000000000000000000000003");
+ auto maxBinaryDecimal = HexDecode("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFD");
+ auto minBinaryDecimal1 = HexDecode("0000000000000000000000000000000000000000000000000000000000000000");
+ auto minBinaryDecimal2 = HexDecode("0000000000000000000000000000000000000000000000000000000000000003");
EXPECT_EQ(TDecimal::MaxBinarySize, std::ssize(maxBinaryDecimal)); // If max TDecimal::MaxBinarySize ever increases
EXPECT_EQ(TDecimal::MaxBinarySize, std::ssize(minBinaryDecimal1)); // please update this test
EXPECT_EQ(TDecimal::MaxBinarySize, std::ssize(minBinaryDecimal2)); // with better values.
- EXPECT_EQ("1701411834604692317316873037158841057.25", TDecimal::BinaryToText(maxBinaryDecimal, TDecimal::MaxPrecision, 2));
- EXPECT_EQ("-1701411834604692317316873037158841057.28", TDecimal::BinaryToText(minBinaryDecimal1, TDecimal::MaxPrecision, 2));
- EXPECT_EQ("-1701411834604692317316873037158841057.25", TDecimal::BinaryToText(minBinaryDecimal2, TDecimal::MaxPrecision, 2));
+ EXPECT_EQ("578960446186580977117854925043439539266349923328202820197287920039565648199.65", TDecimal::BinaryToText(maxBinaryDecimal, TDecimal::MaxPrecision, 2));
+ EXPECT_EQ("-578960446186580977117854925043439539266349923328202820197287920039565648199.68", TDecimal::BinaryToText(minBinaryDecimal1, TDecimal::MaxPrecision, 2));
+ EXPECT_EQ("-578960446186580977117854925043439539266349923328202820197287920039565648199.65", TDecimal::BinaryToText(minBinaryDecimal2, TDecimal::MaxPrecision, 2));
}
TEST(TDecimal, TestValidation)
@@ -148,6 +299,7 @@ TEST(TDecimal, TestValidation)
EXPECT_NO_THROW(TDecimal::ValidateBinaryValue(HexDecode("8000013A"), 3, 2));
EXPECT_NO_THROW(TDecimal::ValidateBinaryValue(HexDecode("80000000" "0000013A"), 10, 2));
EXPECT_NO_THROW(TDecimal::ValidateBinaryValue(HexDecode("80000000" "00000000" "00000000" "0000013A"), 35, 2));
+ EXPECT_NO_THROW(TDecimal::ValidateBinaryValue(HexDecode("80000000" "00000000" "00000000" "00000000" "00000000" "00000000" "00000000" "0000013A"), 76, 2));
}
class TDecimalWithPrecisionTest
diff --git a/yt/yt/library/formats/arrow_parser.cpp b/yt/yt/library/formats/arrow_parser.cpp
index cc31e25335..f34127990a 100644
--- a/yt/yt/library/formats/arrow_parser.cpp
+++ b/yt/yt/library/formats/arrow_parser.cpp
@@ -163,28 +163,17 @@ public:
return ParseNull();
}
- // Decimal types. For now, YT natively supports only Decimal128 with scale up to 35.
- // Thus, we represent short enough decimals as native YT decimals, and wider decimals as
- // their decimal string representation; but the latter is subject to change whenever we
- // get the native support for Decimal128 with scale up to 38 or Decimal256 with scale up to 76.
arrow::Status Visit(const arrow::Decimal128Type& type) override
{
- constexpr int MaximumYTDecimalPrecision = 35;
- if (type.precision() <= MaximumYTDecimalPrecision) {
- return ParseStringLikeArray<arrow::Decimal128Array>([&] (const TStringBuf& value, i64 columnId) {
- return MakeDecimalBinaryValue(value, columnId, type.precision());
- });
- } else {
- return ParseStringLikeArray<arrow::Decimal128Array>([&] (const TStringBuf& value, i64 columnId) {
- return MakeDecimalTextValue<arrow::Decimal128>(value, columnId, type.scale());
- });
- }
+ return ParseStringLikeArray<arrow::Decimal128Array>([&] (const TStringBuf& value, i64 columnId) {
+ return MakeDecimalBinaryValue<TDecimal::TValue128>(value, columnId, type.precision());
+ });
}
arrow::Status Visit(const arrow::Decimal256Type& type) override
{
return ParseStringLikeArray<arrow::Decimal256Array>([&] (const TStringBuf& value, i64 columnId) {
- return MakeDecimalTextValue<arrow::Decimal256>(value, columnId, type.scale());
+ return MakeDecimalBinaryValue<TDecimal::TValue256>(value, columnId, type.precision());
});
}
@@ -294,33 +283,31 @@ private:
return arrow::Status::OK();
}
+ template <class TUnderlyingValueType>
TUnversionedValue MakeDecimalBinaryValue(const TStringBuf& value, i64 columnId, int precision)
{
- // NB: arrow wire representation of Decimal128 is little-endian and (obviously) 128 bit,
+ // NB: Arrow wire representation of Decimal128 is little-endian and (obviously) 128 bit,
// while YT in-memory representation of Decimal is big-endian, variadic-length of either 32 bit, 64 bit or 128 bit,
// and MSB-flipped to ensure lexical sorting order.
- TDecimal::TValue128 value128;
- YT_VERIFY(value.size() == sizeof(value128));
- std::memcpy(&value128, value.data(), value.size());
+ // Representation of Decimal256 is similar, but only 256 bits.
+ TUnderlyingValueType decimalValue;
+ YT_VERIFY(value.size() == sizeof(decimalValue));
+ std::memcpy(&decimalValue, value.data(), value.size());
- const auto maxByteCount = sizeof(value128);
+ const auto maxByteCount = sizeof(decimalValue);
char* buffer = BufferForStringLikeValues_->Preallocate(maxByteCount);
- auto decimalBinary = TDecimal::WriteBinaryVariadic(precision, value128, buffer, maxByteCount);
+ TStringBuf decimalBinary;
+ if constexpr (std::is_same_v<TUnderlyingValueType, TDecimal::TValue128>) {
+ decimalBinary = TDecimal::WriteBinary128Variadic(precision, decimalValue, buffer, maxByteCount);
+ } else if constexpr (std::is_same_v<TUnderlyingValueType, TDecimal::TValue256>) {
+ decimalBinary = TDecimal::WriteBinary256(precision, decimalValue, buffer, maxByteCount);
+ } else {
+ static_assert(std::is_same_v<TUnderlyingValueType, TDecimal::TValue256>, "Unexpected decimal type");
+ }
BufferForStringLikeValues_->Advance(decimalBinary.size());
return MakeUnversionedStringValue(decimalBinary, columnId);
}
-
- template <class TArrowDecimalType>
- TUnversionedValue MakeDecimalTextValue(const TStringBuf& value, i64 columnId, int scale)
- {
- TArrowDecimalType decimal(reinterpret_cast<const uint8_t*>(value.data()));
- auto string = decimal.ToString(scale);
- char* buffer = BufferForStringLikeValues_->Preallocate(string.size());
- std::memcpy(buffer, string.data(), string.size());
- BufferForStringLikeValues_->Advance(string.size());
- return MakeUnversionedStringValue(TStringBuf(buffer, string.size()), columnId);
- }
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/library/formats/skiff_parser.cpp b/yt/yt/library/formats/skiff_parser.cpp
index edff9ffa0b..febde44f42 100644
--- a/yt/yt/library/formats/skiff_parser.cpp
+++ b/yt/yt/library/formats/skiff_parser.cpp
@@ -300,6 +300,7 @@ const auto precision = denullifiedType.GetPrecision();
CASE(EWireType::Int32);
CASE(EWireType::Int64);
CASE(EWireType::Int128);
+ CASE(EWireType::Int256);
#undef CASE
case EWireType::Yson32:
return CreatePrimitiveTypeConverter(wireType, fieldDescription.IsRequired(), columnId, ysonConverter);
diff --git a/yt/yt/library/formats/skiff_writer.cpp b/yt/yt/library/formats/skiff_writer.cpp
index 3bc6965219..0984e72c8f 100644
--- a/yt/yt/library/formats/skiff_writer.cpp
+++ b/yt/yt/library/formats/skiff_writer.cpp
@@ -506,6 +506,10 @@ TUnversionedValueToSkiffConverter CreateDecimalValueConverter(
return CreatePrimitiveValueConverter<EValueType::String>(
isRequired,
TDecimalSkiffWriter<EWireType::Int128>(precision));
+ case EWireType::Int256:
+ return CreatePrimitiveValueConverter<EValueType::String>(
+ isRequired,
+ TDecimalSkiffWriter<EWireType::Int256>(precision));
case EWireType::Yson32:
return CreatePrimitiveValueConverter(wireType, isRequired);
default:
diff --git a/yt/yt/library/formats/skiff_yson_converter-inl.h b/yt/yt/library/formats/skiff_yson_converter-inl.h
index 43dabc63a5..8d5a2f8efd 100644
--- a/yt/yt/library/formats/skiff_yson_converter-inl.h
+++ b/yt/yt/library/formats/skiff_yson_converter-inl.h
@@ -70,8 +70,18 @@ Y_FORCE_INLINE TStringBuf TDecimalSkiffParser<SkiffWireType>::operator() (NSkiff
TDecimal::TValue128{skiffValue.Low, skiffValue.High},
Buffer_,
sizeof(Buffer_));
+ } else if constexpr (SkiffWireType == EWireType::Int256) {
+ const auto skiffValue = parser->ParseInt256();
+ TDecimal::TValue256 decimalValue;
+ static_assert(sizeof(decimalValue) == sizeof(skiffValue));
+ std::memcpy(&decimalValue, &skiffValue, sizeof(decimalValue));
+ return TDecimal::WriteBinary256(
+ Precision_,
+ std::move(decimalValue),
+ Buffer_,
+ sizeof(Buffer_));
} else {
- static_assert(SkiffWireType == EWireType::Int128);
+ static_assert(SkiffWireType == EWireType::Int256);
}
}
@@ -99,9 +109,15 @@ void TDecimalSkiffWriter<SkiffWireType>::operator()(TStringBuf value, NSkiff::TC
} else if constexpr (SkiffWireType == EWireType::Int128) {
auto intValue = TDecimal::ParseBinary128(Precision_, value);
writer->WriteInt128(TInt128{intValue.Low, intValue.High});
+ } else if constexpr (SkiffWireType == EWireType::Int256) {
+ auto intValue = TDecimal::ParseBinary256(Precision_, value);
+ TInt256 skiffValue;
+ static_assert(sizeof(skiffValue) == sizeof(intValue));
+ std::memcpy(&skiffValue, &intValue, sizeof(skiffValue));
+ writer->WriteInt256(std::move(skiffValue));
} else {
// poor man's static_assert(false)
- static_assert(SkiffWireType == EWireType::Int128);
+ static_assert(SkiffWireType == EWireType::Int256);
}
}
diff --git a/yt/yt/library/formats/skiff_yson_converter.cpp b/yt/yt/library/formats/skiff_yson_converter.cpp
index 82fac3ef1b..ebde4dcb3d 100644
--- a/yt/yt/library/formats/skiff_yson_converter.cpp
+++ b/yt/yt/library/formats/skiff_yson_converter.cpp
@@ -748,6 +748,10 @@ TYsonToSkiffConverter CreateDecimalYsonToSkiffConverter(
return CreatePrimitiveTypeYsonToSkiffConverter<EYsonItemType::StringValue>(
std::move(descriptor),
TDecimalSkiffWriter<EWireType::Int128>(precision));
+ case EWireType::Int256:
+ return CreatePrimitiveTypeYsonToSkiffConverter<EYsonItemType::StringValue>(
+ std::move(descriptor),
+ TDecimalSkiffWriter<EWireType::Int256>(precision));
case EWireType::Yson32:
return CreatePrimitiveTypeYsonToSkiffConverter(std::move(descriptor), wireType);
default:
@@ -1815,6 +1819,8 @@ TSkiffToYsonConverter CreateDecimalSkiffToYsonConverter(
return TPrimitiveTypeSkiffToYsonConverter(TDecimalSkiffParser<EWireType::Int64>(precision));
case EWireType::Int128:
return TPrimitiveTypeSkiffToYsonConverter(TDecimalSkiffParser<EWireType::Int128>(precision));
+ case EWireType::Int256:
+ return TPrimitiveTypeSkiffToYsonConverter(TDecimalSkiffParser<EWireType::Int256>(precision));
case EWireType::Yson32:
return CreatePrimitiveTypeSkiffToYsonConverter(wireType);
default:
@@ -1899,6 +1905,8 @@ void CheckSkiffWireTypeForDecimal(int precision, NSkiff::EWireType wireType)
skiffBinarySize = sizeof(i64);
} else if (wireType == NSkiff::EWireType::Int128) {
skiffBinarySize = 2 * sizeof(i64);
+ } else if (wireType == NSkiff::EWireType::Int256) {
+ skiffBinarySize = 4 * sizeof(i64);
}
if (decimalBinarySize != skiffBinarySize) {
diff --git a/yt/yt/library/program/config.cpp b/yt/yt/library/program/config.cpp
index 52aa9e5d8f..74b9d61fdd 100644
--- a/yt/yt/library/program/config.cpp
+++ b/yt/yt/library/program/config.cpp
@@ -187,6 +187,8 @@ void TSingletonsDynamicConfig::Register(TRegistrar registrar)
.Optional();
registrar.Parameter("tcmalloc", &TThis::TCMalloc)
.Optional();
+ registrar.Parameter("stockpile", &TThis::Stockpile)
+ .Optional();
registrar.Parameter("protobuf_interop", &TThis::ProtobufInterop)
.DefaultNew();
}
diff --git a/yt/yt/library/tracing/jaeger/tracer.h b/yt/yt/library/tracing/jaeger/tracer.h
index ed4850b1e9..387b3a8360 100644
--- a/yt/yt/library/tracing/jaeger/tracer.h
+++ b/yt/yt/library/tracing/jaeger/tracer.h
@@ -10,7 +10,7 @@
#include <yt/yt/library/tvm/service/public.h>
#include <yt/yt/core/misc/mpsc_stack.h>
-#include <yt/yt/core/misc/atomic_object.h>
+#include <library/cpp/yt/threading/atomic_object.h>
#include <yt/yt/core/rpc/grpc/config.h>
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 54321bcc96..9c9e018be9 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -1445,6 +1445,7 @@ message TIndexInfo
required NYT.NProto.TGuid index_table_id = 1;
required int32 index_kind = 2; // NTableClient::ESecondaryIndexKind
optional string predicate = 3;
+ optional string unfolded_column = 4;
}
message TReqGetTableMountInfo
@@ -2484,6 +2485,27 @@ message TRspGetJobStderr
////////////////////////////////////////////////////////////////////////////////
+message TReqGetJobTrace
+{
+ oneof operation_id_or_alias {
+ NYT.NProto.TGuid operation_id = 1;
+ string operation_alias = 2;
+ }
+ optional NYT.NProto.TGuid job_id = 3;
+ optional NYT.NProto.TGuid trace_id = 4;
+ optional int64 from_event_index = 5;
+ optional int64 to_event_index = 6;
+ optional int64 from_time = 7;
+ optional int64 to_time = 8;
+}
+
+message TRspGetJobTrace
+{
+ repeated TJobTraceEvent events = 1;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
message TReqGetJobFailContext
{
oneof operation_id_or_alias {
@@ -3156,6 +3178,16 @@ message TListJobsResult
repeated NYT.NProto.TError errors = 6;
}
+message TJobTraceEvent
+{
+ required NYT.NProto.TGuid operation_id = 1;
+ required NYT.NProto.TGuid job_id = 2;
+ required NYT.NProto.TGuid trace_id = 3;
+ required int64 event_index = 4;
+ required string event = 5;
+ required int64 event_time = 6; // TInstant
+}
+
////////////////////////////////////////////////////////////////////////////////
message TGetFileFromCacheResult
@@ -3377,15 +3409,14 @@ message TRspGetQueryTrackerInfo
}
////////////////////////////////////////////////////////////////////////////////
-// NB(arkady-e1ppa): Under construction.
// Distributed table client
////////////////////////////////////////////////////////////////////////////////
message TReqStartDistributedWriteSession
{
- optional string path = 1;
-
- // TDistributedWriteSessionStartOptions contents...
+ required string path = 1;
+
+ optional TTransactionalOptions transactional_options = 100;
}
message TRspStartDistributedWriteSession
@@ -3395,29 +3426,27 @@ message TRspStartDistributedWriteSession
message TReqFinishDistributedWriteSession
{
- required bytes session = 2; // YSON-serialized TDistributedWriteSession
+ required bytes session = 1; // YSON-serialized TDistributedWriteSession
- // TDistributedWriteSessionFinishOptions contents...
+ required int32 max_children_per_attach_request = 2;
}
message TRspFinishDistributedWriteSession
{
}
-message TReqParticipantWriteTable
+message TReqWriteTableFragment
{
optional bytes config = 1; // YSON-serialized TTableWriterConfig
optional bytes format = 2; // YSON-serialized TFormat
- required bytes cookie = 3; // YSON-serialized TDistributedWriteCookie
-
- // TParticipantTableWriterOptions contents...
+ required bytes cookie = 3; // YSON-serialized TFragmentWriteCookie
}
-message TRspParticipantWriteTable
+message TRspWriteTableFragment
{
- required bytes cookie = 1; // YSON-serialized TDistributedWriteCookie
+ required bytes cookie = 1; // YSON-serialized TFragmentWriteCookie
}
///////////////////////////////////////////////////////////////////////////////