diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-09-29 10:44:32 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-09-29 10:59:59 +0300 |
commit | a7ba8fa484dce3de8b6847dc5a9207eaf600f84f (patch) | |
tree | a91497d1be9a7c493acef3cc173f577dba9e933c | |
parent | 0f10245df98b9bb58f2be8b1fd680a75c26da724 (diff) | |
download | ydb-a7ba8fa484dce3de8b6847dc5a9207eaf600f84f.tar.gz |
KIKIMR-19216: multiple tasks for indexation
TTLUsage will fix soon
5 files changed, 56 insertions, 51 deletions
diff --git a/ydb/core/tx/columnshard/background_controller.cpp b/ydb/core/tx/columnshard/background_controller.cpp index e5722476942..1cdc6ba6e77 100644 --- a/ydb/core/tx/columnshard/background_controller.cpp +++ b/ydb/core/tx/columnshard/background_controller.cpp @@ -47,8 +47,7 @@ void TBackgroundController::CheckDeadlines() { } void TBackgroundController::StartIndexing(const NOlap::TColumnEngineChanges& /*changes*/) { - Y_VERIFY(!ActiveIndexing); - ActiveIndexing = true; + ++ActiveIndexing; } diff --git a/ydb/core/tx/columnshard/background_controller.h b/ydb/core/tx/columnshard/background_controller.h index 32d5e5a0465..c951b486c53 100644 --- a/ydb/core/tx/columnshard/background_controller.h +++ b/ydb/core/tx/columnshard/background_controller.h @@ -46,7 +46,7 @@ private: class TBackgroundController { private: - bool ActiveIndexing = false; + i64 ActiveIndexing = 0; using TCurrentCompaction = THashMap<ui64, NOlap::TPlanCompactionInfo>; TCurrentCompaction ActiveCompactionInfo; @@ -75,8 +75,8 @@ public: void StartIndexing(const NOlap::TColumnEngineChanges& changes); void FinishIndexing() { - Y_VERIFY(ActiveIndexing); - ActiveIndexing = false; + --ActiveIndexing; + Y_VERIFY(ActiveIndexing >= 0); } bool IsIndexingActive() const { return ActiveIndexing; diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 127b9eda588..5e6daceaedb 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -709,74 +709,48 @@ private: protected: virtual void DoOnDataReady() override { TxEvent->IndexChanges->Blobs = std::move(ExtractBlobsData()); + const bool isInsert = !!dynamic_pointer_cast<NOlap::TInsertColumnEngineChanges>(TxEvent->IndexChanges); std::shared_ptr<NConveyor::ITask> task = std::make_shared<TChangesTask>(std::move(TxEvent), Counters, TabletId, ParentActorId); - NConveyor::TCompServiceOperator::SendTaskToExecute(task); + if (isInsert) { + NConveyor::TInsertServiceOperator::SendTaskToExecute(task); + } else { + NConveyor::TCompServiceOperator::SendTaskToExecute(task); + } } - virtual bool DoOnError(const TBlobRange& /*range*/) override { + virtual bool DoOnError(const TBlobRange& range) override { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "DoOnError")("blob_id", range); + AFL_VERIFY(false)("blob_id", range); TxEvent->SetPutStatus(NKikimrProto::ERROR); TActorContext::AsActorContext().Send(ParentActorId, std::move(TxEvent)); return false; } public: TChangesReadTask(std::unique_ptr<TEvPrivate::TEvWriteIndex>&& event, const TActorId parentActorId, const ui64 tabletId, const TIndexationCounters& counters) - : TBase(event->IndexChanges->GetReadingActions()) + : TBase(event->IndexChanges->GetReadingActions(), event->IndexChanges->GetTaskIdentifier()) , ParentActorId(parentActorId) , TabletId(tabletId) , TxEvent(std::move(event)) , Counters(counters) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start_changes")("type", TxEvent->IndexChanges->TypeString())("task_id", TxEvent->IndexChanges->GetTaskIdentifier()); } }; -void TColumnShard::SetupIndexation() { - if (BackgroundController.IsIndexingActive()) { - return; - } - CSCounters.OnSetupIndexation(); - ui32 blobs = 0; - ui32 ignored = 0; - ui64 size = 0; - ui64 bytesToIndex = 0; - std::vector<const NOlap::TInsertedData*> dataToIndex; - dataToIndex.reserve(TLimits::MIN_SMALL_BLOBS_TO_INSERT); - for (auto it = InsertTable->GetPathPriorities().rbegin(); it != InsertTable->GetPathPriorities().rend(); ++it) { - for (auto* pathInfo : it->second) { - for (auto& data : pathInfo->GetCommitted()) { - ui32 dataSize = data.BlobSize(); - Y_VERIFY(dataSize); - - size += dataSize; - ++blobs; - bytesToIndex += dataSize; - dataToIndex.push_back(&data); - if (bytesToIndex >= (ui64)Limits.MaxInsertBytes) { - break; - } - } - if (bytesToIndex >= (ui64)Limits.MaxInsertBytes) { - break; - } - } - if (bytesToIndex >= (ui64)Limits.MaxInsertBytes) { - break; - } - } - - if (bytesToIndex < (ui64)Limits.MinInsertBytes && blobs < TLimits::MIN_SMALL_BLOBS_TO_INSERT) { - LOG_S_DEBUG("Few data for indexation (" << bytesToIndex << " bytes in " << blobs << " blobs, ignored " - << ignored << ") at tablet " << TabletID()); +void TColumnShard::StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dataToIndex, const i64 bytesToIndex) { + if (bytesToIndex < Limits.MinInsertBytes && dataToIndex.size() < TLimits::MIN_SMALL_BLOBS_TO_INSERT) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size()); if (!bytesToIndex || SkippedIndexations < TSettings::MAX_INDEXATIONS_TO_SKIP) { ++SkippedIndexations; return; } } + CSCounters.IndexationInput(bytesToIndex); SkippedIndexations = 0; - LOG_S_DEBUG("Prepare indexing " << bytesToIndex << " bytes in " << dataToIndex.size() << " batches of committed " - << size << " bytes in " << blobs << " blobs ignored " << ignored - << " at tablet " << TabletID()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size())("max_limit", (i64)Limits.MaxInsertBytes) + ("has_more", bytesToIndex >= Limits.MaxInsertBytes); std::vector<NOlap::TInsertedData> data; data.reserve(dataToIndex.size()); @@ -799,6 +773,32 @@ void TColumnShard::SetupIndexation() { ActorContext().Send(BlobsReadActor, std::make_unique<NOlap::NBlobOperations::NRead::TEvStartReadTask>(std::make_unique<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), IndexationCounters))); } +void TColumnShard::SetupIndexation() { + if (BackgroundController.IsIndexingActive()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("reason", "in_progress"); + return; + } + CSCounters.OnSetupIndexation(); + i64 bytesToIndex = 0; + std::vector<const NOlap::TInsertedData*> dataToIndex; + dataToIndex.reserve(TLimits::MIN_SMALL_BLOBS_TO_INSERT); + for (auto it = InsertTable->GetPathPriorities().rbegin(); it != InsertTable->GetPathPriorities().rend(); ++it) { + for (auto* pathInfo : it->second) { + for (auto& data : pathInfo->GetCommitted()) { + Y_VERIFY(data.BlobSize()); + bytesToIndex += data.BlobSize(); + dataToIndex.push_back(&data); + if (bytesToIndex >= Limits.MaxInsertBytes) { + StartIndexTask(std::move(dataToIndex), bytesToIndex); + dataToIndex.clear(); + bytesToIndex = 0; + } + } + } + } + StartIndexTask(std::move(dataToIndex), bytesToIndex); +} + void TColumnShard::SetupCompaction() { CSCounters.OnSetupCompaction(); @@ -807,9 +807,7 @@ void TColumnShard::SetupCompaction() { auto limits = CompactionLimits.Get(); auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(limits, BackgroundController.GetConflictCompactionPortions()); if (!indexChanges) { - if (!BackgroundController.GetCompactionsCount()) { - LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID()); - } + LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID()); break; } diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 6d6612e97d8..593573b1a27 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -459,6 +459,7 @@ private: void RunDropTable(const NKikimrTxColumnShard::TDropTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); void RunAlterStore(const NKikimrTxColumnShard::TAlterStore& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); + void StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dataToIndex, const i64 bytesToIndex); void SetupIndexation(); void SetupCompaction(); bool SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls = {}, const bool force = false); diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h index 4ef0e00ad7e..9b07f04e810 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h +++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h @@ -15,6 +15,7 @@ #include <util/generic/string.h> #include <util/datetime/base.h> #include <util/stream/str.h> +#include <util/generic/guid.h> #include <compare> namespace NKikimr::NTabletFlatExecutor { @@ -169,7 +170,13 @@ protected: TBlobsAction BlobsAction; virtual NColumnShard::ECumulativeCounters GetCounterIndex(const bool isSuccess) const = 0; + + const TString TaskIdentifier = TGUID::Create().AsGuidString(); public: + TString GetTaskIdentifier() const { + return TaskIdentifier; + } + TBlobsAction& GetBlobsAction() { return BlobsAction; } |