diff options
author | Valery Mironov <mbkkt@ydb.tech> | 2024-07-09 11:07:00 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-09 11:07:00 +0300 |
commit | 3e87d00cd647028bc8b4064ff33ecf6000f8bb19 (patch) | |
tree | 48c39ebd81f2a171f092e1e9cfd39b340e7fa671 | |
parent | 8c6ee313319e101151d5d233811a266a805bdb03 (diff) | |
download | ydb-3e87d00cd647028bc8b4064ff33ecf6000f8bb19.tar.gz |
Refactoring: DataShard handling indexes (#6425)
Co-authored-by: Ilnaz Nizametdinov <i.nizametdinov@gmail.com>
-rw-r--r-- | ydb/core/tx/datashard/build_index.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/build_index.h | 28 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_collector_async_index.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record_cdc_serializer.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_user_table.h | 11 | ||||
-rw-r--r-- | ydb/core/tx/datashard/drop_table_unit.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/finalize_build_index_unit.cpp | 7 |
10 files changed, 40 insertions, 30 deletions
diff --git a/ydb/core/tx/datashard/build_index.cpp b/ydb/core/tx/datashard/build_index.cpp index c9eaf91699b..e8eb0aa6e11 100644 --- a/ydb/core/tx/datashard/build_index.cpp +++ b/ydb/core/tx/datashard/build_index.cpp @@ -696,15 +696,15 @@ void TDataShard::HandleSafe(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, TUserTable::TCPtr userTable = GetUserTables().at(tableId.PathId.LocalPathId); - if (BuildIndexManager.Contains(buildIndexId)) { - TBuildIndexRecord recCard = BuildIndexManager.Get(buildIndexId); - if (recCard.SeqNo == seqNo) { + if (const auto* recCard = BuildIndexManager.Get(buildIndexId)) { + if (recCard->SeqNo == seqNo) { // do no start one more scan ctx.Send(ev->Sender, std::move(response)); return; } - CancelScan(userTable->LocalTid, recCard.ScanId); + CancelScan(userTable->LocalTid, recCard->ScanId); + BuildIndexManager.Drop(buildIndexId); } diff --git a/ydb/core/tx/datashard/build_index.h b/ydb/core/tx/datashard/build_index.h index 61c85a14693..46dc88353ae 100644 --- a/ydb/core/tx/datashard/build_index.h +++ b/ydb/core/tx/datashard/build_index.h @@ -33,26 +33,32 @@ struct TBuildIndexRecord { class TBuildIndexManager { public: - bool Contains(ui64 id) { - return Records.contains(id); + const TBuildIndexRecord* Get(ui64 id) const { + Y_ABORT_UNLESS(id != 0); + if (BuildIndexId == id) { + return &Record; + } + Y_ABORT_UNLESS(BuildIndexId == 0); + return nullptr; } void Set(ui64 id, TBuildIndexRecord record) { - Records.emplace(id, record); - } - - TBuildIndexRecord Get(ui64 id) const { - return Records.at(id); + Y_ABORT_UNLESS(id != 0); + Y_ABORT_UNLESS(BuildIndexId == 0); + BuildIndexId = id; + Record = record; } void Drop(ui64 id) { - Records.erase(id); + Y_ABORT_UNLESS(Get(id) == &Record); + BuildIndexId = 0; + Record = {}; } private: - using TBuildIndexIdToScanIdMap = TMap<ui64, TBuildIndexRecord>; - - TBuildIndexIdToScanIdMap Records; + // Only single shard scan for build index possible now + ui64 BuildIndexId = 0; + TBuildIndexRecord Record; }; } diff --git a/ydb/core/tx/datashard/change_collector_async_index.cpp b/ydb/core/tx/datashard/change_collector_async_index.cpp index 0559adc0b72..9e1c13d712b 100644 --- a/ydb/core/tx/datashard/change_collector_async_index.cpp +++ b/ydb/core/tx/datashard/change_collector_async_index.cpp @@ -105,7 +105,7 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop, const auto tagToPos = MakeTagToPos(tagsToSelect, [](const auto tag) { return tag; }); const auto updatedTagToPos = MakeTagToPos(updates, [](const TUpdateOp& op) { return op.Tag; }); - userTable->ForEachAsyncIndex([&] (const auto& pathId, const auto& index) { + userTable->ForEachAsyncIndex([&](const auto& pathId, const TUserTable::TTableIndex& index) { if (generateDeletions) { bool needDeletion = rop == ERowOp::Erase || rop == ERowOp::Reset; @@ -198,7 +198,7 @@ auto TAsyncIndexChangeCollector::CacheTags(const TTableId& tableId) const { TCachedTagsBuilder builder; - userTable->ForEachAsyncIndex([&] (const auto&, const auto& index) { + userTable->ForEachAsyncIndex([&](const auto&, const TUserTable::TTableIndex& index) { builder.AddIndexTags(index.KeyColumnIds); builder.AddDataTags(index.DataColumnIds); }); diff --git a/ydb/core/tx/datashard/change_record_cdc_serializer.cpp b/ydb/core/tx/datashard/change_record_cdc_serializer.cpp index 6e6eb098d28..abec4cbeed1 100644 --- a/ydb/core/tx/datashard/change_record_cdc_serializer.cpp +++ b/ydb/core/tx/datashard/change_record_cdc_serializer.cpp @@ -370,6 +370,9 @@ class TDynamoDBStreamsJsonSerializer: public TJsonSerializer { } else if (name.StartsWith("__Hash_")) { bool indexed = false; for (const auto& [_, index] : schema->Indexes) { + if (index.Type != TUserTable::TTableIndex::EType::EIndexTypeGlobalAsync) { + continue; + } Y_ABORT_UNLESS(index.KeyColumnIds.size() >= 1); if (index.KeyColumnIds.at(0) == tag) { indexed = true; diff --git a/ydb/core/tx/datashard/change_sender.cpp b/ydb/core/tx/datashard/change_sender.cpp index 5a5bef53b84..d1c5e8c4033 100644 --- a/ydb/core/tx/datashard/change_sender.cpp +++ b/ydb/core/tx/datashard/change_sender.cpp @@ -275,7 +275,7 @@ public: for (const auto& [tableId, tableInfo] : self->GetUserTables()) { const auto fullTableId = TTableId(self->GetPathOwnerId(), tableId); - tableInfo->ForEachAsyncIndex([&] (const auto& indexPathId, const auto&) { + tableInfo->ForEachAsyncIndex([&](const auto& indexPathId, const auto&) { AddChangeSender(indexPathId, fullTableId, ESenderType::AsyncIndex); }); diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index ba07749ec28..eee988cf37c 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -1825,9 +1825,7 @@ TUserTable::TPtr TDataShard::MoveUserIndex(TOperation::TPtr op, const NKikimrTxD node.key() = remapNewId; auto it = newTableInfo->Indexes.insert(std::move(node)).position; - Y_ABORT_UNLESS(move.GetReMapIndex().HasDstName()); - indexDesc.SetName(dstIndexName); - it->second.Name = dstIndexName; + it->second.Rename(indexDesc, dstIndexName); } newTableInfo->SetSchema(schema); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index b770044dedb..5af64380650 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1958,7 +1958,6 @@ public: } TBuildIndexManager& GetBuildIndexManager() { return BuildIndexManager; } - const TBuildIndexManager& GetBuildIndexManager() const { return BuildIndexManager; } // Returns true when datashard is working in mvcc mode bool IsMvccEnabled() const; diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h index af7c9ab2a6d..9be76b4cac7 100644 --- a/ydb/core/tx/datashard/datashard_user_table.h +++ b/ydb/core/tx/datashard/datashard_user_table.h @@ -254,7 +254,6 @@ struct TUserTable : public TThrRefBase { using EType = NKikimrSchemeOp::EIndexType; using EState = NKikimrSchemeOp::EIndexState; - TString Name; EType Type; EState State; TVector<ui32> KeyColumnIds; @@ -263,10 +262,12 @@ struct TUserTable : public TThrRefBase { TTableIndex() = default; TTableIndex(const NKikimrSchemeOp::TIndexDescription& indexDesc, const TMap<ui32, TUserColumn>& columns) - : Name(indexDesc.GetName()) - , Type(indexDesc.GetType()) + : Type(indexDesc.GetType()) , State(indexDesc.GetState()) { + if (Type != EType::EIndexTypeGlobalAsync) { + return; + } THashMap<TStringBuf, ui32> nameToId; for (const auto& [id, column] : columns) { Y_DEBUG_ABORT_UNLESS(!nameToId.contains(column.Name)); @@ -285,6 +286,10 @@ struct TUserTable : public TThrRefBase { fillColumnIds(indexDesc.GetKeyColumnNames(), KeyColumnIds); fillColumnIds(indexDesc.GetDataColumnNames(), DataColumnIds); } + + static void Rename(NKikimrSchemeOp::TIndexDescription& indexDesc, const TString& newName) { + indexDesc.SetName(newName); + } }; struct TCdcStream { diff --git a/ydb/core/tx/datashard/drop_table_unit.cpp b/ydb/core/tx/datashard/drop_table_unit.cpp index d1b640f444d..552c4574bad 100644 --- a/ydb/core/tx/datashard/drop_table_unit.cpp +++ b/ydb/core/tx/datashard/drop_table_unit.cpp @@ -80,7 +80,7 @@ EExecutionStatus TDropTableUnit::Execute(TOperation::TPtr op, auto it = DataShard.GetUserTables().find(tableId); Y_ABORT_UNLESS(it != DataShard.GetUserTables().end()); { - it->second->ForEachAsyncIndex([&] (const auto& indexPathId, const auto&) { + it->second->ForEachAsyncIndex([&](const auto& indexPathId, const auto&) { RemoveSenders.emplace_back(new TEvChangeExchange::TEvRemoveSender(indexPathId)); }); for (const auto& [streamPathId, _] : it->second->CdcStreams) { diff --git a/ydb/core/tx/datashard/finalize_build_index_unit.cpp b/ydb/core/tx/datashard/finalize_build_index_unit.cpp index 315a2c81f01..383a2303073 100644 --- a/ydb/core/tx/datashard/finalize_build_index_unit.cpp +++ b/ydb/core/tx/datashard/finalize_build_index_unit.cpp @@ -46,7 +46,7 @@ public: const auto& userTables = DataShard.GetUserTables(); Y_ABORT_UNLESS(userTables.contains(pathId.LocalPathId)); - userTables.at(pathId.LocalPathId)->ForAsyncIndex(indexPathId, [&] (const auto&) { + userTables.at(pathId.LocalPathId)->ForAsyncIndex(indexPathId, [&](const auto&) { RemoveSender.Reset(new TEvChangeExchange::TEvRemoveSender(indexPathId)); }); @@ -66,9 +66,8 @@ public: ui64 txId = params.GetSnapshotTxId(); Y_ABORT_UNLESS(step != 0); - if (DataShard.GetBuildIndexManager().Contains(params.GetBuildIndexId())) { - auto record = DataShard.GetBuildIndexManager().Get(params.GetBuildIndexId()); - DataShard.CancelScan(tableInfo->LocalTid, record.ScanId); + if (const auto* record = DataShard.GetBuildIndexManager().Get(params.GetBuildIndexId())) { + DataShard.CancelScan(tableInfo->LocalTid, record->ScanId); DataShard.GetBuildIndexManager().Drop(params.GetBuildIndexId()); } |