aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-10-01 18:48:04 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-10-01 19:08:11 +0300
commit33c98f7f7bf065218f7bc5ff613552a8404447ce (patch)
treea1a033fb9098698f4cf0ddd7328b54e0fdf7c2bd
parent3b19804b1a05894b0d9253f6d6f7d8938a9ede38 (diff)
downloadydb-33c98f7f7bf065218f7bc5ff613552a8404447ce.tar.gz
KIKIMR-19215: additional counters for external blob storages
-rw-r--r--.mapping.json5
-rw-r--r--ydb/core/tx/columnshard/blobs_action/CMakeLists.darwin-x86_64.txt4
-rw-r--r--ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-aarch64.txt4
-rw-r--r--ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-x86_64.txt4
-rw-r--r--ydb/core/tx/columnshard/blobs_action/CMakeLists.windows-x86_64.txt4
-rw-r--r--ydb/core/tx/columnshard/blobs_action/abstract/action.cpp6
-rw-r--r--ydb/core/tx/columnshard/blobs_action/abstract/action.h22
-rw-r--r--ydb/core/tx/columnshard/blobs_action/abstract/read.cpp73
-rw-r--r--ydb/core/tx/columnshard/blobs_action/abstract/read.h69
-rw-r--r--ydb/core/tx/columnshard/blobs_action/abstract/storage.h22
-rw-r--r--ydb/core/tx/columnshard/blobs_action/abstract/write.cpp25
-rw-r--r--ydb/core/tx/columnshard/blobs_action/abstract/write.h16
-rw-r--r--ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.darwin-x86_64.txt22
-rw-r--r--ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-aarch64.txt23
-rw-r--r--ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-x86_64.txt23
-rw-r--r--ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.windows-x86_64.txt22
-rw-r--r--ydb/core/tx/columnshard/blobs_action/counters/read.cpp23
-rw-r--r--ydb/core/tx/columnshard/blobs_action/counters/read.h47
-rw-r--r--ydb/core/tx/columnshard/blobs_action/counters/storage.cpp27
-rw-r--r--ydb/core/tx/columnshard/blobs_action/counters/storage.h32
-rw-r--r--ydb/core/tx/columnshard/blobs_action/counters/write.cpp21
-rw-r--r--ydb/core/tx/columnshard/blobs_action/counters/write.h43
-rw-r--r--ydb/core/tx/columnshard/blobs_action/counters/ya.make15
-rw-r--r--ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp2
-rw-r--r--ydb/core/tx/columnshard/blobs_action/ya.make3
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/actor.h2
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/task.cpp42
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/task.h49
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp3
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp18
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h5
-rw-r--r--ydb/core/tx/columnshard/counters/common_data.cpp3
-rw-r--r--ydb/core/tx/columnshard/counters/common_data.h14
-rw-r--r--ydb/core/tx/columnshard/engines/changes/abstract/abstract.h6
-rw-r--r--ydb/core/tx/columnshard/engines/changes/cleanup.h12
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction.h4
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.h2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.h8
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ttl.h12
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.h4
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp20
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h26
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp12
-rw-r--r--ydb/core/tx/columnshard/operations/write.cpp2
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/actor.cpp7
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/task.cpp3
52 files changed, 674 insertions, 170 deletions
diff --git a/.mapping.json b/.mapping.json
index 23aacb9b91..80966f5860 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -5269,6 +5269,11 @@
"ydb/core/tx/columnshard/blobs_action/bs/CMakeLists.linux-x86_64.txt":"",
"ydb/core/tx/columnshard/blobs_action/bs/CMakeLists.txt":"",
"ydb/core/tx/columnshard/blobs_action/bs/CMakeLists.windows-x86_64.txt":"",
+ "ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-aarch64.txt":"",
+ "ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-x86_64.txt":"",
+ "ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.txt":"",
+ "ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.windows-x86_64.txt":"",
"ydb/core/tx/columnshard/blobs_action/tier/CMakeLists.darwin-x86_64.txt":"",
"ydb/core/tx/columnshard/blobs_action/tier/CMakeLists.linux-aarch64.txt":"",
"ydb/core/tx/columnshard/blobs_action/tier/CMakeLists.linux-x86_64.txt":"",
diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.darwin-x86_64.txt
index de7066a842..11d7dd42c1 100644
--- a/ydb/core/tx/columnshard/blobs_action/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.darwin-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(abstract)
add_subdirectory(bs)
+add_subdirectory(counters)
add_subdirectory(tier)
add_subdirectory(transaction)
@@ -19,8 +20,9 @@ target_link_libraries(tx-columnshard-blobs_action PUBLIC
libs-apache-arrow
ydb-core-tablet_flat
core-tx-tiering
- columnshard-blobs_action-bs
columnshard-blobs_action-abstract
+ columnshard-blobs_action-bs
+ columnshard-blobs_action-counters
columnshard-blobs_action-transaction
columnshard-blobs_action-tier
)
diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-aarch64.txt
index 8e9f9f0fe8..3597b1535a 100644
--- a/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-aarch64.txt
@@ -8,6 +8,7 @@
add_subdirectory(abstract)
add_subdirectory(bs)
+add_subdirectory(counters)
add_subdirectory(tier)
add_subdirectory(transaction)
@@ -20,8 +21,9 @@ target_link_libraries(tx-columnshard-blobs_action PUBLIC
libs-apache-arrow
ydb-core-tablet_flat
core-tx-tiering
- columnshard-blobs_action-bs
columnshard-blobs_action-abstract
+ columnshard-blobs_action-bs
+ columnshard-blobs_action-counters
columnshard-blobs_action-transaction
columnshard-blobs_action-tier
)
diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-x86_64.txt
index 8e9f9f0fe8..3597b1535a 100644
--- a/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(abstract)
add_subdirectory(bs)
+add_subdirectory(counters)
add_subdirectory(tier)
add_subdirectory(transaction)
@@ -20,8 +21,9 @@ target_link_libraries(tx-columnshard-blobs_action PUBLIC
libs-apache-arrow
ydb-core-tablet_flat
core-tx-tiering
- columnshard-blobs_action-bs
columnshard-blobs_action-abstract
+ columnshard-blobs_action-bs
+ columnshard-blobs_action-counters
columnshard-blobs_action-transaction
columnshard-blobs_action-tier
)
diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.windows-x86_64.txt
index f1469f8ee4..7c6f876790 100644
--- a/ydb/core/tx/columnshard/blobs_action/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.windows-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(abstract)
add_subdirectory(bs)
+add_subdirectory(counters)
add_subdirectory(transaction)
add_library(tx-columnshard-blobs_action)
@@ -21,8 +22,9 @@ target_link_libraries(tx-columnshard-blobs_action PUBLIC
libs-apache-arrow
ydb-core-tablet_flat
core-tx-tiering
- columnshard-blobs_action-bs
columnshard-blobs_action-abstract
+ columnshard-blobs_action-bs
+ columnshard-blobs_action-counters
columnshard-blobs_action-transaction
)
target_sources(tx-columnshard-blobs_action PRIVATE
diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/action.cpp b/ydb/core/tx/columnshard/blobs_action/abstract/action.cpp
index 5e860f338f..fdd3fd5adb 100644
--- a/ydb/core/tx/columnshard/blobs_action/abstract/action.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/abstract/action.cpp
@@ -4,15 +4,15 @@
namespace NKikimr::NOlap {
std::shared_ptr<NKikimr::NOlap::IBlobsWritingAction> TBlobsAction::GetWriting(const TPortionInfo& portionInfo) {
- return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetWriting();
+ return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetWriting(ConsumerId);
}
std::shared_ptr<NKikimr::NOlap::IBlobsReadingAction> TBlobsAction::GetReading(const TPortionInfo& portionInfo) {
- return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetReading();
+ return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetReading(ConsumerId);
}
std::shared_ptr<NKikimr::NOlap::IBlobsDeclareRemovingAction> TBlobsAction::GetRemoving(const TPortionInfo& portionInfo) {
- return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetRemoving();
+ return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetRemoving(ConsumerId);
}
}
diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/action.h b/ydb/core/tx/columnshard/blobs_action/abstract/action.h
index 95c48dedb8..48b0e56872 100644
--- a/ydb/core/tx/columnshard/blobs_action/abstract/action.h
+++ b/ydb/core/tx/columnshard/blobs_action/abstract/action.h
@@ -22,24 +22,24 @@ public:
}
- const std::shared_ptr<IBlobsDeclareRemovingAction>& GetRemoving() {
+ const std::shared_ptr<IBlobsDeclareRemovingAction>& GetRemoving(const TString& consumerId) {
if (!Removing) {
- Removing = Storage->StartDeclareRemovingAction();
+ Removing = Storage->StartDeclareRemovingAction(consumerId);
}
return Removing;
}
- const std::shared_ptr<IBlobsWritingAction>& GetWriting() {
+ const std::shared_ptr<IBlobsWritingAction>& GetWriting(const TString& consumerId) {
if (!Writing) {
- Writing = Storage->StartWritingAction();
+ Writing = Storage->StartWritingAction(consumerId);
}
return Writing;
}
const std::shared_ptr<IBlobsWritingAction>& GetWritingOptional() const {
return Writing;
}
- const std::shared_ptr<IBlobsReadingAction>& GetReading() {
+ const std::shared_ptr<IBlobsReadingAction>& GetReading(const TString& consumerId) {
if (!Reading) {
- Reading = Storage->StartReadingAction();
+ Reading = Storage->StartReadingAction(consumerId);
}
return Reading;
}
@@ -78,6 +78,7 @@ class TBlobsAction {
private:
std::shared_ptr<IStoragesManager> Storages;
THashMap<TString, TStorageAction> StorageActions;
+ const TString ConsumerId;
TStorageAction& GetStorageAction(const TString& storageId) {
auto it = StorageActions.find(storageId);
@@ -87,8 +88,9 @@ private:
return it->second;
}
public:
- TBlobsAction(std::shared_ptr<IStoragesManager> storages)
+ explicit TBlobsAction(std::shared_ptr<IStoragesManager> storages, const TString& consumerId)
: Storages(storages)
+ , ConsumerId(consumerId)
{
}
@@ -155,19 +157,19 @@ public:
}
std::shared_ptr<IBlobsDeclareRemovingAction> GetRemoving(const TString& storageId) {
- return GetStorageAction(storageId).GetRemoving();
+ return GetStorageAction(storageId).GetRemoving(ConsumerId);
}
std::shared_ptr<IBlobsDeclareRemovingAction> GetRemoving(const TPortionInfo& portionInfo);
std::shared_ptr<IBlobsWritingAction> GetWriting(const TString& storageId) {
- return GetStorageAction(storageId).GetWriting();
+ return GetStorageAction(storageId).GetWriting(ConsumerId);
}
std::shared_ptr<IBlobsWritingAction> GetWriting(const TPortionInfo& portionInfo);
std::shared_ptr<IBlobsReadingAction> GetReading(const TString& storageId) {
- return GetStorageAction(storageId).GetReading();
+ return GetStorageAction(storageId).GetReading(ConsumerId);
}
std::shared_ptr<IBlobsReadingAction> GetReading(const TPortionInfo& portionInfo);
diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/read.cpp b/ydb/core/tx/columnshard/blobs_action/abstract/read.cpp
index 0f8dd0161c..e16ddf0b69 100644
--- a/ydb/core/tx/columnshard/blobs_action/abstract/read.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/abstract/read.cpp
@@ -5,10 +5,83 @@ namespace NKikimr::NOlap {
void IBlobsReadingAction::StartReading(THashMap<TUnifiedBlobId, THashSet<TBlobRange>>&& ranges) {
AFL_VERIFY(ranges.size());
+ AFL_VERIFY(Counters);
for (auto&& i : ranges) {
AFL_VERIFY(i.second.size());
+ for (auto&& br : i.second) {
+ Counters->OnRequest(br.Size);
+ }
}
return DoStartReading(ranges);
}
+void IBlobsReadingAction::ExtractBlobsDataTo(THashMap<TBlobRange, TString>& result) {
+ AFL_VERIFY(Started);
+ if (result.empty()) {
+ std::swap(result, Replies);
+ } else {
+ for (auto&& i : Replies) {
+ AFL_VERIFY(result.emplace(i.first, std::move(i.second)).second);
+ }
+ Replies.clear();
+ }
+ RangesForResult.clear();
+}
+
+void IBlobsReadingAction::Start(const THashSet<TBlobRange>& rangesInProgress) {
+ Y_VERIFY(!Started);
+ Y_VERIFY(RangesForRead.size() + RangesForResult.size());
+ for (auto&& i : RangesForRead) {
+ for (auto&& r : i.second) {
+ WaitingRanges.emplace(r, TMonotonic::Now());
+ }
+ }
+ THashMap<TUnifiedBlobId, THashSet<TBlobRange>> rangesFiltered;
+ if (rangesInProgress.empty()) {
+ rangesFiltered = RangesForRead;
+ } else {
+ for (auto&& i : RangesForRead) {
+ for (auto&& r : i.second) {
+ if (!rangesInProgress.contains(r)) {
+ rangesFiltered[r.BlobId].emplace(r);
+ }
+ }
+ }
+ }
+ if (rangesFiltered.size()) {
+ StartReading(std::move(rangesFiltered));
+ }
+ Started = true;
+ for (auto&& i : RangesForResult) {
+ AFL_VERIFY(Replies.emplace(i.first, i.second).second);
+ }
+}
+
+void IBlobsReadingAction::OnReadResult(const TBlobRange& range, const TString& data) {
+ AFL_VERIFY(Counters);
+ auto it = WaitingRanges.find(range);
+ Y_VERIFY(it != WaitingRanges.end());
+ Counters->OnReply(range.Size, TMonotonic::Now() - it->second);
+ WaitingRanges.erase(it);
+ Replies.emplace(range, data);
+}
+
+void IBlobsReadingAction::OnReadError(const TBlobRange& range, const TErrorStatus& replyStatus) {
+ AFL_VERIFY(Counters);
+ auto it = WaitingRanges.find(range);
+ Y_VERIFY(it != WaitingRanges.end());
+ Counters->OnFail(range.Size, TMonotonic::Now() - it->second);
+ WaitingRanges.erase(it);
+ Fails.emplace(range, replyStatus);
+}
+
+void IBlobsReadingAction::AddRange(const TBlobRange& range, const TString& result /*= Default<TString>()*/) {
+ Y_VERIFY(!Started);
+ if (!result) {
+ Y_VERIFY(RangesForRead[range.BlobId].emplace(range).second);
+ } else {
+ Y_VERIFY(RangesForResult.emplace(range, result).second);
+ }
+}
+
}
diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/read.h b/ydb/core/tx/columnshard/blobs_action/abstract/read.h
index 3cc93ce60b..116e5748b1 100644
--- a/ydb/core/tx/columnshard/blobs_action/abstract/read.h
+++ b/ydb/core/tx/columnshard/blobs_action/abstract/read.h
@@ -1,29 +1,43 @@
#pragma once
#include "common.h"
+#include <ydb/core/tx/columnshard/blobs_action/counters/read.h>
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/protos/base.pb.h>
+#include <ydb/library/conclusion/status.h>
#include <util/generic/hash_set.h>
namespace NKikimr::NOlap {
class IBlobsReadingAction: public ICommonBlobsAction {
+public:
+ using TErrorStatus = TConclusionSpecialStatus<NKikimrProto::EReplyStatus, NKikimrProto::EReplyStatus::OK, NKikimrProto::EReplyStatus::ERROR>;
private:
using TBase = ICommonBlobsAction;
+
THashMap<TUnifiedBlobId, THashSet<TBlobRange>> RangesForRead;
- THashSet<TBlobRange> WaitingRanges;
+ THashMap<TBlobRange, TString> RangesForResult;
+ THashMap<TBlobRange, TMonotonic> WaitingRanges;
THashMap<TBlobRange, TString> Replies;
- THashMap<TBlobRange, NKikimrProto::EReplyStatus> Fails;
+ THashMap<TBlobRange, TErrorStatus> Fails;
+ std::shared_ptr<NBlobOperations::TReadCounters> Counters;
bool Started = false;
protected:
virtual void DoStartReading(const THashMap<TUnifiedBlobId, THashSet<TBlobRange>>& range) = 0;
void StartReading(THashMap<TUnifiedBlobId, THashSet<TBlobRange>>&& ranges);
public:
+
+ void SetCounters(std::shared_ptr<NBlobOperations::TReadCounters> counters) {
+ Counters = counters;
+ }
+
IBlobsReadingAction(const TString& storageId)
: TBase(storageId)
{
}
+ void ExtractBlobsDataTo(THashMap<TBlobRange, TString>& result);
+
ui64 GetExpectedBlobsSize() const {
ui64 result = 0;
for (auto&& i : RangesForRead) {
@@ -31,6 +45,9 @@ public:
result += b.Size;
}
}
+ for (auto&& i : RangesForResult) {
+ result += i.first.Size;
+ }
return result;
}
@@ -39,7 +56,7 @@ public:
for (auto&& i : RangesForRead) {
result += i.second.size();
}
- return result;
+ return result + RangesForResult.size();
}
void FillExpectedRanges(THashSet<TBlobRange>& ranges) const {
@@ -48,52 +65,20 @@ public:
Y_VERIFY(ranges.emplace(b).second);
}
}
+ for (auto&& i : RangesForResult) {
+ Y_VERIFY(ranges.emplace(i.first).second);
+ }
}
const THashMap<TUnifiedBlobId, THashSet<TBlobRange>>& GetRangesForRead() const {
return RangesForRead;
}
- void AddRange(const TBlobRange& range) {
- Y_VERIFY(!Started);
- Y_VERIFY(RangesForRead[range.BlobId].emplace(range).second);
- }
+ void AddRange(const TBlobRange& range, const TString& result = Default<TString>());
- void Start(const THashSet<TBlobRange>& rangesInProgress) {
- Y_VERIFY(!Started);
- Y_VERIFY(RangesForRead.size());
- for (auto&& i : RangesForRead) {
- for (auto&& r : i.second) {
- WaitingRanges.emplace(r);
- }
- }
- THashMap<TUnifiedBlobId, THashSet<TBlobRange>> rangesFiltered;
- if (rangesInProgress.empty()) {
- rangesFiltered = RangesForRead;
- } else {
- for (auto&& i : RangesForRead) {
- for (auto&& r : i.second) {
- if (!rangesInProgress.contains(r)) {
- rangesFiltered[r.BlobId].emplace(r);
- }
- }
- }
- }
- if (rangesFiltered.size()) {
- StartReading(std::move(rangesFiltered));
- }
- Started = true;
- }
-
- void OnReadResult(const TBlobRange& range, const TString& data) {
- Y_VERIFY(WaitingRanges.erase(range));
- Replies.emplace(range, data);
- }
-
- void OnReadError(const TBlobRange& range, const NKikimrProto::EReplyStatus replyStatus) {
- Y_VERIFY(WaitingRanges.erase(range));
- Fails.emplace(range, replyStatus);
- }
+ void Start(const THashSet<TBlobRange>& rangesInProgress);
+ void OnReadResult(const TBlobRange& range, const TString& data);
+ void OnReadError(const TBlobRange& range, const TErrorStatus& replyStatus);
bool HasFails() const {
return Fails.size();
diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/storage.h b/ydb/core/tx/columnshard/blobs_action/abstract/storage.h
index 6e2ad258dc..4e5b3c4739 100644
--- a/ydb/core/tx/columnshard/blobs_action/abstract/storage.h
+++ b/ydb/core/tx/columnshard/blobs_action/abstract/storage.h
@@ -4,6 +4,7 @@
#include "read.h"
#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
+#include <ydb/core/tx/columnshard/blobs_action/counters/storage.h>
#include <ydb/library/accessor/accessor.h>
namespace NKikimr::NColumnShard {
@@ -34,6 +35,7 @@ private:
Y_VERIFY(GCActivity);
GCActivity = false;
}
+ std::shared_ptr<NBlobOperations::TStorageCounters> Counters;
protected:
virtual std::shared_ptr<IBlobsDeclareRemovingAction> DoStartDeclareRemovingAction() = 0;
virtual std::shared_ptr<IBlobsWritingAction> DoStartWritingAction() = 0;
@@ -47,8 +49,9 @@ protected:
}
public:
IBlobsStorageOperator(const TString& storageId)
- : StorageId(storageId) {
-
+ : StorageId(storageId)
+ {
+ Counters = std::make_shared<NBlobOperations::TStorageCounters>(storageId);
}
virtual std::shared_ptr<IBlobInUseTracker> GetBlobsTracker() const = 0;
@@ -65,14 +68,19 @@ public:
void OnTieringModified(const std::shared_ptr<NColumnShard::TTiersManager>& tiers) {
return DoOnTieringModified(tiers);
}
- std::shared_ptr<IBlobsDeclareRemovingAction> StartDeclareRemovingAction() {
+
+ std::shared_ptr<IBlobsDeclareRemovingAction> StartDeclareRemovingAction(const TString& /*consumerId*/) {
return DoStartDeclareRemovingAction();
}
- std::shared_ptr<IBlobsWritingAction> StartWritingAction() {
- return DoStartWritingAction();
+ std::shared_ptr<IBlobsWritingAction> StartWritingAction(const TString& consumerId) {
+ auto result = DoStartWritingAction();
+ result->SetCounters(Counters->GetConsumerCounter(consumerId)->GetWriteCounters());
+ return result;
}
- std::shared_ptr<IBlobsReadingAction> StartReadingAction() {
- return DoStartReadingAction();
+ std::shared_ptr<IBlobsReadingAction> StartReadingAction(const TString& consumerId) {
+ auto result = DoStartReadingAction();
+ result->SetCounters(Counters->GetConsumerCounter(consumerId)->GetReadCounters());
+ return result;
}
bool StartGC() {
if (!GCActivity) {
diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/write.cpp b/ydb/core/tx/columnshard/blobs_action/abstract/write.cpp
index 689e494c24..234285cfce 100644
--- a/ydb/core/tx/columnshard/blobs_action/abstract/write.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/abstract/write.cpp
@@ -7,22 +7,41 @@ TUnifiedBlobId IBlobsWritingAction::AddDataForWrite(const TString& data) {
Y_VERIFY(!WritingStarted);
auto blobId = AllocateNextBlobId(data);
AFL_VERIFY(BlobsForWrite.emplace(blobId, data).second);
+ BlobsWaiting.emplace(blobId);
+ BlobsWriteCount += 1;
SumSize += data.size();
return blobId;
}
void IBlobsWritingAction::OnBlobWriteResult(const TUnifiedBlobId& blobId, const NKikimrProto::EReplyStatus status) {
- Y_VERIFY(BlobsForWrite.erase(blobId));
+ AFL_VERIFY(Counters);
+ auto it = WritingStart.find(blobId);
+ AFL_VERIFY(it != WritingStart.end());
+ if (status == NKikimrProto::EReplyStatus::OK) {
+ Counters->OnReply(blobId.BlobSize(), TMonotonic::Now() - it->second);
+ } else {
+ Counters->OnFail(blobId.BlobSize(), TMonotonic::Now() - it->second);
+ }
+ WritingStart.erase(it);
+ Y_VERIFY(BlobsWaiting.erase(blobId));
return DoOnBlobWriteResult(blobId, status);
}
bool IBlobsWritingAction::IsReady() const {
Y_VERIFY(WritingStarted);
- return BlobsForWrite.empty();
+ return BlobsWaiting.empty();
}
IBlobsWritingAction::~IBlobsWritingAction() {
- AFL_VERIFY(!NActors::TlsActivationContext || BlobsForWrite.empty() || Aborted);
+ AFL_VERIFY(!NActors::TlsActivationContext || BlobsWaiting.empty() || Aborted);
+}
+
+void IBlobsWritingAction::SendWriteBlobRequest(const TString& data, const TUnifiedBlobId& blobId) {
+ AFL_VERIFY(Counters);
+ Counters->OnRequest(data.size());
+ WritingStarted = true;
+ AFL_VERIFY(WritingStart.emplace(blobId, TMonotonic::Now()).second);
+ return DoSendWriteBlobRequest(data, blobId);
}
}
diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/write.h b/ydb/core/tx/columnshard/blobs_action/abstract/write.h
index a90479fa97..4fa4cc8f6e 100644
--- a/ydb/core/tx/columnshard/blobs_action/abstract/write.h
+++ b/ydb/core/tx/columnshard/blobs_action/abstract/write.h
@@ -3,6 +3,7 @@
#include <util/generic/hash.h>
#include <ydb/core/protos/base.pb.h>
#include <ydb/core/tx/columnshard/blob.h>
+#include <ydb/core/tx/columnshard/blobs_action/counters/write.h>
namespace NKikimr::NColumnShard {
class TColumnShard;
@@ -15,9 +16,13 @@ class IBlobsWritingAction: public ICommonBlobsAction {
private:
using TBase = ICommonBlobsAction;
bool WritingStarted = false;
+ THashMap<TUnifiedBlobId, TMonotonic> WritingStart;
ui64 SumSize = 0;
+ ui32 BlobsWriteCount = 0;
THashMap<TUnifiedBlobId, TString> BlobsForWrite;
+ THashSet<TUnifiedBlobId> BlobsWaiting;
bool Aborted = false;
+ std::shared_ptr<NBlobOperations::TWriteCounters> Counters;
protected:
virtual void DoOnExecuteTxBeforeWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs) = 0;
virtual void DoOnCompleteTxBeforeWrite(NColumnShard::TColumnShard& self) = 0;
@@ -38,6 +43,10 @@ public:
virtual ~IBlobsWritingAction();
bool IsReady() const;
+ void SetCounters(std::shared_ptr<NBlobOperations::TWriteCounters> counters) {
+ Counters = counters;
+ }
+
const THashMap<TUnifiedBlobId, TString>& GetBlobsForWrite() const {
return BlobsForWrite;
}
@@ -54,7 +63,7 @@ public:
}
ui32 GetBlobsCount() const {
- return BlobsForWrite.size();
+ return BlobsWriteCount;
}
ui32 GetTotalSize() const {
return SumSize;
@@ -74,10 +83,7 @@ public:
return DoOnCompleteTxAfterWrite(self);
}
- void SendWriteBlobRequest(const TString& data, const TUnifiedBlobId& blobId) {
- WritingStarted = true;
- return DoSendWriteBlobRequest(data, blobId);
- }
+ void SendWriteBlobRequest(const TString& data, const TUnifiedBlobId& blobId);
};
}
diff --git a/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..3f2c60a24c
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-blobs_action-counters)
+target_link_libraries(columnshard-blobs_action-counters PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ cpp-actors-core
+ ydb-core-tablet_flat
+)
+target_sources(columnshard-blobs_action-counters PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/read.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/storage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/write.cpp
+)
diff --git a/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..536e94a621
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-blobs_action-counters)
+target_link_libraries(columnshard-blobs_action-counters PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ cpp-actors-core
+ ydb-core-tablet_flat
+)
+target_sources(columnshard-blobs_action-counters PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/read.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/storage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/write.cpp
+)
diff --git a/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..536e94a621
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-blobs_action-counters)
+target_link_libraries(columnshard-blobs_action-counters PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ cpp-actors-core
+ ydb-core-tablet_flat
+)
+target_sources(columnshard-blobs_action-counters PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/read.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/storage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/write.cpp
+)
diff --git a/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.txt b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..3f2c60a24c
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-blobs_action-counters)
+target_link_libraries(columnshard-blobs_action-counters PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ cpp-actors-core
+ ydb-core-tablet_flat
+)
+target_sources(columnshard-blobs_action-counters PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/read.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/storage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/write.cpp
+)
diff --git a/ydb/core/tx/columnshard/blobs_action/counters/read.cpp b/ydb/core/tx/columnshard/blobs_action/counters/read.cpp
new file mode 100644
index 0000000000..5107b85406
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/counters/read.cpp
@@ -0,0 +1,23 @@
+#include "read.h"
+#include "storage.h"
+
+namespace NKikimr::NOlap::NBlobOperations {
+
+TReadCounters::TReadCounters(const TConsumerCounters& owner)
+ : TBase(owner, "Reader")
+{
+ RequestsCount = TBase::GetDeriviative("Requests/Count");
+ RequestBytes = TBase::GetDeriviative("Requests/Bytes");
+
+ RepliesCount = TBase::GetDeriviative("Replies/Count");
+ ReplyBytes = TBase::GetDeriviative("Replies/Bytes");
+ ReplyDurationBySize = TBase::GetHistogram("Replies/Duration/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1));
+ ReplyDurationByCount = TBase::GetHistogram("Replies/Duration/Count", NMonitoring::ExponentialHistogram(15, 2, 1));
+
+ FailsCount = TBase::GetDeriviative("Fails/Count");
+ FailBytes = TBase::GetDeriviative("Fails/Bytes");
+ FailDurationBySize = TBase::GetHistogram("Fails/Duration/Bytes", NMonitoring::ExponentialHistogram(15, 2, 2));
+ FailDurationByCount = TBase::GetHistogram("Fails/Duration/Count", NMonitoring::ExponentialHistogram(15, 2, 2));
+}
+
+}
diff --git a/ydb/core/tx/columnshard/blobs_action/counters/read.h b/ydb/core/tx/columnshard/blobs_action/counters/read.h
new file mode 100644
index 0000000000..d0e8f736f5
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/counters/read.h
@@ -0,0 +1,47 @@
+#pragma once
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <ydb/core/tx/columnshard/counters/common/owner.h>
+
+namespace NKikimr::NOlap::NBlobOperations {
+
+class TConsumerCounters;
+
+class TReadCounters: public NColumnShard::TCommonCountersOwner {
+private:
+ using TBase = NColumnShard::TCommonCountersOwner;
+ NMonitoring::TDynamicCounters::TCounterPtr RequestsCount;
+ NMonitoring::TDynamicCounters::TCounterPtr RequestBytes;
+
+ NMonitoring::TDynamicCounters::TCounterPtr RepliesCount;
+ NMonitoring::TDynamicCounters::TCounterPtr ReplyBytes;
+ NMonitoring::THistogramPtr ReplyDurationByCount;
+ NMonitoring::THistogramPtr ReplyDurationBySize;
+
+ NMonitoring::TDynamicCounters::TCounterPtr FailsCount;
+ NMonitoring::TDynamicCounters::TCounterPtr FailBytes;
+ NMonitoring::THistogramPtr FailDurationByCount;
+ NMonitoring::THistogramPtr FailDurationBySize;
+public:
+ TReadCounters(const TConsumerCounters& owner);
+
+ void OnRequest(const ui64 bytes) const {
+ RequestsCount->Add(1);
+ RequestBytes->Add(bytes);
+ }
+
+ void OnReply(const ui64 bytes, const TDuration d) const {
+ RepliesCount->Add(1);
+ ReplyBytes->Add(bytes);
+ ReplyDurationByCount->Collect((i64)d.MilliSeconds());
+ ReplyDurationBySize->Collect((i64)d.MilliSeconds(), (i64)bytes);
+ }
+
+ void OnFail(const ui64 bytes, const TDuration d) const {
+ FailsCount->Add(1);
+ FailBytes->Add(bytes);
+ FailDurationByCount->Collect((i64)d.MilliSeconds());
+ FailDurationBySize->Collect((i64)d.MilliSeconds(), (i64)bytes);
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/blobs_action/counters/storage.cpp b/ydb/core/tx/columnshard/blobs_action/counters/storage.cpp
new file mode 100644
index 0000000000..4f8b03a9ad
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/counters/storage.cpp
@@ -0,0 +1,27 @@
+#include "storage.h"
+
+namespace NKikimr::NOlap::NBlobOperations {
+
+TStorageCounters::TStorageCounters(const TString& storageId)
+ : TBase("BlobStorages")
+{
+ DeepSubGroup("StorageId", storageId);
+}
+
+std::shared_ptr<NKikimr::NOlap::NBlobOperations::TConsumerCounters> TStorageCounters::GetConsumerCounter(const TString& consumerId) {
+ auto it = ConsumerCounters.find(consumerId);
+ if (it == ConsumerCounters.end()) {
+ it = ConsumerCounters.emplace(consumerId, std::make_shared<TConsumerCounters>(consumerId, *this)).first;
+ }
+ return it->second;
+}
+
+TConsumerCounters::TConsumerCounters(const TString& consumerId, const TStorageCounters& parent)
+ : TBase(parent)
+{
+ DeepSubGroup("Consumer", consumerId);
+ ReadCounters = std::make_shared<TReadCounters>(*this);
+ WriteCounters = std::make_shared<TWriteCounters>(*this);
+}
+
+}
diff --git a/ydb/core/tx/columnshard/blobs_action/counters/storage.h b/ydb/core/tx/columnshard/blobs_action/counters/storage.h
new file mode 100644
index 0000000000..8b42b79dfb
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/counters/storage.h
@@ -0,0 +1,32 @@
+#pragma once
+#include "read.h"
+#include "write.h"
+#include <ydb/core/tx/columnshard/counters/common/owner.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <util/generic/hash.h>
+
+namespace NKikimr::NOlap::NBlobOperations {
+
+class TStorageCounters;
+
+class TConsumerCounters: public NColumnShard::TCommonCountersOwner {
+private:
+ using TBase = NColumnShard::TCommonCountersOwner;
+ YDB_READONLY_DEF(std::shared_ptr<TReadCounters>, ReadCounters);
+ YDB_READONLY_DEF(std::shared_ptr<TWriteCounters>, WriteCounters);
+public:
+ TConsumerCounters(const TString& consumerId, const TStorageCounters& parent);
+};
+
+class TStorageCounters: public NColumnShard::TCommonCountersOwner {
+private:
+ using TBase = NColumnShard::TCommonCountersOwner;
+ THashMap<TString, std::shared_ptr<TConsumerCounters>> ConsumerCounters;
+public:
+ TStorageCounters(const TString& storageId);
+
+ std::shared_ptr<TConsumerCounters> GetConsumerCounter(const TString& consumerId);
+
+};
+
+}
diff --git a/ydb/core/tx/columnshard/blobs_action/counters/write.cpp b/ydb/core/tx/columnshard/blobs_action/counters/write.cpp
new file mode 100644
index 0000000000..301ca21dea
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/counters/write.cpp
@@ -0,0 +1,21 @@
+#include "write.h"
+#include "storage.h"
+
+namespace NKikimr::NOlap::NBlobOperations {
+
+TWriteCounters::TWriteCounters(const TConsumerCounters& owner)
+ : TBase(owner, "Writer")
+{
+ RequestsCount = TBase::GetDeriviative("Requests/Count");
+ RequestBytes = TBase::GetDeriviative("Requests/Bytes");
+
+ RepliesCount = TBase::GetDeriviative("Replies/Count");
+ ReplyBytes = TBase::GetDeriviative("Replies/Bytes");
+ ReplyDuration = TBase::GetHistogram("Replies/Duration", NMonitoring::ExponentialHistogram(15, 2, 1000));
+
+ FailsCount = TBase::GetDeriviative("Fails/Count");
+ FailBytes = TBase::GetDeriviative("Fails/Bytes");
+ FailDuration = TBase::GetHistogram("Fails/Duration", NMonitoring::ExponentialHistogram(15, 2, 1000));
+}
+
+}
diff --git a/ydb/core/tx/columnshard/blobs_action/counters/write.h b/ydb/core/tx/columnshard/blobs_action/counters/write.h
new file mode 100644
index 0000000000..60465c7e32
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/counters/write.h
@@ -0,0 +1,43 @@
+#pragma once
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <ydb/core/tx/columnshard/counters/common/owner.h>
+
+namespace NKikimr::NOlap::NBlobOperations {
+
+class TConsumerCounters;
+
+class TWriteCounters: public NColumnShard::TCommonCountersOwner {
+private:
+ using TBase = NColumnShard::TCommonCountersOwner;
+ NMonitoring::TDynamicCounters::TCounterPtr RequestsCount;
+ NMonitoring::TDynamicCounters::TCounterPtr RequestBytes;
+
+ NMonitoring::TDynamicCounters::TCounterPtr RepliesCount;
+ NMonitoring::TDynamicCounters::TCounterPtr ReplyBytes;
+ NMonitoring::THistogramPtr ReplyDuration;
+
+ NMonitoring::TDynamicCounters::TCounterPtr FailsCount;
+ NMonitoring::TDynamicCounters::TCounterPtr FailBytes;
+ NMonitoring::THistogramPtr FailDuration;
+public:
+ TWriteCounters(const TConsumerCounters& owner);
+
+ void OnRequest(const ui64 bytes) const {
+ RequestsCount->Add(1);
+ RequestBytes->Add(bytes);
+ }
+
+ void OnReply(const ui64 bytes, const TDuration d) const {
+ RepliesCount->Add(1);
+ ReplyBytes->Add(bytes);
+ ReplyDuration->Collect(d.MicroSeconds());
+ }
+
+ void OnFail(const ui64 bytes, const TDuration d) const {
+ FailsCount->Add(1);
+ FailBytes->Add(bytes);
+ FailDuration->Collect(d.MicroSeconds());
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/blobs_action/counters/ya.make b/ydb/core/tx/columnshard/blobs_action/counters/ya.make
new file mode 100644
index 0000000000..7e50b3c7f6
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/counters/ya.make
@@ -0,0 +1,15 @@
+LIBRARY()
+
+SRCS(
+ read.cpp
+ storage.cpp
+ write.cpp
+)
+
+PEERDIR(
+ ydb/core/protos
+ library/cpp/actors/core
+ ydb/core/tablet_flat
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp
index 27bb2cb777..9731ff110e 100644
--- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp
@@ -12,7 +12,7 @@ bool TTxInsertTableCleanup::Execute(TTransactionContext& txc, const TActorContex
TBlobManagerDb blobManagerDb(txc.DB);
auto allAborted = Self->InsertTable->GetAborted();
auto storage = Self->StoragesManager->GetInsertOperator();
- BlobsAction = storage->StartDeclareRemovingAction();
+ BlobsAction = storage->StartDeclareRemovingAction("TX_CLEANUP");
for (auto& [abortedWriteId, abortedData] : allAborted) {
Self->InsertTable->EraseAborted(dbTable, abortedData);
Y_VERIFY(abortedData.GetBlobRange().IsFullBlob());
diff --git a/ydb/core/tx/columnshard/blobs_action/ya.make b/ydb/core/tx/columnshard/blobs_action/ya.make
index 9775d5cb7e..320cb63e4e 100644
--- a/ydb/core/tx/columnshard/blobs_action/ya.make
+++ b/ydb/core/tx/columnshard/blobs_action/ya.make
@@ -10,8 +10,9 @@ PEERDIR(
contrib/libs/apache/arrow
ydb/core/tablet_flat
ydb/core/tx/tiering
- ydb/core/tx/columnshard/blobs_action/bs
ydb/core/tx/columnshard/blobs_action/abstract
+ ydb/core/tx/columnshard/blobs_action/bs
+ ydb/core/tx/columnshard/blobs_action/counters
ydb/core/tx/columnshard/blobs_action/transaction
)
diff --git a/ydb/core/tx/columnshard/blobs_reader/actor.cpp b/ydb/core/tx/columnshard/blobs_reader/actor.cpp
index 2b387baeb9..64de8e1b7c 100644
--- a/ydb/core/tx/columnshard/blobs_reader/actor.cpp
+++ b/ydb/core/tx/columnshard/blobs_reader/actor.cpp
@@ -37,7 +37,7 @@ void TActor::Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev)
AFL_VERIFY(it != BlobTasks.end())("blob_id", event.BlobRange);
for (auto&& i : it->second) {
if (event.Status != NKikimrProto::EReplyStatus::OK) {
- i->AddError(event.BlobRange, ITask::TErrorStatus::Fail(event.Status, "cannot get blob"));
+ i->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob"));
} else {
i->AddData(event.BlobRange, event.Data);
}
diff --git a/ydb/core/tx/columnshard/blobs_reader/actor.h b/ydb/core/tx/columnshard/blobs_reader/actor.h
index aad5d74b18..7670b4d0a2 100644
--- a/ydb/core/tx/columnshard/blobs_reader/actor.h
+++ b/ydb/core/tx/columnshard/blobs_reader/actor.h
@@ -10,7 +10,7 @@
namespace NKikimr::NOlap::NBlobOperations::NRead {
-class TActor : public TActorBootstrapped<TActor> {
+class TActor: public TActorBootstrapped<TActor> {
private:
ui64 TabletId;
NActors::TActorId Parent;
diff --git a/ydb/core/tx/columnshard/blobs_reader/task.cpp b/ydb/core/tx/columnshard/blobs_reader/task.cpp
index 4b79bac1dd..b67f64682d 100644
--- a/ydb/core/tx/columnshard/blobs_reader/task.cpp
+++ b/ydb/core/tx/columnshard/blobs_reader/task.cpp
@@ -1,4 +1,5 @@
#include "task.h"
+#include "events.h"
#include <library/cpp/actors/core/log.h>
namespace NKikimr::NOlap::NBlobOperations::NRead {
@@ -8,7 +9,8 @@ const std::vector<std::shared_ptr<IBlobsReadingAction>>& ITask::GetAgents() cons
return Agents;
}
-bool ITask::AddError(const TBlobRange& range, const TErrorStatus& status) {
+bool ITask::AddError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) {
+ ++BlobErrorsCount;
if (TaskFinishedWithError || AbortFlag) {
ACFL_WARN("event", "SkipError")("blob_range", range)("message", status.GetErrorMessage())("status", status.GetStatus())("external_task_id", ExternalTaskId)
("abort", AbortFlag)("finished_with_error", TaskFinishedWithError);
@@ -19,11 +21,9 @@ bool ITask::AddError(const TBlobRange& range, const TErrorStatus& status) {
{
auto it = BlobsWaiting.find(range);
AFL_VERIFY(it != BlobsWaiting.end());
- it->second->OnReadError(range, status.GetStatus());
+ it->second->OnReadError(range, status);
BlobsWaiting.erase(it);
}
-
- Y_VERIFY(BlobErrors.emplace(range, status).second);
if (!OnError(range)) {
TaskFinishedWithError = true;
return false;
@@ -35,6 +35,7 @@ bool ITask::AddError(const TBlobRange& range, const TErrorStatus& status) {
}
void ITask::AddData(const TBlobRange& range, const TString& data) {
+ ++BlobsDataCount;
if (TaskFinishedWithError || AbortFlag) {
ACFL_WARN("event", "SkipDataAfterError")("external_task_id", ExternalTaskId)("abort", AbortFlag)("finished_with_error", TaskFinishedWithError);
return;
@@ -48,7 +49,6 @@ void ITask::AddData(const TBlobRange& range, const TString& data) {
it->second->OnReadResult(range, data);
BlobsWaiting.erase(it);
}
- Y_VERIFY(BlobsData.emplace(range, data).second);
if (BlobsWaiting.empty()) {
OnDataReady();
}
@@ -61,11 +61,11 @@ void ITask::StartBlobsFetching(const THashSet<TBlobRange>& rangesInProgress) {
ui64 size = 0;
ui64 count = 0;
for (auto&& agent : Agents) {
+ size += agent->GetExpectedBlobsSize();
+ count += agent->GetExpectedBlobsCount();
for (auto&& b : agent->GetRangesForRead()) {
for (auto&& r : b.second) {
BlobsWaiting.emplace(r, agent);
- size += r.Size;
- ++count;
}
}
agent->Start(rangesInProgress);
@@ -79,12 +79,19 @@ void ITask::StartBlobsFetching(const THashSet<TBlobRange>& rangesInProgress) {
namespace {
TAtomicCounter TaskIdentifierBuilder = 0;
+
+}
+
+void ITask::TReadSubscriber::DoOnAllocationSuccess(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& guard) {
+ Task->ResourcesGuard = guard;
+ TActorContext::AsActorContext().Send(ReadActorId, std::make_unique<TEvStartReadTask>(Task));
}
-ITask::ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, const TString& externalTaskId)
+ITask::ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, const TString& taskCustomer, const TString& externalTaskId)
: Agents(actions)
, TaskIdentifier(TaskIdentifierBuilder.Inc())
, ExternalTaskId(externalTaskId)
+ , TaskCustomer(taskCustomer)
{
AFL_VERIFY(Agents.size());
for (auto&& i : Agents) {
@@ -95,8 +102,8 @@ ITask::ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, c
TString ITask::DebugString() const {
TStringBuilder sb;
sb << "finished_with_error=" << TaskFinishedWithError << ";"
- << "errors=" << BlobErrors.size() << ";"
- << "data=" << BlobsData.size() << ";"
+ << "errors=" << BlobErrorsCount << ";"
+ << "data=" << BlobsDataCount << ";"
<< "waiting=" << BlobsWaiting.size() << ";"
<< "additional_info=(" << DoDebugString() << ");"
;
@@ -107,7 +114,7 @@ void ITask::OnDataReady() {
ACFL_DEBUG("event", "OnDataReady")("task", DebugString())("external_task_id", ExternalTaskId);
Y_VERIFY(!DataIsReadyFlag);
DataIsReadyFlag = true;
- DoOnDataReady();
+ DoOnDataReady(ResourcesGuard);
}
bool ITask::OnError(const TBlobRange& range) {
@@ -116,7 +123,18 @@ bool ITask::OnError(const TBlobRange& range) {
}
ITask::~ITask() {
- AFL_VERIFY(!NActors::TlsActivationContext || DataIsReadyFlag || TaskFinishedWithError || AbortFlag);
+ AFL_VERIFY(!NActors::TlsActivationContext || DataIsReadyFlag || TaskFinishedWithError || AbortFlag || !BlobsFetchingStarted);
+}
+
+THashMap<NKikimr::NOlap::TBlobRange, TString> ITask::ExtractBlobsData() {
+ AFL_VERIFY(BlobsWaiting.empty());
+ AFL_VERIFY(!ResultsExtracted);
+ ResultsExtracted = true;
+ THashMap<TBlobRange, TString> result;
+ for (auto&& i : Agents) {
+ i->ExtractBlobsDataTo(result);
+ }
+ return std::move(result);
}
}
diff --git a/ydb/core/tx/columnshard/blobs_reader/task.h b/ydb/core/tx/columnshard/blobs_reader/task.h
index 213058d8d2..23ef5a7383 100644
--- a/ydb/core/tx/columnshard/blobs_reader/task.h
+++ b/ydb/core/tx/columnshard/blobs_reader/task.h
@@ -3,18 +3,16 @@
#include <ydb/library/conclusion/status.h>
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/tx/columnshard/blobs_action/abstract/read.h>
+#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
#include <ydb/core/protos/base.pb.h>
+#include <ydb/core/tx/columnshard/resource_subscriber/task.h>
namespace NKikimr::NOlap::NBlobOperations::NRead {
-class ITask {
-public:
- using TErrorStatus = TConclusionSpecialStatus<NKikimrProto::EReplyStatus, NKikimrProto::EReplyStatus::OK, NKikimrProto::EReplyStatus::ERROR>;
+class ITask: public NColumnShard::TMonitoringObjectsCounter<ITask> {
private:
THashMap<TBlobRange, std::shared_ptr<IBlobsReadingAction>> BlobsWaiting;
std::vector<std::shared_ptr<IBlobsReadingAction>> Agents;
- THashMap<TBlobRange, TString> BlobsData;
- THashMap<TBlobRange, TErrorStatus> BlobErrors;
bool BlobsFetchingStarted = false;
bool TaskFinishedWithError = false;
bool DataIsReadyFlag = false;
@@ -23,20 +21,19 @@ private:
bool AbortFlag = false;
std::optional<ui64> WaitBlobsSize;
std::optional<ui64> WaitBlobsCount;
+ TString TaskCustomer;
+ std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
+ ui32 BlobErrorsCount = 0;
+ ui32 BlobsDataCount = 0;
+ bool ResultsExtracted = false;
protected:
bool IsFetchingStarted() const {
return BlobsFetchingStarted;
}
- const THashMap<TBlobRange, TString>& GetBlobsData() const {
- return BlobsData;
- }
-
- THashMap<TBlobRange, TString> ExtractBlobsData() {
- return std::move(BlobsData);
- }
+ THashMap<TBlobRange, TString> ExtractBlobsData();
- virtual void DoOnDataReady() = 0;
+ virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) = 0;
virtual bool DoOnError(const TBlobRange& range) = 0;
void OnDataReady();
@@ -72,8 +69,8 @@ public:
THashSet<TBlobRange> GetExpectedRanges() const {
THashSet<TBlobRange> result;
- for (auto&& i : BlobsWaiting) {
- i.second->FillExpectedRanges(result);
+ for (auto&& i : Agents) {
+ i->FillExpectedRanges(result);
}
return result;
}
@@ -82,12 +79,30 @@ public:
virtual ~ITask();
- ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, const TString& externalTaskId = "");
+ ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, const TString& taskCustomer, const TString& externalTaskId = "");
void StartBlobsFetching(const THashSet<TBlobRange>& rangesInProgress);
- bool AddError(const TBlobRange& range, const TErrorStatus& status);
+ bool AddError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status);
void AddData(const TBlobRange& range, const TString& data);
+
+ class TReadSubscriber: public NResourceBroker::NSubscribe::ITask {
+ private:
+ using TBase = NResourceBroker::NSubscribe::ITask;
+ const TActorId ReadActorId;
+ std::shared_ptr<NRead::ITask> Task;
+ protected:
+ virtual void DoOnAllocationSuccess(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& guard) override;
+ public:
+ TReadSubscriber(const TActorId& readActor, const std::shared_ptr<NRead::ITask>& readTask, const ui32 cpu, const ui64 memory, const TString& name,
+ const NResourceBroker::NSubscribe::TTaskContext& context)
+ : TBase(cpu, memory, name, context)
+ , ReadActorId(readActor)
+ , Task(readTask)
+ {
+
+ }
+ };
};
}
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index 0237cdcf68..b588bff6e1 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -1,6 +1,7 @@
#include "columnshard_impl.h"
#include "blobs_reader/actor.h"
#include "hooks/abstract/abstract.h"
+#include "resource_subscriber/actor.h"
namespace NKikimr {
@@ -15,6 +16,7 @@ namespace NKikimr::NColumnShard {
void TColumnShard::CleanupActors(const TActorContext& ctx)
{
ctx.Send(BlobsReadActor, new TEvents::TEvPoisonPill);
+ ctx.Send(ResourceSubscribeActor, new TEvents::TEvPoisonPill);
if (Tiers) {
Tiers->Stop();
}
@@ -32,6 +34,7 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
LOG_S_INFO("Switched to work at " << TabletID() << " actor " << ctx.SelfID);
BlobsReadActor = ctx.Register(new NOlap::NBlobOperations::NRead::TActor(TabletID(), SelfId()));
+ ResourceSubscribeActor = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletID(), SelfId()));
for (auto&& i : TablesManager.GetTables()) {
ActivateTiering(i.first, i.second.GetTieringUsage());
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 802f6fdbd1..ac69050f9a 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -167,7 +167,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
<< " at tablet " << TabletID());
writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now());
std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildSlicesTask>(TabletID(), SelfId(),
- StoragesManager->GetInsertOperator()->StartWritingAction(), writeData);
+ StoragesManager->GetInsertOperator()->StartWritingAction("WRITING"), writeData);
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
}
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 5e6daceaed..a685eaf12c 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -5,6 +5,7 @@
#include "engines/changes/ttl.h"
#include "engines/changes/cleanup.h"
#include "blobs_action/bs/storage.h"
+#include "resource_subscriber/task.h"
#ifndef KIKIMR_DISABLE_S3_OPS
#include "blobs_action/tier/storage.h"
@@ -21,6 +22,7 @@
#include <ydb/services/metadata/service.h>
#include <ydb/core/tx/tiering/manager.h>
#include <ydb/core/tx/conveyor/usage/service.h>
+#include "resource_subscriber/counters.h"
namespace NKikimr::NColumnShard {
@@ -173,6 +175,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, TablesManager(StoragesManager)
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
, InsertTable(std::make_unique<NOlap::TInsertTable>())
+ , SubscribeCounters(std::make_shared<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>())
+ , InsertTaskSubscription(NOlap::TInsertColumnEngineChanges::StaticTypeName(), SubscribeCounters)
, ReadCounters("Read")
, ScanCounters("Scan")
, WritesMonitor(*this)
@@ -707,8 +711,9 @@ private:
std::unique_ptr<TEvPrivate::TEvWriteIndex> TxEvent;
TIndexationCounters Counters;
protected:
- virtual void DoOnDataReady() override {
- TxEvent->IndexChanges->Blobs = std::move(ExtractBlobsData());
+ virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override {
+ TxEvent->IndexChanges->Blobs = ExtractBlobsData();
+ TxEvent->IndexChanges->ResourcesGuard = resourcesGuard;
const bool isInsert = !!dynamic_pointer_cast<NOlap::TInsertColumnEngineChanges>(TxEvent->IndexChanges);
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TChangesTask>(std::move(TxEvent), Counters, TabletId, ParentActorId);
if (isInsert) {
@@ -726,7 +731,7 @@ protected:
}
public:
TChangesReadTask(std::unique_ptr<TEvPrivate::TEvWriteIndex>&& event, const TActorId parentActorId, const ui64 tabletId, const TIndexationCounters& counters)
- : TBase(event->IndexChanges->GetReadingActions(), event->IndexChanges->GetTaskIdentifier())
+ : TBase(event->IndexChanges->GetReadingActions(), event->IndexChanges->TypeString(), event->IndexChanges->GetTaskIdentifier())
, ParentActorId(parentActorId)
, TabletId(tabletId)
, TxEvent(std::move(event))
@@ -754,8 +759,10 @@ void TColumnShard::StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dat
std::vector<NOlap::TInsertedData> data;
data.reserve(dataToIndex.size());
+ ui64 memoryNeed = 0;
for (auto& ptr : dataToIndex) {
data.push_back(*ptr);
+ memoryNeed += ptr->GetMeta().GetRawBytes();
}
Y_VERIFY(data.size());
@@ -770,7 +777,10 @@ void TColumnShard::StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dat
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges,
Settings.CacheDataAfterIndexing);
- ActorContext().Send(BlobsReadActor, std::make_unique<NOlap::NBlobOperations::NRead::TEvStartReadTask>(std::make_unique<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), IndexationCounters)));
+ const TString taskName = indexChanges->GetTaskIdentifier();
+ NOlap::NResourceBroker::NSubscribe::ITask::Start(
+ ResourceSubscribeActor, std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(BlobsReadActor,
+ std::make_unique<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), IndexationCounters), 0, memoryNeed, taskName, InsertTaskSubscription));
}
void TColumnShard::SetupIndexation() {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 593573b1a2..0597d4fedb 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -11,6 +11,8 @@
#include "tx_controller.h"
#include "inflight_request_tracker.h"
#include "counters/columnshard.h"
+#include "resource_subscriber/counters.h"
+#include "resource_subscriber/task.h"
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/tablet/tablet_counters.h>
@@ -388,6 +390,7 @@ private:
TInstant LastStatsReport;
TActorId BlobsReadActor;
+ TActorId ResourceSubscribeActor;
TActorId StatsReportPipe;
std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
@@ -399,6 +402,8 @@ private:
TTabletCountersBase* TabletCounters;
std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache;
std::unique_ptr<NOlap::TInsertTable> InsertTable;
+ std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters> SubscribeCounters;
+ NOlap::NResourceBroker::NSubscribe::TTaskContext InsertTaskSubscription;
const TScanCounters ReadCounters;
const TScanCounters ScanCounters;
const TIndexationCounters CompactionCounters = TIndexationCounters("GeneralCompaction");
diff --git a/ydb/core/tx/columnshard/counters/common_data.cpp b/ydb/core/tx/columnshard/counters/common_data.cpp
index 9f72a45171..121a9b183c 100644
--- a/ydb/core/tx/columnshard/counters/common_data.cpp
+++ b/ydb/core/tx/columnshard/counters/common_data.cpp
@@ -6,6 +6,9 @@ TDataOwnerSignals::TDataOwnerSignals(const TString& module, const TString dataNa
: TBase(module)
, DataName(dataName)
{
+ DataSize = TBase::GetValueAutoAggregationsClient(DataName + "/Size");
+ ChunksCount = TBase::GetValueAutoAggregationsClient(DataName + "/Chunks/Count");
+
AddCount = GetDeriviative(DataName + "/Add/Count");
AddBytes = GetDeriviative(DataName + "/Add/Bytes");
EraseCount = GetDeriviative(DataName + "/Erase/Count");
diff --git a/ydb/core/tx/columnshard/counters/common_data.h b/ydb/core/tx/columnshard/counters/common_data.h
index b66a4aab2f..e4dd70531d 100644
--- a/ydb/core/tx/columnshard/counters/common_data.h
+++ b/ydb/core/tx/columnshard/counters/common_data.h
@@ -16,15 +16,23 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr EraseBytes;
NMonitoring::TDynamicCounters::TCounterPtr SkipEraseCount;
NMonitoring::TDynamicCounters::TCounterPtr SkipEraseBytes;
+ std::shared_ptr<TValueAggregationClient> DataSize;
+ std::shared_ptr<TValueAggregationClient> ChunksCount;
public:
TDataOwnerSignals(const TString& module, const TString dataName);
- void Add(const ui64 size) const {
- AddCount->Add(1);
- AddBytes->Add(size);
+ void Add(const ui64 size, const bool load) const {
+ DataSize->Add(size);
+ ChunksCount->Add(1);
+ if (!load) {
+ AddCount->Add(1);
+ AddBytes->Add(size);
+ }
}
void Erase(const ui64 size) const {
+ DataSize->Remove(size);
+ ChunksCount->Remove(1);
EraseCount->Add(1);
EraseBytes->Add(size);
}
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
index 9b07f04e81..cea84b06fa 100644
--- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
@@ -6,6 +6,7 @@
#include <ydb/core/tx/columnshard/engines/columns_table.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/engines/portions/with_blobs.h>
+#include <ydb/core/tx/columnshard/resource_subscriber/task.h>
#include <ydb/core/protos/counters_columnshard.pb.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/tx/columnshard/splitter/settings.h>
@@ -181,8 +182,8 @@ public:
return BlobsAction;
}
- TColumnEngineChanges(const std::shared_ptr<IStoragesManager>& storagesManager)
- : BlobsAction(storagesManager)
+ TColumnEngineChanges(const std::shared_ptr<IStoragesManager>& storagesManager, const TString& consumerId)
+ : BlobsAction(storagesManager, consumerId)
{
}
@@ -218,6 +219,7 @@ public:
}
THashMap<TBlobRange, TString> Blobs;
+ std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
std::vector<std::shared_ptr<IBlobsReadingAction>> GetReadingActions() const {
auto result = BlobsAction.GetReadingActions();
diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup.h b/ydb/core/tx/columnshard/engines/changes/cleanup.h
index c245b47178..ca044f734b 100644
--- a/ydb/core/tx/columnshard/engines/changes/cleanup.h
+++ b/ydb/core/tx/columnshard/engines/changes/cleanup.h
@@ -25,7 +25,11 @@ protected:
}
virtual NColumnShard::ECumulativeCounters GetCounterIndex(const bool isSuccess) const override;
public:
- using TBase::TBase;
+ TCleanupColumnEngineChanges(const std::shared_ptr<IStoragesManager>& storagesManager)
+ : TBase(storagesManager, StaticTypeName()) {
+
+ }
+
virtual THashSet<TPortionAddress> GetTouchedPortions() const override {
THashSet<TPortionAddress> result;
@@ -48,8 +52,12 @@ public:
return true;
}
+ static TString StaticTypeName() {
+ return "CS::CLEANUP";
+ }
+
virtual TString TypeString() const override {
- return "CLEANUP";
+ return StaticTypeName();
}
};
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp
index 8f62034e80..b7d50f84cf 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp
@@ -76,7 +76,7 @@ void TCompactColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, T
}
TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::map<ui64, std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext)
- : TBase(limits.GetSplitSettings(), saverContext)
+ : TBase(limits.GetSplitSettings(), saverContext, StaticTypeName())
, Limits(limits)
, GranuleMeta(granule)
{
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.h b/ydb/core/tx/columnshard/engines/changes/compaction.h
index 778a557ebc..fcba1e97cf 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction.h
+++ b/ydb/core/tx/columnshard/engines/changes/compaction.h
@@ -35,6 +35,10 @@ public:
TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::map<ui64, std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext);
~TCompactColumnEngineChanges();
+ static TString StaticTypeName() {
+ return "CS::GENERAL";
+ }
+
ui32 NumSplitInto(const ui32 srcRows) const;
};
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.h b/ydb/core/tx/columnshard/engines/changes/general_compaction.h
index f14d62f9fb..c246efe9ff 100644
--- a/ydb/core/tx/columnshard/engines/changes/general_compaction.h
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.h
@@ -18,7 +18,7 @@ public:
using TBase::TBase;
virtual TString TypeString() const override {
- return "GENERAL_COMPACTION";
+ return StaticTypeName();
}
};
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.h b/ydb/core/tx/columnshard/engines/changes/indexation.h
index 4aa50998bf..22a28a0a00 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.h
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.h
@@ -25,7 +25,7 @@ public:
THashMap<ui64, std::vector<std::pair<TMark, ui64>>> PathToGranule; // pathId -> {mark, granule}
public:
TInsertColumnEngineChanges(const TMark& defaultMark, std::vector<NOlap::TInsertedData>&& dataToIndex, const TSplitSettings& splitSettings, const TSaverContext& saverContext)
- : TBase(splitSettings, saverContext)
+ : TBase(splitSettings, saverContext, StaticTypeName())
, DataToIndex(std::move(dataToIndex))
, DefaultMark(defaultMark)
{
@@ -39,8 +39,12 @@ public:
return TBase::GetTouchedPortions();
}
+ static TString StaticTypeName() {
+ return "CS::INDEXATION";
+ }
+
virtual TString TypeString() const override {
- return "INSERT";
+ return StaticTypeName();
}
bool AddPathIfNotExists(ui64 pathId);
diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.h b/ydb/core/tx/columnshard/engines/changes/ttl.h
index 8d71dc736e..6e7208b851 100644
--- a/ydb/core/tx/columnshard/engines/changes/ttl.h
+++ b/ydb/core/tx/columnshard/engines/changes/ttl.h
@@ -70,11 +70,19 @@ public:
PortionsToEvict.emplace_back(info, std::move(features));
}
+ static TString StaticTypeName() {
+ return "CS::TTL";
+ }
+
virtual TString TypeString() const override {
- return "TTL";
+ return StaticTypeName();
+ }
+
+ TTTLColumnEngineChanges(const TSplitSettings& splitSettings, const TSaverContext& saverContext)
+ : TBase(splitSettings, saverContext, StaticTypeName()) {
+
}
- using TBase::TBase;
};
}
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h
index 0db6dae785..f107eb637e 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.h
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h
@@ -28,8 +28,8 @@ public:
return SplitSettings;
}
- TChangesWithAppend(const TSplitSettings& splitSettings, const TSaverContext& saverContext)
- : TBase(saverContext.GetStoragesManager())
+ TChangesWithAppend(const TSplitSettings& splitSettings, const TSaverContext& saverContext, const TString& consumerId)
+ : TBase(saverContext.GetStoragesManager(), consumerId)
, SplitSettings(splitSettings)
, SaverContext(saverContext)
{
diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
index c5d9d3714d..430905b212 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
@@ -38,8 +38,8 @@ TInsertionSummary::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 plan
pathInfo->AddCommitted(std::move(*data));
} else {
- dbTable.Abort(*data);
- Summary.AddAborted(std::move(*data));
+ dbTable.Abort(*data);
+ Summary.AddAborted(std::move(*data));
}
}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
index 0bc69d578f..b15e2aab59 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
@@ -6,9 +6,7 @@ namespace NKikimr::NOlap {
TAtomicCounter TInsertionSummary::CriticalInserted;
void TInsertionSummary::OnNewCommitted(const ui64 dataSize, const bool load) noexcept {
- if (!load) {
- Counters.Committed.Add(dataSize);
- }
+ Counters.Committed.Add(dataSize, load);
++StatsCommitted.Rows;
if (StatsCommitted.Bytes <= (i64)2 * 1024 * 1024 * 1024 && StatsCommitted.Bytes + dataSize > (i64)2 * 1024 * 1024 * 1024) {
++LocalInsertedCritical;
@@ -82,12 +80,8 @@ const NKikimr::NOlap::TPathInfo* TInsertionSummary::GetPathInfoOptional(const ui
return &it->second;
}
-bool TInsertionSummary::IsOverloaded(const ui64 pathId) const {
- auto it = PathInfo.find(pathId);
- if (it == PathInfo.end()) {
- return false;
- }
- return it->second.IsOverloaded();
+bool TInsertionSummary::IsOverloaded(const ui64 /*pathId*/) const {
+ return StatsCommitted.Bytes > TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID;
}
void TInsertionSummary::Clear() {
@@ -105,9 +99,7 @@ void TInsertionSummary::Clear() {
}
void TInsertionSummary::OnNewInserted(TPathInfo& pathInfo, const ui64 dataSize, const bool load) noexcept {
- if (!load) {
- Counters.Inserted.Add(dataSize);
- }
+ Counters.Inserted.Add(dataSize, load);
pathInfo.AddInsertedSize(dataSize, TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID);
++StatsPrepared.Rows;
StatsPrepared.Bytes += dataSize;
@@ -168,9 +160,7 @@ bool TInsertionSummary::EraseCommitted(const TInsertedData& data) {
const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddAborted(TInsertedData&& data, const bool load /*= false*/) {
const TWriteId writeId((TWriteId)data.WriteTxId);
- if (!load) {
- Counters.Aborted.Add(data.BlobSize());
- }
+ Counters.Aborted.Add(data.BlobSize(), load);
auto insertInfo = Aborted.emplace(writeId, std::move(data));
Y_VERIFY(insertInfo.second);
return &insertInfo.first->second;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp
index a0acc9d19a..8799ddd642 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp
@@ -30,17 +30,17 @@ TPortionInfo::TPreparedBatchData TAssembleColumnsTaskConstructor::BuildBatchAsse
return PortionInfo->PrepareForAssemble(*blobSchema, *resultSchema, blobsDataAssemble);
}
-void TEFTaskConstructor::DoOnDataReady() {
+void TEFTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) {
NConveyor::TScanServiceOperator::SendTaskToExecute(std::make_shared<TAssembleFilter>(ScanActorId, BuildBatchAssembler(),
ReadMetadata, SourceIdx, ColumnIds, UseEarlyFilter));
}
-void TFFColumnsTaskConstructor::DoOnDataReady() {
+void TFFColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) {
NConveyor::TScanServiceOperator::SendTaskToExecute(std::make_shared<TAssembleFFBatch>(ScanActorId, BuildBatchAssembler(),
SourceIdx, AppliedFilter));
}
-void TCommittedColumnsTaskConstructor::DoOnDataReady() {
+void TCommittedColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) {
auto blobs = ExtractBlobsData();
Y_VERIFY(NullBlocks.size() == 0);
Y_VERIFY(blobs.size() == 1);
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h
index 58925f1a48..bc7c3be153 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h
@@ -19,8 +19,8 @@ protected:
THashMap<TBlobRange, ui32> NullBlocks;
virtual bool DoOnError(const TBlobRange& range) override;
public:
- IFetchTaskConstructor(IDataReader& reader, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks, const IDataSource& source)
- : TBase(readActions)
+ IFetchTaskConstructor(IDataReader& reader, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks, const IDataSource& source, const TString& taskCustomer)
+ : TBase(readActions, taskCustomer)
, ScanActorId(NActors::TActorContext::AsActorContext().SelfID)
, SourceIdx(source.GetSourceIdx())
, ReadMetadata(reader.GetReadMetadata())
@@ -36,11 +36,11 @@ private:
TCommittedBlob CommittedBlob;
using TBase = IFetchTaskConstructor;
protected:
- virtual void DoOnDataReady() override;
+ virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override;
public:
TCommittedColumnsTaskConstructor(IDataReader& reader, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks,
- const TCommittedDataSource& source)
- : TBase(reader, readActions, std::move(nullBlocks), source)
+ const TCommittedDataSource& source, const TString& taskCustomer)
+ : TBase(reader, readActions, std::move(nullBlocks), source, taskCustomer)
, CommittedBlob(source.GetCommitted())
{
@@ -56,8 +56,8 @@ protected:
TPortionInfo::TPreparedBatchData BuildBatchAssembler();
public:
TAssembleColumnsTaskConstructor(IDataReader& reader, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks,
- const std::set<ui32>& columnIds, const TPortionDataSource& portion)
- : TBase(reader, readActions, std::move(nullBlocks), portion)
+ const std::set<ui32>& columnIds, const TPortionDataSource& portion, const TString& taskCustomer)
+ : TBase(reader, readActions, std::move(nullBlocks), portion, taskCustomer)
, ColumnIds(columnIds)
, PortionInfo(portion.GetPortionInfoPtr())
{
@@ -69,11 +69,11 @@ class TFFColumnsTaskConstructor: public TAssembleColumnsTaskConstructor {
private:
using TBase = TAssembleColumnsTaskConstructor;
std::shared_ptr<NArrow::TColumnFilter> AppliedFilter;
- virtual void DoOnDataReady() override;
+ virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override;
public:
TFFColumnsTaskConstructor(IDataReader& reader, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks,
- const std::set<ui32>& columnIds, const TPortionDataSource& portion)
- : TBase(reader, readActions, std::move(nullBlocks), columnIds, portion)
+ const std::set<ui32>& columnIds, const TPortionDataSource& portion, const TString& taskCustomer)
+ : TBase(reader, readActions, std::move(nullBlocks), columnIds, portion, taskCustomer)
, AppliedFilter(portion.GetFilterStageData().GetAppliedFilter())
{
}
@@ -83,11 +83,11 @@ class TEFTaskConstructor: public TAssembleColumnsTaskConstructor {
private:
bool UseEarlyFilter = false;
using TBase = TAssembleColumnsTaskConstructor;
- virtual void DoOnDataReady() override;
+ virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override;
public:
TEFTaskConstructor(IDataReader& reader, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks,
- const std::set<ui32>& columnIds, const TPortionDataSource& portion, const bool useEarlyFilter)
- : TBase(reader, readActions, std::move(nullBlocks), columnIds, portion)
+ const std::set<ui32>& columnIds, const TPortionDataSource& portion, const bool useEarlyFilter, const TString& taskCustomer)
+ : TBase(reader, readActions, std::move(nullBlocks), columnIds, portion, taskCustomer)
, UseEarlyFilter(useEarlyFilter)
{
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
index ba2e970fe1..d5f7f732b8 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
@@ -85,12 +85,12 @@ void TPortionDataSource::DoStartFilterStage() {
Y_VERIFY(FetchingPlan->GetFilterStage()->GetSize());
auto& columnIds = FetchingPlan->GetFilterStage()->GetColumnIds();
- auto readAction = Portion->GetBlobsStorage()->StartReadingAction();
+ auto readAction = Portion->GetBlobsStorage()->StartReadingAction("CS::READ::FILTER");
THashMap<TBlobRange, ui32> nullBlocks;
NeedFetchColumns(columnIds, readAction, nullBlocks, nullptr);
std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction};
- auto constructor = std::make_shared<TEFTaskConstructor>(ReadData, actions, std::move(nullBlocks), columnIds, *this, FetchingPlan->CanUseEarlyFilterImmediately());
+ auto constructor = std::make_shared<TEFTaskConstructor>(ReadData, actions, std::move(nullBlocks), columnIds, *this, FetchingPlan->CanUseEarlyFilterImmediately(), "ReaderFilter");
ReadData.AddForFetch(GetSourceIdx(), constructor, false);
}
@@ -101,12 +101,12 @@ void TPortionDataSource::DoStartFetchStage() {
if (FetchingPlan->GetFetchingStage()->GetSize() && !FilterStageData->IsEmptyFilter()) {
auto& columnIds = FetchingPlan->GetFetchingStage()->GetColumnIds();
- auto readAction = Portion->GetBlobsStorage()->StartReadingAction();
+ auto readAction = Portion->GetBlobsStorage()->StartReadingAction("CS::READ::FETCHING");
THashMap<TBlobRange, ui32> nullBlocks;
NeedFetchColumns(columnIds, readAction, nullBlocks, GetFilterStageData().GetActualFilter());
if (readAction->GetExpectedBlobsCount()) {
std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction};
- auto constructor = std::make_shared<TFFColumnsTaskConstructor>(ReadData, actions, std::move(nullBlocks), columnIds, *this);
+ auto constructor = std::make_shared<TFFColumnsTaskConstructor>(ReadData, actions, std::move(nullBlocks), columnIds, *this, "ReaderFetcher");
ReadData.AddForFetch(GetSourceIdx(), constructor, true);
return;
}
@@ -124,12 +124,12 @@ void TCommittedDataSource::DoFetch() {
ReadStarted = true;
std::shared_ptr<IBlobsStorageOperator> storageOperator = ReadData.GetContext().GetStoragesManager()->GetInsertOperator();
- auto readAction = storageOperator->StartReadingAction();
+ auto readAction = storageOperator->StartReadingAction("CS::READ::COMMITTED");
readAction->AddRange(CommittedBlob.GetBlobRange());
THashMap<TBlobRange, ui32> nullBlocks;
std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction};
- auto constructor = std::make_shared<TCommittedColumnsTaskConstructor>(ReadData, actions, std::move(nullBlocks), *this);
+ auto constructor = std::make_shared<TCommittedColumnsTaskConstructor>(ReadData, actions, std::move(nullBlocks), *this, "ReaderCommitted");
ReadData.AddForFetch(GetSourceIdx(), constructor, true);
}
}
diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp
index cc248a53fa..7d1d1abcb6 100644
--- a/ydb/core/tx/columnshard/operations/write.cpp
+++ b/ydb/core/tx/columnshard/operations/write.cpp
@@ -26,7 +26,7 @@ namespace NKikimr::NColumnShard {
NEvWrite::TWriteMeta writeMeta((ui64)WriteId, tableId, source);
std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildSlicesTask>(owner.TabletID(), ctx.SelfID,
- owner.StoragesManager->GetInsertOperator()->StartWritingAction(), NEvWrite::TWriteData(writeMeta, data));
+ owner.StoragesManager->GetInsertOperator()->StartWritingAction("WRITING_OPERATOR"), NEvWrite::TWriteData(writeMeta, data));
NConveyor::TCompServiceOperator::SendTaskToExecute(task);
Status = EOperationStatus::Started;
diff --git a/ydb/core/tx/columnshard/resource_subscriber/actor.cpp b/ydb/core/tx/columnshard/resource_subscriber/actor.cpp
index b91c13d16b..afae8e4beb 100644
--- a/ydb/core/tx/columnshard/resource_subscriber/actor.cpp
+++ b/ydb/core/tx/columnshard/resource_subscriber/actor.cpp
@@ -31,10 +31,11 @@ void TActor::Handle(TEvStartTask::TPtr& ev) {
void TActor::Handle(NKikimr::NResourceBroker::TEvResourceBroker::TEvResourceAllocated::TPtr& ev) {
auto it = Tasks.find(((TCookie*)ev->Get()->Cookie.Get())->GetTaskIdentifier());
Y_VERIFY(it != Tasks.end());
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "result_resources")("task_id", ev->Get()->TaskId)("task", it->second->DebugString());
- it->second->OnAllocationSuccess(ev->Get()->TaskId, SelfId());
+ auto task = it->second;
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "result_resources")("task_id", ev->Get()->TaskId)("task", task->DebugString());
+ task->OnAllocationSuccess(ev->Get()->TaskId, SelfId());
+ task->GetContext().GetCounters()->OnReply(task->GetMemoryAllocation());
Tasks.erase(it);
- it->second->GetContext().GetCounters()->OnReply(it->second->GetMemoryAllocation());
}
TActor::TActor(ui64 tabletId, const TActorId& parent)
diff --git a/ydb/core/tx/columnshard/resource_subscriber/task.cpp b/ydb/core/tx/columnshard/resource_subscriber/task.cpp
index 8dc1fdc646..df1a7650de 100644
--- a/ydb/core/tx/columnshard/resource_subscriber/task.cpp
+++ b/ydb/core/tx/columnshard/resource_subscriber/task.cpp
@@ -14,6 +14,9 @@ void ITask::Start(const NActors::TActorId& actorId, const std::shared_ptr<ITask>
}
TResourcesGuard::~TResourcesGuard() {
+ if (!NActors::TlsActivationContext) {
+ return;
+ }
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "free_resources")("task_id", TaskId)("mem", Memory)("cpu", Cpu);
auto ev = std::make_unique<IEventHandle>(NKikimr::NResourceBroker::MakeResourceBrokerID(), Sender, new NKikimr::NResourceBroker::TEvResourceBroker::TEvFinishTask(TaskId));
NActors::TActorContext::AsActorContext().Send(std::move(ev));