aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2024-12-25 07:34:56 +0300
committerGitHub <noreply@github.com>2024-12-25 07:34:56 +0300
commit08abadd8bb7645cb03f829a6f9c3e127b19fd0b8 (patch)
tree8b24d2e8be514fda7789c5a6197eb8b9eb7291b3
parentbd0e2de0b1035962a4d5b9e847eaa6508fad7fcf (diff)
downloadydb-08abadd8bb7645cb03f829a6f9c3e127b19fd0b8.tar.gz
fix accessors fetching queue processing (#12936)
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/tx/columnshard/data_accessor/manager.cpp37
-rw-r--r--ydb/core/tx/columnshard/data_accessor/request.h6
-rw-r--r--ydb/core/tx/columnshard/hooks/abstract/abstract.h8
-rw-r--r--ydb/core/tx/columnshard/hooks/testing/controller.h7
5 files changed, 46 insertions, 13 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 921edfcd742..795d5d99880 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1764,6 +1764,7 @@ message TColumnShardConfig {
optional bool AllowNullableColumnsInPK = 29 [default = false];
optional uint32 RestoreDataOnWriteTimeoutSeconds = 30;
optional bool UseSlicesFilter = 31 [default = true];
+ optional uint32 LimitForPortionsMetadataAsk = 32 [default = 1000];
}
message TSchemeShardConfig {
diff --git a/ydb/core/tx/columnshard/data_accessor/manager.cpp b/ydb/core/tx/columnshard/data_accessor/manager.cpp
index 182f8d81fbe..7f0b7bdc151 100644
--- a/ydb/core/tx/columnshard/data_accessor/manager.cpp
+++ b/ydb/core/tx/columnshard/data_accessor/manager.cpp
@@ -1,18 +1,17 @@
#include "manager.h"
+#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
+
namespace NKikimr::NOlap::NDataAccessorControl {
void TLocalManager::DrainQueue() {
- THashMap<ui64, std::vector<TPortionInfo::TConstPtr>> portionsToAsk;
std::optional<ui64> lastPathId;
IGranuleDataAccessor* lastDataAccessor = nullptr;
ui32 countToFlight = 0;
- while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) {
+ while (PortionsAskInFlight + countToFlight < NYDBTest::TControllers::GetColumnShardController()->GetLimitForPortionsMetadataAsk() &&
+ PortionsAsk.size()) {
+ THashMap<ui64, std::vector<TPortionInfo::TConstPtr>> portionsToAsk;
while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) {
- if (PortionsAsk.front().GetAbortionFlag() && PortionsAsk.front().GetAbortionFlag()->Val()) {
- PortionsAsk.pop_front();
- continue;
- }
auto p = PortionsAsk.front().ExtractPortion();
PortionsAsk.pop_front();
if (!lastPathId || *lastPathId != p->GetPathId()) {
@@ -24,18 +23,30 @@ void TLocalManager::DrainQueue() {
lastDataAccessor = it->second.get();
}
}
+ auto it = RequestsByPortion.find(p->GetPortionId());
+ if (it == RequestsByPortion.end()) {
+ continue;
+ }
if (!lastDataAccessor) {
- auto it = RequestsByPortion.find(p->GetPortionId());
- AFL_VERIFY(it != RequestsByPortion.end());
for (auto&& i : it->second) {
- if (!i->IsFetched()) {
+ if (!i->IsFetched() && !i->IsAborted()) {
i->AddError(p->GetPathId(), "path id absent");
}
}
RequestsByPortion.erase(it);
} else {
- portionsToAsk[p->GetPathId()].emplace_back(p);
- ++countToFlight;
+ bool toAsk = false;
+ for (auto&& i : it->second) {
+ if (!i->IsFetched() && !i->IsAborted()) {
+ toAsk = true;
+ }
+ }
+ if (!toAsk) {
+ RequestsByPortion.erase(it);
+ } else {
+ portionsToAsk[p->GetPathId()].emplace_back(p);
+ ++countToFlight;
+ }
}
}
for (auto&& i : portionsToAsk) {
@@ -46,7 +57,7 @@ void TLocalManager::DrainQueue() {
auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId());
AFL_VERIFY(it != RequestsByPortion.end());
for (auto&& i : it->second) {
- if (!i->IsFetched()) {
+ if (!i->IsFetched() && !i->IsAborted()) {
i->AddAccessor(accessor);
}
}
@@ -110,4 +121,4 @@ void TLocalManager::DoAddPortion(const TPortionDataAccessor& accessor) {
DrainQueue();
}
-} // namespace NKikimr::NOlap
+} // namespace NKikimr::NOlap::NDataAccessorControl
diff --git a/ydb/core/tx/columnshard/data_accessor/request.h b/ydb/core/tx/columnshard/data_accessor/request.h
index 2d5f29ad204..6be2665e8be 100644
--- a/ydb/core/tx/columnshard/data_accessor/request.h
+++ b/ydb/core/tx/columnshard/data_accessor/request.h
@@ -232,6 +232,12 @@ public:
return result;
}
+ bool IsAborted() const {
+ AFL_VERIFY(HasSubscriber());
+ auto flag = Subscriber->GetAbortionFlag();
+ return flag && flag->Val();
+ }
+
const std::shared_ptr<const TAtomicCounter>& GetAbortionFlag() const {
AFL_VERIFY(HasSubscriber());
return Subscriber->GetAbortionFlag();
diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
index 46ee0ba1c33..94b4eca7e4d 100644
--- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
@@ -92,6 +92,9 @@ protected:
virtual TDuration DoGetUsedSnapshotLivetime(const TDuration defaultValue) const {
return defaultValue;
}
+ virtual ui64 DoGetLimitForPortionsMetadataAsk(const ui64 defaultValue) const {
+ return defaultValue;
+ }
virtual TDuration DoGetOverridenGCPeriod(const TDuration defaultValue) const {
return defaultValue;
}
@@ -189,6 +192,11 @@ public:
virtual void OnSelectShardingFilter() {
}
+ ui64 GetLimitForPortionsMetadataAsk() const {
+ const ui64 defaultValue = GetConfig().GetLimitForPortionsMetadataAsk();
+ return DoGetLimitForPortionsMetadataAsk(defaultValue);
+ }
+
TDuration GetCompactionActualizationLag() const {
const TDuration defaultValue = TDuration::MilliSeconds(GetConfig().GetCompactionActualizationLagMs());
return DoGetCompactionActualizationLag(defaultValue);
diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h
index 9076e8183cd..0b1d8870cab 100644
--- a/ydb/core/tx/columnshard/hooks/testing/controller.h
+++ b/ydb/core/tx/columnshard/hooks/testing/controller.h
@@ -24,6 +24,8 @@ private:
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideTasksActualizationLag);
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideMaxReadStaleness);
YDB_ACCESSOR(std::optional<ui64>, OverrideMemoryLimitForPortionReading, 100);
+ YDB_ACCESSOR(std::optional<ui64>, OverrideLimitForPortionsMetadataAsk, 1);
+
YDB_ACCESSOR_DEF(std::optional<NKikimrProto::EReplyStatus>, OverrideBlobPutResultOnWriteValue);
EOptimizerCompactionWeightControl CompactionControl = EOptimizerCompactionWeightControl::Force;
@@ -135,6 +137,11 @@ private:
protected:
virtual ::NKikimr::NColumnShard::TBlobPutResult::TPtr OverrideBlobPutResultOnCompaction(const ::NKikimr::NColumnShard::TBlobPutResult::TPtr original, const NOlap::TWriteActionsCollection& actions) const override;
+ virtual ui64 DoGetLimitForPortionsMetadataAsk(const ui64 defaultValue) const override {
+ return OverrideLimitForPortionsMetadataAsk.value_or(defaultValue);
+ }
+
+
virtual ui64 DoGetMemoryLimitScanPortion(const ui64 defaultValue) const override {
return OverrideMemoryLimitForPortionReading.value_or(defaultValue);
}