aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSemyon <yentsovsemyon@ydb.tech>2024-12-20 11:21:29 +0300
committerGitHub <noreply@github.com>2024-12-20 08:21:29 +0000
commit888c474b76c8282662165ba3a6ccf43a20ae702b (patch)
treebe29612c512dd8480e15bd7c4d961a7ab0b3c27f
parente804b29410a6ae318228d897175850414e0d16da (diff)
downloadydb-888c474b76c8282662165ba3a6ccf43a20ae702b.tar.gz
share schemas between CS on same node (#12673)
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp3
-rw-r--r--ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h6
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp26
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h19
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h32
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/abstract/ya.make1
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/common/cache.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/common/cache.h72
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/common/ya.make13
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/objects_cache.h45
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp22
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h10
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h5
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/versions/ya.make1
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/ya.make1
-rw-r--r--ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp14
-rw-r--r--ydb/core/tx/columnshard/loading/stages.cpp3
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/chunks.cpp3
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp4
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp3
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp3
-rw-r--r--ydb/core/tx/columnshard/operations/batch_builder/merger.cpp16
-rw-r--r--ydb/core/tx/columnshard/tables_manager.cpp19
-rw-r--r--ydb/core/tx/columnshard/tables_manager.h4
30 files changed, 276 insertions, 82 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index dd74874db4..65e179c8c3 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -83,7 +83,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod())
, StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval())
, InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters())
- , TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr), info->TabletID)
+ , TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr),
+ std::make_shared<NOlap::TSchemaObjectsCache>(), info->TabletID)
, Subscribers(std::make_shared<NSubscriber::TManager>(*this))
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
, InsertTable(std::make_unique<NOlap::TInsertTable>())
diff --git a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp
index d8f33c2906..28b13a7eeb 100644
--- a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp
+++ b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp
@@ -15,7 +15,7 @@ bool TTxDataFromSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc,
auto& index = Self->TablesManager.MutablePrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>();
for (auto& info : SchemeHistory) {
- index.RegisterOldSchemaVersion(info.GetSnapshot(), info.GetSchema());
+ index.RegisterOldSchemaVersion(info.GetSnapshot(), info.GetProto().GetId(), info.GetSchema());
}
TDbWrapper dbWrapper(txc.DB, nullptr);
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
index fe62107aa0..3d3d5fb8c6 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
@@ -108,7 +108,7 @@ public:
if (!Schemas.contains(data.GetSchemaVersion())) {
Schemas.emplace(data.GetSchemaVersion(), blobSchema);
}
- auto columnIds = blobSchema->GetIndexInfo().GetColumnIds(false);
+ TColumnIdsView columnIds = blobSchema->GetIndexInfo().GetColumnIds(false);
std::vector<ui32> filteredIds = data.GetMeta().GetSchemaSubset().Apply(columnIds.begin(), columnIds.end());
if (data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete) {
filteredIds.emplace_back((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
@@ -247,7 +247,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
{
const auto blobData = Blobs.Extract(IStoragesManager::DefaultStorageId, blobRange);
- auto blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema();
+ NArrow::TSchemaLiteView blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema();
auto batchSchema =
std::make_shared<arrow::Schema>(inserted.GetMeta().GetSchemaSubset().Apply(blobSchemaView.begin(), blobSchemaView.end()));
batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema));
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 13d5c95492..b3beefc25a 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -354,9 +354,9 @@ public:
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const ui64 memoryUsageLimit) noexcept = 0;
virtual bool ApplyChangesOnTxCreate(std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0;
virtual bool ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0;
- virtual void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) = 0;
- virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) = 0;
- virtual void RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) = 0;
+ virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& info) = 0;
+ virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) = 0;
+ virtual void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) = 0;
virtual const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const = 0;
virtual const TColumnEngineStats& GetTotalStats() = 0;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index d25abc4941..aa588965e9 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -26,30 +26,32 @@
namespace NKikimr::NOlap {
-TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId,
+TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
- const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema)
+ const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema)
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, dataAccessorsManager, storagesManager))
, DataAccessorsManager(dataAccessorsManager)
, StoragesManager(storagesManager)
+ , SchemaObjectsCache(schemaCache)
, TabletId(tabletId)
, LastPortion(0)
, LastGranule(0) {
ActualizationController = std::make_shared<NActualizer::TController>();
- RegisterSchemaVersion(snapshot, schema);
+ RegisterSchemaVersion(snapshot, presetId, schema);
}
-TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId,
+TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
- const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema)
+ const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& schema)
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, dataAccessorsManager, storagesManager))
, DataAccessorsManager(dataAccessorsManager)
, StoragesManager(storagesManager)
+ , SchemaObjectsCache(schemaCache)
, TabletId(tabletId)
, LastPortion(0)
, LastGranule(0) {
ActualizationController = std::make_shared<NActualizer::TController>();
- RegisterSchemaVersion(snapshot, std::move(schema));
+ RegisterSchemaVersion(snapshot, presetId, std::move(schema));
}
const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& TColumnEngineForLogs::GetStats() const {
@@ -138,7 +140,7 @@ void TColumnEngineForLogs::UpdatePortionStats(
}
}
-void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& indexInfo) {
+void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& indexInfo) {
AFL_VERIFY(DataAccessorsManager);
bool switchOptimizer = false;
bool switchAccessorsManager = false;
@@ -150,7 +152,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd
}
const bool isCriticalScheme = indexInfo.GetSchemeNeedActualization();
- auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, std::move(indexInfo));
+ auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->UpsertIndexInfo(presetId, std::move(indexInfo)));
if (isCriticalScheme) {
StartActualization({});
for (auto&& i : GranulesStorage->GetTables()) {
@@ -170,7 +172,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd
}
}
-void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) {
+void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) {
AFL_VERIFY(VersionedIndex.IsEmpty() || schema.GetVersion() >= VersionedIndex.GetLastSchema()->GetVersion())("empty", VersionedIndex.IsEmpty())("current", schema.GetVersion())(
"last", VersionedIndex.GetLastSchema()->GetVersion());
@@ -184,10 +186,10 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, cons
indexInfoOptional = NOlap::TIndexInfo::BuildFromProto(schema.GetSchemaVerified(), StoragesManager, SchemaObjectsCache);
}
AFL_VERIFY(indexInfoOptional);
- RegisterSchemaVersion(snapshot, std::move(*indexInfoOptional));
+ RegisterSchemaVersion(snapshot, presetId, std::move(*indexInfoOptional));
}
-void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) {
+void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) {
AFL_VERIFY(!VersionedIndex.IsEmpty());
ui64 version = schema.GetVersion();
@@ -215,7 +217,7 @@ void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, c
}
AFL_VERIFY(indexInfoOptional);
- VersionedIndex.AddIndex(snapshot, std::move(*indexInfoOptional));
+ VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->UpsertIndexInfo(presetId, std::move(*indexInfoOptional)));
}
std::shared_ptr<ITxReader> TColumnEngineForLogs::BuildLoader(const std::shared_ptr<IBlobGroupSelector>& dsGroupSelector) {
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 70ec8a852b..b62ce870dc 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -61,7 +61,7 @@ private:
std::shared_ptr<IStoragesManager> StoragesManager;
std::shared_ptr<NActualizer::TController> ActualizationController;
- std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache = std::make_shared<TSchemaObjectsCache>();
+ std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache;
TVersionedIndex VersionedIndex;
std::shared_ptr<TVersionedIndex> VersionedIndexCopy;
@@ -98,10 +98,13 @@ public:
ADD,
};
- TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
- const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema);
- TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
- const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema);
+ TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
+ const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
+ const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId,
+ const TSchemaInitializationData& schema);
+ TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
+ const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
+ const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& schema);
void OnTieringModified(const std::optional<NOlap::TTiering>& ttl, const ui64 pathId) override;
void OnTieringModified(const THashMap<ui64, NOlap::TTiering>& ttl) override;
@@ -157,9 +160,9 @@ public:
virtual bool ApplyChangesOnExecute(
IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept override;
- void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) override;
- void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) override;
- void RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) override;
+ void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& info) override;
+ void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override;
+ void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override;
std::shared_ptr<TSelectInfo> Select(
ui64 pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const override;
diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.cpp
new file mode 100644
index 0000000000..8f630cd397
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.cpp
@@ -0,0 +1,3 @@
+#include "schema_version.h"
+
+namespace NKikimr::NOlap {}
diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h b/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h
new file mode 100644
index 0000000000..2930b6d8d1
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h
@@ -0,0 +1,32 @@
+#pragma once
+
+#include <ydb/library/accessor/accessor.h>
+
+#include <util/digest/numeric.h>
+
+namespace NKikimr::NOlap {
+
+class TSchemaVersionId {
+private:
+ YDB_READONLY_DEF(ui64, PresetId);
+ YDB_READONLY_DEF(ui64, Version);
+
+public:
+ bool operator==(const TSchemaVersionId& other) const {
+ return std::tie(PresetId, Version) == std::tie(other.PresetId, other.Version);
+ }
+
+ TSchemaVersionId(const ui64 presetId, const ui64 version)
+ : PresetId(presetId)
+ , Version(version) {
+ }
+};
+
+}
+
+template <>
+struct THash<NKikimr::NOlap::TSchemaVersionId> {
+ inline size_t operator()(const NKikimr::NOlap::TSchemaVersionId& key) const {
+ return CombineHashes(key.GetPresetId(), key.GetVersion());
+ }
+};
diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make b/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make
index 709793c4e3..bf3aac5302 100644
--- a/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make
+++ b/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make
@@ -3,6 +3,7 @@ LIBRARY()
SRCS(
index_info.cpp
column_ids.cpp
+ schema_version.cpp
)
PEERDIR(
diff --git a/ydb/core/tx/columnshard/engines/scheme/common/cache.cpp b/ydb/core/tx/columnshard/engines/scheme/common/cache.cpp
new file mode 100644
index 0000000000..9be4dd9584
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/common/cache.cpp
@@ -0,0 +1,3 @@
+#include "cache.h"
+
+namespace NKikimr::NOlap {}
diff --git a/ydb/core/tx/columnshard/engines/scheme/common/cache.h b/ydb/core/tx/columnshard/engines/scheme/common/cache.h
new file mode 100644
index 0000000000..8ccdfdc034
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/common/cache.h
@@ -0,0 +1,72 @@
+#pragma once
+
+#include <util/generic/hash.h>
+#include <util/system/guard.h>
+#include <util/system/mutex.h>
+
+#include <memory>
+
+namespace NKikimr::NOlap {
+
+template <typename TKey, typename TObject>
+class TObjectCache : std::enable_shared_from_this<TObjectCache<TKey, TObject>> {
+private:
+ THashMap<TKey, std::weak_ptr<const TObject>> Objects;
+ mutable TMutex Mutex;
+
+public:
+ class TEntryGuard {
+ private:
+ TKey Key;
+ std::shared_ptr<const TObject> Object;
+ std::weak_ptr<TObjectCache> Cache;
+
+ public:
+ TEntryGuard(TKey key, const std::shared_ptr<const TObject> object, TObjectCache* cache)
+ : Key(key)
+ , Object(object)
+ , Cache(cache->weak_from_this()) {
+ }
+
+ const TObject* operator->() const {
+ return Object.get();
+ }
+ const TObject& operator*() const {
+ return *Object;
+ }
+
+ ~TEntryGuard() {
+ Object.reset();
+ if (auto cache = Cache.lock()) {
+ cache->TryFree(Key);
+ }
+ }
+ };
+
+public:
+ TEntryGuard Upsert(TKey key, TObject&& object) {
+ TGuard lock(Mutex);
+ auto* findSchema = Objects.FindPtr(key);
+ std::shared_ptr<const TObject> cachedObject;
+ if (findSchema) {
+ cachedObject = findSchema->lock();
+ }
+ if (!cachedObject) {
+ cachedObject = std::make_shared<const TObject>(std::move(object));
+ Objects[key] = cachedObject;
+ }
+ return TEntryGuard(std::move(key), cachedObject, this);
+ }
+
+ void TryFree(const TKey& key) {
+ TGuard lock(Mutex);
+ auto findObject = Objects.FindPtr(key);
+ if (findObject) {
+ if (findObject->expired()) {
+ Objects.erase(key);
+ }
+ }
+ }
+};
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/scheme/common/ya.make b/ydb/core/tx/columnshard/engines/scheme/common/ya.make
new file mode 100644
index 0000000000..6d84704af4
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/common/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ cache.cpp
+)
+
+PEERDIR(
+ ydb/library/actors/core
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
index 9974e027bf..29d40e0032 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
@@ -76,7 +76,7 @@ std::vector<TString> TIndexInfo::GetColumnNames(const std::vector<ui32>& ids) co
}
std::vector<std::string> TIndexInfo::GetColumnSTLNames(const bool withSpecial) const {
- const auto ids = GetColumnIds(withSpecial);
+ const TColumnIdsView ids = GetColumnIds(withSpecial);
std::vector<std::string> out;
out.reserve(ids.size());
for (ui32 id : ids) {
@@ -457,7 +457,7 @@ std::shared_ptr<NIndexes::NCountMinSketch::TIndexMeta> TIndexInfo::GetIndexMetaC
}
std::vector<ui32> TIndexInfo::GetEntityIds() const {
- const auto columnIds = GetColumnIds(true);
+ const TColumnIdsView columnIds = GetColumnIds(true);
std::vector<ui32> result(columnIds.begin(), columnIds.end());
for (auto&& i : Indexes) {
result.emplace_back(i.first);
diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp b/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp
index 9b3da88611..8b8d2f44b0 100644
--- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp
@@ -1,5 +1,12 @@
#include "objects_cache.h"
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+
namespace NKikimr::NOlap {
+TSchemaObjectsCache::TSchemasCache::TEntryGuard TSchemaObjectsCache::UpsertIndexInfo(const ui64 presetId, TIndexInfo&& indexInfo) {
+ const TSchemaVersionId versionId(presetId, indexInfo.GetVersion());
+ return SchemasByVersion.Upsert(versionId, std::move(indexInfo));
+}
+
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h
index a203623b60..fabd908943 100644
--- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h
+++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h
@@ -1,6 +1,9 @@
#pragma once
#include "column_features.h"
+#include <ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h>
+#include <ydb/core/tx/columnshard/engines/scheme/common/cache.h>
+
#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
#include <library/cpp/string_utils/quote/quote.h>
#include <util/generic/hash.h>
@@ -10,13 +13,22 @@ namespace NKikimr::NOlap {
class TSchemaObjectsCache {
private:
THashMap<TString, std::shared_ptr<arrow::Field>> Fields;
- THashMap<TString, std::shared_ptr<TColumnFeatures>> ColumnFeatures;
- THashSet<TString> StringsCache;
mutable ui64 AcceptionFieldsCount = 0;
+ mutable TMutex FieldsMutex;
+
+ THashMap<TString, std::shared_ptr<TColumnFeatures>> ColumnFeatures;
mutable ui64 AcceptionFeaturesCount = 0;
+ mutable TMutex FeaturesMutex;
+
+ using TSchemasCache = TObjectCache<TSchemaVersionId, TIndexInfo>;
+ TSchemasCache SchemasByVersion;
+
+ THashSet<TString> StringsCache;
+ mutable TMutex StringsMutex;
public:
const TString& GetStringCache(const TString& original) {
+ TGuard lock(StringsMutex);
auto it = StringsCache.find(original);
if (it == StringsCache.end()) {
it = StringsCache.emplace(original).first;
@@ -26,13 +38,16 @@ public:
void RegisterField(const TString& fingerprint, const std::shared_ptr<arrow::Field>& f) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_field")("fp", fingerprint)("f", f->ToString());
+ TGuard lock(FieldsMutex);
AFL_VERIFY(Fields.emplace(fingerprint, f).second);
}
void RegisterColumnFeatures(const TString& fingerprint, const std::shared_ptr<TColumnFeatures>& f) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_column_features")("fp", fingerprint)("info", f->DebugString());
+ TGuard lock(FeaturesMutex);
AFL_VERIFY(ColumnFeatures.emplace(fingerprint, f).second);
}
std::shared_ptr<arrow::Field> GetField(const TString& fingerprint) const {
+ TGuard lock(FieldsMutex);
auto it = Fields.find(fingerprint);
if (it == Fields.end()) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_field_miss")("fp", fingerprint)("count", Fields.size())(
@@ -47,6 +62,7 @@ public:
}
template <class TConstructor>
TConclusion<std::shared_ptr<TColumnFeatures>> GetOrCreateColumnFeatures(const TString& fingerprint, const TConstructor& constructor) {
+ TGuard lock(FeaturesMutex);
auto it = ColumnFeatures.find(fingerprint);
if (it == ColumnFeatures.end()) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_column_features_miss")("fp", UrlEscapeRet(fingerprint))(
@@ -65,6 +81,31 @@ public:
}
return it->second;
}
+
+ TSchemasCache::TEntryGuard UpsertIndexInfo(const ui64 presetId, TIndexInfo&& indexInfo);
+};
+
+class TSchemaCachesManager {
+private:
+ THashMap<ui64, std::shared_ptr<TSchemaObjectsCache>> CacheByTableOwner;
+ TMutex Mutex;
+
+ std::shared_ptr<TSchemaObjectsCache> GetCacheImpl(const ui64 ownerPathId) {
+ if (!ownerPathId) {
+ return std::make_shared<TSchemaObjectsCache>();
+ }
+ TGuard lock(Mutex);
+ auto findCache = CacheByTableOwner.FindPtr(ownerPathId);
+ if (findCache) {
+ return *findCache;
+ }
+ return CacheByTableOwner.emplace(ownerPathId, std::make_shared<TSchemaObjectsCache>()).first->second;
+ }
+
+public:
+ static std::shared_ptr<TSchemaObjectsCache> GetCache(const ui64 ownerPathId) {
+ return Singleton<TSchemaCachesManager>()->GetCacheImpl(ownerPathId);
+ }
};
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp
index 05277b7b89..451145bee0 100644
--- a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp
@@ -1,32 +1,32 @@
#include "snapshot_scheme.h"
+#include <ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h>
namespace NKikimr::NOlap {
-TSnapshotSchema::TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot)
+TSnapshotSchema::TSnapshotSchema(TObjectCache<TSchemaVersionId, TIndexInfo>::TEntryGuard&& indexInfo, const TSnapshot& snapshot)
: IndexInfo(std::move(indexInfo))
- , Schema(IndexInfo.ArrowSchemaWithSpecials())
- , Snapshot(snapshot)
-{
+ , Schema(IndexInfo->ArrowSchemaWithSpecials())
+ , Snapshot(snapshot) {
}
TColumnSaver TSnapshotSchema::GetColumnSaver(const ui32 columnId) const {
- return IndexInfo.GetColumnSaver(columnId);
+ return IndexInfo->GetColumnSaver(columnId);
}
std::shared_ptr<TColumnLoader> TSnapshotSchema::GetColumnLoaderOptional(const ui32 columnId) const {
- return IndexInfo.GetColumnLoaderOptional(columnId);
+ return IndexInfo->GetColumnLoaderOptional(columnId);
}
std::optional<ui32> TSnapshotSchema::GetColumnIdOptional(const std::string& columnName) const {
- return IndexInfo.GetColumnIdOptional(columnName);
+ return IndexInfo->GetColumnIdOptional(columnName);
}
ui32 TSnapshotSchema::GetColumnIdVerified(const std::string& columnName) const {
- return IndexInfo.GetColumnIdVerified(columnName);
+ return IndexInfo->GetColumnIdVerified(columnName);
}
int TSnapshotSchema::GetFieldIndex(const ui32 columnId) const {
- return IndexInfo.GetColumnIndexOptional(columnId).value_or(-1);
+ return IndexInfo->GetColumnIndexOptional(columnId).value_or(-1);
}
const std::shared_ptr<NArrow::TSchemaLite>& TSnapshotSchema::GetSchema() const {
@@ -34,7 +34,7 @@ const std::shared_ptr<NArrow::TSchemaLite>& TSnapshotSchema::GetSchema() const {
}
const TIndexInfo& TSnapshotSchema::GetIndexInfo() const {
- return IndexInfo;
+ return *IndexInfo;
}
const TSnapshot& TSnapshotSchema::GetSnapshot() const {
@@ -46,7 +46,7 @@ ui32 TSnapshotSchema::GetColumnsCount() const {
}
ui64 TSnapshotSchema::GetVersion() const {
- return IndexInfo.GetVersion();
+ return IndexInfo->GetVersion();
}
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h
index 5246d39267..0cf6aa147d 100644
--- a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h
+++ b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h
@@ -4,11 +4,13 @@
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+#include <ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h>
+
namespace NKikimr::NOlap {
class TSnapshotSchema: public ISnapshotSchema {
private:
- TIndexInfo IndexInfo;
+ TObjectCache<TSchemaVersionId, TIndexInfo>::TEntryGuard IndexInfo;
std::shared_ptr<NArrow::TSchemaLite> Schema;
TSnapshot Snapshot;
protected:
@@ -16,15 +18,15 @@ protected:
return TStringBuilder() << "("
"schema=" << Schema->ToString() << ";" <<
"snapshot=" << Snapshot.DebugString() << ";" <<
- "index_info=" << IndexInfo.DebugString() << ";" <<
+ "index_info=" << IndexInfo->DebugString() << ";" <<
")"
;
}
public:
- TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot);
+ TSnapshotSchema(TObjectCache<TSchemaVersionId, TIndexInfo>::TEntryGuard&& indexInfo, const TSnapshot& snapshot);
virtual TColumnIdsView GetColumnIds() const override {
- return IndexInfo.GetColumnIds();
+ return IndexInfo->GetColumnIds();
}
TColumnSaver GetColumnSaver(const ui32 columnId) const override;
diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp
index d9a858f349..1642449df5 100644
--- a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp
@@ -6,15 +6,15 @@
namespace NKikimr::NOlap {
-const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, TIndexInfo&& indexInfo) {
+const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, TObjectCache<TSchemaVersionId, TIndexInfo>::TEntryGuard&& indexInfo) {
if (Snapshots.empty()) {
- PrimaryKey = indexInfo.GetPrimaryKey();
+ PrimaryKey = indexInfo->GetPrimaryKey();
} else {
- Y_ABORT_UNLESS(PrimaryKey->Equals(indexInfo.GetPrimaryKey()));
+ Y_ABORT_UNLESS(PrimaryKey->Equals(indexInfo->GetPrimaryKey()));
}
- const bool needActualization = indexInfo.GetSchemeNeedActualization();
- auto newVersion = indexInfo.GetVersion();
+ const bool needActualization = indexInfo->GetSchemeNeedActualization();
+ auto newVersion = indexInfo->GetVersion();
auto itVersion = SnapshotByVersion.emplace(newVersion, std::make_shared<TSnapshotSchema>(std::move(indexInfo), snapshot));
if (!itVersion.second) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("message", "Skip registered version")("version", LastSchemaVersion);
diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h
index e4e22071f1..81a57cd65e 100644
--- a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h
+++ b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h
@@ -1,5 +1,8 @@
#pragma once
#include "abstract_scheme.h"
+
+#include <ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h>
+#include <ydb/core/tx/columnshard/engines/scheme/common/cache.h>
#include <ydb/core/tx/sharding/sharding.h>
namespace NKikimr::NOlap {
@@ -123,7 +126,7 @@ public:
return PrimaryKey;
}
- const TIndexInfo* AddIndex(const TSnapshot& snapshot, TIndexInfo&& indexInfo);
+ const TIndexInfo* AddIndex(const TSnapshot& snapshot, TObjectCache<TSchemaVersionId, TIndexInfo>::TEntryGuard&& indexInfo);
bool LoadShardingInfo(IDbWrapper& db);
};
diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/ya.make b/ydb/core/tx/columnshard/engines/scheme/versions/ya.make
index 63dc44a748..5b9cc7eff7 100644
--- a/ydb/core/tx/columnshard/engines/scheme/versions/ya.make
+++ b/ydb/core/tx/columnshard/engines/scheme/versions/ya.make
@@ -9,6 +9,7 @@ SRCS(
PEERDIR(
ydb/core/tx/columnshard/engines/scheme/abstract
+ ydb/core/tx/columnshard/engines/scheme/common
)
END()
diff --git a/ydb/core/tx/columnshard/engines/scheme/ya.make b/ydb/core/tx/columnshard/engines/scheme/ya.make
index a8b2572ac5..295da3556b 100644
--- a/ydb/core/tx/columnshard/engines/scheme/ya.make
+++ b/ydb/core/tx/columnshard/engines/scheme/ya.make
@@ -22,6 +22,7 @@ PEERDIR(
ydb/core/tx/columnshard/engines/scheme/versions
ydb/core/tx/columnshard/engines/scheme/tiering
ydb/core/tx/columnshard/engines/scheme/column
+ ydb/core/tx/columnshard/engines/scheme/common
ydb/core/tx/columnshard/engines/scheme/defaults
ydb/core/formats/arrow/accessor
ydb/core/tx/columnshard/blobs_action/abstract
diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
index 6d23f3de94..9635c24b33 100644
--- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
@@ -524,7 +524,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// load
TSnapshot indexSnapshot(1, 1);
TColumnEngineForLogs engine(
- 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo));
+ 0, std::make_shared<TSchemaObjectsCache>(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo));
for (auto&& i : paths) {
engine.RegisterTable(i);
}
@@ -609,7 +609,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
TSnapshot indexSnapshot(1, 1);
TColumnEngineForLogs engine(
- 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo));
+ 0, std::make_shared<TSchemaObjectsCache>(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo));
engine.RegisterTable(pathId);
engine.TestingLoad(db);
@@ -710,7 +710,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
TSnapshot indexSnapshot(1, 1);
TColumnEngineForLogs engine(
- 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo));
+ 0, std::make_shared<TSchemaObjectsCache>(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo));
engine.RegisterTable(pathId);
engine.TestingLoad(db);
@@ -736,7 +736,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // check it's overloaded after reload
TColumnEngineForLogs tmpEngine(
- 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), TIndexInfo(tableInfo));
+ 0, std::make_shared<TSchemaObjectsCache>(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), 0, TIndexInfo(tableInfo));
tmpEngine.RegisterTable(pathId);
tmpEngine.TestingLoad(db);
}
@@ -768,7 +768,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // check it's not overloaded after reload
TColumnEngineForLogs tmpEngine(
- 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), TIndexInfo(tableInfo));
+ 0, std::make_shared<TSchemaObjectsCache>(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), 0, TIndexInfo(tableInfo));
tmpEngine.RegisterTable(pathId);
tmpEngine.TestingLoad(db);
}
@@ -789,7 +789,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
TSnapshot indexSnapshot(1, 1);
{
TColumnEngineForLogs engine(
- 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo));
+ 0, std::make_shared<TSchemaObjectsCache>(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo));
engine.RegisterTable(pathId);
engine.TestingLoad(db);
@@ -868,7 +868,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{
// load
TColumnEngineForLogs engine(
- 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo));
+ 0, std::make_shared<TSchemaObjectsCache>(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo));
engine.RegisterTable(pathId);
engine.TestingLoad(db);
diff --git a/ydb/core/tx/columnshard/loading/stages.cpp b/ydb/core/tx/columnshard/loading/stages.cpp
index ac4ab092d4..a0db85a87a 100644
--- a/ydb/core/tx/columnshard/loading/stages.cpp
+++ b/ydb/core/tx/columnshard/loading/stages.cpp
@@ -194,7 +194,8 @@ bool TSpecialValuesInitializer::DoPrecharge(NTabletFlatExecutor::TTransactionCon
bool TTablesManagerInitializer::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
NIceDb::TNiceDb db(txc.DB);
- TTablesManager tablesManagerLocal(Self->StoragesManager, Self->DataAccessorsManager.GetObjectPtrVerified(), Self->TabletID());
+ TTablesManager tablesManagerLocal(Self->StoragesManager, Self->DataAccessorsManager.GetObjectPtrVerified(),
+ NOlap::TSchemaCachesManager::GetCache(Self->OwnerPathId), Self->TabletID());
{
TMemoryProfileGuard g("TTxInit/TTablesManager");
if (!tablesManagerLocal.InitFromDB(db)) {
diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp
index 21a184f7fd..e96a18d5e8 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp
+++ b/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp
@@ -140,7 +140,8 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TChunksNormalizer::DoInit(
return tasks;
}
- TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr), 0);
+ TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr),
+ std::make_shared<TSchemaObjectsCache>(), 0);
if (!tablesManager.InitFromDB(db)) {
ACFL_TRACE("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager");
return TConclusionStatus::Fail("Can't load index");
diff --git a/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp
index 811cae0957..7f4745f7bb 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp
+++ b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp
@@ -181,8 +181,8 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TLeakedBlobsNormalizer::DoInit(
return TConclusionStatus::Fail("Not ready");
}
- NColumnShard::TTablesManager tablesManager(
- controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr), TabletId);
+ NColumnShard::TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr),
+ std::make_shared<TSchemaObjectsCache>(), TabletId);
if (!tablesManager.InitFromDB(db)) {
ACFL_TRACE("normalizer", "TPortionsNormalizer")("error", "can't initialize tables manager");
diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
index bf4a6f5e89..f1ce6882b7 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
+++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
@@ -25,7 +25,8 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizerBase::DoInit(
return TConclusionStatus::Fail("Not ready");
}
- NColumnShard::TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr), 0);
+ NColumnShard::TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr),
+ std::make_shared<TSchemaObjectsCache>(), 0);
if (!tablesManager.InitFromDB(db)) {
ACFL_TRACE("normalizer", "TPortionsNormalizer")("error", "can't initialize tables manager");
return TConclusionStatus::Fail("Can't load index");
diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp
index a2c382e5dd..b668125097 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp
+++ b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp
@@ -72,7 +72,8 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
return TConclusionStatus::Fail("Not ready");
}
- TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr), 0);
+ TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr),
+ std::make_shared<TSchemaObjectsCache>(), 0);
if (!tablesManager.InitFromDB(db)) {
ACFL_TRACE("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager");
return TConclusionStatus::Fail("Can't load index");
diff --git a/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp b/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp
index 829f1cbe36..ceb87eb027 100644
--- a/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp
+++ b/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp
@@ -48,14 +48,14 @@ NKikimr::TConclusionStatus TUpdateMerger::OnEqualKeys(const NArrow::NMerger::TSo
AFL_VERIFY(Schema->GetIndexInfo().GetColumnIds(false).size() == exists.GetData().GetColumns().size())
("index", Schema->GetIndexInfo().GetColumnIds(false).size())("exists", exists.GetData().GetColumns().size());
for (i32 columnIdx = 0; columnIdx < Schema->GetIndexInfo().ArrowSchema().num_fields(); ++columnIdx) {
- const std::optional<ui32>& incomingColumnIdx = IncomingColumnRemap[columnIdx];
- if (incomingColumnIdx && HasIncomingDataFlags[*incomingColumnIdx]->GetView(incoming.GetPosition())) {
- const ui32 idxChunk = incoming.GetData().GetPositionInChunk(*incomingColumnIdx, incoming.GetPosition());
- rGuard.Add(*incoming.GetData().GetPositionAddress(*incomingColumnIdx).GetArray(), idxChunk);
- } else {
- const ui32 idxChunk = exists.GetData().GetPositionInChunk(columnIdx, exists.GetPosition());
- rGuard.Add(*exists.GetData().GetPositionAddress(columnIdx).GetArray(), idxChunk);
- }
+ const std::optional<ui32>& incomingColumnIdx = IncomingColumnRemap[columnIdx];
+ if (incomingColumnIdx && HasIncomingDataFlags[*incomingColumnIdx]->GetView(incoming.GetPosition())) {
+ const ui32 idxChunk = incoming.GetData().GetPositionInChunk(*incomingColumnIdx, incoming.GetPosition());
+ rGuard.Add(*incoming.GetData().GetPositionAddress(*incomingColumnIdx).GetArray(), idxChunk);
+ } else {
+ const ui32 idxChunk = exists.GetData().GetPositionInChunk(columnIdx, exists.GetPosition());
+ rGuard.Add(*exists.GetData().GetPositionAddress(columnIdx).GetArray(), idxChunk);
+ }
}
return TConclusionStatus::Success();
}
diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp
index a1aee2a6a4..63a74e12fb 100644
--- a/ydb/core/tx/columnshard/tables_manager.cpp
+++ b/ydb/core/tx/columnshard/tables_manager.cpp
@@ -178,13 +178,13 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
"version", info.GetSchema().GetVersion());
NOlap::IColumnEngine::TSchemaInitializationData schemaInitializationData(info);
if (!PrimaryIndex) {
- PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, DataAccessorsManager, StoragesManager,
- version, schemaInitializationData);
+ PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(
+ TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager, version, preset->Id, schemaInitializationData);
} else if (PrimaryIndex->GetVersionedIndex().IsEmpty() ||
info.GetSchema().GetVersion() > PrimaryIndex->GetVersionedIndex().GetLastSchema()->GetVersion()) {
- PrimaryIndex->RegisterSchemaVersion(version, schemaInitializationData);
+ PrimaryIndex->RegisterSchemaVersion(version, preset->Id, schemaInitializationData);
} else {
- PrimaryIndex->RegisterOldSchemaVersion(version, schemaInitializationData);
+ PrimaryIndex->RegisterOldSchemaVersion(version, preset->Id, schemaInitializationData);
}
if (!rowset.Next()) {
@@ -290,14 +290,14 @@ void TTablesManager::AddSchemaVersion(
Schema::SaveSchemaPresetVersionInfo(db, presetId, version, versionInfo);
if (!PrimaryIndex) {
- PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(
- TabletId, DataAccessorsManager, StoragesManager, version, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo));
+ PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager,
+ version, presetId, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo));
for (auto&& i : Tables) {
PrimaryIndex->RegisterTable(i.first);
}
PrimaryIndex->OnTieringModified(Ttl);
} else {
- PrimaryIndex->RegisterSchemaVersion(version, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo));
+ PrimaryIndex->RegisterSchemaVersion(version, presetId, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo));
}
}
@@ -352,11 +352,14 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot&
}
TTablesManager::TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager,
- const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager, const ui64 tabletId)
+ const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
+ const std::shared_ptr<NOlap::TSchemaObjectsCache>& schemaCache, const ui64 tabletId)
: StoragesManager(storagesManager)
, DataAccessorsManager(dataAccessorsManager)
, LoadTimeCounters(std::make_unique<TTableLoadTimeCounters>())
+ , SchemaObjectsCache(schemaCache)
, TabletId(tabletId) {
+ AFL_VERIFY(SchemaObjectsCache);
}
bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, const ui64 pathId) const {
diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h
index 05f1872c92..2f8d418328 100644
--- a/ydb/core/tx/columnshard/tables_manager.h
+++ b/ydb/core/tx/columnshard/tables_manager.h
@@ -145,13 +145,15 @@ private:
std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager> DataAccessorsManager;
std::unique_ptr<TTableLoadTimeCounters> LoadTimeCounters;
+ std::shared_ptr<NOlap::TSchemaObjectsCache> SchemaObjectsCache;
ui64 TabletId = 0;
public:
friend class TTxInit;
TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager,
- const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager, const ui64 tabletId);
+ const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
+ const std::shared_ptr<NOlap::TSchemaObjectsCache>& schemaCache, const ui64 tabletId);
const std::unique_ptr<TTableLoadTimeCounters>& GetLoadTimeCounters() const {
return LoadTimeCounters;