diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-01 18:48:04 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-01 19:08:11 +0300 |
commit | 33c98f7f7bf065218f7bc5ff613552a8404447ce (patch) | |
tree | a1a033fb9098698f4cf0ddd7328b54e0fdf7c2bd | |
parent | 3b19804b1a05894b0d9253f6d6f7d8938a9ede38 (diff) | |
download | ydb-33c98f7f7bf065218f7bc5ff613552a8404447ce.tar.gz |
KIKIMR-19215: additional counters for external blob storages
52 files changed, 674 insertions, 170 deletions
diff --git a/.mapping.json b/.mapping.json index 23aacb9b91..80966f5860 100644 --- a/.mapping.json +++ b/.mapping.json @@ -5269,6 +5269,11 @@ "ydb/core/tx/columnshard/blobs_action/bs/CMakeLists.linux-x86_64.txt":"", "ydb/core/tx/columnshard/blobs_action/bs/CMakeLists.txt":"", "ydb/core/tx/columnshard/blobs_action/bs/CMakeLists.windows-x86_64.txt":"", + "ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.darwin-x86_64.txt":"", + "ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-aarch64.txt":"", + "ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-x86_64.txt":"", + "ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.txt":"", + "ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.windows-x86_64.txt":"", "ydb/core/tx/columnshard/blobs_action/tier/CMakeLists.darwin-x86_64.txt":"", "ydb/core/tx/columnshard/blobs_action/tier/CMakeLists.linux-aarch64.txt":"", "ydb/core/tx/columnshard/blobs_action/tier/CMakeLists.linux-x86_64.txt":"", diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.darwin-x86_64.txt index de7066a842..11d7dd42c1 100644 --- a/ydb/core/tx/columnshard/blobs_action/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.darwin-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(abstract) add_subdirectory(bs) +add_subdirectory(counters) add_subdirectory(tier) add_subdirectory(transaction) @@ -19,8 +20,9 @@ target_link_libraries(tx-columnshard-blobs_action PUBLIC libs-apache-arrow ydb-core-tablet_flat core-tx-tiering - columnshard-blobs_action-bs columnshard-blobs_action-abstract + columnshard-blobs_action-bs + columnshard-blobs_action-counters columnshard-blobs_action-transaction columnshard-blobs_action-tier ) diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-aarch64.txt index 8e9f9f0fe8..3597b1535a 100644 --- a/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-aarch64.txt @@ -8,6 +8,7 @@ add_subdirectory(abstract) add_subdirectory(bs) +add_subdirectory(counters) add_subdirectory(tier) add_subdirectory(transaction) @@ -20,8 +21,9 @@ target_link_libraries(tx-columnshard-blobs_action PUBLIC libs-apache-arrow ydb-core-tablet_flat core-tx-tiering - columnshard-blobs_action-bs columnshard-blobs_action-abstract + columnshard-blobs_action-bs + columnshard-blobs_action-counters columnshard-blobs_action-transaction columnshard-blobs_action-tier ) diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-x86_64.txt index 8e9f9f0fe8..3597b1535a 100644 --- a/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(abstract) add_subdirectory(bs) +add_subdirectory(counters) add_subdirectory(tier) add_subdirectory(transaction) @@ -20,8 +21,9 @@ target_link_libraries(tx-columnshard-blobs_action PUBLIC libs-apache-arrow ydb-core-tablet_flat core-tx-tiering - columnshard-blobs_action-bs columnshard-blobs_action-abstract + columnshard-blobs_action-bs + columnshard-blobs_action-counters columnshard-blobs_action-transaction columnshard-blobs_action-tier ) diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.windows-x86_64.txt index f1469f8ee4..7c6f876790 100644 --- a/ydb/core/tx/columnshard/blobs_action/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.windows-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(abstract) add_subdirectory(bs) +add_subdirectory(counters) add_subdirectory(transaction) add_library(tx-columnshard-blobs_action) @@ -21,8 +22,9 @@ target_link_libraries(tx-columnshard-blobs_action PUBLIC libs-apache-arrow ydb-core-tablet_flat core-tx-tiering - columnshard-blobs_action-bs columnshard-blobs_action-abstract + columnshard-blobs_action-bs + columnshard-blobs_action-counters columnshard-blobs_action-transaction ) target_sources(tx-columnshard-blobs_action PRIVATE diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/action.cpp b/ydb/core/tx/columnshard/blobs_action/abstract/action.cpp index 5e860f338f..fdd3fd5adb 100644 --- a/ydb/core/tx/columnshard/blobs_action/abstract/action.cpp +++ b/ydb/core/tx/columnshard/blobs_action/abstract/action.cpp @@ -4,15 +4,15 @@ namespace NKikimr::NOlap { std::shared_ptr<NKikimr::NOlap::IBlobsWritingAction> TBlobsAction::GetWriting(const TPortionInfo& portionInfo) { - return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetWriting(); + return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetWriting(ConsumerId); } std::shared_ptr<NKikimr::NOlap::IBlobsReadingAction> TBlobsAction::GetReading(const TPortionInfo& portionInfo) { - return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetReading(); + return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetReading(ConsumerId); } std::shared_ptr<NKikimr::NOlap::IBlobsDeclareRemovingAction> TBlobsAction::GetRemoving(const TPortionInfo& portionInfo) { - return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetRemoving(); + return GetStorageAction(portionInfo.GetBlobsStorage()->GetStorageId()).GetRemoving(ConsumerId); } } diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/action.h b/ydb/core/tx/columnshard/blobs_action/abstract/action.h index 95c48dedb8..48b0e56872 100644 --- a/ydb/core/tx/columnshard/blobs_action/abstract/action.h +++ b/ydb/core/tx/columnshard/blobs_action/abstract/action.h @@ -22,24 +22,24 @@ public: } - const std::shared_ptr<IBlobsDeclareRemovingAction>& GetRemoving() { + const std::shared_ptr<IBlobsDeclareRemovingAction>& GetRemoving(const TString& consumerId) { if (!Removing) { - Removing = Storage->StartDeclareRemovingAction(); + Removing = Storage->StartDeclareRemovingAction(consumerId); } return Removing; } - const std::shared_ptr<IBlobsWritingAction>& GetWriting() { + const std::shared_ptr<IBlobsWritingAction>& GetWriting(const TString& consumerId) { if (!Writing) { - Writing = Storage->StartWritingAction(); + Writing = Storage->StartWritingAction(consumerId); } return Writing; } const std::shared_ptr<IBlobsWritingAction>& GetWritingOptional() const { return Writing; } - const std::shared_ptr<IBlobsReadingAction>& GetReading() { + const std::shared_ptr<IBlobsReadingAction>& GetReading(const TString& consumerId) { if (!Reading) { - Reading = Storage->StartReadingAction(); + Reading = Storage->StartReadingAction(consumerId); } return Reading; } @@ -78,6 +78,7 @@ class TBlobsAction { private: std::shared_ptr<IStoragesManager> Storages; THashMap<TString, TStorageAction> StorageActions; + const TString ConsumerId; TStorageAction& GetStorageAction(const TString& storageId) { auto it = StorageActions.find(storageId); @@ -87,8 +88,9 @@ private: return it->second; } public: - TBlobsAction(std::shared_ptr<IStoragesManager> storages) + explicit TBlobsAction(std::shared_ptr<IStoragesManager> storages, const TString& consumerId) : Storages(storages) + , ConsumerId(consumerId) { } @@ -155,19 +157,19 @@ public: } std::shared_ptr<IBlobsDeclareRemovingAction> GetRemoving(const TString& storageId) { - return GetStorageAction(storageId).GetRemoving(); + return GetStorageAction(storageId).GetRemoving(ConsumerId); } std::shared_ptr<IBlobsDeclareRemovingAction> GetRemoving(const TPortionInfo& portionInfo); std::shared_ptr<IBlobsWritingAction> GetWriting(const TString& storageId) { - return GetStorageAction(storageId).GetWriting(); + return GetStorageAction(storageId).GetWriting(ConsumerId); } std::shared_ptr<IBlobsWritingAction> GetWriting(const TPortionInfo& portionInfo); std::shared_ptr<IBlobsReadingAction> GetReading(const TString& storageId) { - return GetStorageAction(storageId).GetReading(); + return GetStorageAction(storageId).GetReading(ConsumerId); } std::shared_ptr<IBlobsReadingAction> GetReading(const TPortionInfo& portionInfo); diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/read.cpp b/ydb/core/tx/columnshard/blobs_action/abstract/read.cpp index 0f8dd0161c..e16ddf0b69 100644 --- a/ydb/core/tx/columnshard/blobs_action/abstract/read.cpp +++ b/ydb/core/tx/columnshard/blobs_action/abstract/read.cpp @@ -5,10 +5,83 @@ namespace NKikimr::NOlap { void IBlobsReadingAction::StartReading(THashMap<TUnifiedBlobId, THashSet<TBlobRange>>&& ranges) { AFL_VERIFY(ranges.size()); + AFL_VERIFY(Counters); for (auto&& i : ranges) { AFL_VERIFY(i.second.size()); + for (auto&& br : i.second) { + Counters->OnRequest(br.Size); + } } return DoStartReading(ranges); } +void IBlobsReadingAction::ExtractBlobsDataTo(THashMap<TBlobRange, TString>& result) { + AFL_VERIFY(Started); + if (result.empty()) { + std::swap(result, Replies); + } else { + for (auto&& i : Replies) { + AFL_VERIFY(result.emplace(i.first, std::move(i.second)).second); + } + Replies.clear(); + } + RangesForResult.clear(); +} + +void IBlobsReadingAction::Start(const THashSet<TBlobRange>& rangesInProgress) { + Y_VERIFY(!Started); + Y_VERIFY(RangesForRead.size() + RangesForResult.size()); + for (auto&& i : RangesForRead) { + for (auto&& r : i.second) { + WaitingRanges.emplace(r, TMonotonic::Now()); + } + } + THashMap<TUnifiedBlobId, THashSet<TBlobRange>> rangesFiltered; + if (rangesInProgress.empty()) { + rangesFiltered = RangesForRead; + } else { + for (auto&& i : RangesForRead) { + for (auto&& r : i.second) { + if (!rangesInProgress.contains(r)) { + rangesFiltered[r.BlobId].emplace(r); + } + } + } + } + if (rangesFiltered.size()) { + StartReading(std::move(rangesFiltered)); + } + Started = true; + for (auto&& i : RangesForResult) { + AFL_VERIFY(Replies.emplace(i.first, i.second).second); + } +} + +void IBlobsReadingAction::OnReadResult(const TBlobRange& range, const TString& data) { + AFL_VERIFY(Counters); + auto it = WaitingRanges.find(range); + Y_VERIFY(it != WaitingRanges.end()); + Counters->OnReply(range.Size, TMonotonic::Now() - it->second); + WaitingRanges.erase(it); + Replies.emplace(range, data); +} + +void IBlobsReadingAction::OnReadError(const TBlobRange& range, const TErrorStatus& replyStatus) { + AFL_VERIFY(Counters); + auto it = WaitingRanges.find(range); + Y_VERIFY(it != WaitingRanges.end()); + Counters->OnFail(range.Size, TMonotonic::Now() - it->second); + WaitingRanges.erase(it); + Fails.emplace(range, replyStatus); +} + +void IBlobsReadingAction::AddRange(const TBlobRange& range, const TString& result /*= Default<TString>()*/) { + Y_VERIFY(!Started); + if (!result) { + Y_VERIFY(RangesForRead[range.BlobId].emplace(range).second); + } else { + Y_VERIFY(RangesForResult.emplace(range, result).second); + } +} + } diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/read.h b/ydb/core/tx/columnshard/blobs_action/abstract/read.h index 3cc93ce60b..116e5748b1 100644 --- a/ydb/core/tx/columnshard/blobs_action/abstract/read.h +++ b/ydb/core/tx/columnshard/blobs_action/abstract/read.h @@ -1,29 +1,43 @@ #pragma once #include "common.h" +#include <ydb/core/tx/columnshard/blobs_action/counters/read.h> #include <ydb/core/tx/columnshard/blob.h> #include <ydb/core/protos/base.pb.h> +#include <ydb/library/conclusion/status.h> #include <util/generic/hash_set.h> namespace NKikimr::NOlap { class IBlobsReadingAction: public ICommonBlobsAction { +public: + using TErrorStatus = TConclusionSpecialStatus<NKikimrProto::EReplyStatus, NKikimrProto::EReplyStatus::OK, NKikimrProto::EReplyStatus::ERROR>; private: using TBase = ICommonBlobsAction; + THashMap<TUnifiedBlobId, THashSet<TBlobRange>> RangesForRead; - THashSet<TBlobRange> WaitingRanges; + THashMap<TBlobRange, TString> RangesForResult; + THashMap<TBlobRange, TMonotonic> WaitingRanges; THashMap<TBlobRange, TString> Replies; - THashMap<TBlobRange, NKikimrProto::EReplyStatus> Fails; + THashMap<TBlobRange, TErrorStatus> Fails; + std::shared_ptr<NBlobOperations::TReadCounters> Counters; bool Started = false; protected: virtual void DoStartReading(const THashMap<TUnifiedBlobId, THashSet<TBlobRange>>& range) = 0; void StartReading(THashMap<TUnifiedBlobId, THashSet<TBlobRange>>&& ranges); public: + + void SetCounters(std::shared_ptr<NBlobOperations::TReadCounters> counters) { + Counters = counters; + } + IBlobsReadingAction(const TString& storageId) : TBase(storageId) { } + void ExtractBlobsDataTo(THashMap<TBlobRange, TString>& result); + ui64 GetExpectedBlobsSize() const { ui64 result = 0; for (auto&& i : RangesForRead) { @@ -31,6 +45,9 @@ public: result += b.Size; } } + for (auto&& i : RangesForResult) { + result += i.first.Size; + } return result; } @@ -39,7 +56,7 @@ public: for (auto&& i : RangesForRead) { result += i.second.size(); } - return result; + return result + RangesForResult.size(); } void FillExpectedRanges(THashSet<TBlobRange>& ranges) const { @@ -48,52 +65,20 @@ public: Y_VERIFY(ranges.emplace(b).second); } } + for (auto&& i : RangesForResult) { + Y_VERIFY(ranges.emplace(i.first).second); + } } const THashMap<TUnifiedBlobId, THashSet<TBlobRange>>& GetRangesForRead() const { return RangesForRead; } - void AddRange(const TBlobRange& range) { - Y_VERIFY(!Started); - Y_VERIFY(RangesForRead[range.BlobId].emplace(range).second); - } + void AddRange(const TBlobRange& range, const TString& result = Default<TString>()); - void Start(const THashSet<TBlobRange>& rangesInProgress) { - Y_VERIFY(!Started); - Y_VERIFY(RangesForRead.size()); - for (auto&& i : RangesForRead) { - for (auto&& r : i.second) { - WaitingRanges.emplace(r); - } - } - THashMap<TUnifiedBlobId, THashSet<TBlobRange>> rangesFiltered; - if (rangesInProgress.empty()) { - rangesFiltered = RangesForRead; - } else { - for (auto&& i : RangesForRead) { - for (auto&& r : i.second) { - if (!rangesInProgress.contains(r)) { - rangesFiltered[r.BlobId].emplace(r); - } - } - } - } - if (rangesFiltered.size()) { - StartReading(std::move(rangesFiltered)); - } - Started = true; - } - - void OnReadResult(const TBlobRange& range, const TString& data) { - Y_VERIFY(WaitingRanges.erase(range)); - Replies.emplace(range, data); - } - - void OnReadError(const TBlobRange& range, const NKikimrProto::EReplyStatus replyStatus) { - Y_VERIFY(WaitingRanges.erase(range)); - Fails.emplace(range, replyStatus); - } + void Start(const THashSet<TBlobRange>& rangesInProgress); + void OnReadResult(const TBlobRange& range, const TString& data); + void OnReadError(const TBlobRange& range, const TErrorStatus& replyStatus); bool HasFails() const { return Fails.size(); diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/storage.h b/ydb/core/tx/columnshard/blobs_action/abstract/storage.h index 6e2ad258dc..4e5b3c4739 100644 --- a/ydb/core/tx/columnshard/blobs_action/abstract/storage.h +++ b/ydb/core/tx/columnshard/blobs_action/abstract/storage.h @@ -4,6 +4,7 @@ #include "read.h" #include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h> +#include <ydb/core/tx/columnshard/blobs_action/counters/storage.h> #include <ydb/library/accessor/accessor.h> namespace NKikimr::NColumnShard { @@ -34,6 +35,7 @@ private: Y_VERIFY(GCActivity); GCActivity = false; } + std::shared_ptr<NBlobOperations::TStorageCounters> Counters; protected: virtual std::shared_ptr<IBlobsDeclareRemovingAction> DoStartDeclareRemovingAction() = 0; virtual std::shared_ptr<IBlobsWritingAction> DoStartWritingAction() = 0; @@ -47,8 +49,9 @@ protected: } public: IBlobsStorageOperator(const TString& storageId) - : StorageId(storageId) { - + : StorageId(storageId) + { + Counters = std::make_shared<NBlobOperations::TStorageCounters>(storageId); } virtual std::shared_ptr<IBlobInUseTracker> GetBlobsTracker() const = 0; @@ -65,14 +68,19 @@ public: void OnTieringModified(const std::shared_ptr<NColumnShard::TTiersManager>& tiers) { return DoOnTieringModified(tiers); } - std::shared_ptr<IBlobsDeclareRemovingAction> StartDeclareRemovingAction() { + + std::shared_ptr<IBlobsDeclareRemovingAction> StartDeclareRemovingAction(const TString& /*consumerId*/) { return DoStartDeclareRemovingAction(); } - std::shared_ptr<IBlobsWritingAction> StartWritingAction() { - return DoStartWritingAction(); + std::shared_ptr<IBlobsWritingAction> StartWritingAction(const TString& consumerId) { + auto result = DoStartWritingAction(); + result->SetCounters(Counters->GetConsumerCounter(consumerId)->GetWriteCounters()); + return result; } - std::shared_ptr<IBlobsReadingAction> StartReadingAction() { - return DoStartReadingAction(); + std::shared_ptr<IBlobsReadingAction> StartReadingAction(const TString& consumerId) { + auto result = DoStartReadingAction(); + result->SetCounters(Counters->GetConsumerCounter(consumerId)->GetReadCounters()); + return result; } bool StartGC() { if (!GCActivity) { diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/write.cpp b/ydb/core/tx/columnshard/blobs_action/abstract/write.cpp index 689e494c24..234285cfce 100644 --- a/ydb/core/tx/columnshard/blobs_action/abstract/write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/abstract/write.cpp @@ -7,22 +7,41 @@ TUnifiedBlobId IBlobsWritingAction::AddDataForWrite(const TString& data) { Y_VERIFY(!WritingStarted); auto blobId = AllocateNextBlobId(data); AFL_VERIFY(BlobsForWrite.emplace(blobId, data).second); + BlobsWaiting.emplace(blobId); + BlobsWriteCount += 1; SumSize += data.size(); return blobId; } void IBlobsWritingAction::OnBlobWriteResult(const TUnifiedBlobId& blobId, const NKikimrProto::EReplyStatus status) { - Y_VERIFY(BlobsForWrite.erase(blobId)); + AFL_VERIFY(Counters); + auto it = WritingStart.find(blobId); + AFL_VERIFY(it != WritingStart.end()); + if (status == NKikimrProto::EReplyStatus::OK) { + Counters->OnReply(blobId.BlobSize(), TMonotonic::Now() - it->second); + } else { + Counters->OnFail(blobId.BlobSize(), TMonotonic::Now() - it->second); + } + WritingStart.erase(it); + Y_VERIFY(BlobsWaiting.erase(blobId)); return DoOnBlobWriteResult(blobId, status); } bool IBlobsWritingAction::IsReady() const { Y_VERIFY(WritingStarted); - return BlobsForWrite.empty(); + return BlobsWaiting.empty(); } IBlobsWritingAction::~IBlobsWritingAction() { - AFL_VERIFY(!NActors::TlsActivationContext || BlobsForWrite.empty() || Aborted); + AFL_VERIFY(!NActors::TlsActivationContext || BlobsWaiting.empty() || Aborted); +} + +void IBlobsWritingAction::SendWriteBlobRequest(const TString& data, const TUnifiedBlobId& blobId) { + AFL_VERIFY(Counters); + Counters->OnRequest(data.size()); + WritingStarted = true; + AFL_VERIFY(WritingStart.emplace(blobId, TMonotonic::Now()).second); + return DoSendWriteBlobRequest(data, blobId); } } diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/write.h b/ydb/core/tx/columnshard/blobs_action/abstract/write.h index a90479fa97..4fa4cc8f6e 100644 --- a/ydb/core/tx/columnshard/blobs_action/abstract/write.h +++ b/ydb/core/tx/columnshard/blobs_action/abstract/write.h @@ -3,6 +3,7 @@ #include <util/generic/hash.h> #include <ydb/core/protos/base.pb.h> #include <ydb/core/tx/columnshard/blob.h> +#include <ydb/core/tx/columnshard/blobs_action/counters/write.h> namespace NKikimr::NColumnShard { class TColumnShard; @@ -15,9 +16,13 @@ class IBlobsWritingAction: public ICommonBlobsAction { private: using TBase = ICommonBlobsAction; bool WritingStarted = false; + THashMap<TUnifiedBlobId, TMonotonic> WritingStart; ui64 SumSize = 0; + ui32 BlobsWriteCount = 0; THashMap<TUnifiedBlobId, TString> BlobsForWrite; + THashSet<TUnifiedBlobId> BlobsWaiting; bool Aborted = false; + std::shared_ptr<NBlobOperations::TWriteCounters> Counters; protected: virtual void DoOnExecuteTxBeforeWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs) = 0; virtual void DoOnCompleteTxBeforeWrite(NColumnShard::TColumnShard& self) = 0; @@ -38,6 +43,10 @@ public: virtual ~IBlobsWritingAction(); bool IsReady() const; + void SetCounters(std::shared_ptr<NBlobOperations::TWriteCounters> counters) { + Counters = counters; + } + const THashMap<TUnifiedBlobId, TString>& GetBlobsForWrite() const { return BlobsForWrite; } @@ -54,7 +63,7 @@ public: } ui32 GetBlobsCount() const { - return BlobsForWrite.size(); + return BlobsWriteCount; } ui32 GetTotalSize() const { return SumSize; @@ -74,10 +83,7 @@ public: return DoOnCompleteTxAfterWrite(self); } - void SendWriteBlobRequest(const TString& data, const TUnifiedBlobId& blobId) { - WritingStarted = true; - return DoSendWriteBlobRequest(data, blobId); - } + void SendWriteBlobRequest(const TString& data, const TUnifiedBlobId& blobId); }; } diff --git a/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..3f2c60a24c --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-blobs_action-counters) +target_link_libraries(columnshard-blobs_action-counters PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-protos + cpp-actors-core + ydb-core-tablet_flat +) +target_sources(columnshard-blobs_action-counters PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/read.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/storage.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/write.cpp +) diff --git a/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..536e94a621 --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-aarch64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-blobs_action-counters) +target_link_libraries(columnshard-blobs_action-counters PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-protos + cpp-actors-core + ydb-core-tablet_flat +) +target_sources(columnshard-blobs_action-counters PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/read.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/storage.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/write.cpp +) diff --git a/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..536e94a621 --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.linux-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-blobs_action-counters) +target_link_libraries(columnshard-blobs_action-counters PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-protos + cpp-actors-core + ydb-core-tablet_flat +) +target_sources(columnshard-blobs_action-counters PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/read.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/storage.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/write.cpp +) diff --git a/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.txt b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..3f2c60a24c --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/counters/CMakeLists.windows-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-blobs_action-counters) +target_link_libraries(columnshard-blobs_action-counters PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-protos + cpp-actors-core + ydb-core-tablet_flat +) +target_sources(columnshard-blobs_action-counters PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/read.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/storage.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/counters/write.cpp +) diff --git a/ydb/core/tx/columnshard/blobs_action/counters/read.cpp b/ydb/core/tx/columnshard/blobs_action/counters/read.cpp new file mode 100644 index 0000000000..5107b85406 --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/counters/read.cpp @@ -0,0 +1,23 @@ +#include "read.h" +#include "storage.h" + +namespace NKikimr::NOlap::NBlobOperations { + +TReadCounters::TReadCounters(const TConsumerCounters& owner) + : TBase(owner, "Reader") +{ + RequestsCount = TBase::GetDeriviative("Requests/Count"); + RequestBytes = TBase::GetDeriviative("Requests/Bytes"); + + RepliesCount = TBase::GetDeriviative("Replies/Count"); + ReplyBytes = TBase::GetDeriviative("Replies/Bytes"); + ReplyDurationBySize = TBase::GetHistogram("Replies/Duration/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1)); + ReplyDurationByCount = TBase::GetHistogram("Replies/Duration/Count", NMonitoring::ExponentialHistogram(15, 2, 1)); + + FailsCount = TBase::GetDeriviative("Fails/Count"); + FailBytes = TBase::GetDeriviative("Fails/Bytes"); + FailDurationBySize = TBase::GetHistogram("Fails/Duration/Bytes", NMonitoring::ExponentialHistogram(15, 2, 2)); + FailDurationByCount = TBase::GetHistogram("Fails/Duration/Count", NMonitoring::ExponentialHistogram(15, 2, 2)); +} + +} diff --git a/ydb/core/tx/columnshard/blobs_action/counters/read.h b/ydb/core/tx/columnshard/blobs_action/counters/read.h new file mode 100644 index 0000000000..d0e8f736f5 --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/counters/read.h @@ -0,0 +1,47 @@ +#pragma once +#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <ydb/core/tx/columnshard/counters/common/owner.h> + +namespace NKikimr::NOlap::NBlobOperations { + +class TConsumerCounters; + +class TReadCounters: public NColumnShard::TCommonCountersOwner { +private: + using TBase = NColumnShard::TCommonCountersOwner; + NMonitoring::TDynamicCounters::TCounterPtr RequestsCount; + NMonitoring::TDynamicCounters::TCounterPtr RequestBytes; + + NMonitoring::TDynamicCounters::TCounterPtr RepliesCount; + NMonitoring::TDynamicCounters::TCounterPtr ReplyBytes; + NMonitoring::THistogramPtr ReplyDurationByCount; + NMonitoring::THistogramPtr ReplyDurationBySize; + + NMonitoring::TDynamicCounters::TCounterPtr FailsCount; + NMonitoring::TDynamicCounters::TCounterPtr FailBytes; + NMonitoring::THistogramPtr FailDurationByCount; + NMonitoring::THistogramPtr FailDurationBySize; +public: + TReadCounters(const TConsumerCounters& owner); + + void OnRequest(const ui64 bytes) const { + RequestsCount->Add(1); + RequestBytes->Add(bytes); + } + + void OnReply(const ui64 bytes, const TDuration d) const { + RepliesCount->Add(1); + ReplyBytes->Add(bytes); + ReplyDurationByCount->Collect((i64)d.MilliSeconds()); + ReplyDurationBySize->Collect((i64)d.MilliSeconds(), (i64)bytes); + } + + void OnFail(const ui64 bytes, const TDuration d) const { + FailsCount->Add(1); + FailBytes->Add(bytes); + FailDurationByCount->Collect((i64)d.MilliSeconds()); + FailDurationBySize->Collect((i64)d.MilliSeconds(), (i64)bytes); + } +}; + +} diff --git a/ydb/core/tx/columnshard/blobs_action/counters/storage.cpp b/ydb/core/tx/columnshard/blobs_action/counters/storage.cpp new file mode 100644 index 0000000000..4f8b03a9ad --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/counters/storage.cpp @@ -0,0 +1,27 @@ +#include "storage.h" + +namespace NKikimr::NOlap::NBlobOperations { + +TStorageCounters::TStorageCounters(const TString& storageId) + : TBase("BlobStorages") +{ + DeepSubGroup("StorageId", storageId); +} + +std::shared_ptr<NKikimr::NOlap::NBlobOperations::TConsumerCounters> TStorageCounters::GetConsumerCounter(const TString& consumerId) { + auto it = ConsumerCounters.find(consumerId); + if (it == ConsumerCounters.end()) { + it = ConsumerCounters.emplace(consumerId, std::make_shared<TConsumerCounters>(consumerId, *this)).first; + } + return it->second; +} + +TConsumerCounters::TConsumerCounters(const TString& consumerId, const TStorageCounters& parent) + : TBase(parent) +{ + DeepSubGroup("Consumer", consumerId); + ReadCounters = std::make_shared<TReadCounters>(*this); + WriteCounters = std::make_shared<TWriteCounters>(*this); +} + +} diff --git a/ydb/core/tx/columnshard/blobs_action/counters/storage.h b/ydb/core/tx/columnshard/blobs_action/counters/storage.h new file mode 100644 index 0000000000..8b42b79dfb --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/counters/storage.h @@ -0,0 +1,32 @@ +#pragma once +#include "read.h" +#include "write.h" +#include <ydb/core/tx/columnshard/counters/common/owner.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <util/generic/hash.h> + +namespace NKikimr::NOlap::NBlobOperations { + +class TStorageCounters; + +class TConsumerCounters: public NColumnShard::TCommonCountersOwner { +private: + using TBase = NColumnShard::TCommonCountersOwner; + YDB_READONLY_DEF(std::shared_ptr<TReadCounters>, ReadCounters); + YDB_READONLY_DEF(std::shared_ptr<TWriteCounters>, WriteCounters); +public: + TConsumerCounters(const TString& consumerId, const TStorageCounters& parent); +}; + +class TStorageCounters: public NColumnShard::TCommonCountersOwner { +private: + using TBase = NColumnShard::TCommonCountersOwner; + THashMap<TString, std::shared_ptr<TConsumerCounters>> ConsumerCounters; +public: + TStorageCounters(const TString& storageId); + + std::shared_ptr<TConsumerCounters> GetConsumerCounter(const TString& consumerId); + +}; + +} diff --git a/ydb/core/tx/columnshard/blobs_action/counters/write.cpp b/ydb/core/tx/columnshard/blobs_action/counters/write.cpp new file mode 100644 index 0000000000..301ca21dea --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/counters/write.cpp @@ -0,0 +1,21 @@ +#include "write.h" +#include "storage.h" + +namespace NKikimr::NOlap::NBlobOperations { + +TWriteCounters::TWriteCounters(const TConsumerCounters& owner) + : TBase(owner, "Writer") +{ + RequestsCount = TBase::GetDeriviative("Requests/Count"); + RequestBytes = TBase::GetDeriviative("Requests/Bytes"); + + RepliesCount = TBase::GetDeriviative("Replies/Count"); + ReplyBytes = TBase::GetDeriviative("Replies/Bytes"); + ReplyDuration = TBase::GetHistogram("Replies/Duration", NMonitoring::ExponentialHistogram(15, 2, 1000)); + + FailsCount = TBase::GetDeriviative("Fails/Count"); + FailBytes = TBase::GetDeriviative("Fails/Bytes"); + FailDuration = TBase::GetHistogram("Fails/Duration", NMonitoring::ExponentialHistogram(15, 2, 1000)); +} + +} diff --git a/ydb/core/tx/columnshard/blobs_action/counters/write.h b/ydb/core/tx/columnshard/blobs_action/counters/write.h new file mode 100644 index 0000000000..60465c7e32 --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/counters/write.h @@ -0,0 +1,43 @@ +#pragma once +#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <ydb/core/tx/columnshard/counters/common/owner.h> + +namespace NKikimr::NOlap::NBlobOperations { + +class TConsumerCounters; + +class TWriteCounters: public NColumnShard::TCommonCountersOwner { +private: + using TBase = NColumnShard::TCommonCountersOwner; + NMonitoring::TDynamicCounters::TCounterPtr RequestsCount; + NMonitoring::TDynamicCounters::TCounterPtr RequestBytes; + + NMonitoring::TDynamicCounters::TCounterPtr RepliesCount; + NMonitoring::TDynamicCounters::TCounterPtr ReplyBytes; + NMonitoring::THistogramPtr ReplyDuration; + + NMonitoring::TDynamicCounters::TCounterPtr FailsCount; + NMonitoring::TDynamicCounters::TCounterPtr FailBytes; + NMonitoring::THistogramPtr FailDuration; +public: + TWriteCounters(const TConsumerCounters& owner); + + void OnRequest(const ui64 bytes) const { + RequestsCount->Add(1); + RequestBytes->Add(bytes); + } + + void OnReply(const ui64 bytes, const TDuration d) const { + RepliesCount->Add(1); + ReplyBytes->Add(bytes); + ReplyDuration->Collect(d.MicroSeconds()); + } + + void OnFail(const ui64 bytes, const TDuration d) const { + FailsCount->Add(1); + FailBytes->Add(bytes); + FailDuration->Collect(d.MicroSeconds()); + } +}; + +} diff --git a/ydb/core/tx/columnshard/blobs_action/counters/ya.make b/ydb/core/tx/columnshard/blobs_action/counters/ya.make new file mode 100644 index 0000000000..7e50b3c7f6 --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/counters/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + read.cpp + storage.cpp + write.cpp +) + +PEERDIR( + ydb/core/protos + library/cpp/actors/core + ydb/core/tablet_flat +) + +END() diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp index 27bb2cb777..9731ff110e 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp @@ -12,7 +12,7 @@ bool TTxInsertTableCleanup::Execute(TTransactionContext& txc, const TActorContex TBlobManagerDb blobManagerDb(txc.DB); auto allAborted = Self->InsertTable->GetAborted(); auto storage = Self->StoragesManager->GetInsertOperator(); - BlobsAction = storage->StartDeclareRemovingAction(); + BlobsAction = storage->StartDeclareRemovingAction("TX_CLEANUP"); for (auto& [abortedWriteId, abortedData] : allAborted) { Self->InsertTable->EraseAborted(dbTable, abortedData); Y_VERIFY(abortedData.GetBlobRange().IsFullBlob()); diff --git a/ydb/core/tx/columnshard/blobs_action/ya.make b/ydb/core/tx/columnshard/blobs_action/ya.make index 9775d5cb7e..320cb63e4e 100644 --- a/ydb/core/tx/columnshard/blobs_action/ya.make +++ b/ydb/core/tx/columnshard/blobs_action/ya.make @@ -10,8 +10,9 @@ PEERDIR( contrib/libs/apache/arrow ydb/core/tablet_flat ydb/core/tx/tiering - ydb/core/tx/columnshard/blobs_action/bs ydb/core/tx/columnshard/blobs_action/abstract + ydb/core/tx/columnshard/blobs_action/bs + ydb/core/tx/columnshard/blobs_action/counters ydb/core/tx/columnshard/blobs_action/transaction ) diff --git a/ydb/core/tx/columnshard/blobs_reader/actor.cpp b/ydb/core/tx/columnshard/blobs_reader/actor.cpp index 2b387baeb9..64de8e1b7c 100644 --- a/ydb/core/tx/columnshard/blobs_reader/actor.cpp +++ b/ydb/core/tx/columnshard/blobs_reader/actor.cpp @@ -37,7 +37,7 @@ void TActor::Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev) AFL_VERIFY(it != BlobTasks.end())("blob_id", event.BlobRange); for (auto&& i : it->second) { if (event.Status != NKikimrProto::EReplyStatus::OK) { - i->AddError(event.BlobRange, ITask::TErrorStatus::Fail(event.Status, "cannot get blob")); + i->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob")); } else { i->AddData(event.BlobRange, event.Data); } diff --git a/ydb/core/tx/columnshard/blobs_reader/actor.h b/ydb/core/tx/columnshard/blobs_reader/actor.h index aad5d74b18..7670b4d0a2 100644 --- a/ydb/core/tx/columnshard/blobs_reader/actor.h +++ b/ydb/core/tx/columnshard/blobs_reader/actor.h @@ -10,7 +10,7 @@ namespace NKikimr::NOlap::NBlobOperations::NRead { -class TActor : public TActorBootstrapped<TActor> { +class TActor: public TActorBootstrapped<TActor> { private: ui64 TabletId; NActors::TActorId Parent; diff --git a/ydb/core/tx/columnshard/blobs_reader/task.cpp b/ydb/core/tx/columnshard/blobs_reader/task.cpp index 4b79bac1dd..b67f64682d 100644 --- a/ydb/core/tx/columnshard/blobs_reader/task.cpp +++ b/ydb/core/tx/columnshard/blobs_reader/task.cpp @@ -1,4 +1,5 @@ #include "task.h" +#include "events.h" #include <library/cpp/actors/core/log.h> namespace NKikimr::NOlap::NBlobOperations::NRead { @@ -8,7 +9,8 @@ const std::vector<std::shared_ptr<IBlobsReadingAction>>& ITask::GetAgents() cons return Agents; } -bool ITask::AddError(const TBlobRange& range, const TErrorStatus& status) { +bool ITask::AddError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) { + ++BlobErrorsCount; if (TaskFinishedWithError || AbortFlag) { ACFL_WARN("event", "SkipError")("blob_range", range)("message", status.GetErrorMessage())("status", status.GetStatus())("external_task_id", ExternalTaskId) ("abort", AbortFlag)("finished_with_error", TaskFinishedWithError); @@ -19,11 +21,9 @@ bool ITask::AddError(const TBlobRange& range, const TErrorStatus& status) { { auto it = BlobsWaiting.find(range); AFL_VERIFY(it != BlobsWaiting.end()); - it->second->OnReadError(range, status.GetStatus()); + it->second->OnReadError(range, status); BlobsWaiting.erase(it); } - - Y_VERIFY(BlobErrors.emplace(range, status).second); if (!OnError(range)) { TaskFinishedWithError = true; return false; @@ -35,6 +35,7 @@ bool ITask::AddError(const TBlobRange& range, const TErrorStatus& status) { } void ITask::AddData(const TBlobRange& range, const TString& data) { + ++BlobsDataCount; if (TaskFinishedWithError || AbortFlag) { ACFL_WARN("event", "SkipDataAfterError")("external_task_id", ExternalTaskId)("abort", AbortFlag)("finished_with_error", TaskFinishedWithError); return; @@ -48,7 +49,6 @@ void ITask::AddData(const TBlobRange& range, const TString& data) { it->second->OnReadResult(range, data); BlobsWaiting.erase(it); } - Y_VERIFY(BlobsData.emplace(range, data).second); if (BlobsWaiting.empty()) { OnDataReady(); } @@ -61,11 +61,11 @@ void ITask::StartBlobsFetching(const THashSet<TBlobRange>& rangesInProgress) { ui64 size = 0; ui64 count = 0; for (auto&& agent : Agents) { + size += agent->GetExpectedBlobsSize(); + count += agent->GetExpectedBlobsCount(); for (auto&& b : agent->GetRangesForRead()) { for (auto&& r : b.second) { BlobsWaiting.emplace(r, agent); - size += r.Size; - ++count; } } agent->Start(rangesInProgress); @@ -79,12 +79,19 @@ void ITask::StartBlobsFetching(const THashSet<TBlobRange>& rangesInProgress) { namespace { TAtomicCounter TaskIdentifierBuilder = 0; + +} + +void ITask::TReadSubscriber::DoOnAllocationSuccess(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& guard) { + Task->ResourcesGuard = guard; + TActorContext::AsActorContext().Send(ReadActorId, std::make_unique<TEvStartReadTask>(Task)); } -ITask::ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, const TString& externalTaskId) +ITask::ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, const TString& taskCustomer, const TString& externalTaskId) : Agents(actions) , TaskIdentifier(TaskIdentifierBuilder.Inc()) , ExternalTaskId(externalTaskId) + , TaskCustomer(taskCustomer) { AFL_VERIFY(Agents.size()); for (auto&& i : Agents) { @@ -95,8 +102,8 @@ ITask::ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, c TString ITask::DebugString() const { TStringBuilder sb; sb << "finished_with_error=" << TaskFinishedWithError << ";" - << "errors=" << BlobErrors.size() << ";" - << "data=" << BlobsData.size() << ";" + << "errors=" << BlobErrorsCount << ";" + << "data=" << BlobsDataCount << ";" << "waiting=" << BlobsWaiting.size() << ";" << "additional_info=(" << DoDebugString() << ");" ; @@ -107,7 +114,7 @@ void ITask::OnDataReady() { ACFL_DEBUG("event", "OnDataReady")("task", DebugString())("external_task_id", ExternalTaskId); Y_VERIFY(!DataIsReadyFlag); DataIsReadyFlag = true; - DoOnDataReady(); + DoOnDataReady(ResourcesGuard); } bool ITask::OnError(const TBlobRange& range) { @@ -116,7 +123,18 @@ bool ITask::OnError(const TBlobRange& range) { } ITask::~ITask() { - AFL_VERIFY(!NActors::TlsActivationContext || DataIsReadyFlag || TaskFinishedWithError || AbortFlag); + AFL_VERIFY(!NActors::TlsActivationContext || DataIsReadyFlag || TaskFinishedWithError || AbortFlag || !BlobsFetchingStarted); +} + +THashMap<NKikimr::NOlap::TBlobRange, TString> ITask::ExtractBlobsData() { + AFL_VERIFY(BlobsWaiting.empty()); + AFL_VERIFY(!ResultsExtracted); + ResultsExtracted = true; + THashMap<TBlobRange, TString> result; + for (auto&& i : Agents) { + i->ExtractBlobsDataTo(result); + } + return std::move(result); } } diff --git a/ydb/core/tx/columnshard/blobs_reader/task.h b/ydb/core/tx/columnshard/blobs_reader/task.h index 213058d8d2..23ef5a7383 100644 --- a/ydb/core/tx/columnshard/blobs_reader/task.h +++ b/ydb/core/tx/columnshard/blobs_reader/task.h @@ -3,18 +3,16 @@ #include <ydb/library/conclusion/status.h> #include <ydb/core/tx/columnshard/blob.h> #include <ydb/core/tx/columnshard/blobs_action/abstract/read.h> +#include <ydb/core/tx/columnshard/counters/common/object_counter.h> #include <ydb/core/protos/base.pb.h> +#include <ydb/core/tx/columnshard/resource_subscriber/task.h> namespace NKikimr::NOlap::NBlobOperations::NRead { -class ITask { -public: - using TErrorStatus = TConclusionSpecialStatus<NKikimrProto::EReplyStatus, NKikimrProto::EReplyStatus::OK, NKikimrProto::EReplyStatus::ERROR>; +class ITask: public NColumnShard::TMonitoringObjectsCounter<ITask> { private: THashMap<TBlobRange, std::shared_ptr<IBlobsReadingAction>> BlobsWaiting; std::vector<std::shared_ptr<IBlobsReadingAction>> Agents; - THashMap<TBlobRange, TString> BlobsData; - THashMap<TBlobRange, TErrorStatus> BlobErrors; bool BlobsFetchingStarted = false; bool TaskFinishedWithError = false; bool DataIsReadyFlag = false; @@ -23,20 +21,19 @@ private: bool AbortFlag = false; std::optional<ui64> WaitBlobsSize; std::optional<ui64> WaitBlobsCount; + TString TaskCustomer; + std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard; + ui32 BlobErrorsCount = 0; + ui32 BlobsDataCount = 0; + bool ResultsExtracted = false; protected: bool IsFetchingStarted() const { return BlobsFetchingStarted; } - const THashMap<TBlobRange, TString>& GetBlobsData() const { - return BlobsData; - } - - THashMap<TBlobRange, TString> ExtractBlobsData() { - return std::move(BlobsData); - } + THashMap<TBlobRange, TString> ExtractBlobsData(); - virtual void DoOnDataReady() = 0; + virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) = 0; virtual bool DoOnError(const TBlobRange& range) = 0; void OnDataReady(); @@ -72,8 +69,8 @@ public: THashSet<TBlobRange> GetExpectedRanges() const { THashSet<TBlobRange> result; - for (auto&& i : BlobsWaiting) { - i.second->FillExpectedRanges(result); + for (auto&& i : Agents) { + i->FillExpectedRanges(result); } return result; } @@ -82,12 +79,30 @@ public: virtual ~ITask(); - ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, const TString& externalTaskId = ""); + ITask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, const TString& taskCustomer, const TString& externalTaskId = ""); void StartBlobsFetching(const THashSet<TBlobRange>& rangesInProgress); - bool AddError(const TBlobRange& range, const TErrorStatus& status); + bool AddError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status); void AddData(const TBlobRange& range, const TString& data); + + class TReadSubscriber: public NResourceBroker::NSubscribe::ITask { + private: + using TBase = NResourceBroker::NSubscribe::ITask; + const TActorId ReadActorId; + std::shared_ptr<NRead::ITask> Task; + protected: + virtual void DoOnAllocationSuccess(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& guard) override; + public: + TReadSubscriber(const TActorId& readActor, const std::shared_ptr<NRead::ITask>& readTask, const ui32 cpu, const ui64 memory, const TString& name, + const NResourceBroker::NSubscribe::TTaskContext& context) + : TBase(cpu, memory, name, context) + , ReadActorId(readActor) + , Task(readTask) + { + + } + }; }; } diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 0237cdcf68..b588bff6e1 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -1,6 +1,7 @@ #include "columnshard_impl.h" #include "blobs_reader/actor.h" #include "hooks/abstract/abstract.h" +#include "resource_subscriber/actor.h" namespace NKikimr { @@ -15,6 +16,7 @@ namespace NKikimr::NColumnShard { void TColumnShard::CleanupActors(const TActorContext& ctx) { ctx.Send(BlobsReadActor, new TEvents::TEvPoisonPill); + ctx.Send(ResourceSubscribeActor, new TEvents::TEvPoisonPill); if (Tiers) { Tiers->Stop(); } @@ -32,6 +34,7 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) { LOG_S_INFO("Switched to work at " << TabletID() << " actor " << ctx.SelfID); BlobsReadActor = ctx.Register(new NOlap::NBlobOperations::NRead::TActor(TabletID(), SelfId())); + ResourceSubscribeActor = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletID(), SelfId())); for (auto&& i : TablesManager.GetTables()) { ActivateTiering(i.first, i.second.GetTieringUsage()); diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 802f6fdbd1..ac69050f9a 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -167,7 +167,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex << " at tablet " << TabletID()); writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now()); std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildSlicesTask>(TabletID(), SelfId(), - StoragesManager->GetInsertOperator()->StartWritingAction(), writeData); + StoragesManager->GetInsertOperator()->StartWritingAction("WRITING"), writeData); NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task); } } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 5e6daceaed..a685eaf12c 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -5,6 +5,7 @@ #include "engines/changes/ttl.h" #include "engines/changes/cleanup.h" #include "blobs_action/bs/storage.h" +#include "resource_subscriber/task.h" #ifndef KIKIMR_DISABLE_S3_OPS #include "blobs_action/tier/storage.h" @@ -21,6 +22,7 @@ #include <ydb/services/metadata/service.h> #include <ydb/core/tx/tiering/manager.h> #include <ydb/core/tx/conveyor/usage/service.h> +#include "resource_subscriber/counters.h" namespace NKikimr::NColumnShard { @@ -173,6 +175,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) , TablesManager(StoragesManager) , PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig())) , InsertTable(std::make_unique<NOlap::TInsertTable>()) + , SubscribeCounters(std::make_shared<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>()) + , InsertTaskSubscription(NOlap::TInsertColumnEngineChanges::StaticTypeName(), SubscribeCounters) , ReadCounters("Read") , ScanCounters("Scan") , WritesMonitor(*this) @@ -707,8 +711,9 @@ private: std::unique_ptr<TEvPrivate::TEvWriteIndex> TxEvent; TIndexationCounters Counters; protected: - virtual void DoOnDataReady() override { - TxEvent->IndexChanges->Blobs = std::move(ExtractBlobsData()); + virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override { + TxEvent->IndexChanges->Blobs = ExtractBlobsData(); + TxEvent->IndexChanges->ResourcesGuard = resourcesGuard; const bool isInsert = !!dynamic_pointer_cast<NOlap::TInsertColumnEngineChanges>(TxEvent->IndexChanges); std::shared_ptr<NConveyor::ITask> task = std::make_shared<TChangesTask>(std::move(TxEvent), Counters, TabletId, ParentActorId); if (isInsert) { @@ -726,7 +731,7 @@ protected: } public: TChangesReadTask(std::unique_ptr<TEvPrivate::TEvWriteIndex>&& event, const TActorId parentActorId, const ui64 tabletId, const TIndexationCounters& counters) - : TBase(event->IndexChanges->GetReadingActions(), event->IndexChanges->GetTaskIdentifier()) + : TBase(event->IndexChanges->GetReadingActions(), event->IndexChanges->TypeString(), event->IndexChanges->GetTaskIdentifier()) , ParentActorId(parentActorId) , TabletId(tabletId) , TxEvent(std::move(event)) @@ -754,8 +759,10 @@ void TColumnShard::StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dat std::vector<NOlap::TInsertedData> data; data.reserve(dataToIndex.size()); + ui64 memoryNeed = 0; for (auto& ptr : dataToIndex) { data.push_back(*ptr); + memoryNeed += ptr->GetMeta().GetRawBytes(); } Y_VERIFY(data.size()); @@ -770,7 +777,10 @@ void TColumnShard::StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dat auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterIndexing); - ActorContext().Send(BlobsReadActor, std::make_unique<NOlap::NBlobOperations::NRead::TEvStartReadTask>(std::make_unique<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), IndexationCounters))); + const TString taskName = indexChanges->GetTaskIdentifier(); + NOlap::NResourceBroker::NSubscribe::ITask::Start( + ResourceSubscribeActor, std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(BlobsReadActor, + std::make_unique<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), IndexationCounters), 0, memoryNeed, taskName, InsertTaskSubscription)); } void TColumnShard::SetupIndexation() { diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 593573b1a2..0597d4fedb 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -11,6 +11,8 @@ #include "tx_controller.h" #include "inflight_request_tracker.h" #include "counters/columnshard.h" +#include "resource_subscriber/counters.h" +#include "resource_subscriber/task.h" #include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/tablet/tablet_counters.h> @@ -388,6 +390,7 @@ private: TInstant LastStatsReport; TActorId BlobsReadActor; + TActorId ResourceSubscribeActor; TActorId StatsReportPipe; std::shared_ptr<NOlap::IStoragesManager> StoragesManager; @@ -399,6 +402,8 @@ private: TTabletCountersBase* TabletCounters; std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache; std::unique_ptr<NOlap::TInsertTable> InsertTable; + std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters> SubscribeCounters; + NOlap::NResourceBroker::NSubscribe::TTaskContext InsertTaskSubscription; const TScanCounters ReadCounters; const TScanCounters ScanCounters; const TIndexationCounters CompactionCounters = TIndexationCounters("GeneralCompaction"); diff --git a/ydb/core/tx/columnshard/counters/common_data.cpp b/ydb/core/tx/columnshard/counters/common_data.cpp index 9f72a45171..121a9b183c 100644 --- a/ydb/core/tx/columnshard/counters/common_data.cpp +++ b/ydb/core/tx/columnshard/counters/common_data.cpp @@ -6,6 +6,9 @@ TDataOwnerSignals::TDataOwnerSignals(const TString& module, const TString dataNa : TBase(module) , DataName(dataName) { + DataSize = TBase::GetValueAutoAggregationsClient(DataName + "/Size"); + ChunksCount = TBase::GetValueAutoAggregationsClient(DataName + "/Chunks/Count"); + AddCount = GetDeriviative(DataName + "/Add/Count"); AddBytes = GetDeriviative(DataName + "/Add/Bytes"); EraseCount = GetDeriviative(DataName + "/Erase/Count"); diff --git a/ydb/core/tx/columnshard/counters/common_data.h b/ydb/core/tx/columnshard/counters/common_data.h index b66a4aab2f..e4dd70531d 100644 --- a/ydb/core/tx/columnshard/counters/common_data.h +++ b/ydb/core/tx/columnshard/counters/common_data.h @@ -16,15 +16,23 @@ private: NMonitoring::TDynamicCounters::TCounterPtr EraseBytes; NMonitoring::TDynamicCounters::TCounterPtr SkipEraseCount; NMonitoring::TDynamicCounters::TCounterPtr SkipEraseBytes; + std::shared_ptr<TValueAggregationClient> DataSize; + std::shared_ptr<TValueAggregationClient> ChunksCount; public: TDataOwnerSignals(const TString& module, const TString dataName); - void Add(const ui64 size) const { - AddCount->Add(1); - AddBytes->Add(size); + void Add(const ui64 size, const bool load) const { + DataSize->Add(size); + ChunksCount->Add(1); + if (!load) { + AddCount->Add(1); + AddBytes->Add(size); + } } void Erase(const ui64 size) const { + DataSize->Remove(size); + ChunksCount->Remove(1); EraseCount->Add(1); EraseBytes->Add(size); } diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h index 9b07f04e81..cea84b06fa 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h +++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h @@ -6,6 +6,7 @@ #include <ydb/core/tx/columnshard/engines/columns_table.h> #include <ydb/core/tx/columnshard/engines/portions/portion_info.h> #include <ydb/core/tx/columnshard/engines/portions/with_blobs.h> +#include <ydb/core/tx/columnshard/resource_subscriber/task.h> #include <ydb/core/protos/counters_columnshard.pb.h> #include <ydb/core/formats/arrow/arrow_helpers.h> #include <ydb/core/tx/columnshard/splitter/settings.h> @@ -181,8 +182,8 @@ public: return BlobsAction; } - TColumnEngineChanges(const std::shared_ptr<IStoragesManager>& storagesManager) - : BlobsAction(storagesManager) + TColumnEngineChanges(const std::shared_ptr<IStoragesManager>& storagesManager, const TString& consumerId) + : BlobsAction(storagesManager, consumerId) { } @@ -218,6 +219,7 @@ public: } THashMap<TBlobRange, TString> Blobs; + std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard; std::vector<std::shared_ptr<IBlobsReadingAction>> GetReadingActions() const { auto result = BlobsAction.GetReadingActions(); diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup.h b/ydb/core/tx/columnshard/engines/changes/cleanup.h index c245b47178..ca044f734b 100644 --- a/ydb/core/tx/columnshard/engines/changes/cleanup.h +++ b/ydb/core/tx/columnshard/engines/changes/cleanup.h @@ -25,7 +25,11 @@ protected: } virtual NColumnShard::ECumulativeCounters GetCounterIndex(const bool isSuccess) const override; public: - using TBase::TBase; + TCleanupColumnEngineChanges(const std::shared_ptr<IStoragesManager>& storagesManager) + : TBase(storagesManager, StaticTypeName()) { + + } + virtual THashSet<TPortionAddress> GetTouchedPortions() const override { THashSet<TPortionAddress> result; @@ -48,8 +52,12 @@ public: return true; } + static TString StaticTypeName() { + return "CS::CLEANUP"; + } + virtual TString TypeString() const override { - return "CLEANUP"; + return StaticTypeName(); } }; diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp index 8f62034e80..b7d50f84cf 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp @@ -76,7 +76,7 @@ void TCompactColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, T } TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::map<ui64, std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext) - : TBase(limits.GetSplitSettings(), saverContext) + : TBase(limits.GetSplitSettings(), saverContext, StaticTypeName()) , Limits(limits) , GranuleMeta(granule) { diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.h b/ydb/core/tx/columnshard/engines/changes/compaction.h index 778a557ebc..fcba1e97cf 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction.h @@ -35,6 +35,10 @@ public: TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::map<ui64, std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext); ~TCompactColumnEngineChanges(); + static TString StaticTypeName() { + return "CS::GENERAL"; + } + ui32 NumSplitInto(const ui32 srcRows) const; }; diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.h b/ydb/core/tx/columnshard/engines/changes/general_compaction.h index f14d62f9fb..c246efe9ff 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.h @@ -18,7 +18,7 @@ public: using TBase::TBase; virtual TString TypeString() const override { - return "GENERAL_COMPACTION"; + return StaticTypeName(); } }; diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.h b/ydb/core/tx/columnshard/engines/changes/indexation.h index 4aa50998bf..22a28a0a00 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.h +++ b/ydb/core/tx/columnshard/engines/changes/indexation.h @@ -25,7 +25,7 @@ public: THashMap<ui64, std::vector<std::pair<TMark, ui64>>> PathToGranule; // pathId -> {mark, granule} public: TInsertColumnEngineChanges(const TMark& defaultMark, std::vector<NOlap::TInsertedData>&& dataToIndex, const TSplitSettings& splitSettings, const TSaverContext& saverContext) - : TBase(splitSettings, saverContext) + : TBase(splitSettings, saverContext, StaticTypeName()) , DataToIndex(std::move(dataToIndex)) , DefaultMark(defaultMark) { @@ -39,8 +39,12 @@ public: return TBase::GetTouchedPortions(); } + static TString StaticTypeName() { + return "CS::INDEXATION"; + } + virtual TString TypeString() const override { - return "INSERT"; + return StaticTypeName(); } bool AddPathIfNotExists(ui64 pathId); diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.h b/ydb/core/tx/columnshard/engines/changes/ttl.h index 8d71dc736e..6e7208b851 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.h +++ b/ydb/core/tx/columnshard/engines/changes/ttl.h @@ -70,11 +70,19 @@ public: PortionsToEvict.emplace_back(info, std::move(features)); } + static TString StaticTypeName() { + return "CS::TTL"; + } + virtual TString TypeString() const override { - return "TTL"; + return StaticTypeName(); + } + + TTTLColumnEngineChanges(const TSplitSettings& splitSettings, const TSaverContext& saverContext) + : TBase(splitSettings, saverContext, StaticTypeName()) { + } - using TBase::TBase; }; } diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h index 0db6dae785..f107eb637e 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.h +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h @@ -28,8 +28,8 @@ public: return SplitSettings; } - TChangesWithAppend(const TSplitSettings& splitSettings, const TSaverContext& saverContext) - : TBase(saverContext.GetStoragesManager()) + TChangesWithAppend(const TSplitSettings& splitSettings, const TSaverContext& saverContext, const TString& consumerId) + : TBase(saverContext.GetStoragesManager(), consumerId) , SplitSettings(splitSettings) , SaverContext(saverContext) { diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp index c5d9d3714d..430905b212 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp @@ -38,8 +38,8 @@ TInsertionSummary::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 plan pathInfo->AddCommitted(std::move(*data)); } else { - dbTable.Abort(*data); - Summary.AddAborted(std::move(*data)); + dbTable.Abort(*data); + Summary.AddAborted(std::move(*data)); } } diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp index 0bc69d578f..b15e2aab59 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp @@ -6,9 +6,7 @@ namespace NKikimr::NOlap { TAtomicCounter TInsertionSummary::CriticalInserted; void TInsertionSummary::OnNewCommitted(const ui64 dataSize, const bool load) noexcept { - if (!load) { - Counters.Committed.Add(dataSize); - } + Counters.Committed.Add(dataSize, load); ++StatsCommitted.Rows; if (StatsCommitted.Bytes <= (i64)2 * 1024 * 1024 * 1024 && StatsCommitted.Bytes + dataSize > (i64)2 * 1024 * 1024 * 1024) { ++LocalInsertedCritical; @@ -82,12 +80,8 @@ const NKikimr::NOlap::TPathInfo* TInsertionSummary::GetPathInfoOptional(const ui return &it->second; } -bool TInsertionSummary::IsOverloaded(const ui64 pathId) const { - auto it = PathInfo.find(pathId); - if (it == PathInfo.end()) { - return false; - } - return it->second.IsOverloaded(); +bool TInsertionSummary::IsOverloaded(const ui64 /*pathId*/) const { + return StatsCommitted.Bytes > TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID; } void TInsertionSummary::Clear() { @@ -105,9 +99,7 @@ void TInsertionSummary::Clear() { } void TInsertionSummary::OnNewInserted(TPathInfo& pathInfo, const ui64 dataSize, const bool load) noexcept { - if (!load) { - Counters.Inserted.Add(dataSize); - } + Counters.Inserted.Add(dataSize, load); pathInfo.AddInsertedSize(dataSize, TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID); ++StatsPrepared.Rows; StatsPrepared.Bytes += dataSize; @@ -168,9 +160,7 @@ bool TInsertionSummary::EraseCommitted(const TInsertedData& data) { const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddAborted(TInsertedData&& data, const bool load /*= false*/) { const TWriteId writeId((TWriteId)data.WriteTxId); - if (!load) { - Counters.Aborted.Add(data.BlobSize()); - } + Counters.Aborted.Add(data.BlobSize(), load); auto insertInfo = Aborted.emplace(writeId, std::move(data)); Y_VERIFY(insertInfo.second); return &insertInfo.first->second; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp index a0acc9d19a..8799ddd642 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp @@ -30,17 +30,17 @@ TPortionInfo::TPreparedBatchData TAssembleColumnsTaskConstructor::BuildBatchAsse return PortionInfo->PrepareForAssemble(*blobSchema, *resultSchema, blobsDataAssemble); } -void TEFTaskConstructor::DoOnDataReady() { +void TEFTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) { NConveyor::TScanServiceOperator::SendTaskToExecute(std::make_shared<TAssembleFilter>(ScanActorId, BuildBatchAssembler(), ReadMetadata, SourceIdx, ColumnIds, UseEarlyFilter)); } -void TFFColumnsTaskConstructor::DoOnDataReady() { +void TFFColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) { NConveyor::TScanServiceOperator::SendTaskToExecute(std::make_shared<TAssembleFFBatch>(ScanActorId, BuildBatchAssembler(), SourceIdx, AppliedFilter)); } -void TCommittedColumnsTaskConstructor::DoOnDataReady() { +void TCommittedColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) { auto blobs = ExtractBlobsData(); Y_VERIFY(NullBlocks.size() == 0); Y_VERIFY(blobs.size() == 1); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h index 58925f1a48..bc7c3be153 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h @@ -19,8 +19,8 @@ protected: THashMap<TBlobRange, ui32> NullBlocks; virtual bool DoOnError(const TBlobRange& range) override; public: - IFetchTaskConstructor(IDataReader& reader, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks, const IDataSource& source) - : TBase(readActions) + IFetchTaskConstructor(IDataReader& reader, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks, const IDataSource& source, const TString& taskCustomer) + : TBase(readActions, taskCustomer) , ScanActorId(NActors::TActorContext::AsActorContext().SelfID) , SourceIdx(source.GetSourceIdx()) , ReadMetadata(reader.GetReadMetadata()) @@ -36,11 +36,11 @@ private: TCommittedBlob CommittedBlob; using TBase = IFetchTaskConstructor; protected: - virtual void DoOnDataReady() override; + virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override; public: TCommittedColumnsTaskConstructor(IDataReader& reader, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks, - const TCommittedDataSource& source) - : TBase(reader, readActions, std::move(nullBlocks), source) + const TCommittedDataSource& source, const TString& taskCustomer) + : TBase(reader, readActions, std::move(nullBlocks), source, taskCustomer) , CommittedBlob(source.GetCommitted()) { @@ -56,8 +56,8 @@ protected: TPortionInfo::TPreparedBatchData BuildBatchAssembler(); public: TAssembleColumnsTaskConstructor(IDataReader& reader, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks, - const std::set<ui32>& columnIds, const TPortionDataSource& portion) - : TBase(reader, readActions, std::move(nullBlocks), portion) + const std::set<ui32>& columnIds, const TPortionDataSource& portion, const TString& taskCustomer) + : TBase(reader, readActions, std::move(nullBlocks), portion, taskCustomer) , ColumnIds(columnIds) , PortionInfo(portion.GetPortionInfoPtr()) { @@ -69,11 +69,11 @@ class TFFColumnsTaskConstructor: public TAssembleColumnsTaskConstructor { private: using TBase = TAssembleColumnsTaskConstructor; std::shared_ptr<NArrow::TColumnFilter> AppliedFilter; - virtual void DoOnDataReady() override; + virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override; public: TFFColumnsTaskConstructor(IDataReader& reader, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks, - const std::set<ui32>& columnIds, const TPortionDataSource& portion) - : TBase(reader, readActions, std::move(nullBlocks), columnIds, portion) + const std::set<ui32>& columnIds, const TPortionDataSource& portion, const TString& taskCustomer) + : TBase(reader, readActions, std::move(nullBlocks), columnIds, portion, taskCustomer) , AppliedFilter(portion.GetFilterStageData().GetAppliedFilter()) { } @@ -83,11 +83,11 @@ class TEFTaskConstructor: public TAssembleColumnsTaskConstructor { private: bool UseEarlyFilter = false; using TBase = TAssembleColumnsTaskConstructor; - virtual void DoOnDataReady() override; + virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override; public: TEFTaskConstructor(IDataReader& reader, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks, - const std::set<ui32>& columnIds, const TPortionDataSource& portion, const bool useEarlyFilter) - : TBase(reader, readActions, std::move(nullBlocks), columnIds, portion) + const std::set<ui32>& columnIds, const TPortionDataSource& portion, const bool useEarlyFilter, const TString& taskCustomer) + : TBase(reader, readActions, std::move(nullBlocks), columnIds, portion, taskCustomer) , UseEarlyFilter(useEarlyFilter) { } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp index ba2e970fe1..d5f7f732b8 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp @@ -85,12 +85,12 @@ void TPortionDataSource::DoStartFilterStage() { Y_VERIFY(FetchingPlan->GetFilterStage()->GetSize()); auto& columnIds = FetchingPlan->GetFilterStage()->GetColumnIds(); - auto readAction = Portion->GetBlobsStorage()->StartReadingAction(); + auto readAction = Portion->GetBlobsStorage()->StartReadingAction("CS::READ::FILTER"); THashMap<TBlobRange, ui32> nullBlocks; NeedFetchColumns(columnIds, readAction, nullBlocks, nullptr); std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction}; - auto constructor = std::make_shared<TEFTaskConstructor>(ReadData, actions, std::move(nullBlocks), columnIds, *this, FetchingPlan->CanUseEarlyFilterImmediately()); + auto constructor = std::make_shared<TEFTaskConstructor>(ReadData, actions, std::move(nullBlocks), columnIds, *this, FetchingPlan->CanUseEarlyFilterImmediately(), "ReaderFilter"); ReadData.AddForFetch(GetSourceIdx(), constructor, false); } @@ -101,12 +101,12 @@ void TPortionDataSource::DoStartFetchStage() { if (FetchingPlan->GetFetchingStage()->GetSize() && !FilterStageData->IsEmptyFilter()) { auto& columnIds = FetchingPlan->GetFetchingStage()->GetColumnIds(); - auto readAction = Portion->GetBlobsStorage()->StartReadingAction(); + auto readAction = Portion->GetBlobsStorage()->StartReadingAction("CS::READ::FETCHING"); THashMap<TBlobRange, ui32> nullBlocks; NeedFetchColumns(columnIds, readAction, nullBlocks, GetFilterStageData().GetActualFilter()); if (readAction->GetExpectedBlobsCount()) { std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction}; - auto constructor = std::make_shared<TFFColumnsTaskConstructor>(ReadData, actions, std::move(nullBlocks), columnIds, *this); + auto constructor = std::make_shared<TFFColumnsTaskConstructor>(ReadData, actions, std::move(nullBlocks), columnIds, *this, "ReaderFetcher"); ReadData.AddForFetch(GetSourceIdx(), constructor, true); return; } @@ -124,12 +124,12 @@ void TCommittedDataSource::DoFetch() { ReadStarted = true; std::shared_ptr<IBlobsStorageOperator> storageOperator = ReadData.GetContext().GetStoragesManager()->GetInsertOperator(); - auto readAction = storageOperator->StartReadingAction(); + auto readAction = storageOperator->StartReadingAction("CS::READ::COMMITTED"); readAction->AddRange(CommittedBlob.GetBlobRange()); THashMap<TBlobRange, ui32> nullBlocks; std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction}; - auto constructor = std::make_shared<TCommittedColumnsTaskConstructor>(ReadData, actions, std::move(nullBlocks), *this); + auto constructor = std::make_shared<TCommittedColumnsTaskConstructor>(ReadData, actions, std::move(nullBlocks), *this, "ReaderCommitted"); ReadData.AddForFetch(GetSourceIdx(), constructor, true); } } diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp index cc248a53fa..7d1d1abcb6 100644 --- a/ydb/core/tx/columnshard/operations/write.cpp +++ b/ydb/core/tx/columnshard/operations/write.cpp @@ -26,7 +26,7 @@ namespace NKikimr::NColumnShard { NEvWrite::TWriteMeta writeMeta((ui64)WriteId, tableId, source); std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildSlicesTask>(owner.TabletID(), ctx.SelfID, - owner.StoragesManager->GetInsertOperator()->StartWritingAction(), NEvWrite::TWriteData(writeMeta, data)); + owner.StoragesManager->GetInsertOperator()->StartWritingAction("WRITING_OPERATOR"), NEvWrite::TWriteData(writeMeta, data)); NConveyor::TCompServiceOperator::SendTaskToExecute(task); Status = EOperationStatus::Started; diff --git a/ydb/core/tx/columnshard/resource_subscriber/actor.cpp b/ydb/core/tx/columnshard/resource_subscriber/actor.cpp index b91c13d16b..afae8e4beb 100644 --- a/ydb/core/tx/columnshard/resource_subscriber/actor.cpp +++ b/ydb/core/tx/columnshard/resource_subscriber/actor.cpp @@ -31,10 +31,11 @@ void TActor::Handle(TEvStartTask::TPtr& ev) { void TActor::Handle(NKikimr::NResourceBroker::TEvResourceBroker::TEvResourceAllocated::TPtr& ev) { auto it = Tasks.find(((TCookie*)ev->Get()->Cookie.Get())->GetTaskIdentifier()); Y_VERIFY(it != Tasks.end()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "result_resources")("task_id", ev->Get()->TaskId)("task", it->second->DebugString()); - it->second->OnAllocationSuccess(ev->Get()->TaskId, SelfId()); + auto task = it->second; + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "result_resources")("task_id", ev->Get()->TaskId)("task", task->DebugString()); + task->OnAllocationSuccess(ev->Get()->TaskId, SelfId()); + task->GetContext().GetCounters()->OnReply(task->GetMemoryAllocation()); Tasks.erase(it); - it->second->GetContext().GetCounters()->OnReply(it->second->GetMemoryAllocation()); } TActor::TActor(ui64 tabletId, const TActorId& parent) diff --git a/ydb/core/tx/columnshard/resource_subscriber/task.cpp b/ydb/core/tx/columnshard/resource_subscriber/task.cpp index 8dc1fdc646..df1a7650de 100644 --- a/ydb/core/tx/columnshard/resource_subscriber/task.cpp +++ b/ydb/core/tx/columnshard/resource_subscriber/task.cpp @@ -14,6 +14,9 @@ void ITask::Start(const NActors::TActorId& actorId, const std::shared_ptr<ITask> } TResourcesGuard::~TResourcesGuard() { + if (!NActors::TlsActivationContext) { + return; + } AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "free_resources")("task_id", TaskId)("mem", Memory)("cpu", Cpu); auto ev = std::make_unique<IEventHandle>(NKikimr::NResourceBroker::MakeResourceBrokerID(), Sender, new NKikimr::NResourceBroker::TEvResourceBroker::TEvFinishTask(TaskId)); NActors::TActorContext::AsActorContext().Send(std::move(ev)); |