aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordon-dron <don-dron@yandex-team.com>2024-04-05 19:05:06 +0300
committerdon-dron <don-dron@yandex-team.com>2024-04-05 19:14:03 +0300
commit7d065ca2a9d88a3651dbe690e479c77761de371b (patch)
tree45509f5dca24e0924957ed6a1bc8539c1e7e5f88
parentbeb1725856691d2bd11a0e2ec8ce7b45cebb9804 (diff)
downloadydb-7d065ca2a9d88a3651dbe690e479c77761de371b.tar.gz
YT-21472: Disable slot manager if layer cache disabled
9cfa8a698bc7b54a4c21e77846419a417ca66eea
-rw-r--r--yt/yt/client/table_client/adapters.cpp50
-rw-r--r--yt/yt/client/table_client/adapters.h1
-rw-r--r--yt/yt/core/concurrency/periodic_executor_base-inl.h7
-rw-r--r--yt/yt/core/concurrency/periodic_executor_base.h2
-rw-r--r--yt/yt/core/misc/error.cpp22
-rw-r--r--yt/yt/core/misc/error.h2
6 files changed, 62 insertions, 22 deletions
diff --git a/yt/yt/client/table_client/adapters.cpp b/yt/yt/client/table_client/adapters.cpp
index 5001c2b36f..c64915f9d6 100644
--- a/yt/yt/client/table_client/adapters.cpp
+++ b/yt/yt/client/table_client/adapters.cpp
@@ -118,33 +118,43 @@ void PipeReaderToWriter(
while (auto batch = reader->Read(readOptions)) {
yielder.TryYield();
- if (batch->IsEmpty()) {
- WaitFor(reader->GetReadyEvent())
- .ThrowOnError();
- continue;
- }
+ TSharedRange<TUnversionedRow> rows;
+
+ try {
+ if (batch->IsEmpty()) {
+ WaitFor(reader->GetReadyEvent())
+ .ThrowOnError();
+ continue;
+ }
- auto rows = batch->MaterializeRows();
+ rows = batch->MaterializeRows();
- if (options.ValidateValues) {
- for (auto row : rows) {
- for (const auto& value : row) {
- ValidateStaticValue(value);
+ if (options.ValidateValues) {
+ for (auto row : rows) {
+ for (const auto& value : row) {
+ ValidateStaticValue(value);
+ }
}
}
- }
- if (options.Throttler) {
- i64 dataWeight = 0;
- for (auto row : rows) {
- dataWeight += GetDataWeight(row);
+ if (options.Throttler) {
+ i64 dataWeight = 0;
+ for (auto row : rows) {
+ dataWeight += GetDataWeight(row);
+ }
+ WaitFor(options.Throttler->Throttle(dataWeight))
+ .ThrowOnError();
}
- WaitFor(options.Throttler->Throttle(dataWeight))
- .ThrowOnError();
- }
- if (!rows.empty() && options.PipeDelay) {
- TDelayedExecutor::WaitForDuration(options.PipeDelay);
+ if (!rows.empty() && options.PipeDelay) {
+ TDelayedExecutor::WaitForDuration(options.PipeDelay);
+ }
+ } catch (const std::exception& ex) {
+ if (options.ReaderErrorWrapper) {
+ THROW_ERROR(options.ReaderErrorWrapper(ex));
+ } else {
+ throw;
+ }
}
if (!writer->Write(rows)) {
diff --git a/yt/yt/client/table_client/adapters.h b/yt/yt/client/table_client/adapters.h
index 4625398b41..bbb568582b 100644
--- a/yt/yt/client/table_client/adapters.h
+++ b/yt/yt/client/table_client/adapters.h
@@ -27,6 +27,7 @@ struct TPipeReaderToWriterOptions
i64 BufferDataWeight = 16_MB;
bool ValidateValues = false;
NConcurrency::IThroughputThrottlerPtr Throttler;
+ std::function<TError(const TError& readerError)> ReaderErrorWrapper;
// Used only for testing.
TDuration PipeDelay;
};
diff --git a/yt/yt/core/concurrency/periodic_executor_base-inl.h b/yt/yt/core/concurrency/periodic_executor_base-inl.h
index a2985b7251..6852a398e9 100644
--- a/yt/yt/core/concurrency/periodic_executor_base-inl.h
+++ b/yt/yt/core/concurrency/periodic_executor_base-inl.h
@@ -40,6 +40,13 @@ void TPeriodicExecutorBase<TInvocationTimePolicy>::Start()
}
template <CInvocationTimePolicy TInvocationTimePolicy>
+bool TPeriodicExecutorBase<TInvocationTimePolicy>::IsStarted() const
+{
+ auto guard = Guard(SpinLock_);
+ return Started_;
+}
+
+template <CInvocationTimePolicy TInvocationTimePolicy>
void TPeriodicExecutorBase<TInvocationTimePolicy>::DoStop(TGuard<NThreading::TSpinLock>& guard)
{
if (!Started_) {
diff --git a/yt/yt/core/concurrency/periodic_executor_base.h b/yt/yt/core/concurrency/periodic_executor_base.h
index 1b171d8669..ddf6a853e8 100644
--- a/yt/yt/core/concurrency/periodic_executor_base.h
+++ b/yt/yt/core/concurrency/periodic_executor_base.h
@@ -88,6 +88,8 @@ public:
//! Starts the instance.
void Start();
+ bool IsStarted() const;
+
//! Stops the instance, cancels all subsequent invocations.
//! Returns a future that becomes set when all outstanding callback
//! invocations are finished and no more invocations are expected to happen.
diff --git a/yt/yt/core/misc/error.cpp b/yt/yt/core/misc/error.cpp
index cc6ae70bf9..7784292c70 100644
--- a/yt/yt/core/misc/error.cpp
+++ b/yt/yt/core/misc/error.cpp
@@ -886,8 +886,26 @@ std::optional<TError> TError::FindMatching(TErrorCode code) const
}
for (const auto& innerError : InnerErrors()) {
- auto innerResult = innerError.FindMatching(code);
- if (innerResult) {
+ if (auto innerResult = innerError.FindMatching(code)) {
+ return innerResult;
+ }
+ }
+
+ return {};
+}
+
+std::optional<TError> TError::FindMatching(const THashSet<TErrorCode>& codes) const
+{
+ if (!Impl_) {
+ return {};
+ }
+
+ if (codes.contains(GetCode())) {
+ return *this;
+ }
+
+ for (const auto& innerError : InnerErrors()) {
+ if (auto innerResult = innerError.FindMatching(codes)) {
return innerResult;
}
}
diff --git a/yt/yt/core/misc/error.h b/yt/yt/core/misc/error.h
index 76a5dac28c..f3783aab7d 100644
--- a/yt/yt/core/misc/error.h
+++ b/yt/yt/core/misc/error.h
@@ -190,6 +190,8 @@ public:
std::optional<TError> FindMatching(TErrorCode code) const;
+ std::optional<TError> FindMatching(const THashSet<TErrorCode>& codes) const;
+
template <class... TArgs>
requires std::constructible_from<TError, TArgs...>
TError Wrap(TArgs&&... args) const &;