diff options
author | Alexander Smirnov <alex@ydb.tech> | 2024-10-04 11:11:38 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2024-10-04 11:11:38 +0000 |
commit | 6547f93cad0868da00a4e9e4d4dc63b26726653b (patch) | |
tree | f6a1ecd48d58a29cbb921167cb15a80aad1a1d21 /yt | |
parent | e55564f7a0a0ff7b638ce199c1b5005b696d349d (diff) | |
parent | 5d32f79de3c53b2eaebca9be84a7399d479f8549 (diff) | |
download | ydb-6547f93cad0868da00a4e9e4d4dc63b26726653b.tar.gz |
Merge branch 'rightlib' into mergelibs-241004-1110
Diffstat (limited to 'yt')
-rw-r--r-- | yt/cpp/mapreduce/client/transaction_pinger.cpp | 26 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/config.cpp | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/config.h | 1 | ||||
-rw-r--r-- | yt/yt/core/logging/config.cpp | 2 | ||||
-rw-r--r-- | yt/yt/core/logging/config.h | 1 | ||||
-rw-r--r-- | yt/yt/core/logging/file_log_writer.cpp | 19 | ||||
-rw-r--r-- | yt/yt/flow/lib/client/public.h | 3 | ||||
-rw-r--r-- | yt/yt/library/profiling/solomon/exporter.cpp | 1 |
8 files changed, 34 insertions, 20 deletions
diff --git a/yt/cpp/mapreduce/client/transaction_pinger.cpp b/yt/cpp/mapreduce/client/transaction_pinger.cpp index 3b6ff2efc5..0a193352f7 100644 --- a/yt/cpp/mapreduce/client/transaction_pinger.cpp +++ b/yt/cpp/mapreduce/client/transaction_pinger.cpp @@ -292,21 +292,17 @@ private: ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config) { - if (config->UseAsyncTxPinger) { - YT_LOG_DEBUG("Using async transaction pinger"); - auto httpClientConfig = NYT::New<NHttp::TClientConfig>(); - httpClientConfig->MaxIdleConnections = 16; - auto httpPoller = NConcurrency::CreateThreadPoolPoller( - config->AsyncHttpClientThreads, - "tx_http_client_poller"); - auto httpClient = NHttp::CreateClient(std::move(httpClientConfig), std::move(httpPoller)); - - return MakeIntrusive<TSharedTransactionPinger>( - std::move(httpClient), - config->AsyncTxPingerPoolThreads); - } else { - return MakeIntrusive<TThreadPerTransactionPinger>(); - } + YT_LOG_DEBUG("Using async transaction pinger"); + auto httpClientConfig = NYT::New<NHttp::TClientConfig>(); + httpClientConfig->MaxIdleConnections = 16; + auto httpPoller = NConcurrency::CreateThreadPoolPoller( + config->AsyncHttpClientThreads, + "tx_http_client_poller"); + auto httpClient = NHttp::CreateClient(std::move(httpClientConfig), std::move(httpPoller)); + + return MakeIntrusive<TSharedTransactionPinger>( + std::move(httpClient), + config->AsyncTxPingerPoolThreads); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/interface/config.cpp b/yt/cpp/mapreduce/interface/config.cpp index 879bf79561..418868a7eb 100644 --- a/yt/cpp/mapreduce/interface/config.cpp +++ b/yt/cpp/mapreduce/interface/config.cpp @@ -200,7 +200,6 @@ void TConfig::Reset() GlobalTxId = GetEnv("YT_TRANSACTION", ""); - UseAsyncTxPinger = true; AsyncHttpClientThreads = 1; AsyncTxPingerPoolThreads = 1; diff --git a/yt/cpp/mapreduce/interface/config.h b/yt/cpp/mapreduce/interface/config.h index a8baac8d4f..71f0c4fcdc 100644 --- a/yt/cpp/mapreduce/interface/config.h +++ b/yt/cpp/mapreduce/interface/config.h @@ -104,7 +104,6 @@ struct TConfig TDuration PingTimeout; TDuration PingInterval; - bool UseAsyncTxPinger; int AsyncHttpClientThreads; int AsyncTxPingerPoolThreads; diff --git a/yt/yt/core/logging/config.cpp b/yt/yt/core/logging/config.cpp index e94e4b51fd..4acde166a9 100644 --- a/yt/yt/core/logging/config.cpp +++ b/yt/yt/core/logging/config.cpp @@ -104,6 +104,8 @@ void TFileLogWriterConfig::Register(TRegistrar registrar) .Default(false); registrar.Parameter("enable_compression", &TThis::EnableCompression) .Default(false); + registrar.Parameter("enable_no_reuse", &TThis::EnableNoReuse) + .Default(false); registrar.Parameter("compression_method", &TThis::CompressionMethod) .Default(ECompressionMethod::Gzip); registrar.Parameter("compression_level", &TThis::CompressionLevel) diff --git a/yt/yt/core/logging/config.h b/yt/yt/core/logging/config.h index a959e5968a..6e5f2b0f9f 100644 --- a/yt/yt/core/logging/config.h +++ b/yt/yt/core/logging/config.h @@ -82,6 +82,7 @@ public: TString FileName; bool UseTimestampSuffix; bool EnableCompression; + bool EnableNoReuse; ECompressionMethod CompressionMethod; int CompressionLevel; diff --git a/yt/yt/core/logging/file_log_writer.cpp b/yt/yt/core/logging/file_log_writer.cpp index 1bad6030c5..d72c2f4e07 100644 --- a/yt/yt/core/logging/file_log_writer.cpp +++ b/yt/yt/core/logging/file_log_writer.cpp @@ -152,9 +152,26 @@ private: TFlags<EOpenModeFlag> openMode; if (Config_->EnableCompression) { - openMode = OpenAlways|RdWr|CloseOnExec; + switch (Config_->CompressionMethod) { + case ECompressionMethod::Zstd: + openMode = OpenAlways|RdWr|CloseOnExec; + if (Config_->EnableNoReuse) { + openMode = openMode|NoReuse; + } + break; + + case ECompressionMethod::Gzip: + openMode = OpenAlways|RdWr|CloseOnExec; + break; + + default: + YT_ABORT(); + } } else { openMode = OpenAlways|ForAppend|WrOnly|Seq|CloseOnExec; + if (Config_->EnableNoReuse) { + openMode = openMode|NoReuse; + } } // Generate filename. diff --git a/yt/yt/flow/lib/client/public.h b/yt/yt/flow/lib/client/public.h index b194200cda..b2e76974a7 100644 --- a/yt/yt/flow/lib/client/public.h +++ b/yt/yt/flow/lib/client/public.h @@ -28,7 +28,8 @@ DEFINE_ENUM(EPipelineState, ); YT_DEFINE_ERROR_ENUM( - ((SpecVersionMismatch) (3300)) + ((SpecVersionMismatch) (3300)) + ((PipelineStateVersionMismatch) (3301)) ); YT_DEFINE_STRONG_TYPEDEF(TVersion, i64); diff --git a/yt/yt/library/profiling/solomon/exporter.cpp b/yt/yt/library/profiling/solomon/exporter.cpp index c543a7d26f..7f4469053b 100644 --- a/yt/yt/library/profiling/solomon/exporter.cpp +++ b/yt/yt/library/profiling/solomon/exporter.cpp @@ -573,7 +573,6 @@ bool TSolomonExporter::ReadSensors( // Read last value. auto readOptions = options; readOptions.Times.emplace_back(std::vector<int>{Registry_->IndexOf(Window_.back().first)}, TInstant::Zero()); - readOptions.ConvertCountersToRateGauge = false; readOptions.EnableHistogramCompat = true; readOptions.SummaryPolicy |= Config_->GetSummaryPolicy(); |