aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-06-06 18:54:38 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-06-06 18:54:38 +0300
commita1edfae32d8ebc2479d063d6106156f128ba5fc5 (patch)
tree28122d7b6f09da270f9b6c9696029fa1a5a920ab
parentf662d68f5a0c839f02969571c1fbba97c7711be4 (diff)
downloadydb-a1edfae32d8ebc2479d063d6106156f128ba5fc5.tar.gz
InsertTable overloads
overloads insert_table
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp63
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp42
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h12
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h12
-rw-r--r--ydb/core/tx/columnshard/counters/columnshard.h4
-rw-r--r--ydb/core/tx/columnshard/counters/common/private.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt4
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt4
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt4
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt4
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h78
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h2
-rw-r--r--ydb/core/tx/columnshard/engines/compaction_info.cpp11
-rw-r--r--ydb/core/tx/columnshard/engines/compaction_info.h97
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.h9
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt23
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt24
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt24
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt23
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/data.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/data.h (renamed from ydb/core/tx/columnshard/engines/insert_table.h)50
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp (renamed from ydb/core/tx/columnshard/engines/insert_table.cpp)130
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/insert_table.h87
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/path_info.cpp62
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/path_info.h50
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp70
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h41
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.h1
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.h4
-rw-r--r--ydb/core/tx/columnshard/engines/ut_insert_table.cpp12
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp19
-rw-r--r--ydb/core/tx/columnshard/tables_manager.h4
35 files changed, 736 insertions, 267 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 3869a4760ea..82b994b1a98 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -124,6 +124,25 @@ void TTxWrite::Complete(const TActorContext& ctx) {
ctx.Send(Ev->Get()->GetSource(), Result.release());
}
+void TColumnShard::OverloadWriteFail(const TString& overloadReason, TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) {
+ IncCounter(COUNTER_WRITE_FAIL);
+ IncCounter(COUNTER_WRITE_OVERLOAD);
+
+ const auto& record = Proto(ev->Get());
+ const auto& data = record.GetData();
+ const ui64 tableId = record.GetTableId();
+ const ui64 metaShard = record.GetTxInitiator();
+ const ui64 writeId = record.GetWriteId();
+ const TString& dedupId = record.GetDedupId();
+
+ LOG_S_INFO("Write (overload) " << data.size() << " bytes into pathId " << tableId
+ << "overload reason: [" << overloadReason << "]"
+ << " at tablet " << TabletID());
+
+ auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(
+ TabletID(), metaShard, writeId, tableId, dedupId, NKikimrTxColumnShard::EResultStatus::OVERLOADED);
+ ctx.Send(ev->Get()->GetSource(), result.release());
+}
// EvWrite -> WriteActor (attach BlobId without proto changes) -> EvWrite
void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) {
@@ -131,13 +150,13 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
OnYellowChannels(*ev->Get());
- auto& record = Proto(ev->Get());
- auto& data = record.GetData();
- ui64 tableId = record.GetTableId();
- ui64 metaShard = record.GetTxInitiator();
- ui64 writeId = record.GetWriteId();
- TString dedupId = record.GetDedupId();
- auto putStatus = ev->Get()->GetPutStatus();
+ const auto& record = Proto(ev->Get());
+ const auto& data = record.GetData();
+ const ui64 tableId = record.GetTableId();
+ const ui64 metaShard = record.GetTxInitiator();
+ const ui64 writeId = record.GetWriteId();
+ const TString dedupId = record.GetDedupId();
+ const auto putStatus = ev->Get()->GetPutStatus();
bool isWritable = TablesManager.IsWritableTable(tableId);
bool error = data.empty() || data.size() > TLimits::GetMaxBlobSize() || !TablesManager.HasPrimaryIndex() || !isWritable;
@@ -175,28 +194,24 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
--WritesInFly; // write successed
Y_VERIFY(putStatus == NKikimrProto::OK);
Execute(new TTxWrite(this, ev), ctx);
- } else if (isOutOfSpace || InsertTable->IsOverloaded(tableId) || ShardOverloaded()) {
+ } else if (isOutOfSpace) {
IncCounter(COUNTER_WRITE_FAIL);
-
- if (isOutOfSpace) {
- IncCounter(COUNTER_OUT_OF_SPACE);
- LOG_S_ERROR("Write (out of disk space) " << data.size() << " bytes into pathId " << tableId
- << " at tablet " << TabletID());
- } else {
- bool tableOverload = InsertTable->IsOverloaded(tableId);
- IncCounter(COUNTER_WRITE_OVERLOAD);
- if (!tableOverload) {
- IncCounter(COUNTER_WRITE_OVERLOAD_SHARD);
- }
-
- LOG_S_INFO("Write (overload) " << data.size() << " bytes into pathId " << tableId
- << (ShardOverloaded() ? " [shard]" : "") << (tableOverload? " [table]" : "")
- << " at tablet " << TabletID());
- }
+ IncCounter(COUNTER_OUT_OF_SPACE);
+ LOG_S_ERROR("Write (out of disk space) " << data.size() << " bytes into pathId " << tableId
+ << " at tablet " << TabletID());
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(
TabletID(), metaShard, writeId, tableId, dedupId, NKikimrTxColumnShard::EResultStatus::OVERLOADED);
ctx.Send(ev->Get()->GetSource(), result.release());
+ } else if (InsertTable && InsertTable->IsOverloadedByCommitted(tableId)) {
+ CSCounters.OnOverloadInsertTable(data.size());
+ OverloadWriteFail("insert_table", ev, ctx);
+ } else if (TablesManager.IsOverloaded(tableId)) {
+ CSCounters.OnOverloadGranule(data.size());
+ OverloadWriteFail("granule", ev, ctx);
+ } else if (ShardOverloaded()) {
+ CSCounters.OnOverloadShard(data.size());
+ OverloadWriteFail("shard", ev, ctx);
} else {
if (record.HasLongTxId()) {
// TODO: multiple blobs in one longTx ({longTxId, dedupId} -> writeId)
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 48677da1f90..75c04412999 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -619,8 +619,7 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, TBackgroundActivit
// Preventing conflicts between indexing and compaction leads to election between them.
// Indexing vs compaction probability depends on index and insert table overload status.
// Prefer compaction: 25% by default; 50% if IndexOverloaded(); 6.25% if InsertTableOverloaded().
- const ui32 mask = InsertTableOverloaded() ? 0xF : (IndexOverloaded() ? 0x1 : 0x3);
- const bool preferIndexing = BackgroundActivation & mask;
+ const bool preferIndexing = RandomNumber<ui32>(1000) < 750;
if (preferIndexing) {
if (activity.HasIndexation()) {
@@ -680,27 +679,26 @@ bool TColumnShard::SetupIndexation() {
std::vector<const NOlap::TInsertedData*> dataToIndex;
dataToIndex.reserve(TLimits::MIN_SMALL_BLOBS_TO_INSERT);
THashMap<ui64, ui64> overloadedPathGranules;
- for (auto& [pathId, committed] : InsertTable->GetCommitted()) {
- auto* pMap = TablesManager.GetPrimaryIndexSafe().GetOverloadedGranules(pathId);
- if (pMap) {
- overloadedPathGranules[pathId] = pMap->size();
- }
- InsertTable->SetOverloaded(pathId, !!pMap);
- for (auto& data : committed) {
- ui32 dataSize = data.BlobSize();
- Y_VERIFY(dataSize);
-
- size += dataSize;
- if (bytesToIndex && (bytesToIndex + dataSize) > (ui64)Limits.MaxInsertBytes) {
- continue;
- }
- if (pMap) {
- ++ignored;
- continue;
+ for (auto it = InsertTable->GetPathPriorities().rbegin(); it != InsertTable->GetPathPriorities().rend(); ++it) {
+ for (auto* pathInfo : it->second) {
+ const bool granulesOverloaded = TablesManager.GetPrimaryIndex()->HasOverloadedGranules(pathInfo->GetPathId());
+ for (auto& data : pathInfo->GetCommitted()) {
+ ui32 dataSize = data.BlobSize();
+ Y_VERIFY(dataSize);
+
+ size += dataSize;
+ if (bytesToIndex && (bytesToIndex + dataSize) > (ui64)Limits.MaxInsertBytes) {
+ continue;
+ }
+ if (granulesOverloaded) {
+ ++ignored;
+ CSCounters.SkipIndexationInputDueToGranuleOverload(dataSize);
+ continue;
+ }
+ ++blobs;
+ bytesToIndex += dataSize;
+ dataToIndex.push_back(&data);
}
- ++blobs;
- bytesToIndex += dataSize;
- dataToIndex.push_back(&data);
}
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 4670398b548..226c99d0e4c 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -8,6 +8,7 @@
#include "blob_manager.h"
#include "tables_manager.h"
#include "inflight_request_tracker.h"
+#include "counters/columnshard.h"
#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/core/tablet/tablet_pipe_client_cache.h>
@@ -163,6 +164,7 @@ class TColumnShard
void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr& ev, const TActorContext& ctx);
void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev);
+ void OverloadWriteFail(const TString& overloadReason, TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx);
ITransaction* CreateTxInitSchema();
ITransaction* CreateTxRunGc();
@@ -383,6 +385,8 @@ private:
const TIndexationCounters IndexationCounters = TIndexationCounters("Indexation");
const TIndexationCounters EvictionCounters = TIndexationCounters("Eviction");
+ const TCSCounters CSCounters;
+
THashMap<ui64, TBasicTxInfo> BasicTxInfo;
TSet<TDeadlineQueueItem> DeadlineQueue;
@@ -427,14 +431,6 @@ private:
(writesLimit && WritesInFly > writesLimit);
}
- bool InsertTableOverloaded() const {
- return InsertTable && InsertTable->HasOverloaded();
- }
-
- bool IndexOverloaded() const {
- return TablesManager.IndexOverloaded();
- }
-
TWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId);
TWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId, const ui32 partId);
void AddLongTxWrite(TWriteId writeId, ui64 txId);
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index b025ee9903d..f8e03b52366 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -5,7 +5,7 @@
#include <ydb/core/tx/long_tx_service/public/types.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/protos/tx_columnshard.pb.h>
-#include <ydb/core/tx/columnshard/engines/insert_table.h>
+#include <ydb/core/tx/columnshard/engines/insert_table/insert_table.h>
#include <ydb/core/tx/columnshard/engines/granules_table.h>
#include <ydb/core/tx/columnshard/engines/columns_table.h>
@@ -464,9 +464,7 @@ struct Schema : NIceDb::Schema {
static bool InsertTable_Load(NIceDb::TNiceDb& db,
const IBlobGroupSelector* dsGroupSelector,
- THashMap<TWriteId, TInsertedData>& inserted,
- THashMap<ui64, TSet<TInsertedData>>& committed,
- THashMap<TWriteId, TInsertedData>& aborted,
+ NOlap::TInsertTableAccessor& insertTable,
const TInstant& loadTime) {
auto rowset = db.Table<InsertTable>().GreaterOrEqual(0, 0, 0, 0, "").Select();
if (!rowset.IsReady())
@@ -502,13 +500,13 @@ struct Schema : NIceDb::Schema {
switch (recType) {
case EInsertTableIds::Inserted:
- inserted.emplace(TWriteId{data.WriteTxId}, std::move(data));
+ insertTable.AddInserted(TWriteId{ data.WriteTxId }, std::move(data));
break;
case EInsertTableIds::Committed:
- committed[data.PathId].emplace(data);
+ insertTable.AddCommitted(std::move(data));
break;
case EInsertTableIds::Aborted:
- aborted.emplace(TWriteId{data.WriteTxId}, std::move(data));
+ insertTable.AddAborted(TWriteId{ data.WriteTxId }, std::move(data));
break;
}
diff --git a/ydb/core/tx/columnshard/counters/columnshard.h b/ydb/core/tx/columnshard/counters/columnshard.h
index c9b625eb7c3..af9aed36ff6 100644
--- a/ydb/core/tx/columnshard/counters/columnshard.h
+++ b/ydb/core/tx/columnshard/counters/columnshard.h
@@ -60,12 +60,12 @@ public:
OverloadShardCount->Add(1);
}
- void SkipIndexationInputDutToSplitCompaction(const ui64 size) const {
+ void SkipIndexationInputDueToSplitCompaction(const ui64 size) const {
SkipIndexationInputDueToSplitCompactionBytes->Add(size);
SkipIndexationInputDueToSplitCompactionCount->Add(1);
}
- void SkipIndexationInputDutToGranuleOverload(const ui64 size) const {
+ void SkipIndexationInputDueToGranuleOverload(const ui64 size) const {
SkipIndexationInputDueToGranuleOverloadBytes->Add(size);
SkipIndexationInputDueToGranuleOverloadCount->Add(1);
}
diff --git a/ydb/core/tx/columnshard/counters/common/private.cpp b/ydb/core/tx/columnshard/counters/common/private.cpp
index 2c6d0c92c91..4fc7cc40623 100644
--- a/ydb/core/tx/columnshard/counters/common/private.cpp
+++ b/ydb/core/tx/columnshard/counters/common/private.cpp
@@ -47,7 +47,9 @@ public:
auto it = Agents.find(signalName);
if (it == Agents.end()) {
it = Agents.emplace(signalName, std::make_shared<TValueAggregationAgent>(signalName, signalsOwner)).first;
- NActors::TActivationContext::Register(new TRegularSignalBuilderActor(it->second));
+ if (NActors::TlsActivationContext) {
+ NActors::TActivationContext::Register(new TRegularSignalBuilderActor(it->second));
+ }
}
return it->second;
}
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
index b868fce25e7..570006da4b2 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(insert_table)
add_subdirectory(predicate)
add_subdirectory(reader)
add_subdirectory(storage)
@@ -34,6 +35,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC
columnshard-engines-reader
columnshard-engines-predicate
columnshard-engines-storage
+ columnshard-engines-insert_table
formats-arrow-compression
core-tx-program
udf-service-exception_policy
@@ -42,8 +44,8 @@ target_link_libraries(tx-columnshard-engines PUBLIC
target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/compaction_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
index d73745d1174..60cd9a5bca8 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(insert_table)
add_subdirectory(predicate)
add_subdirectory(reader)
add_subdirectory(storage)
@@ -35,6 +36,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC
columnshard-engines-reader
columnshard-engines-predicate
columnshard-engines-storage
+ columnshard-engines-insert_table
formats-arrow-compression
core-tx-program
udf-service-exception_policy
@@ -43,8 +45,8 @@ target_link_libraries(tx-columnshard-engines PUBLIC
target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/compaction_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
index d73745d1174..60cd9a5bca8 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(insert_table)
add_subdirectory(predicate)
add_subdirectory(reader)
add_subdirectory(storage)
@@ -35,6 +36,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC
columnshard-engines-reader
columnshard-engines-predicate
columnshard-engines-storage
+ columnshard-engines-insert_table
formats-arrow-compression
core-tx-program
udf-service-exception_policy
@@ -43,8 +45,8 @@ target_link_libraries(tx-columnshard-engines PUBLIC
target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/compaction_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
index b868fce25e7..570006da4b2 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(insert_table)
add_subdirectory(predicate)
add_subdirectory(reader)
add_subdirectory(storage)
@@ -34,6 +35,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC
columnshard-engines-reader
columnshard-engines-predicate
columnshard-engines-storage
+ columnshard-engines-insert_table
formats-arrow-compression
core-tx-program
udf-service-exception_policy
@@ -42,8 +44,8 @@ target_link_libraries(tx-columnshard-engines PUBLIC
target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/column_engine.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/compaction_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 076200c84e8..299d75dea0a 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -4,10 +4,11 @@
#include "index_info.h"
#include "portion_info.h"
#include "db_wrapper.h"
-#include "insert_table.h"
#include "columns_table.h"
+#include "compaction_info.h"
#include "granules_table.h"
#include "predicate/filter.h"
+#include "insert_table/data.h"
#include <ydb/core/formats/arrow/replace_key.h>
#include <ydb/core/tx/columnshard/blob.h>
@@ -17,11 +18,12 @@ namespace NKikimr::NOlap {
struct TPredicate;
struct TCompactionLimits {
- static constexpr const ui32 MIN_GOOD_BLOB_SIZE = 256 * 1024; // some BlobStorage constant
- static constexpr const ui32 MAX_BLOB_SIZE = 8 * 1024 * 1024; // some BlobStorage constant
+ static constexpr const ui64 MIN_GOOD_BLOB_SIZE = 256 * 1024; // some BlobStorage constant
+ static constexpr const ui64 MAX_BLOB_SIZE = 8 * 1024 * 1024; // some BlobStorage constant
static constexpr const ui64 EVICT_HOT_PORTION_BYTES = 1 * 1024 * 1024;
static constexpr const ui64 DEFAULT_EVICTION_BYTES = 64 * 1024 * 1024;
static constexpr const ui64 MAX_BLOBS_TO_DELETE = 10000;
+ static constexpr const ui64 OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID = 1024 * MAX_BLOB_SIZE;
ui32 GoodBlobSize{MIN_GOOD_BLOB_SIZE};
ui32 GranuleBlobSplitSize{MAX_BLOB_SIZE};
@@ -114,72 +116,6 @@ private:
static std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type);
};
-class ICompactionObjectCallback {
-public:
- virtual ~ICompactionObjectCallback() = default;
- virtual void OnCompactionStarted(const bool inGranule) = 0;
- virtual void OnCompactionFinished() = 0;
- virtual void OnCompactionFailed(const TString& reason) = 0;
- virtual void OnCompactionCanceled(const TString& reason) = 0;
- virtual TString DebugString() const = 0;
-};
-
-struct TCompactionInfo {
-private:
- std::shared_ptr<ICompactionObjectCallback> CompactionObject;
- mutable bool StatusProvided = false;
- const bool InGranuleFlag = false;
-public:
- TCompactionInfo(std::shared_ptr<ICompactionObjectCallback> compactionObject, const bool inGranule)
- : CompactionObject(compactionObject)
- , InGranuleFlag(inGranule)
- {
- Y_VERIFY(compactionObject);
- CompactionObject->OnCompactionStarted(InGranuleFlag);
- }
-
- bool InGranule() const {
- return InGranuleFlag;
- }
-
- template <class T>
- const T& GetObject() const {
- auto result = dynamic_cast<const T*>(CompactionObject.get());
- Y_VERIFY(result);
- return *result;
- }
-
- void CompactionFinished() const {
- Y_VERIFY(!StatusProvided);
- StatusProvided = true;
- CompactionObject->OnCompactionFinished();
- }
-
- void CompactionCanceled(const TString& reason) const {
- Y_VERIFY(!StatusProvided);
- StatusProvided = true;
- CompactionObject->OnCompactionCanceled(reason);
- }
-
- void CompactionFailed(const TString& reason) const {
- Y_VERIFY(!StatusProvided);
- StatusProvided = true;
- CompactionObject->OnCompactionFailed(reason);
- }
-
- ~TCompactionInfo() {
- Y_VERIFY_DEBUG(StatusProvided);
- if (!StatusProvided) {
- CompactionObject->OnCompactionFailed("compaction unexpectedly finished");
- }
- }
-
- friend IOutputStream& operator << (IOutputStream& out, const TCompactionInfo& info) {
- out << (info.InGranuleFlag ? "in granule" : "split granule") << " compaction of granule: " << info.CompactionObject->DebugString();
- return out;
- }
-};
-
struct TPortionEvictionFeatures {
const TString TargetTierName;
const ui64 PathId; // portion path id for cold-storage-key construct
@@ -632,7 +568,9 @@ public:
virtual const std::shared_ptr<arrow::Schema>& GetSortingKey() const { return GetIndexInfo().GetSortingKey(); }
virtual const std::shared_ptr<arrow::Schema>& GetIndexKey() const { return GetIndexInfo().GetIndexKey(); }
virtual const THashSet<ui64>* GetOverloadedGranules(ui64 /*pathId*/) const { return nullptr; }
- virtual bool HasOverloadedGranules() const { return false; }
+ bool HasOverloadedGranules(const ui64 pathId) const {
+ return GetOverloadedGranules(pathId) != nullptr;
+ }
virtual TString SerializeMark(const NArrow::TReplaceKey& key) const = 0;
virtual NArrow::TReplaceKey DeserializeMark(const TString& key, std::optional<ui32> markNumKeys) const = 0;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 39ca47a44c6..4a225e60dfe 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -201,8 +201,6 @@ public:
return GranulesStorage->GetOverloaded(pathId);
}
- bool HasOverloadedGranules() const override { return GranulesStorage->HasOverloadedGranules(); }
-
TString SerializeMark(const NArrow::TReplaceKey& key) const override {
if (UseCompositeMarks()) {
return TMark::SerializeComposite(key, MarkSchema());
diff --git a/ydb/core/tx/columnshard/engines/compaction_info.cpp b/ydb/core/tx/columnshard/engines/compaction_info.cpp
new file mode 100644
index 00000000000..699aa959f65
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/compaction_info.cpp
@@ -0,0 +1,11 @@
+#include "compaction_info.h"
+#include "storage/granule.h"
+
+namespace NKikimr::NOlap {
+
+NKikimr::NOlap::TPlanCompactionInfo TCompactionInfo::GetPlanCompaction() const {
+ auto& granuleMeta = GetObject<TGranuleMeta>();
+ return TPlanCompactionInfo(granuleMeta.GetPathId(), InGranule());
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/compaction_info.h b/ydb/core/tx/columnshard/engines/compaction_info.h
new file mode 100644
index 00000000000..37ffc9dcffd
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/compaction_info.h
@@ -0,0 +1,97 @@
+#pragma once
+#include <util/generic/string.h>
+#include <util/system/yassert.h>
+#include <util/stream/output.h>
+#include <memory>
+
+namespace NKikimr::NOlap {
+
+class ICompactionObjectCallback {
+public:
+ virtual ~ICompactionObjectCallback() = default;
+ virtual void OnCompactionStarted(const bool inGranule) = 0;
+ virtual void OnCompactionFinished() = 0;
+ virtual void OnCompactionFailed(const TString& reason) = 0;
+ virtual void OnCompactionCanceled(const TString& reason) = 0;
+ virtual TString DebugString() const = 0;
+};
+
+class TPlanCompactionInfo {
+private:
+ ui64 PathId = 0;
+ bool InternalFlag = false;
+public:
+ TPlanCompactionInfo(const ui64 pathId, const bool internalFlag)
+ : PathId(pathId)
+ , InternalFlag(internalFlag) {
+
+ }
+
+ ui64 GetPathId() const {
+ return PathId;
+ }
+
+ bool IsInternal() const {
+ return InternalFlag;
+ }
+};
+
+struct TCompactionInfo {
+private:
+ std::shared_ptr<ICompactionObjectCallback> CompactionObject;
+ mutable bool StatusProvided = false;
+ const bool InGranuleFlag = false;
+public:
+ TCompactionInfo(std::shared_ptr<ICompactionObjectCallback> compactionObject, const bool inGranule)
+ : CompactionObject(compactionObject)
+ , InGranuleFlag(inGranule)
+ {
+ Y_VERIFY(compactionObject);
+ CompactionObject->OnCompactionStarted(InGranuleFlag);
+ }
+
+ TPlanCompactionInfo GetPlanCompaction() const;
+
+ bool InGranule() const {
+ return InGranuleFlag;
+ }
+
+ template <class T>
+ const T& GetObject() const {
+ auto result = dynamic_cast<const T*>(CompactionObject.get());
+ Y_VERIFY(result);
+ return *result;
+ }
+
+ void CompactionFinished() const {
+ Y_VERIFY(!StatusProvided);
+ StatusProvided = true;
+ CompactionObject->OnCompactionFinished();
+ }
+
+ void CompactionCanceled(const TString& reason) const {
+ Y_VERIFY(!StatusProvided);
+ StatusProvided = true;
+ CompactionObject->OnCompactionCanceled(reason);
+ }
+
+ void CompactionFailed(const TString& reason) const {
+ Y_VERIFY(!StatusProvided);
+ StatusProvided = true;
+ CompactionObject->OnCompactionFailed(reason);
+ }
+
+ ~TCompactionInfo() {
+ Y_VERIFY_DEBUG(StatusProvided);
+ if (!StatusProvided) {
+ CompactionObject->OnCompactionFailed("compaction unexpectedly finished");
+ }
+ }
+
+ friend IOutputStream& operator << (IOutputStream& out, const TCompactionInfo& info) {
+ out << (info.InGranuleFlag ? "in granule" : "split granule") << " compaction of granule: " << info.CompactionObject->DebugString();
+ return out;
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
index e8cef6ca7e1..2b8aeb39455 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
@@ -34,12 +34,10 @@ void TDbWrapper::EraseAborted(const TInsertedData& data) {
NColumnShard::Schema::InsertTable_EraseAborted(db, data);
}
-bool TDbWrapper::Load(THashMap<TWriteId, TInsertedData>& inserted,
- THashMap<ui64, TSet<TInsertedData>>& committed,
- THashMap<TWriteId, TInsertedData>& aborted,
+bool TDbWrapper::Load(TInsertTableAccessor& insertTable,
const TInstant& loadTime) {
NIceDb::TNiceDb db(Database);
- return NColumnShard::Schema::InsertTable_Load(db, DsGroupSelector, inserted, committed, aborted, loadTime);
+ return NColumnShard::Schema::InsertTable_Load(db, DsGroupSelector, insertTable, loadTime);
}
void TDbWrapper::WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) {
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h
index b893c584733..0e32b6d18b7 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.h
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.h
@@ -8,6 +8,7 @@ class TDatabase;
namespace NKikimr::NOlap {
struct TInsertedData;
+class TInsertTableAccessor;
struct TColumnRecord;
struct TGranuleRecord;
class IColumnEngine;
@@ -23,9 +24,7 @@ public:
virtual void EraseCommitted(const TInsertedData& data) = 0;
virtual void EraseAborted(const TInsertedData& data) = 0;
- virtual bool Load(THashMap<TWriteId, TInsertedData>& inserted,
- THashMap<ui64, TSet<TInsertedData>>& committed,
- THashMap<TWriteId, TInsertedData>& aborted,
+ virtual bool Load(TInsertTableAccessor& insertTable,
const TInstant& loadTime) = 0;
virtual void WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) = 0;
@@ -54,9 +53,7 @@ public:
void EraseCommitted(const TInsertedData& data) override;
void EraseAborted(const TInsertedData& data) override;
- bool Load(THashMap<TWriteId, TInsertedData>& inserted,
- THashMap<ui64, TSet<TInsertedData>>& committed,
- THashMap<TWriteId, TInsertedData>& aborted,
+ bool Load(TInsertTableAccessor& insertTable,
const TInstant& loadTime) override;
void WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) override;
diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp
index 886b67762d2..64d1ca85694 100644
--- a/ydb/core/tx/columnshard/engines/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/index_info.cpp
@@ -1,5 +1,4 @@
#include "index_info.h"
-#include "insert_table.h"
#include "column_engine.h"
#include <ydb/core/formats/arrow/arrow_batch_builder.h>
diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..3ee6b423795
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-engines-insert_table)
+target_link_libraries(columnshard-engines-insert_table PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(columnshard-engines-insert_table PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..e2faa9f1855
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-engines-insert_table)
+target_link_libraries(columnshard-engines-insert_table PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(columnshard-engines-insert_table PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..e2faa9f1855
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-engines-insert_table)
+target_link_libraries(columnshard-engines-insert_table PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(columnshard-engines-insert_table PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..3ee6b423795
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-engines-insert_table)
+target_link_libraries(columnshard-engines-insert_table PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(columnshard-engines-insert_table PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/insert_table/data.cpp b/ydb/core/tx/columnshard/engines/insert_table/data.cpp
new file mode 100644
index 00000000000..2830fe24c17
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/insert_table/data.cpp
@@ -0,0 +1,5 @@
+#include "data.h"
+
+namespace NKikimr::NOlap {
+
+}
diff --git a/ydb/core/tx/columnshard/engines/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table/data.h
index 2e9ddd5f097..dcbae641e22 100644
--- a/ydb/core/tx/columnshard/engines/insert_table.h
+++ b/ydb/core/tx/columnshard/engines/insert_table/data.h
@@ -1,8 +1,6 @@
#pragma once
#include <ydb/core/tx/columnshard/blob.h>
-#include <util/generic/set.h>
-
-#include "defs.h"
+#include <ydb/core/tx/columnshard/engines/defs.h>
namespace NKikimr::NOlap {
@@ -136,52 +134,6 @@ public:
}
};
-class IDbWrapper;
-
-/// Use one table for inserted and commited blobs:
-/// !Commited => {ShardOrPlan, WriteTxId} are {MetaShard, WriteId}
-/// Commited => {ShardOrPlan, WriteTxId} are {PlanStep, TxId}
-class TInsertTable {
-public:
- static constexpr const TDuration WaitCommitDelay = TDuration::Hours(24);
- static constexpr const TDuration CleanDelay = TDuration::Minutes(10);
-
- struct TCounters {
- ui64 Rows{};
- ui64 Bytes{};
- ui64 RawBytes{};
- };
-
- bool Insert(IDbWrapper& dbTable, TInsertedData&& data);
- TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard,
- const THashSet<TWriteId>& writeIds, std::function<bool(ui64)> pathExists);
- void Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWriteId>& writeIds);
- THashSet<TWriteId> OldWritesToAbort(const TInstant& now) const;
- THashSet<TWriteId> DropPath(IDbWrapper& dbTable, ui64 pathId);
- void EraseCommitted(IDbWrapper& dbTable, const TInsertedData& key);
- void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key);
- std::vector<TCommittedBlob> Read(ui64 pathId, const TSnapshot& snapshot) const;
- bool Load(IDbWrapper& dbTable, const TInstant& loadTime);
- const TCounters& GetCountersPrepared() const { return StatsPrepared; }
- const TCounters& GetCountersCommitted() const { return StatsCommitted; }
-
- size_t InsertedSize() const { return Inserted.size(); }
- const THashMap<ui64, TSet<TInsertedData>>& GetCommitted() const { return CommittedByPathId; }
- const THashMap<TWriteId, TInsertedData>& GetAborted() const { return Aborted; }
- void SetOverloaded(ui64 pathId, bool overload);
- bool IsOverloaded(ui64 pathId) const { return PathsOverloaded.contains(pathId); }
- bool HasOverloaded() const { return !PathsOverloaded.empty(); }
-
-private:
- THashMap<TWriteId, TInsertedData> Inserted;
- THashMap<ui64, TSet<TInsertedData>> CommittedByPathId;
- THashMap<TWriteId, TInsertedData> Aborted;
- THashSet<ui64> PathsOverloaded;
- mutable TInstant LastCleanup;
- TCounters StatsPrepared;
- TCounters StatsCommitted;
-};
-
}
template <>
diff --git a/ydb/core/tx/columnshard/engines/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
index 50f15ace780..c7a799de8c4 100644
--- a/ydb/core/tx/columnshard/engines/insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
@@ -1,22 +1,54 @@
-#include "defs.h"
#include "insert_table.h"
-#include "db_wrapper.h"
-#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/core/protos/tx_columnshard.pb.h>
+#include <ydb/core/tx/columnshard/engines/db_wrapper.h>
+#include <ydb/core/tx/columnshard/engines/column_engine.h>
namespace NKikimr::NOlap {
+void TInsertTable::OnNewInserted(TPathInfo& pathInfo, const ui64 dataSize, const bool load) noexcept {
+ if (!load) {
+ Counters.Inserted.Add(dataSize);
+ }
+ pathInfo.AddInsertedSize(dataSize, TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID);
+ ++StatsPrepared.Rows;
+ StatsPrepared.Bytes += dataSize;
+}
+
+void TInsertTable::OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize) noexcept {
+ Counters.Inserted.Erase(dataSize);
+ pathInfo.AddInsertedSize(-1 * (i64)dataSize, TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID);
+ Y_VERIFY(--StatsPrepared.Rows >= 0);
+ StatsPrepared.Bytes += dataSize;
+}
+
+void TInsertTable::OnNewCommitted(const ui64 dataSize, const bool load) noexcept {
+ if (!load) {
+ Counters.Committed.Add(dataSize);
+ }
+ ++StatsCommitted.Rows;
+ StatsCommitted.Bytes += dataSize;
+}
+
+void TInsertTable::OnEraseCommitted(TPathInfo& /*pathInfo*/, const ui64 dataSize) noexcept {
+ Counters.Committed.Erase(dataSize);
+ Y_VERIFY(--StatsCommitted.Rows >= 0);
+ StatsCommitted.Bytes -= dataSize;
+}
+
bool TInsertTable::Insert(IDbWrapper& dbTable, TInsertedData&& data) {
TWriteId writeId{data.WriteTxId};
if (Inserted.contains(writeId)) {
+ Counters.Inserted.SkipAdd(data.BlobSize());
return false;
}
dbTable.Insert(data);
- ui32 dataSize = data.BlobSize();
+ const ui32 dataSize = data.BlobSize();
+ const ui64 pathId = data.PathId;
if (Inserted.emplace(writeId, std::move(data)).second) {
- StatsPrepared.Rows = Inserted.size();
- StatsPrepared.Bytes += dataSize;
+ OnNewInserted(Summary.GetPathInfo(pathId), dataSize);
+ } else {
+ Counters.Inserted.SkipAdd(dataSize);
}
return true;
}
@@ -40,25 +72,25 @@ TInsertTable::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 planStep,
dbTable.EraseInserted(*data);
- ui32 dataSize = data->BlobSize();
-
+ const ui64 dataSize = data->BlobSize();
+ const ui64 pathId = data->PathId;
+ auto* pathInfo = Summary.GetPathInfoOptional(pathId);
// There could be commit after drop: propose, drop, plan
- if (pathExists(data->PathId)) {
+ if (pathInfo && pathExists(pathId)) {
data->Commit(planStep, txId);
dbTable.Commit(*data);
- if (CommittedByPathId[data->PathId].emplace(std::move(*data)).second) {
- ++StatsCommitted.Rows;
- StatsCommitted.Bytes += dataSize;
+ if (pathInfo->AddCommitted(std::move(*data))) {
+ OnNewCommitted(dataSize);
}
} else {
dbTable.Abort(*data);
+ Counters.Aborted.Add(data->BlobSize());
Aborted.emplace(writeId, std::move(*data));
}
- if (Inserted.erase(writeId)) {
- StatsPrepared.Rows = Inserted.size();
- StatsPrepared.Bytes -= dataSize;
+ if (pathInfo && Inserted.erase(writeId)) {
+ OnEraseInserted(*pathInfo, dataSize);
}
}
@@ -72,14 +104,17 @@ void TInsertTable::Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWr
for (auto writeId : writeIds) {
// There could be inconsistency with txs and writes in case of bugs. So we could find no record for writeId.
if (auto* data = Inserted.FindPtr(writeId)) {
+ Counters.Aborted.Add(data->BlobSize());
dbTable.EraseInserted(*data);
dbTable.Abort(*data);
- ui32 dataSize = data->BlobSize();
+ const ui64 pathId = data->PathId;
+ const ui32 dataSize = data->BlobSize();
Aborted.emplace(writeId, std::move(*data));
if (Inserted.erase(writeId)) {
- StatsPrepared.Rows = Inserted.size();
- StatsPrepared.Bytes -= dataSize;
+ OnEraseInserted(Summary.GetPathInfo(pathId), dataSize);
+ } else {
+ Counters.Inserted.SkipErase(dataSize);
}
}
}
@@ -107,20 +142,19 @@ THashSet<TWriteId> TInsertTable::OldWritesToAbort(const TInstant& now) const {
THashSet<TWriteId> TInsertTable::DropPath(IDbWrapper& dbTable, ui64 pathId) {
// Committed -> Aborted (for future cleanup)
- TSet<TInsertedData> committed = std::move(CommittedByPathId[pathId]);
- CommittedByPathId.erase(pathId);
-
- StatsCommitted.Rows -= committed.size();
- for (auto& data : committed) {
- StatsCommitted.Bytes -= data.BlobSize();
-
+ auto pathInfo = Summary.ExtractPathInfo(pathId);
+ if (!pathInfo) {
+ return {};
+ }
+ for (auto& data : pathInfo->GetCommitted()) {
dbTable.EraseCommitted(data);
-
+ OnEraseCommitted(*pathInfo, data.BlobSize());
TInsertedData copy = data;
copy.Undo();
dbTable.Abort(copy);
TWriteId writeId{copy.WriteTxId};
+ Counters.Aborted.Add(copy.BlobSize());
Aborted.emplace(writeId, std::move(copy));
}
@@ -137,14 +171,17 @@ THashSet<TWriteId> TInsertTable::DropPath(IDbWrapper& dbTable, ui64 pathId) {
}
void TInsertTable::EraseCommitted(IDbWrapper& dbTable, const TInsertedData& data) {
- if (!CommittedByPathId.contains(data.PathId)) {
+ TPathInfo* pathInfo = Summary.GetPathInfoOptional(data.PathId);
+ if (!pathInfo) {
+ Counters.Committed.SkipErase(data.BlobSize());
return;
}
dbTable.EraseCommitted(data);
- if (CommittedByPathId[data.PathId].erase(data)) {
- --StatsCommitted.Rows;
- StatsCommitted.Bytes -= data.BlobSize();
+ if (pathInfo->EraseCommitted(data)) {
+ OnEraseCommitted(*pathInfo, data.BlobSize());
+ } else {
+ Counters.Committed.SkipErase(data.BlobSize());
}
}
@@ -155,15 +192,14 @@ void TInsertTable::EraseAborted(IDbWrapper& dbTable, const TInsertedData& data)
}
dbTable.EraseAborted(data);
+ Counters.Aborted.Erase(data.BlobSize());
Aborted.erase(writeId);
}
bool TInsertTable::Load(IDbWrapper& dbTable, const TInstant& loadTime) {
- Inserted.clear();
- CommittedByPathId.clear();
- Aborted.clear();
+ Clear();
- if (!dbTable.Load(Inserted, CommittedByPathId, Aborted, loadTime)) {
+ if (!dbTable.Load(*this, loadTime)) {
return false;
}
@@ -172,15 +208,13 @@ bool TInsertTable::Load(IDbWrapper& dbTable, const TInstant& loadTime) {
StatsPrepared = {};
StatsCommitted = {};
- StatsPrepared.Rows = Inserted.size();
for (auto& [_, data] : Inserted) {
- StatsPrepared.Bytes += data.BlobSize();
+ OnNewInserted(Summary.GetPathInfo(data.PathId), data.BlobSize());
}
- for (auto& [_, set] : CommittedByPathId) {
- StatsCommitted.Rows += set.size();
- for (auto& data : set) {
- StatsCommitted.Bytes += data.BlobSize();
+ for (auto& [pathId, pathInfo] : Summary.GetPathInfo()) {
+ for (auto& data : pathInfo.GetCommitted()) {
+ OnNewCommitted(data.BlobSize());
}
}
@@ -188,15 +222,15 @@ bool TInsertTable::Load(IDbWrapper& dbTable, const TInstant& loadTime) {
}
std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& snapshot) const {
- const auto* committed = CommittedByPathId.FindPtr(pathId);
- if (!committed) {
+ const TPathInfo* pInfo = Summary.GetPathInfoOptional(pathId);
+ if (!pInfo) {
return {};
}
std::vector<TCommittedBlob> ret;
- ret.reserve(committed->size());
+ ret.reserve(pInfo->GetCommitted().size());
- for (const auto& data : *committed) {
+ for (const auto& data : pInfo->GetCommitted()) {
if (std::less_equal<TSnapshot>()(data.GetSnapshot(), snapshot)) {
ret.emplace_back(TCommittedBlob(data.BlobId, data.GetSnapshot(), data.GetSchemaSnapshot()));
}
@@ -205,12 +239,4 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& sna
return ret;
}
-void TInsertTable::SetOverloaded(ui64 pathId, bool overload) {
- if (overload) {
- PathsOverloaded.insert(pathId);
- } else {
- PathsOverloaded.erase(pathId);
- }
-}
-
}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h
new file mode 100644
index 00000000000..d777b2e0a73
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h
@@ -0,0 +1,87 @@
+#pragma once
+#include "data.h"
+#include "rt_insertion.h"
+#include "path_info.h"
+#include <ydb/core/tx/columnshard/counters/insert_table.h>
+
+namespace NKikimr::NOlap {
+
+class IDbWrapper;
+
+/// Use one table for inserted and committed blobs:
+/// !Commited => {ShardOrPlan, WriteTxId} are {MetaShard, WriteId}
+/// Commited => {ShardOrPlan, WriteTxId} are {PlanStep, TxId}
+
+class TInsertTableAccessor {
+protected:
+ THashMap<TWriteId, TInsertedData> Inserted;
+ THashMap<TWriteId, TInsertedData> Aborted;
+ TInsertionSummary Summary;
+
+protected:
+ void Clear() {
+ Inserted.clear();
+ Summary.Clear();
+ Aborted.clear();
+ }
+public:
+ const std::map<ui64, std::set<const TPathInfo*>>& GetPathPriorities() const {
+ return Summary.GetPathPriorities();
+ }
+
+ bool AddInserted(const TWriteId& writeId, TInsertedData&& data) {
+ return Inserted.emplace(writeId, std::move(data)).second;
+ }
+ bool AddAborted(const TWriteId& writeId, TInsertedData&& data) {
+ return Aborted.emplace(writeId, std::move(data)).second;
+ }
+ bool AddCommitted(TInsertedData&& data) {
+ const ui64 pathId = data.PathId;
+ return Summary.GetPathInfo(pathId).AddCommitted(std::move(data));
+ }
+};
+
+class TInsertTable: public TInsertTableAccessor {
+private:
+ void OnNewInserted(TPathInfo& pathInfo, const ui64 dataSize, const bool load = false) noexcept;
+ void OnNewCommitted(const ui64 dataSize, const bool load = false) noexcept;
+ void OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize) noexcept;
+ void OnEraseCommitted(TPathInfo& pathInfo, const ui64 dataSize) noexcept;
+
+public:
+ static constexpr const TDuration WaitCommitDelay = TDuration::Hours(24);
+ static constexpr const TDuration CleanDelay = TDuration::Minutes(10);
+
+ struct TCounters {
+ ui64 Rows{};
+ ui64 Bytes{};
+ ui64 RawBytes{};
+ };
+
+ bool Insert(IDbWrapper& dbTable, TInsertedData&& data);
+ TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard,
+ const THashSet<TWriteId>& writeIds, std::function<bool(ui64)> pathExists);
+ void Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWriteId>& writeIds);
+ THashSet<TWriteId> OldWritesToAbort(const TInstant& now) const;
+ THashSet<TWriteId> DropPath(IDbWrapper& dbTable, ui64 pathId);
+ void EraseCommitted(IDbWrapper& dbTable, const TInsertedData& key);
+ void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key);
+ std::vector<TCommittedBlob> Read(ui64 pathId, const TSnapshot& snapshot) const;
+ bool Load(IDbWrapper& dbTable, const TInstant& loadTime);
+ const TCounters& GetCountersPrepared() const { return StatsPrepared; }
+ const TCounters& GetCountersCommitted() const { return StatsCommitted; }
+
+ size_t InsertedSize() const { return Inserted.size(); }
+ const THashMap<TWriteId, TInsertedData>& GetAborted() const { return Aborted; }
+ bool IsOverloadedByCommitted(const ui64 pathId) const {
+ return Summary.IsOverloaded(pathId);
+ }
+private:
+
+ mutable TInstant LastCleanup;
+ TCounters StatsPrepared;
+ TCounters StatsCommitted;
+ const NColumnShard::TInsertTableCounters Counters;
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp b/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp
new file mode 100644
index 00000000000..489ec93cd5f
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp
@@ -0,0 +1,62 @@
+#include "path_info.h"
+#include "rt_insertion.h"
+#include <ydb/core/tx/columnshard/engines/column_engine.h>
+
+namespace NKikimr::NOlap {
+
+bool TPathInfo::SetCommittedOverload(const bool value) {
+ const bool startOverloaded = IsOverloaded();
+ CommittedOverload = value;
+ return startOverloaded != IsOverloaded();
+}
+
+bool TPathInfo::SetInsertedOverload(const bool value) {
+ const bool startOverloaded = IsOverloaded();
+ InsertedOverload = value;
+ return startOverloaded != IsOverloaded();
+}
+
+void TPathInfo::AddCommittedSize(const i64 size, const ui64 overloadLimit) {
+ CommittedSize += size;
+ Y_VERIFY(CommittedSize >= 0);
+ Summary->CommittedSize += size;
+ Y_VERIFY(Summary->CommittedSize >= 0);
+ SetCommittedOverload((ui64)CommittedSize > overloadLimit);
+}
+
+void TPathInfo::AddInsertedSize(const i64 size, const ui64 overloadLimit) {
+ InsertedSize += size;
+ Y_VERIFY(InsertedSize >= 0);
+ Summary->InsertedSize += size;
+ Y_VERIFY(Summary->InsertedSize >= 0);
+ PathIdCounters.Committed.OnPathIdDataInfo(InsertedSize, 0);
+ SetInsertedOverload((ui64)InsertedSize > overloadLimit);
+}
+
+bool TPathInfo::EraseCommitted(const TInsertedData& data) {
+ Summary->RemovePriority(*this);
+ const bool result = Committed.erase(data);
+ AddCommittedSize(-1 * (i64)data.BlobSize(), TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID);
+ Summary->AddPriority(*this);
+ PathIdCounters.Committed.OnPathIdDataInfo(CommittedSize, Committed.size());
+ return result;
+}
+
+bool TPathInfo::AddCommitted(TInsertedData&& data) {
+ Summary->RemovePriority(*this);
+ AddCommittedSize(data.BlobSize(), TCompactionLimits::OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID);
+ bool result = Committed.emplace(std::move(data)).second;
+ Summary->AddPriority(*this);
+ PathIdCounters.Committed.OnPathIdDataInfo(CommittedSize, Committed.size());
+ return result;
+}
+
+TPathInfo::TPathInfo(TInsertionSummary& summary, const ui64 pathId)
+ : PathId(pathId)
+ , Summary(&summary)
+ , PathIdCounters(Summary->GetCounters().GetPathIdCounters())
+{
+
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/path_info.h b/ydb/core/tx/columnshard/engines/insert_table/path_info.h
new file mode 100644
index 00000000000..57f19d1298a
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/insert_table/path_info.h
@@ -0,0 +1,50 @@
+#pragma once
+#include <ydb/core/tx/columnshard/counters/insert_table.h>
+#include <util/generic/noncopyable.h>
+#include "data.h"
+
+namespace NKikimr::NOlap {
+class TInsertionSummary;
+class TPathInfo: public TMoveOnly {
+private:
+ const ui64 PathId = 0;
+ TSet<TInsertedData> Committed;
+ i64 CommittedSize = 0;
+ i64 InsertedSize = 0;
+ bool CommittedOverload = false;
+ bool InsertedOverload = false;
+ TInsertionSummary* Summary = nullptr;
+ const NColumnShard::TPathIdOwnedCounters PathIdCounters;
+
+ bool SetCommittedOverload(const bool value);
+ bool SetInsertedOverload(const bool value);
+
+ void AddCommittedSize(const i64 size, const ui64 overloadLimit);
+
+public:
+ void AddInsertedSize(const i64 size, const ui64 overloadLimit);
+
+ explicit TPathInfo(TInsertionSummary& summary, const ui64 pathId);
+
+ ui64 GetPathId() const {
+ return PathId;
+ }
+
+ ui64 GetIndexationPriority() const {
+ return CommittedSize * Committed.size() * Committed.size();
+ }
+
+ bool EraseCommitted(const TInsertedData& data);
+
+ const TSet<TInsertedData>& GetCommitted() const {
+ return Committed;
+ }
+
+ bool AddCommitted(TInsertedData&& data);
+
+ bool IsOverloaded() const {
+ return CommittedOverload || InsertedOverload;
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
new file mode 100644
index 00000000000..f76b5379379
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
@@ -0,0 +1,70 @@
+#include "rt_insertion.h"
+
+namespace NKikimr::NOlap {
+
+void TInsertionSummary::RemovePriority(const TPathInfo& pathInfo) noexcept {
+ const ui64 priority = pathInfo.GetIndexationPriority();
+ auto it = Priorities.find(priority);
+ if (it == Priorities.end()) {
+ Y_VERIFY(priority == 0);
+ return;
+ }
+ Y_VERIFY(it->second.erase(&pathInfo) || priority == 0);
+ if (it->second.empty()) {
+ Priorities.erase(it);
+ }
+}
+
+void TInsertionSummary::AddPriority(const TPathInfo& pathInfo) noexcept {
+ Y_VERIFY(Priorities[pathInfo.GetIndexationPriority()].emplace(&pathInfo).second);
+}
+
+NKikimr::NOlap::TPathInfo& TInsertionSummary::GetPathInfo(const ui64 pathId) {
+ auto it = PathInfo.find(pathId);
+ if (it == PathInfo.end()) {
+ it = PathInfo.emplace(pathId, TPathInfo(*this, pathId)).first;
+ }
+ return it->second;
+}
+
+std::optional<NKikimr::NOlap::TPathInfo> TInsertionSummary::ExtractPathInfo(const ui64 pathId) {
+ auto it = PathInfo.find(pathId);
+ if (it == PathInfo.end()) {
+ return {};
+ }
+ RemovePriority(it->second);
+ std::optional<TPathInfo> result = std::move(it->second);
+ PathInfo.erase(it);
+ return result;
+}
+
+NKikimr::NOlap::TPathInfo* TInsertionSummary::GetPathInfoOptional(const ui64 pathId) {
+ auto it = PathInfo.find(pathId);
+ if (it == PathInfo.end()) {
+ return nullptr;
+ }
+ return &it->second;
+}
+
+const NKikimr::NOlap::TPathInfo* TInsertionSummary::GetPathInfoOptional(const ui64 pathId) const {
+ auto it = PathInfo.find(pathId);
+ if (it == PathInfo.end()) {
+ return nullptr;
+ }
+ return &it->second;
+}
+
+bool TInsertionSummary::IsOverloaded(const ui64 pathId) const {
+ auto it = PathInfo.find(pathId);
+ if (it == PathInfo.end()) {
+ return false;
+ }
+ return it->second.IsOverloaded();
+}
+
+void TInsertionSummary::Clear() {
+ PathInfo.clear();
+ Priorities.clear();
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h
new file mode 100644
index 00000000000..63118535b59
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h
@@ -0,0 +1,41 @@
+#pragma once
+#include <ydb/core/tx/columnshard/counters/insert_table.h>
+#include <ydb/library/accessor/accessor.h>
+#include "path_info.h"
+
+namespace NKikimr::NOlap {
+
+class TInsertionSummary {
+private:
+ const NColumnShard::TInsertTableCounters Counters;
+ YDB_READONLY(i64, CommittedSize, 0);
+ YDB_READONLY(i64, InsertedSize, 0);
+ std::map<ui64, std::set<const TPathInfo*>> Priorities;
+ THashMap<ui64, TPathInfo> PathInfo;
+ friend class TPathInfo;
+ void RemovePriority(const TPathInfo& pathInfo) noexcept;
+ void AddPriority(const TPathInfo& pathInfo) noexcept;
+
+public:
+ const NColumnShard::TInsertTableCounters& GetCounters() const {
+ return Counters;
+ }
+ NKikimr::NOlap::TPathInfo& GetPathInfo(const ui64 pathId);
+ std::optional<TPathInfo> ExtractPathInfo(const ui64 pathId);
+ TPathInfo* GetPathInfoOptional(const ui64 pathId);
+ const TPathInfo* GetPathInfoOptional(const ui64 pathId) const;
+
+ const THashMap<ui64, TPathInfo>& GetPathInfo() const {
+ return PathInfo;
+ }
+
+ void Clear();
+
+ bool IsOverloaded(const ui64 pathId) const;
+
+ const std::map<ui64, std::set<const TPathInfo*>>& GetPathPriorities() const {
+ return Priorities;
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
index 77a31505227..6d93d765c66 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
@@ -6,6 +6,7 @@
#include <ydb/core/tx/columnshard/counters.h>
#include <ydb/core/tx/columnshard/columnshard__scan.h>
#include <ydb/core/tx/columnshard/columnshard_common.h>
+#include <ydb/core/tx/columnshard/engines/insert_table/insert_table.h>
#include <ydb/core/tx/columnshard/engines/predicate/predicate.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
#include <ydb/core/scheme_types/scheme_type_info.h>
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h
index 6d5e5475284..1446af52ffb 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.h
+++ b/ydb/core/tx/columnshard/engines/storage/granule.h
@@ -242,6 +242,10 @@ public:
return Portions;
}
+ ui64 GetPathId() const {
+ return Record.PathId;
+ }
+
const TPortionInfo& GetPortionVerified(const ui64 portion) const {
auto it = Portions.find(portion);
Y_VERIFY(it != Portions.end());
diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
index 3688a52e2cd..6be2efea27a 100644
--- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
@@ -1,7 +1,7 @@
#include <library/cpp/testing/unittest/registar.h>
#include <util/string/printf.h>
#include "db_wrapper.h"
-#include "insert_table.h"
+#include "insert_table/insert_table.h"
namespace NKikimr {
@@ -18,9 +18,7 @@ public:
void EraseCommitted(const TInsertedData&) override {}
void EraseAborted(const TInsertedData&) override {}
- bool Load(THashMap<TWriteId, TInsertedData>&,
- THashMap<ui64, TSet<TInsertedData>>&,
- THashMap<TWriteId, TInsertedData>&,
+ bool Load(TInsertTableAccessor&,
const TInstant&) override
{
return true;
@@ -78,9 +76,9 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) {
ui64 txId = 42;
insertTable.Commit(dbTable, planStep, txId, metaShard, {TWriteId{writeId}}, [](ui64){ return true; });
- auto committed = insertTable.GetCommitted();
- UNIT_ASSERT_EQUAL(committed.size(), 1);
- UNIT_ASSERT_EQUAL(committed.begin()->second.size(), 1);
+ UNIT_ASSERT_EQUAL(insertTable.GetPathPriorities().size(), 1);
+ UNIT_ASSERT_EQUAL(insertTable.GetPathPriorities().begin()->second.size(), 1);
+ UNIT_ASSERT_EQUAL((*insertTable.GetPathPriorities().begin()->second.begin())->GetCommitted().size(), 1);
// read old snapshot
blobs = insertTable.Read(tableId, TSnapshot::Zero());
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index a3fdbdd9332..a7a58da4d02 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -47,13 +47,20 @@ public:
Aborted.erase(TWriteId{data.WriteTxId});
}
- bool Load(THashMap<TWriteId, TInsertedData>& inserted,
- THashMap<ui64, TSet<TInsertedData>>& committed,
- THashMap<TWriteId, TInsertedData>& aborted,
+ bool Load(TInsertTableAccessor& accessor,
const TInstant&) override {
- inserted = Inserted;
- committed = Committed;
- aborted = Aborted;
+ for (auto&& i : Inserted) {
+ accessor.AddInserted(i.first, std::move(i.second));
+ }
+ for (auto&& i : Aborted) {
+ accessor.AddAborted(i.first, std::move(i.second));
+ }
+ for (auto&& i : Committed) {
+ for (auto&& c: i.second) {
+ auto copy = c;
+ accessor.AddCommitted(std::move(copy));
+ }
+ }
return true;
}
diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h
index afc4181a1b6..6ec23442140 100644
--- a/ydb/core/tx/columnshard/tables_manager.h
+++ b/ydb/core/tx/columnshard/tables_manager.h
@@ -158,8 +158,8 @@ public:
return SchemaPresets;
}
- bool IndexOverloaded() const {
- return PrimaryIndex && PrimaryIndex->HasOverloadedGranules();
+ bool IsOverloaded(const ui64 pathId) const {
+ return PrimaryIndex && PrimaryIndex->HasOverloadedGranules(pathId);
}
bool HasPrimaryIndex() const {