diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2024-12-25 07:34:56 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-25 07:34:56 +0300 |
commit | 08abadd8bb7645cb03f829a6f9c3e127b19fd0b8 (patch) | |
tree | 8b24d2e8be514fda7789c5a6197eb8b9eb7291b3 | |
parent | bd0e2de0b1035962a4d5b9e847eaa6508fad7fcf (diff) | |
download | ydb-08abadd8bb7645cb03f829a6f9c3e127b19fd0b8.tar.gz |
fix accessors fetching queue processing (#12936)
-rw-r--r-- | ydb/core/protos/config.proto | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/data_accessor/manager.cpp | 37 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/data_accessor/request.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/hooks/abstract/abstract.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/hooks/testing/controller.h | 7 |
5 files changed, 46 insertions, 13 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 921edfcd742..795d5d99880 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1764,6 +1764,7 @@ message TColumnShardConfig { optional bool AllowNullableColumnsInPK = 29 [default = false]; optional uint32 RestoreDataOnWriteTimeoutSeconds = 30; optional bool UseSlicesFilter = 31 [default = true]; + optional uint32 LimitForPortionsMetadataAsk = 32 [default = 1000]; } message TSchemeShardConfig { diff --git a/ydb/core/tx/columnshard/data_accessor/manager.cpp b/ydb/core/tx/columnshard/data_accessor/manager.cpp index 182f8d81fbe..7f0b7bdc151 100644 --- a/ydb/core/tx/columnshard/data_accessor/manager.cpp +++ b/ydb/core/tx/columnshard/data_accessor/manager.cpp @@ -1,18 +1,17 @@ #include "manager.h" +#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> + namespace NKikimr::NOlap::NDataAccessorControl { void TLocalManager::DrainQueue() { - THashMap<ui64, std::vector<TPortionInfo::TConstPtr>> portionsToAsk; std::optional<ui64> lastPathId; IGranuleDataAccessor* lastDataAccessor = nullptr; ui32 countToFlight = 0; - while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) { + while (PortionsAskInFlight + countToFlight < NYDBTest::TControllers::GetColumnShardController()->GetLimitForPortionsMetadataAsk() && + PortionsAsk.size()) { + THashMap<ui64, std::vector<TPortionInfo::TConstPtr>> portionsToAsk; while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) { - if (PortionsAsk.front().GetAbortionFlag() && PortionsAsk.front().GetAbortionFlag()->Val()) { - PortionsAsk.pop_front(); - continue; - } auto p = PortionsAsk.front().ExtractPortion(); PortionsAsk.pop_front(); if (!lastPathId || *lastPathId != p->GetPathId()) { @@ -24,18 +23,30 @@ void TLocalManager::DrainQueue() { lastDataAccessor = it->second.get(); } } + auto it = RequestsByPortion.find(p->GetPortionId()); + if (it == RequestsByPortion.end()) { + continue; + } if (!lastDataAccessor) { - auto it = RequestsByPortion.find(p->GetPortionId()); - AFL_VERIFY(it != RequestsByPortion.end()); for (auto&& i : it->second) { - if (!i->IsFetched()) { + if (!i->IsFetched() && !i->IsAborted()) { i->AddError(p->GetPathId(), "path id absent"); } } RequestsByPortion.erase(it); } else { - portionsToAsk[p->GetPathId()].emplace_back(p); - ++countToFlight; + bool toAsk = false; + for (auto&& i : it->second) { + if (!i->IsFetched() && !i->IsAborted()) { + toAsk = true; + } + } + if (!toAsk) { + RequestsByPortion.erase(it); + } else { + portionsToAsk[p->GetPathId()].emplace_back(p); + ++countToFlight; + } } } for (auto&& i : portionsToAsk) { @@ -46,7 +57,7 @@ void TLocalManager::DrainQueue() { auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId()); AFL_VERIFY(it != RequestsByPortion.end()); for (auto&& i : it->second) { - if (!i->IsFetched()) { + if (!i->IsFetched() && !i->IsAborted()) { i->AddAccessor(accessor); } } @@ -110,4 +121,4 @@ void TLocalManager::DoAddPortion(const TPortionDataAccessor& accessor) { DrainQueue(); } -} // namespace NKikimr::NOlap +} // namespace NKikimr::NOlap::NDataAccessorControl diff --git a/ydb/core/tx/columnshard/data_accessor/request.h b/ydb/core/tx/columnshard/data_accessor/request.h index 2d5f29ad204..6be2665e8be 100644 --- a/ydb/core/tx/columnshard/data_accessor/request.h +++ b/ydb/core/tx/columnshard/data_accessor/request.h @@ -232,6 +232,12 @@ public: return result; } + bool IsAborted() const { + AFL_VERIFY(HasSubscriber()); + auto flag = Subscriber->GetAbortionFlag(); + return flag && flag->Val(); + } + const std::shared_ptr<const TAtomicCounter>& GetAbortionFlag() const { AFL_VERIFY(HasSubscriber()); return Subscriber->GetAbortionFlag(); diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h index 46ee0ba1c33..94b4eca7e4d 100644 --- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h +++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h @@ -92,6 +92,9 @@ protected: virtual TDuration DoGetUsedSnapshotLivetime(const TDuration defaultValue) const { return defaultValue; } + virtual ui64 DoGetLimitForPortionsMetadataAsk(const ui64 defaultValue) const { + return defaultValue; + } virtual TDuration DoGetOverridenGCPeriod(const TDuration defaultValue) const { return defaultValue; } @@ -189,6 +192,11 @@ public: virtual void OnSelectShardingFilter() { } + ui64 GetLimitForPortionsMetadataAsk() const { + const ui64 defaultValue = GetConfig().GetLimitForPortionsMetadataAsk(); + return DoGetLimitForPortionsMetadataAsk(defaultValue); + } + TDuration GetCompactionActualizationLag() const { const TDuration defaultValue = TDuration::MilliSeconds(GetConfig().GetCompactionActualizationLagMs()); return DoGetCompactionActualizationLag(defaultValue); diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h index 9076e8183cd..0b1d8870cab 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/controller.h @@ -24,6 +24,8 @@ private: YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideTasksActualizationLag); YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideMaxReadStaleness); YDB_ACCESSOR(std::optional<ui64>, OverrideMemoryLimitForPortionReading, 100); + YDB_ACCESSOR(std::optional<ui64>, OverrideLimitForPortionsMetadataAsk, 1); + YDB_ACCESSOR_DEF(std::optional<NKikimrProto::EReplyStatus>, OverrideBlobPutResultOnWriteValue); EOptimizerCompactionWeightControl CompactionControl = EOptimizerCompactionWeightControl::Force; @@ -135,6 +137,11 @@ private: protected: virtual ::NKikimr::NColumnShard::TBlobPutResult::TPtr OverrideBlobPutResultOnCompaction(const ::NKikimr::NColumnShard::TBlobPutResult::TPtr original, const NOlap::TWriteActionsCollection& actions) const override; + virtual ui64 DoGetLimitForPortionsMetadataAsk(const ui64 defaultValue) const override { + return OverrideLimitForPortionsMetadataAsk.value_or(defaultValue); + } + + virtual ui64 DoGetMemoryLimitScanPortion(const ui64 defaultValue) const override { return OverrideMemoryLimitForPortionReading.value_or(defaultValue); } |