aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-09-29 10:44:32 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-09-29 10:59:59 +0300
commita7ba8fa484dce3de8b6847dc5a9207eaf600f84f (patch)
treea91497d1be9a7c493acef3cc173f577dba9e933c
parent0f10245df98b9bb58f2be8b1fd680a75c26da724 (diff)
downloadydb-a7ba8fa484dce3de8b6847dc5a9207eaf600f84f.tar.gz
KIKIMR-19216: multiple tasks for indexation
TTLUsage will fix soon
-rw-r--r--ydb/core/tx/columnshard/background_controller.cpp3
-rw-r--r--ydb/core/tx/columnshard/background_controller.h6
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp90
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/abstract/abstract.h7
5 files changed, 56 insertions, 51 deletions
diff --git a/ydb/core/tx/columnshard/background_controller.cpp b/ydb/core/tx/columnshard/background_controller.cpp
index e5722476942..1cdc6ba6e77 100644
--- a/ydb/core/tx/columnshard/background_controller.cpp
+++ b/ydb/core/tx/columnshard/background_controller.cpp
@@ -47,8 +47,7 @@ void TBackgroundController::CheckDeadlines() {
}
void TBackgroundController::StartIndexing(const NOlap::TColumnEngineChanges& /*changes*/) {
- Y_VERIFY(!ActiveIndexing);
- ActiveIndexing = true;
+ ++ActiveIndexing;
}
diff --git a/ydb/core/tx/columnshard/background_controller.h b/ydb/core/tx/columnshard/background_controller.h
index 32d5e5a0465..c951b486c53 100644
--- a/ydb/core/tx/columnshard/background_controller.h
+++ b/ydb/core/tx/columnshard/background_controller.h
@@ -46,7 +46,7 @@ private:
class TBackgroundController {
private:
- bool ActiveIndexing = false;
+ i64 ActiveIndexing = 0;
using TCurrentCompaction = THashMap<ui64, NOlap::TPlanCompactionInfo>;
TCurrentCompaction ActiveCompactionInfo;
@@ -75,8 +75,8 @@ public:
void StartIndexing(const NOlap::TColumnEngineChanges& changes);
void FinishIndexing() {
- Y_VERIFY(ActiveIndexing);
- ActiveIndexing = false;
+ --ActiveIndexing;
+ Y_VERIFY(ActiveIndexing >= 0);
}
bool IsIndexingActive() const {
return ActiveIndexing;
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 127b9eda588..5e6daceaedb 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -709,74 +709,48 @@ private:
protected:
virtual void DoOnDataReady() override {
TxEvent->IndexChanges->Blobs = std::move(ExtractBlobsData());
+ const bool isInsert = !!dynamic_pointer_cast<NOlap::TInsertColumnEngineChanges>(TxEvent->IndexChanges);
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TChangesTask>(std::move(TxEvent), Counters, TabletId, ParentActorId);
- NConveyor::TCompServiceOperator::SendTaskToExecute(task);
+ if (isInsert) {
+ NConveyor::TInsertServiceOperator::SendTaskToExecute(task);
+ } else {
+ NConveyor::TCompServiceOperator::SendTaskToExecute(task);
+ }
}
- virtual bool DoOnError(const TBlobRange& /*range*/) override {
+ virtual bool DoOnError(const TBlobRange& range) override {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "DoOnError")("blob_id", range);
+ AFL_VERIFY(false)("blob_id", range);
TxEvent->SetPutStatus(NKikimrProto::ERROR);
TActorContext::AsActorContext().Send(ParentActorId, std::move(TxEvent));
return false;
}
public:
TChangesReadTask(std::unique_ptr<TEvPrivate::TEvWriteIndex>&& event, const TActorId parentActorId, const ui64 tabletId, const TIndexationCounters& counters)
- : TBase(event->IndexChanges->GetReadingActions())
+ : TBase(event->IndexChanges->GetReadingActions(), event->IndexChanges->GetTaskIdentifier())
, ParentActorId(parentActorId)
, TabletId(tabletId)
, TxEvent(std::move(event))
, Counters(counters)
{
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start_changes")("type", TxEvent->IndexChanges->TypeString())("task_id", TxEvent->IndexChanges->GetTaskIdentifier());
}
};
-void TColumnShard::SetupIndexation() {
- if (BackgroundController.IsIndexingActive()) {
- return;
- }
- CSCounters.OnSetupIndexation();
- ui32 blobs = 0;
- ui32 ignored = 0;
- ui64 size = 0;
- ui64 bytesToIndex = 0;
- std::vector<const NOlap::TInsertedData*> dataToIndex;
- dataToIndex.reserve(TLimits::MIN_SMALL_BLOBS_TO_INSERT);
- for (auto it = InsertTable->GetPathPriorities().rbegin(); it != InsertTable->GetPathPriorities().rend(); ++it) {
- for (auto* pathInfo : it->second) {
- for (auto& data : pathInfo->GetCommitted()) {
- ui32 dataSize = data.BlobSize();
- Y_VERIFY(dataSize);
-
- size += dataSize;
- ++blobs;
- bytesToIndex += dataSize;
- dataToIndex.push_back(&data);
- if (bytesToIndex >= (ui64)Limits.MaxInsertBytes) {
- break;
- }
- }
- if (bytesToIndex >= (ui64)Limits.MaxInsertBytes) {
- break;
- }
- }
- if (bytesToIndex >= (ui64)Limits.MaxInsertBytes) {
- break;
- }
- }
-
- if (bytesToIndex < (ui64)Limits.MinInsertBytes && blobs < TLimits::MIN_SMALL_BLOBS_TO_INSERT) {
- LOG_S_DEBUG("Few data for indexation (" << bytesToIndex << " bytes in " << blobs << " blobs, ignored "
- << ignored << ") at tablet " << TabletID());
+void TColumnShard::StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dataToIndex, const i64 bytesToIndex) {
+ if (bytesToIndex < Limits.MinInsertBytes && dataToIndex.size() < TLimits::MIN_SMALL_BLOBS_TO_INSERT) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size());
if (!bytesToIndex || SkippedIndexations < TSettings::MAX_INDEXATIONS_TO_SKIP) {
++SkippedIndexations;
return;
}
}
+
CSCounters.IndexationInput(bytesToIndex);
SkippedIndexations = 0;
- LOG_S_DEBUG("Prepare indexing " << bytesToIndex << " bytes in " << dataToIndex.size() << " batches of committed "
- << size << " bytes in " << blobs << " blobs ignored " << ignored
- << " at tablet " << TabletID());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size())("max_limit", (i64)Limits.MaxInsertBytes)
+ ("has_more", bytesToIndex >= Limits.MaxInsertBytes);
std::vector<NOlap::TInsertedData> data;
data.reserve(dataToIndex.size());
@@ -799,6 +773,32 @@ void TColumnShard::SetupIndexation() {
ActorContext().Send(BlobsReadActor, std::make_unique<NOlap::NBlobOperations::NRead::TEvStartReadTask>(std::make_unique<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), IndexationCounters)));
}
+void TColumnShard::SetupIndexation() {
+ if (BackgroundController.IsIndexingActive()) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("reason", "in_progress");
+ return;
+ }
+ CSCounters.OnSetupIndexation();
+ i64 bytesToIndex = 0;
+ std::vector<const NOlap::TInsertedData*> dataToIndex;
+ dataToIndex.reserve(TLimits::MIN_SMALL_BLOBS_TO_INSERT);
+ for (auto it = InsertTable->GetPathPriorities().rbegin(); it != InsertTable->GetPathPriorities().rend(); ++it) {
+ for (auto* pathInfo : it->second) {
+ for (auto& data : pathInfo->GetCommitted()) {
+ Y_VERIFY(data.BlobSize());
+ bytesToIndex += data.BlobSize();
+ dataToIndex.push_back(&data);
+ if (bytesToIndex >= Limits.MaxInsertBytes) {
+ StartIndexTask(std::move(dataToIndex), bytesToIndex);
+ dataToIndex.clear();
+ bytesToIndex = 0;
+ }
+ }
+ }
+ }
+ StartIndexTask(std::move(dataToIndex), bytesToIndex);
+}
+
void TColumnShard::SetupCompaction() {
CSCounters.OnSetupCompaction();
@@ -807,9 +807,7 @@ void TColumnShard::SetupCompaction() {
auto limits = CompactionLimits.Get();
auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(limits, BackgroundController.GetConflictCompactionPortions());
if (!indexChanges) {
- if (!BackgroundController.GetCompactionsCount()) {
- LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID());
- }
+ LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID());
break;
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 6d6612e97d8..593573b1a27 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -459,6 +459,7 @@ private:
void RunDropTable(const NKikimrTxColumnShard::TDropTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
void RunAlterStore(const NKikimrTxColumnShard::TAlterStore& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
+ void StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dataToIndex, const i64 bytesToIndex);
void SetupIndexation();
void SetupCompaction();
bool SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls = {}, const bool force = false);
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
index 4ef0e00ad7e..9b07f04e810 100644
--- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
@@ -15,6 +15,7 @@
#include <util/generic/string.h>
#include <util/datetime/base.h>
#include <util/stream/str.h>
+#include <util/generic/guid.h>
#include <compare>
namespace NKikimr::NTabletFlatExecutor {
@@ -169,7 +170,13 @@ protected:
TBlobsAction BlobsAction;
virtual NColumnShard::ECumulativeCounters GetCounterIndex(const bool isSuccess) const = 0;
+
+ const TString TaskIdentifier = TGUID::Create().AsGuidString();
public:
+ TString GetTaskIdentifier() const {
+ return TaskIdentifier;
+ }
+
TBlobsAction& GetBlobsAction() {
return BlobsAction;
}