aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorValery Mironov <mbkkt@ydb.tech>2024-07-09 11:07:00 +0300
committerGitHub <noreply@github.com>2024-07-09 11:07:00 +0300
commit3e87d00cd647028bc8b4064ff33ecf6000f8bb19 (patch)
tree48c39ebd81f2a171f092e1e9cfd39b340e7fa671
parent8c6ee313319e101151d5d233811a266a805bdb03 (diff)
downloadydb-3e87d00cd647028bc8b4064ff33ecf6000f8bb19.tar.gz
Refactoring: DataShard handling indexes (#6425)
Co-authored-by: Ilnaz Nizametdinov <i.nizametdinov@gmail.com>
-rw-r--r--ydb/core/tx/datashard/build_index.cpp8
-rw-r--r--ydb/core/tx/datashard/build_index.h28
-rw-r--r--ydb/core/tx/datashard/change_collector_async_index.cpp4
-rw-r--r--ydb/core/tx/datashard/change_record_cdc_serializer.cpp3
-rw-r--r--ydb/core/tx/datashard/change_sender.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h1
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.h11
-rw-r--r--ydb/core/tx/datashard/drop_table_unit.cpp2
-rw-r--r--ydb/core/tx/datashard/finalize_build_index_unit.cpp7
10 files changed, 40 insertions, 30 deletions
diff --git a/ydb/core/tx/datashard/build_index.cpp b/ydb/core/tx/datashard/build_index.cpp
index c9eaf91699b..e8eb0aa6e11 100644
--- a/ydb/core/tx/datashard/build_index.cpp
+++ b/ydb/core/tx/datashard/build_index.cpp
@@ -696,15 +696,15 @@ void TDataShard::HandleSafe(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev,
TUserTable::TCPtr userTable = GetUserTables().at(tableId.PathId.LocalPathId);
- if (BuildIndexManager.Contains(buildIndexId)) {
- TBuildIndexRecord recCard = BuildIndexManager.Get(buildIndexId);
- if (recCard.SeqNo == seqNo) {
+ if (const auto* recCard = BuildIndexManager.Get(buildIndexId)) {
+ if (recCard->SeqNo == seqNo) {
// do no start one more scan
ctx.Send(ev->Sender, std::move(response));
return;
}
- CancelScan(userTable->LocalTid, recCard.ScanId);
+ CancelScan(userTable->LocalTid, recCard->ScanId);
+ BuildIndexManager.Drop(buildIndexId);
}
diff --git a/ydb/core/tx/datashard/build_index.h b/ydb/core/tx/datashard/build_index.h
index 61c85a14693..46dc88353ae 100644
--- a/ydb/core/tx/datashard/build_index.h
+++ b/ydb/core/tx/datashard/build_index.h
@@ -33,26 +33,32 @@ struct TBuildIndexRecord {
class TBuildIndexManager {
public:
- bool Contains(ui64 id) {
- return Records.contains(id);
+ const TBuildIndexRecord* Get(ui64 id) const {
+ Y_ABORT_UNLESS(id != 0);
+ if (BuildIndexId == id) {
+ return &Record;
+ }
+ Y_ABORT_UNLESS(BuildIndexId == 0);
+ return nullptr;
}
void Set(ui64 id, TBuildIndexRecord record) {
- Records.emplace(id, record);
- }
-
- TBuildIndexRecord Get(ui64 id) const {
- return Records.at(id);
+ Y_ABORT_UNLESS(id != 0);
+ Y_ABORT_UNLESS(BuildIndexId == 0);
+ BuildIndexId = id;
+ Record = record;
}
void Drop(ui64 id) {
- Records.erase(id);
+ Y_ABORT_UNLESS(Get(id) == &Record);
+ BuildIndexId = 0;
+ Record = {};
}
private:
- using TBuildIndexIdToScanIdMap = TMap<ui64, TBuildIndexRecord>;
-
- TBuildIndexIdToScanIdMap Records;
+ // Only single shard scan for build index possible now
+ ui64 BuildIndexId = 0;
+ TBuildIndexRecord Record;
};
}
diff --git a/ydb/core/tx/datashard/change_collector_async_index.cpp b/ydb/core/tx/datashard/change_collector_async_index.cpp
index 0559adc0b72..9e1c13d712b 100644
--- a/ydb/core/tx/datashard/change_collector_async_index.cpp
+++ b/ydb/core/tx/datashard/change_collector_async_index.cpp
@@ -105,7 +105,7 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
const auto tagToPos = MakeTagToPos(tagsToSelect, [](const auto tag) { return tag; });
const auto updatedTagToPos = MakeTagToPos(updates, [](const TUpdateOp& op) { return op.Tag; });
- userTable->ForEachAsyncIndex([&] (const auto& pathId, const auto& index) {
+ userTable->ForEachAsyncIndex([&](const auto& pathId, const TUserTable::TTableIndex& index) {
if (generateDeletions) {
bool needDeletion = rop == ERowOp::Erase || rop == ERowOp::Reset;
@@ -198,7 +198,7 @@ auto TAsyncIndexChangeCollector::CacheTags(const TTableId& tableId) const {
TCachedTagsBuilder builder;
- userTable->ForEachAsyncIndex([&] (const auto&, const auto& index) {
+ userTable->ForEachAsyncIndex([&](const auto&, const TUserTable::TTableIndex& index) {
builder.AddIndexTags(index.KeyColumnIds);
builder.AddDataTags(index.DataColumnIds);
});
diff --git a/ydb/core/tx/datashard/change_record_cdc_serializer.cpp b/ydb/core/tx/datashard/change_record_cdc_serializer.cpp
index 6e6eb098d28..abec4cbeed1 100644
--- a/ydb/core/tx/datashard/change_record_cdc_serializer.cpp
+++ b/ydb/core/tx/datashard/change_record_cdc_serializer.cpp
@@ -370,6 +370,9 @@ class TDynamoDBStreamsJsonSerializer: public TJsonSerializer {
} else if (name.StartsWith("__Hash_")) {
bool indexed = false;
for (const auto& [_, index] : schema->Indexes) {
+ if (index.Type != TUserTable::TTableIndex::EType::EIndexTypeGlobalAsync) {
+ continue;
+ }
Y_ABORT_UNLESS(index.KeyColumnIds.size() >= 1);
if (index.KeyColumnIds.at(0) == tag) {
indexed = true;
diff --git a/ydb/core/tx/datashard/change_sender.cpp b/ydb/core/tx/datashard/change_sender.cpp
index 5a5bef53b84..d1c5e8c4033 100644
--- a/ydb/core/tx/datashard/change_sender.cpp
+++ b/ydb/core/tx/datashard/change_sender.cpp
@@ -275,7 +275,7 @@ public:
for (const auto& [tableId, tableInfo] : self->GetUserTables()) {
const auto fullTableId = TTableId(self->GetPathOwnerId(), tableId);
- tableInfo->ForEachAsyncIndex([&] (const auto& indexPathId, const auto&) {
+ tableInfo->ForEachAsyncIndex([&](const auto& indexPathId, const auto&) {
AddChangeSender(indexPathId, fullTableId, ESenderType::AsyncIndex);
});
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index ba07749ec28..eee988cf37c 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -1825,9 +1825,7 @@ TUserTable::TPtr TDataShard::MoveUserIndex(TOperation::TPtr op, const NKikimrTxD
node.key() = remapNewId;
auto it = newTableInfo->Indexes.insert(std::move(node)).position;
- Y_ABORT_UNLESS(move.GetReMapIndex().HasDstName());
- indexDesc.SetName(dstIndexName);
- it->second.Name = dstIndexName;
+ it->second.Rename(indexDesc, dstIndexName);
}
newTableInfo->SetSchema(schema);
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index b770044dedb..5af64380650 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1958,7 +1958,6 @@ public:
}
TBuildIndexManager& GetBuildIndexManager() { return BuildIndexManager; }
- const TBuildIndexManager& GetBuildIndexManager() const { return BuildIndexManager; }
// Returns true when datashard is working in mvcc mode
bool IsMvccEnabled() const;
diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h
index af7c9ab2a6d..9be76b4cac7 100644
--- a/ydb/core/tx/datashard/datashard_user_table.h
+++ b/ydb/core/tx/datashard/datashard_user_table.h
@@ -254,7 +254,6 @@ struct TUserTable : public TThrRefBase {
using EType = NKikimrSchemeOp::EIndexType;
using EState = NKikimrSchemeOp::EIndexState;
- TString Name;
EType Type;
EState State;
TVector<ui32> KeyColumnIds;
@@ -263,10 +262,12 @@ struct TUserTable : public TThrRefBase {
TTableIndex() = default;
TTableIndex(const NKikimrSchemeOp::TIndexDescription& indexDesc, const TMap<ui32, TUserColumn>& columns)
- : Name(indexDesc.GetName())
- , Type(indexDesc.GetType())
+ : Type(indexDesc.GetType())
, State(indexDesc.GetState())
{
+ if (Type != EType::EIndexTypeGlobalAsync) {
+ return;
+ }
THashMap<TStringBuf, ui32> nameToId;
for (const auto& [id, column] : columns) {
Y_DEBUG_ABORT_UNLESS(!nameToId.contains(column.Name));
@@ -285,6 +286,10 @@ struct TUserTable : public TThrRefBase {
fillColumnIds(indexDesc.GetKeyColumnNames(), KeyColumnIds);
fillColumnIds(indexDesc.GetDataColumnNames(), DataColumnIds);
}
+
+ static void Rename(NKikimrSchemeOp::TIndexDescription& indexDesc, const TString& newName) {
+ indexDesc.SetName(newName);
+ }
};
struct TCdcStream {
diff --git a/ydb/core/tx/datashard/drop_table_unit.cpp b/ydb/core/tx/datashard/drop_table_unit.cpp
index d1b640f444d..552c4574bad 100644
--- a/ydb/core/tx/datashard/drop_table_unit.cpp
+++ b/ydb/core/tx/datashard/drop_table_unit.cpp
@@ -80,7 +80,7 @@ EExecutionStatus TDropTableUnit::Execute(TOperation::TPtr op,
auto it = DataShard.GetUserTables().find(tableId);
Y_ABORT_UNLESS(it != DataShard.GetUserTables().end());
{
- it->second->ForEachAsyncIndex([&] (const auto& indexPathId, const auto&) {
+ it->second->ForEachAsyncIndex([&](const auto& indexPathId, const auto&) {
RemoveSenders.emplace_back(new TEvChangeExchange::TEvRemoveSender(indexPathId));
});
for (const auto& [streamPathId, _] : it->second->CdcStreams) {
diff --git a/ydb/core/tx/datashard/finalize_build_index_unit.cpp b/ydb/core/tx/datashard/finalize_build_index_unit.cpp
index 315a2c81f01..383a2303073 100644
--- a/ydb/core/tx/datashard/finalize_build_index_unit.cpp
+++ b/ydb/core/tx/datashard/finalize_build_index_unit.cpp
@@ -46,7 +46,7 @@ public:
const auto& userTables = DataShard.GetUserTables();
Y_ABORT_UNLESS(userTables.contains(pathId.LocalPathId));
- userTables.at(pathId.LocalPathId)->ForAsyncIndex(indexPathId, [&] (const auto&) {
+ userTables.at(pathId.LocalPathId)->ForAsyncIndex(indexPathId, [&](const auto&) {
RemoveSender.Reset(new TEvChangeExchange::TEvRemoveSender(indexPathId));
});
@@ -66,9 +66,8 @@ public:
ui64 txId = params.GetSnapshotTxId();
Y_ABORT_UNLESS(step != 0);
- if (DataShard.GetBuildIndexManager().Contains(params.GetBuildIndexId())) {
- auto record = DataShard.GetBuildIndexManager().Get(params.GetBuildIndexId());
- DataShard.CancelScan(tableInfo->LocalTid, record.ScanId);
+ if (const auto* record = DataShard.GetBuildIndexManager().Get(params.GetBuildIndexId())) {
+ DataShard.CancelScan(tableInfo->LocalTid, record->ScanId);
DataShard.GetBuildIndexManager().Drop(params.GetBuildIndexId());
}