diff options
author | don-dron <don-dron@yandex-team.com> | 2024-04-05 19:05:06 +0300 |
---|---|---|
committer | don-dron <don-dron@yandex-team.com> | 2024-04-05 19:14:03 +0300 |
commit | 7d065ca2a9d88a3651dbe690e479c77761de371b (patch) | |
tree | 45509f5dca24e0924957ed6a1bc8539c1e7e5f88 | |
parent | beb1725856691d2bd11a0e2ec8ce7b45cebb9804 (diff) | |
download | ydb-7d065ca2a9d88a3651dbe690e479c77761de371b.tar.gz |
YT-21472: Disable slot manager if layer cache disabled
9cfa8a698bc7b54a4c21e77846419a417ca66eea
-rw-r--r-- | yt/yt/client/table_client/adapters.cpp | 50 | ||||
-rw-r--r-- | yt/yt/client/table_client/adapters.h | 1 | ||||
-rw-r--r-- | yt/yt/core/concurrency/periodic_executor_base-inl.h | 7 | ||||
-rw-r--r-- | yt/yt/core/concurrency/periodic_executor_base.h | 2 | ||||
-rw-r--r-- | yt/yt/core/misc/error.cpp | 22 | ||||
-rw-r--r-- | yt/yt/core/misc/error.h | 2 |
6 files changed, 62 insertions, 22 deletions
diff --git a/yt/yt/client/table_client/adapters.cpp b/yt/yt/client/table_client/adapters.cpp index 5001c2b36f..c64915f9d6 100644 --- a/yt/yt/client/table_client/adapters.cpp +++ b/yt/yt/client/table_client/adapters.cpp @@ -118,33 +118,43 @@ void PipeReaderToWriter( while (auto batch = reader->Read(readOptions)) { yielder.TryYield(); - if (batch->IsEmpty()) { - WaitFor(reader->GetReadyEvent()) - .ThrowOnError(); - continue; - } + TSharedRange<TUnversionedRow> rows; + + try { + if (batch->IsEmpty()) { + WaitFor(reader->GetReadyEvent()) + .ThrowOnError(); + continue; + } - auto rows = batch->MaterializeRows(); + rows = batch->MaterializeRows(); - if (options.ValidateValues) { - for (auto row : rows) { - for (const auto& value : row) { - ValidateStaticValue(value); + if (options.ValidateValues) { + for (auto row : rows) { + for (const auto& value : row) { + ValidateStaticValue(value); + } } } - } - if (options.Throttler) { - i64 dataWeight = 0; - for (auto row : rows) { - dataWeight += GetDataWeight(row); + if (options.Throttler) { + i64 dataWeight = 0; + for (auto row : rows) { + dataWeight += GetDataWeight(row); + } + WaitFor(options.Throttler->Throttle(dataWeight)) + .ThrowOnError(); } - WaitFor(options.Throttler->Throttle(dataWeight)) - .ThrowOnError(); - } - if (!rows.empty() && options.PipeDelay) { - TDelayedExecutor::WaitForDuration(options.PipeDelay); + if (!rows.empty() && options.PipeDelay) { + TDelayedExecutor::WaitForDuration(options.PipeDelay); + } + } catch (const std::exception& ex) { + if (options.ReaderErrorWrapper) { + THROW_ERROR(options.ReaderErrorWrapper(ex)); + } else { + throw; + } } if (!writer->Write(rows)) { diff --git a/yt/yt/client/table_client/adapters.h b/yt/yt/client/table_client/adapters.h index 4625398b41..bbb568582b 100644 --- a/yt/yt/client/table_client/adapters.h +++ b/yt/yt/client/table_client/adapters.h @@ -27,6 +27,7 @@ struct TPipeReaderToWriterOptions i64 BufferDataWeight = 16_MB; bool ValidateValues = false; NConcurrency::IThroughputThrottlerPtr Throttler; + std::function<TError(const TError& readerError)> ReaderErrorWrapper; // Used only for testing. TDuration PipeDelay; }; diff --git a/yt/yt/core/concurrency/periodic_executor_base-inl.h b/yt/yt/core/concurrency/periodic_executor_base-inl.h index a2985b7251..6852a398e9 100644 --- a/yt/yt/core/concurrency/periodic_executor_base-inl.h +++ b/yt/yt/core/concurrency/periodic_executor_base-inl.h @@ -40,6 +40,13 @@ void TPeriodicExecutorBase<TInvocationTimePolicy>::Start() } template <CInvocationTimePolicy TInvocationTimePolicy> +bool TPeriodicExecutorBase<TInvocationTimePolicy>::IsStarted() const +{ + auto guard = Guard(SpinLock_); + return Started_; +} + +template <CInvocationTimePolicy TInvocationTimePolicy> void TPeriodicExecutorBase<TInvocationTimePolicy>::DoStop(TGuard<NThreading::TSpinLock>& guard) { if (!Started_) { diff --git a/yt/yt/core/concurrency/periodic_executor_base.h b/yt/yt/core/concurrency/periodic_executor_base.h index 1b171d8669..ddf6a853e8 100644 --- a/yt/yt/core/concurrency/periodic_executor_base.h +++ b/yt/yt/core/concurrency/periodic_executor_base.h @@ -88,6 +88,8 @@ public: //! Starts the instance. void Start(); + bool IsStarted() const; + //! Stops the instance, cancels all subsequent invocations. //! Returns a future that becomes set when all outstanding callback //! invocations are finished and no more invocations are expected to happen. diff --git a/yt/yt/core/misc/error.cpp b/yt/yt/core/misc/error.cpp index cc6ae70bf9..7784292c70 100644 --- a/yt/yt/core/misc/error.cpp +++ b/yt/yt/core/misc/error.cpp @@ -886,8 +886,26 @@ std::optional<TError> TError::FindMatching(TErrorCode code) const } for (const auto& innerError : InnerErrors()) { - auto innerResult = innerError.FindMatching(code); - if (innerResult) { + if (auto innerResult = innerError.FindMatching(code)) { + return innerResult; + } + } + + return {}; +} + +std::optional<TError> TError::FindMatching(const THashSet<TErrorCode>& codes) const +{ + if (!Impl_) { + return {}; + } + + if (codes.contains(GetCode())) { + return *this; + } + + for (const auto& innerError : InnerErrors()) { + if (auto innerResult = innerError.FindMatching(codes)) { return innerResult; } } diff --git a/yt/yt/core/misc/error.h b/yt/yt/core/misc/error.h index 76a5dac28c..f3783aab7d 100644 --- a/yt/yt/core/misc/error.h +++ b/yt/yt/core/misc/error.h @@ -190,6 +190,8 @@ public: std::optional<TError> FindMatching(TErrorCode code) const; + std::optional<TError> FindMatching(const THashSet<TErrorCode>& codes) const; + template <class... TArgs> requires std::constructible_from<TError, TArgs...> TError Wrap(TArgs&&... args) const &; |