diff options
author | Semyon <yentsovsemyon@ydb.tech> | 2024-12-20 11:21:29 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-20 08:21:29 +0000 |
commit | 888c474b76c8282662165ba3a6ccf43a20ae702b (patch) | |
tree | be29612c512dd8480e15bd7c4d961a7ab0b3c27f | |
parent | e804b29410a6ae318228d897175850414e0d16da (diff) | |
download | ydb-888c474b76c8282662165ba3a6ccf43a20ae702b.tar.gz |
share schemas between CS on same node (#12673)
30 files changed, 276 insertions, 82 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index dd74874db4..65e179c8c3 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -83,7 +83,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) , PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod()) , StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval()) , InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters()) - , TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr), info->TabletID) + , TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr), + std::make_shared<NOlap::TSchemaObjectsCache>(), info->TabletID) , Subscribers(std::make_shared<NSubscriber::TManager>(*this)) , PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig())) , InsertTable(std::make_unique<NOlap::TInsertTable>()) diff --git a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp index d8f33c2906..28b13a7eeb 100644 --- a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp +++ b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp @@ -15,7 +15,7 @@ bool TTxDataFromSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, auto& index = Self->TablesManager.MutablePrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>(); for (auto& info : SchemeHistory) { - index.RegisterOldSchemaVersion(info.GetSnapshot(), info.GetSchema()); + index.RegisterOldSchemaVersion(info.GetSnapshot(), info.GetProto().GetId(), info.GetSchema()); } TDbWrapper dbWrapper(txc.DB, nullptr); diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index fe62107aa0..3d3d5fb8c6 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -108,7 +108,7 @@ public: if (!Schemas.contains(data.GetSchemaVersion())) { Schemas.emplace(data.GetSchemaVersion(), blobSchema); } - auto columnIds = blobSchema->GetIndexInfo().GetColumnIds(false); + TColumnIdsView columnIds = blobSchema->GetIndexInfo().GetColumnIds(false); std::vector<ui32> filteredIds = data.GetMeta().GetSchemaSubset().Apply(columnIds.begin(), columnIds.end()); if (data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete) { filteredIds.emplace_back((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG); @@ -247,7 +247,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont { const auto blobData = Blobs.Extract(IStoragesManager::DefaultStorageId, blobRange); - auto blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema(); + NArrow::TSchemaLiteView blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema(); auto batchSchema = std::make_shared<arrow::Schema>(inserted.GetMeta().GetSchemaSubset().Apply(blobSchemaView.begin(), blobSchemaView.end())); batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema)); diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 13d5c95492..b3beefc25a 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -354,9 +354,9 @@ public: const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const ui64 memoryUsageLimit) noexcept = 0; virtual bool ApplyChangesOnTxCreate(std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0; virtual bool ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0; - virtual void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) = 0; - virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) = 0; - virtual void RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) = 0; + virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& info) = 0; + virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) = 0; + virtual void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) = 0; virtual const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const = 0; virtual const TColumnEngineStats& GetTotalStats() = 0; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index d25abc4941..aa588965e9 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -26,30 +26,32 @@ namespace NKikimr::NOlap { -TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, +TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager, - const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema) + const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) : GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, dataAccessorsManager, storagesManager)) , DataAccessorsManager(dataAccessorsManager) , StoragesManager(storagesManager) + , SchemaObjectsCache(schemaCache) , TabletId(tabletId) , LastPortion(0) , LastGranule(0) { ActualizationController = std::make_shared<NActualizer::TController>(); - RegisterSchemaVersion(snapshot, schema); + RegisterSchemaVersion(snapshot, presetId, schema); } -TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, +TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager, - const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema) + const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& schema) : GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, dataAccessorsManager, storagesManager)) , DataAccessorsManager(dataAccessorsManager) , StoragesManager(storagesManager) + , SchemaObjectsCache(schemaCache) , TabletId(tabletId) , LastPortion(0) , LastGranule(0) { ActualizationController = std::make_shared<NActualizer::TController>(); - RegisterSchemaVersion(snapshot, std::move(schema)); + RegisterSchemaVersion(snapshot, presetId, std::move(schema)); } const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& TColumnEngineForLogs::GetStats() const { @@ -138,7 +140,7 @@ void TColumnEngineForLogs::UpdatePortionStats( } } -void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& indexInfo) { +void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& indexInfo) { AFL_VERIFY(DataAccessorsManager); bool switchOptimizer = false; bool switchAccessorsManager = false; @@ -150,7 +152,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd } const bool isCriticalScheme = indexInfo.GetSchemeNeedActualization(); - auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, std::move(indexInfo)); + auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->UpsertIndexInfo(presetId, std::move(indexInfo))); if (isCriticalScheme) { StartActualization({}); for (auto&& i : GranulesStorage->GetTables()) { @@ -170,7 +172,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd } } -void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) { +void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) { AFL_VERIFY(VersionedIndex.IsEmpty() || schema.GetVersion() >= VersionedIndex.GetLastSchema()->GetVersion())("empty", VersionedIndex.IsEmpty())("current", schema.GetVersion())( "last", VersionedIndex.GetLastSchema()->GetVersion()); @@ -184,10 +186,10 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, cons indexInfoOptional = NOlap::TIndexInfo::BuildFromProto(schema.GetSchemaVerified(), StoragesManager, SchemaObjectsCache); } AFL_VERIFY(indexInfoOptional); - RegisterSchemaVersion(snapshot, std::move(*indexInfoOptional)); + RegisterSchemaVersion(snapshot, presetId, std::move(*indexInfoOptional)); } -void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) { +void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) { AFL_VERIFY(!VersionedIndex.IsEmpty()); ui64 version = schema.GetVersion(); @@ -215,7 +217,7 @@ void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, c } AFL_VERIFY(indexInfoOptional); - VersionedIndex.AddIndex(snapshot, std::move(*indexInfoOptional)); + VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->UpsertIndexInfo(presetId, std::move(*indexInfoOptional))); } std::shared_ptr<ITxReader> TColumnEngineForLogs::BuildLoader(const std::shared_ptr<IBlobGroupSelector>& dsGroupSelector) { diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 70ec8a852b..b62ce870dc 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -61,7 +61,7 @@ private: std::shared_ptr<IStoragesManager> StoragesManager; std::shared_ptr<NActualizer::TController> ActualizationController; - std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache = std::make_shared<TSchemaObjectsCache>(); + std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache; TVersionedIndex VersionedIndex; std::shared_ptr<TVersionedIndex> VersionedIndexCopy; @@ -98,10 +98,13 @@ public: ADD, }; - TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager, - const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema); - TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager, - const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema); + TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache, + const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager, + const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId, + const TSchemaInitializationData& schema); + TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache, + const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager, + const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& schema); void OnTieringModified(const std::optional<NOlap::TTiering>& ttl, const ui64 pathId) override; void OnTieringModified(const THashMap<ui64, NOlap::TTiering>& ttl) override; @@ -157,9 +160,9 @@ public: virtual bool ApplyChangesOnExecute( IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept override; - void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) override; - void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) override; - void RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) override; + void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& info) override; + void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override; + void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override; std::shared_ptr<TSelectInfo> Select( ui64 pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const override; diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.cpp new file mode 100644 index 0000000000..8f630cd397 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.cpp @@ -0,0 +1,3 @@ +#include "schema_version.h" + +namespace NKikimr::NOlap {} diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h b/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h new file mode 100644 index 0000000000..2930b6d8d1 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h @@ -0,0 +1,32 @@ +#pragma once + +#include <ydb/library/accessor/accessor.h> + +#include <util/digest/numeric.h> + +namespace NKikimr::NOlap { + +class TSchemaVersionId { +private: + YDB_READONLY_DEF(ui64, PresetId); + YDB_READONLY_DEF(ui64, Version); + +public: + bool operator==(const TSchemaVersionId& other) const { + return std::tie(PresetId, Version) == std::tie(other.PresetId, other.Version); + } + + TSchemaVersionId(const ui64 presetId, const ui64 version) + : PresetId(presetId) + , Version(version) { + } +}; + +} + +template <> +struct THash<NKikimr::NOlap::TSchemaVersionId> { + inline size_t operator()(const NKikimr::NOlap::TSchemaVersionId& key) const { + return CombineHashes(key.GetPresetId(), key.GetVersion()); + } +}; diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make b/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make index 709793c4e3..bf3aac5302 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( index_info.cpp column_ids.cpp + schema_version.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/scheme/common/cache.cpp b/ydb/core/tx/columnshard/engines/scheme/common/cache.cpp new file mode 100644 index 0000000000..9be4dd9584 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/common/cache.cpp @@ -0,0 +1,3 @@ +#include "cache.h" + +namespace NKikimr::NOlap {} diff --git a/ydb/core/tx/columnshard/engines/scheme/common/cache.h b/ydb/core/tx/columnshard/engines/scheme/common/cache.h new file mode 100644 index 0000000000..8ccdfdc034 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/common/cache.h @@ -0,0 +1,72 @@ +#pragma once + +#include <util/generic/hash.h> +#include <util/system/guard.h> +#include <util/system/mutex.h> + +#include <memory> + +namespace NKikimr::NOlap { + +template <typename TKey, typename TObject> +class TObjectCache : std::enable_shared_from_this<TObjectCache<TKey, TObject>> { +private: + THashMap<TKey, std::weak_ptr<const TObject>> Objects; + mutable TMutex Mutex; + +public: + class TEntryGuard { + private: + TKey Key; + std::shared_ptr<const TObject> Object; + std::weak_ptr<TObjectCache> Cache; + + public: + TEntryGuard(TKey key, const std::shared_ptr<const TObject> object, TObjectCache* cache) + : Key(key) + , Object(object) + , Cache(cache->weak_from_this()) { + } + + const TObject* operator->() const { + return Object.get(); + } + const TObject& operator*() const { + return *Object; + } + + ~TEntryGuard() { + Object.reset(); + if (auto cache = Cache.lock()) { + cache->TryFree(Key); + } + } + }; + +public: + TEntryGuard Upsert(TKey key, TObject&& object) { + TGuard lock(Mutex); + auto* findSchema = Objects.FindPtr(key); + std::shared_ptr<const TObject> cachedObject; + if (findSchema) { + cachedObject = findSchema->lock(); + } + if (!cachedObject) { + cachedObject = std::make_shared<const TObject>(std::move(object)); + Objects[key] = cachedObject; + } + return TEntryGuard(std::move(key), cachedObject, this); + } + + void TryFree(const TKey& key) { + TGuard lock(Mutex); + auto findObject = Objects.FindPtr(key); + if (findObject) { + if (findObject->expired()) { + Objects.erase(key); + } + } + } +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/common/ya.make b/ydb/core/tx/columnshard/engines/scheme/common/ya.make new file mode 100644 index 0000000000..6d84704af4 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/common/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + cache.cpp +) + +PEERDIR( + ydb/library/actors/core +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index 9974e027bf..29d40e0032 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -76,7 +76,7 @@ std::vector<TString> TIndexInfo::GetColumnNames(const std::vector<ui32>& ids) co } std::vector<std::string> TIndexInfo::GetColumnSTLNames(const bool withSpecial) const { - const auto ids = GetColumnIds(withSpecial); + const TColumnIdsView ids = GetColumnIds(withSpecial); std::vector<std::string> out; out.reserve(ids.size()); for (ui32 id : ids) { @@ -457,7 +457,7 @@ std::shared_ptr<NIndexes::NCountMinSketch::TIndexMeta> TIndexInfo::GetIndexMetaC } std::vector<ui32> TIndexInfo::GetEntityIds() const { - const auto columnIds = GetColumnIds(true); + const TColumnIdsView columnIds = GetColumnIds(true); std::vector<ui32> result(columnIds.begin(), columnIds.end()); for (auto&& i : Indexes) { result.emplace_back(i.first); diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp b/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp index 9b3da88611..8b8d2f44b0 100644 --- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp @@ -1,5 +1,12 @@ #include "objects_cache.h" +#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> + namespace NKikimr::NOlap { +TSchemaObjectsCache::TSchemasCache::TEntryGuard TSchemaObjectsCache::UpsertIndexInfo(const ui64 presetId, TIndexInfo&& indexInfo) { + const TSchemaVersionId versionId(presetId, indexInfo.GetVersion()); + return SchemasByVersion.Upsert(versionId, std::move(indexInfo)); +} + } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h index a203623b60..fabd908943 100644 --- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h +++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h @@ -1,6 +1,9 @@ #pragma once #include "column_features.h" +#include <ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h> +#include <ydb/core/tx/columnshard/engines/scheme/common/cache.h> + #include <contrib/libs/apache/arrow/cpp/src/arrow/type.h> #include <library/cpp/string_utils/quote/quote.h> #include <util/generic/hash.h> @@ -10,13 +13,22 @@ namespace NKikimr::NOlap { class TSchemaObjectsCache { private: THashMap<TString, std::shared_ptr<arrow::Field>> Fields; - THashMap<TString, std::shared_ptr<TColumnFeatures>> ColumnFeatures; - THashSet<TString> StringsCache; mutable ui64 AcceptionFieldsCount = 0; + mutable TMutex FieldsMutex; + + THashMap<TString, std::shared_ptr<TColumnFeatures>> ColumnFeatures; mutable ui64 AcceptionFeaturesCount = 0; + mutable TMutex FeaturesMutex; + + using TSchemasCache = TObjectCache<TSchemaVersionId, TIndexInfo>; + TSchemasCache SchemasByVersion; + + THashSet<TString> StringsCache; + mutable TMutex StringsMutex; public: const TString& GetStringCache(const TString& original) { + TGuard lock(StringsMutex); auto it = StringsCache.find(original); if (it == StringsCache.end()) { it = StringsCache.emplace(original).first; @@ -26,13 +38,16 @@ public: void RegisterField(const TString& fingerprint, const std::shared_ptr<arrow::Field>& f) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_field")("fp", fingerprint)("f", f->ToString()); + TGuard lock(FieldsMutex); AFL_VERIFY(Fields.emplace(fingerprint, f).second); } void RegisterColumnFeatures(const TString& fingerprint, const std::shared_ptr<TColumnFeatures>& f) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_column_features")("fp", fingerprint)("info", f->DebugString()); + TGuard lock(FeaturesMutex); AFL_VERIFY(ColumnFeatures.emplace(fingerprint, f).second); } std::shared_ptr<arrow::Field> GetField(const TString& fingerprint) const { + TGuard lock(FieldsMutex); auto it = Fields.find(fingerprint); if (it == Fields.end()) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_field_miss")("fp", fingerprint)("count", Fields.size())( @@ -47,6 +62,7 @@ public: } template <class TConstructor> TConclusion<std::shared_ptr<TColumnFeatures>> GetOrCreateColumnFeatures(const TString& fingerprint, const TConstructor& constructor) { + TGuard lock(FeaturesMutex); auto it = ColumnFeatures.find(fingerprint); if (it == ColumnFeatures.end()) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_column_features_miss")("fp", UrlEscapeRet(fingerprint))( @@ -65,6 +81,31 @@ public: } return it->second; } + + TSchemasCache::TEntryGuard UpsertIndexInfo(const ui64 presetId, TIndexInfo&& indexInfo); +}; + +class TSchemaCachesManager { +private: + THashMap<ui64, std::shared_ptr<TSchemaObjectsCache>> CacheByTableOwner; + TMutex Mutex; + + std::shared_ptr<TSchemaObjectsCache> GetCacheImpl(const ui64 ownerPathId) { + if (!ownerPathId) { + return std::make_shared<TSchemaObjectsCache>(); + } + TGuard lock(Mutex); + auto findCache = CacheByTableOwner.FindPtr(ownerPathId); + if (findCache) { + return *findCache; + } + return CacheByTableOwner.emplace(ownerPathId, std::make_shared<TSchemaObjectsCache>()).first->second; + } + +public: + static std::shared_ptr<TSchemaObjectsCache> GetCache(const ui64 ownerPathId) { + return Singleton<TSchemaCachesManager>()->GetCacheImpl(ownerPathId); + } }; } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp index 05277b7b89..451145bee0 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp @@ -1,32 +1,32 @@ #include "snapshot_scheme.h" +#include <ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h> namespace NKikimr::NOlap { -TSnapshotSchema::TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot) +TSnapshotSchema::TSnapshotSchema(TObjectCache<TSchemaVersionId, TIndexInfo>::TEntryGuard&& indexInfo, const TSnapshot& snapshot) : IndexInfo(std::move(indexInfo)) - , Schema(IndexInfo.ArrowSchemaWithSpecials()) - , Snapshot(snapshot) -{ + , Schema(IndexInfo->ArrowSchemaWithSpecials()) + , Snapshot(snapshot) { } TColumnSaver TSnapshotSchema::GetColumnSaver(const ui32 columnId) const { - return IndexInfo.GetColumnSaver(columnId); + return IndexInfo->GetColumnSaver(columnId); } std::shared_ptr<TColumnLoader> TSnapshotSchema::GetColumnLoaderOptional(const ui32 columnId) const { - return IndexInfo.GetColumnLoaderOptional(columnId); + return IndexInfo->GetColumnLoaderOptional(columnId); } std::optional<ui32> TSnapshotSchema::GetColumnIdOptional(const std::string& columnName) const { - return IndexInfo.GetColumnIdOptional(columnName); + return IndexInfo->GetColumnIdOptional(columnName); } ui32 TSnapshotSchema::GetColumnIdVerified(const std::string& columnName) const { - return IndexInfo.GetColumnIdVerified(columnName); + return IndexInfo->GetColumnIdVerified(columnName); } int TSnapshotSchema::GetFieldIndex(const ui32 columnId) const { - return IndexInfo.GetColumnIndexOptional(columnId).value_or(-1); + return IndexInfo->GetColumnIndexOptional(columnId).value_or(-1); } const std::shared_ptr<NArrow::TSchemaLite>& TSnapshotSchema::GetSchema() const { @@ -34,7 +34,7 @@ const std::shared_ptr<NArrow::TSchemaLite>& TSnapshotSchema::GetSchema() const { } const TIndexInfo& TSnapshotSchema::GetIndexInfo() const { - return IndexInfo; + return *IndexInfo; } const TSnapshot& TSnapshotSchema::GetSnapshot() const { @@ -46,7 +46,7 @@ ui32 TSnapshotSchema::GetColumnsCount() const { } ui64 TSnapshotSchema::GetVersion() const { - return IndexInfo.GetVersion(); + return IndexInfo->GetVersion(); } } diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h index 5246d39267..0cf6aa147d 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h @@ -4,11 +4,13 @@ #include <ydb/core/tx/columnshard/engines/scheme/index_info.h> +#include <ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h> + namespace NKikimr::NOlap { class TSnapshotSchema: public ISnapshotSchema { private: - TIndexInfo IndexInfo; + TObjectCache<TSchemaVersionId, TIndexInfo>::TEntryGuard IndexInfo; std::shared_ptr<NArrow::TSchemaLite> Schema; TSnapshot Snapshot; protected: @@ -16,15 +18,15 @@ protected: return TStringBuilder() << "(" "schema=" << Schema->ToString() << ";" << "snapshot=" << Snapshot.DebugString() << ";" << - "index_info=" << IndexInfo.DebugString() << ";" << + "index_info=" << IndexInfo->DebugString() << ";" << ")" ; } public: - TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot); + TSnapshotSchema(TObjectCache<TSchemaVersionId, TIndexInfo>::TEntryGuard&& indexInfo, const TSnapshot& snapshot); virtual TColumnIdsView GetColumnIds() const override { - return IndexInfo.GetColumnIds(); + return IndexInfo->GetColumnIds(); } TColumnSaver GetColumnSaver(const ui32 columnId) const override; diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp index d9a858f349..1642449df5 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp @@ -6,15 +6,15 @@ namespace NKikimr::NOlap { -const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, TIndexInfo&& indexInfo) { +const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, TObjectCache<TSchemaVersionId, TIndexInfo>::TEntryGuard&& indexInfo) { if (Snapshots.empty()) { - PrimaryKey = indexInfo.GetPrimaryKey(); + PrimaryKey = indexInfo->GetPrimaryKey(); } else { - Y_ABORT_UNLESS(PrimaryKey->Equals(indexInfo.GetPrimaryKey())); + Y_ABORT_UNLESS(PrimaryKey->Equals(indexInfo->GetPrimaryKey())); } - const bool needActualization = indexInfo.GetSchemeNeedActualization(); - auto newVersion = indexInfo.GetVersion(); + const bool needActualization = indexInfo->GetSchemeNeedActualization(); + auto newVersion = indexInfo->GetVersion(); auto itVersion = SnapshotByVersion.emplace(newVersion, std::make_shared<TSnapshotSchema>(std::move(indexInfo), snapshot)); if (!itVersion.second) { AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("message", "Skip registered version")("version", LastSchemaVersion); diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h index e4e22071f1..81a57cd65e 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h @@ -1,5 +1,8 @@ #pragma once #include "abstract_scheme.h" + +#include <ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h> +#include <ydb/core/tx/columnshard/engines/scheme/common/cache.h> #include <ydb/core/tx/sharding/sharding.h> namespace NKikimr::NOlap { @@ -123,7 +126,7 @@ public: return PrimaryKey; } - const TIndexInfo* AddIndex(const TSnapshot& snapshot, TIndexInfo&& indexInfo); + const TIndexInfo* AddIndex(const TSnapshot& snapshot, TObjectCache<TSchemaVersionId, TIndexInfo>::TEntryGuard&& indexInfo); bool LoadShardingInfo(IDbWrapper& db); }; diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/ya.make b/ydb/core/tx/columnshard/engines/scheme/versions/ya.make index 63dc44a748..5b9cc7eff7 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/versions/ya.make @@ -9,6 +9,7 @@ SRCS( PEERDIR( ydb/core/tx/columnshard/engines/scheme/abstract + ydb/core/tx/columnshard/engines/scheme/common ) END() diff --git a/ydb/core/tx/columnshard/engines/scheme/ya.make b/ydb/core/tx/columnshard/engines/scheme/ya.make index a8b2572ac5..295da3556b 100644 --- a/ydb/core/tx/columnshard/engines/scheme/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/ya.make @@ -22,6 +22,7 @@ PEERDIR( ydb/core/tx/columnshard/engines/scheme/versions ydb/core/tx/columnshard/engines/scheme/tiering ydb/core/tx/columnshard/engines/scheme/column + ydb/core/tx/columnshard/engines/scheme/common ydb/core/tx/columnshard/engines/scheme/defaults ydb/core/formats/arrow/accessor ydb/core/tx/columnshard/blobs_action/abstract diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index 6d23f3de94..9635c24b33 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -524,7 +524,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // load TSnapshot indexSnapshot(1, 1); TColumnEngineForLogs engine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared<TSchemaObjectsCache>(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo)); for (auto&& i : paths) { engine.RegisterTable(i); } @@ -609,7 +609,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TSnapshot indexSnapshot(1, 1); TColumnEngineForLogs engine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared<TSchemaObjectsCache>(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.TestingLoad(db); @@ -710,7 +710,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TSnapshot indexSnapshot(1, 1); TColumnEngineForLogs engine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared<TSchemaObjectsCache>(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.TestingLoad(db); @@ -736,7 +736,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // check it's overloaded after reload TColumnEngineForLogs tmpEngine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), TIndexInfo(tableInfo)); + 0, std::make_shared<TSchemaObjectsCache>(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), 0, TIndexInfo(tableInfo)); tmpEngine.RegisterTable(pathId); tmpEngine.TestingLoad(db); } @@ -768,7 +768,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // check it's not overloaded after reload TColumnEngineForLogs tmpEngine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), TIndexInfo(tableInfo)); + 0, std::make_shared<TSchemaObjectsCache>(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), 0, TIndexInfo(tableInfo)); tmpEngine.RegisterTable(pathId); tmpEngine.TestingLoad(db); } @@ -789,7 +789,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TSnapshot indexSnapshot(1, 1); { TColumnEngineForLogs engine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared<TSchemaObjectsCache>(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.TestingLoad(db); @@ -868,7 +868,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // load TColumnEngineForLogs engine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared<TSchemaObjectsCache>(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.TestingLoad(db); diff --git a/ydb/core/tx/columnshard/loading/stages.cpp b/ydb/core/tx/columnshard/loading/stages.cpp index ac4ab092d4..a0db85a87a 100644 --- a/ydb/core/tx/columnshard/loading/stages.cpp +++ b/ydb/core/tx/columnshard/loading/stages.cpp @@ -194,7 +194,8 @@ bool TSpecialValuesInitializer::DoPrecharge(NTabletFlatExecutor::TTransactionCon bool TTablesManagerInitializer::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) { NIceDb::TNiceDb db(txc.DB); - TTablesManager tablesManagerLocal(Self->StoragesManager, Self->DataAccessorsManager.GetObjectPtrVerified(), Self->TabletID()); + TTablesManager tablesManagerLocal(Self->StoragesManager, Self->DataAccessorsManager.GetObjectPtrVerified(), + NOlap::TSchemaCachesManager::GetCache(Self->OwnerPathId), Self->TabletID()); { TMemoryProfileGuard g("TTxInit/TTablesManager"); if (!tablesManagerLocal.InitFromDB(db)) { diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp index 21a184f7fd..e96a18d5e8 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp @@ -140,7 +140,8 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TChunksNormalizer::DoInit( return tasks; } - TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr), 0); + TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr), + std::make_shared<TSchemaObjectsCache>(), 0); if (!tablesManager.InitFromDB(db)) { ACFL_TRACE("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager"); return TConclusionStatus::Fail("Can't load index"); diff --git a/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp index 811cae0957..7f4745f7bb 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp @@ -181,8 +181,8 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TLeakedBlobsNormalizer::DoInit( return TConclusionStatus::Fail("Not ready"); } - NColumnShard::TTablesManager tablesManager( - controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr), TabletId); + NColumnShard::TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr), + std::make_shared<TSchemaObjectsCache>(), TabletId); if (!tablesManager.InitFromDB(db)) { ACFL_TRACE("normalizer", "TPortionsNormalizer")("error", "can't initialize tables manager"); diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp index bf4a6f5e89..f1ce6882b7 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp @@ -25,7 +25,8 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizerBase::DoInit( return TConclusionStatus::Fail("Not ready"); } - NColumnShard::TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr), 0); + NColumnShard::TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr), + std::make_shared<TSchemaObjectsCache>(), 0); if (!tablesManager.InitFromDB(db)) { ACFL_TRACE("normalizer", "TPortionsNormalizer")("error", "can't initialize tables manager"); return TConclusionStatus::Fail("Can't load index"); diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp index a2c382e5dd..b668125097 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp @@ -72,7 +72,8 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit( return TConclusionStatus::Fail("Not ready"); } - TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr), 0); + TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr), + std::make_shared<TSchemaObjectsCache>(), 0); if (!tablesManager.InitFromDB(db)) { ACFL_TRACE("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager"); return TConclusionStatus::Fail("Can't load index"); diff --git a/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp b/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp index 829f1cbe36..ceb87eb027 100644 --- a/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp +++ b/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp @@ -48,14 +48,14 @@ NKikimr::TConclusionStatus TUpdateMerger::OnEqualKeys(const NArrow::NMerger::TSo AFL_VERIFY(Schema->GetIndexInfo().GetColumnIds(false).size() == exists.GetData().GetColumns().size()) ("index", Schema->GetIndexInfo().GetColumnIds(false).size())("exists", exists.GetData().GetColumns().size()); for (i32 columnIdx = 0; columnIdx < Schema->GetIndexInfo().ArrowSchema().num_fields(); ++columnIdx) { - const std::optional<ui32>& incomingColumnIdx = IncomingColumnRemap[columnIdx]; - if (incomingColumnIdx && HasIncomingDataFlags[*incomingColumnIdx]->GetView(incoming.GetPosition())) { - const ui32 idxChunk = incoming.GetData().GetPositionInChunk(*incomingColumnIdx, incoming.GetPosition()); - rGuard.Add(*incoming.GetData().GetPositionAddress(*incomingColumnIdx).GetArray(), idxChunk); - } else { - const ui32 idxChunk = exists.GetData().GetPositionInChunk(columnIdx, exists.GetPosition()); - rGuard.Add(*exists.GetData().GetPositionAddress(columnIdx).GetArray(), idxChunk); - } + const std::optional<ui32>& incomingColumnIdx = IncomingColumnRemap[columnIdx]; + if (incomingColumnIdx && HasIncomingDataFlags[*incomingColumnIdx]->GetView(incoming.GetPosition())) { + const ui32 idxChunk = incoming.GetData().GetPositionInChunk(*incomingColumnIdx, incoming.GetPosition()); + rGuard.Add(*incoming.GetData().GetPositionAddress(*incomingColumnIdx).GetArray(), idxChunk); + } else { + const ui32 idxChunk = exists.GetData().GetPositionInChunk(columnIdx, exists.GetPosition()); + rGuard.Add(*exists.GetData().GetPositionAddress(columnIdx).GetArray(), idxChunk); + } } return TConclusionStatus::Success(); } diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index a1aee2a6a4..63a74e12fb 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -178,13 +178,13 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) { "version", info.GetSchema().GetVersion()); NOlap::IColumnEngine::TSchemaInitializationData schemaInitializationData(info); if (!PrimaryIndex) { - PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, DataAccessorsManager, StoragesManager, - version, schemaInitializationData); + PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>( + TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager, version, preset->Id, schemaInitializationData); } else if (PrimaryIndex->GetVersionedIndex().IsEmpty() || info.GetSchema().GetVersion() > PrimaryIndex->GetVersionedIndex().GetLastSchema()->GetVersion()) { - PrimaryIndex->RegisterSchemaVersion(version, schemaInitializationData); + PrimaryIndex->RegisterSchemaVersion(version, preset->Id, schemaInitializationData); } else { - PrimaryIndex->RegisterOldSchemaVersion(version, schemaInitializationData); + PrimaryIndex->RegisterOldSchemaVersion(version, preset->Id, schemaInitializationData); } if (!rowset.Next()) { @@ -290,14 +290,14 @@ void TTablesManager::AddSchemaVersion( Schema::SaveSchemaPresetVersionInfo(db, presetId, version, versionInfo); if (!PrimaryIndex) { - PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>( - TabletId, DataAccessorsManager, StoragesManager, version, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo)); + PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager, + version, presetId, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo)); for (auto&& i : Tables) { PrimaryIndex->RegisterTable(i.first); } PrimaryIndex->OnTieringModified(Ttl); } else { - PrimaryIndex->RegisterSchemaVersion(version, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo)); + PrimaryIndex->RegisterSchemaVersion(version, presetId, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo)); } } @@ -352,11 +352,14 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& } TTablesManager::TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, - const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager, const ui64 tabletId) + const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager, + const std::shared_ptr<NOlap::TSchemaObjectsCache>& schemaCache, const ui64 tabletId) : StoragesManager(storagesManager) , DataAccessorsManager(dataAccessorsManager) , LoadTimeCounters(std::make_unique<TTableLoadTimeCounters>()) + , SchemaObjectsCache(schemaCache) , TabletId(tabletId) { + AFL_VERIFY(SchemaObjectsCache); } bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, const ui64 pathId) const { diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index 05f1872c92..2f8d418328 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -145,13 +145,15 @@ private: std::shared_ptr<NOlap::IStoragesManager> StoragesManager; std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager> DataAccessorsManager; std::unique_ptr<TTableLoadTimeCounters> LoadTimeCounters; + std::shared_ptr<NOlap::TSchemaObjectsCache> SchemaObjectsCache; ui64 TabletId = 0; public: friend class TTxInit; TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, - const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager, const ui64 tabletId); + const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager, + const std::shared_ptr<NOlap::TSchemaObjectsCache>& schemaCache, const ui64 tabletId); const std::unique_ptr<TTableLoadTimeCounters>& GetLoadTimeCounters() const { return LoadTimeCounters; |