diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-04-26 16:44:19 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-04-26 16:44:19 +0300 |
commit | 6d73358874922366b2611962ff0179b01fb838a0 (patch) | |
tree | 8abda74daf439ebf3d2b55a8d9443cee62d08a56 | |
parent | 50b1a3669d6924e89fde02dc52208d793bba1fbd (diff) | |
download | ydb-6d73358874922366b2611962ff0179b01fb838a0.tar.gz |
Apply changes
-rw-r--r-- | ydb/core/protos/flat_scheme_op.proto | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.cpp | 12 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_repl_apply.cpp | 84 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_user_table.cpp | 18 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_user_table.h | 29 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_compaction.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_replication.cpp | 55 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execution_unit.cpp | 4 |
12 files changed, 190 insertions, 39 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 87455da8b2..895d6040db 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -331,7 +331,14 @@ message TTableReplicationConfig { REPLICATION_MODE_READ_ONLY = 1; } + enum EConsistency { + CONSISTENCY_UNKNOWN = 0; + CONSISTENCY_STRONG = 1; + CONSISTENCY_WEAK = 2; + } + optional EReplicationMode Mode = 1; + optional EConsistency Consistency = 2; } message TTableDescription { diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 2b3d32402b..3e57671368 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1418,7 +1418,7 @@ public: bool IsReplicated() const { for (const auto& [_, info] : TableInfos) { - if (info->IsReplicated) { + if (info->IsReplicated()) { return true; } } diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index a9447a96f8..7d8dd3a1d4 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1898,6 +1898,12 @@ TOperation::TPtr TPipeline::FindCompletingOp(ui64 txId) const { return nullptr; } +void TPipeline::AddCommittingOp(const TRowVersion& version) { + if (!Self->IsMvccEnabled()) + return; + CommittingOps.Add(version); +} + void TPipeline::AddCommittingOp(const TOperation::TPtr& op) { if (!Self->IsMvccEnabled() || op->IsReadOnly()) return; @@ -1909,6 +1915,12 @@ void TPipeline::AddCommittingOp(const TOperation::TPtr& op) { CommittingOps.Add(version); } +void TPipeline::RemoveCommittingOp(const TRowVersion& version) { + if (!Self->IsMvccEnabled()) + return; + CommittingOps.Remove(version); +} + void TPipeline::RemoveCommittingOp(const TOperation::TPtr& op) { if (!Self->IsMvccEnabled() || op->IsReadOnly()) return; diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index a57aaf6bde..16d3e332a9 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -352,7 +352,9 @@ public: void RemoveCompletingOp(const TOperation::TPtr& op); TOperation::TPtr FindCompletingOp(ui64 txId) const; + void AddCommittingOp(const TRowVersion& version); void AddCommittingOp(const TOperation::TPtr& op); + void RemoveCommittingOp(const TRowVersion& version); void RemoveCommittingOp(const TOperation::TPtr& op); bool WaitCompletion(const TOperation::TPtr& op) const; bool HasCommittingOpsBelow(TRowVersion upperBound) const; diff --git a/ydb/core/tx/datashard/datashard_repl_apply.cpp b/ydb/core/tx/datashard/datashard_repl_apply.cpp index 169fd0203d..205c5528a1 100644 --- a/ydb/core/tx/datashard/datashard_repl_apply.cpp +++ b/ydb/core/tx/datashard/datashard_repl_apply.cpp @@ -9,8 +9,10 @@ using namespace NTabletFlatExecutor; class TDataShard::TTxApplyReplicationChanges : public TTransactionBase<TDataShard> { public: - explicit TTxApplyReplicationChanges(TDataShard* self, TEvDataShard::TEvApplyReplicationChanges::TPtr&& ev) + explicit TTxApplyReplicationChanges(TDataShard* self, TPipeline& pipeline, + TEvDataShard::TEvApplyReplicationChanges::TPtr&& ev) : TTransactionBase(self) + , Pipeline(pipeline) , Ev(std::move(ev)) { } @@ -22,8 +24,7 @@ public: bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { Y_UNUSED(ctx); - // TODO: check this is a replicated shard - if (Self->State != TShardState::Ready) { + if (Self->State != TShardState::Ready && !Self->IsReplicated()) { Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>( NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_WRONG_STATE); @@ -33,11 +34,11 @@ public: const auto& msg = Ev->Get()->Record; const auto& tableId = msg.GetTableId(); - TPathId pathId(tableId.GetOwnerId(), tableId.GetTableId()); + const TTableId fullTableId(tableId.GetOwnerId(), tableId.GetTableId()); const auto& userTables = Self->GetUserTables(); - auto it = userTables.find(pathId.LocalPathId); - if (pathId.OwnerId != Self->GetPathOwnerId() || it == userTables.end()) { + auto it = userTables.find(fullTableId.PathId.LocalPathId); + if (fullTableId.PathId.OwnerId != Self->GetPathOwnerId() || it == userTables.end()) { TString error = TStringBuilder() << "DataShard " << Self->TabletID() << " does not have a table " << tableId.GetOwnerId() << ":" << tableId.GetTableId(); @@ -62,17 +63,24 @@ public: return true; } - auto& source = EnsureSource(txc, pathId, msg.GetSource()); + auto& source = EnsureSource(txc, fullTableId.PathId, msg.GetSource()); for (const auto& change : msg.GetChanges()) { - if (!ApplyChange(txc, userTable, source, change)) { + if (!ApplyChange(txc, fullTableId, userTable, source, change)) { Y_VERIFY(Result); - return true; + break; } } - Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>( - NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_OK); + if (MvccReadWriteVersion) { + Pipeline.AddCommittingOp(*MvccReadWriteVersion); + } + + if (!Result) { + Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>( + NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_OK); + } + return true; } @@ -84,19 +92,31 @@ public: } bool ApplyChange( - TTransactionContext& txc, const TUserTable& userTable, TReplicationSourceState& source, - const NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& change) + TTransactionContext& txc, const TTableId& tableId, const TUserTable& userTable, + TReplicationSourceState& source, const NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& change) { + Y_VERIFY(userTable.IsReplicated()); + // TODO: check source and offset, persist new values i64 sourceOffset = change.GetSourceOffset(); ui64 writeTxId = change.GetWriteTxId(); - if (Y_UNLIKELY(writeTxId == 0)) { - Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>( - NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, - NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST, - "Every change must specify a non-zero WriteTxId"); - return false; + if (userTable.ReplicationConfig.HasWeakConsistency()) { + if (writeTxId) { + Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>( + NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, + NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST, + "WriteTxId cannot be specified for weak consistency"); + return false; + } + } else { + if (writeTxId == 0) { + Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>( + NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, + NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST, + "Non-zero WriteTxId must be specified for strong consistency"); + return false; + } } TSerializedCellVec keyCellVec; @@ -153,7 +173,19 @@ public: } } - txc.DB.UpdateTx(userTable.LocalTid, rop, key, update, writeTxId); + if (writeTxId) { + txc.DB.UpdateTx(userTable.LocalTid, rop, key, update, writeTxId); + } else { + if (!MvccReadWriteVersion) { + auto [readVersion, writeVersion] = Self->GetReadWriteVersions(); + Y_VERIFY_DEBUG(readVersion == writeVersion); + MvccReadWriteVersion = writeVersion; + } + + Self->SysLocksTable().BreakLocks(tableId, keyCellVec.GetCells()); + txc.DB.Update(userTable.LocalTid, rop, key, update, *MvccReadWriteVersion); + } + return true; } @@ -201,16 +233,24 @@ public: void Complete(const TActorContext& ctx) override { Y_VERIFY(Ev); Y_VERIFY(Result); - ctx.Send(Ev->Sender, Result.Release(), 0, Ev->Cookie); + + if (MvccReadWriteVersion) { + Pipeline.RemoveCommittingOp(*MvccReadWriteVersion); + Self->SendImmediateWriteResult(*MvccReadWriteVersion, Ev->Sender, Result.Release(), Ev->Cookie); + } else { + ctx.Send(Ev->Sender, Result.Release(), 0, Ev->Cookie); + } } private: + TPipeline& Pipeline; TEvDataShard::TEvApplyReplicationChanges::TPtr Ev; THolder<TEvDataShard::TEvApplyReplicationChangesResult> Result; + std::optional<TRowVersion> MvccReadWriteVersion; }; // TTxApplyReplicationChanges void TDataShard::Handle(TEvDataShard::TEvApplyReplicationChanges::TPtr& ev, const TActorContext& ctx) { - Execute(new TTxApplyReplicationChanges(this, std::move(ev)), ctx); + Execute(new TTxApplyReplicationChanges(this, Pipeline, std::move(ev)), ctx); } } // NDataShard diff --git a/ydb/core/tx/datashard/datashard_user_table.cpp b/ydb/core/tx/datashard/datashard_user_table.cpp index 9b1f2f3318..217f079987 100644 --- a/ydb/core/tx/datashard/datashard_user_table.cpp +++ b/ydb/core/tx/datashard/datashard_user_table.cpp @@ -188,6 +188,15 @@ bool TUserTable::NeedSchemaSnapshots() const { return JsonCdcStreamCount > 0; } +bool TUserTable::IsReplicated() const { + switch (ReplicationConfig.Mode) { + case NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE: + return false; + default: + return true; + } +} + void TUserTable::ParseProto(const NKikimrSchemeOp::TTableDescription& descr) { // We expect schemeshard to send us full list of storage rooms @@ -267,14 +276,7 @@ void TUserTable::ParseProto(const NKikimrSchemeOp::TTableDescription& descr) TableSchemaVersion = descr.GetTableSchemaVersion(); IsBackup = descr.GetIsBackup(); - - switch (descr.GetReplicationConfig().GetMode()) { - case NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE: - break; - default: - IsReplicated = true; - break; - } + ReplicationConfig = TReplicationConfig(descr.GetReplicationConfig()); CheckSpecialColumns(); diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h index 7f22dbf06a..c1b3120752 100644 --- a/ydb/core/tx/datashard/datashard_user_table.h +++ b/ydb/core/tx/datashard/datashard_user_table.h @@ -308,6 +308,31 @@ struct TUserTable : public TThrRefBase { } }; + struct TReplicationConfig { + NKikimrSchemeOp::TTableReplicationConfig::EReplicationMode Mode; + NKikimrSchemeOp::TTableReplicationConfig::EConsistency Consistency; + + TReplicationConfig() + : Mode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE) + , Consistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_UNKNOWN) + { + } + + TReplicationConfig(const NKikimrSchemeOp::TTableReplicationConfig& config) + : Mode(config.GetMode()) + , Consistency(config.GetConsistency()) + { + } + + bool HasWeakConsistency() const { + return Consistency == NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK; + } + + bool HasStrongConsistency() const { + return Consistency == NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_STRONG; + } + }; + struct TStats { NTable::TStats DataStats; ui64 IndexSize = 0; @@ -358,8 +383,8 @@ struct TUserTable : public TThrRefBase { TVector<NScheme::TTypeInfo> KeyColumnTypes; TVector<ui32> KeyColumnIds; TSerializedTableRange Range; + TReplicationConfig ReplicationConfig; bool IsBackup = false; - bool IsReplicated = false; TMap<TPathId, TTableIndex> Indexes; TMap<TPathId, TCdcStream> CdcStreams; @@ -421,6 +446,8 @@ struct TUserTable : public TThrRefBase { bool HasCdcStreams() const; bool NeedSchemaSnapshots() const; + bool IsReplicated() const; + private: void DoApplyCreate(NTabletFlatExecutor::TTransactionContext& txc, const TString& tableName, bool shadow, const NKikimrSchemeOp::TPartitionConfig& partConfig) const; diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp index 8a0a890dd0..1ede140537 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/datashard_ut_common.cpp @@ -1179,6 +1179,11 @@ void CreateShardedTable( desc->MutableReplicationConfig()->SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY); } + if (opts.ReplicationConsistency_) { + desc->MutableReplicationConfig()->SetConsistency( + static_cast<NKikimrSchemeOp::TTableReplicationConfig::EConsistency>(*opts.ReplicationConsistency_)); + } + WaitTxNotification(server, sender, RunSchemeTx(*server->GetRuntime(), std::move(request), sender)); } diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index a6430b2f33..2839c518a6 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -395,6 +395,11 @@ enum class EShadowDataMode { Enabled, }; +enum class EReplicationConsistency: int { + Strong = 1, + Weak = 2, +}; + struct TShardedTableOptions { using TSelf = TShardedTableOptions; @@ -452,6 +457,7 @@ struct TShardedTableOptions { TABLE_OPTION(bool, ExternalStorage, false); TABLE_OPTION(std::optional<ui64>, ExecutorCacheSize, std::nullopt); TABLE_OPTION(bool, Replicated, false); + TABLE_OPTION(std::optional<EReplicationConsistency>, ReplicationConsistency, std::nullopt); #undef TABLE_OPTION #undef TABLE_OPTION_IMPL diff --git a/ydb/core/tx/datashard/datashard_ut_compaction.cpp b/ydb/core/tx/datashard/datashard_ut_compaction.cpp index 29bf2f966e..947728d89d 100644 --- a/ydb/core/tx/datashard/datashard_ut_compaction.cpp +++ b/ydb/core/tx/datashard/datashard_ut_compaction.cpp @@ -142,7 +142,10 @@ Y_UNIT_TEST_SUITE(DataShardCompaction) { InitRoot(server, sender); - CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions() + .Replicated(true) + .ReplicationConsistency(EReplicationConsistency::Strong) + ); auto shards1 = GetTableShards(server, sender, "/Root/table-1"); UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u); diff --git a/ydb/core/tx/datashard/datashard_ut_replication.cpp b/ydb/core/tx/datashard/datashard_ut_replication.cpp index 7906420e0c..00b78a2789 100644 --- a/ydb/core/tx/datashard/datashard_ut_replication.cpp +++ b/ydb/core/tx/datashard/datashard_ut_replication.cpp @@ -26,8 +26,14 @@ Y_UNIT_TEST_SUITE(DataShardReplication) { InitRoot(server, sender); - CreateShardedTable(server, sender, "/Root", "table-1", 1); - CreateShardedTable(server, sender, "/Root", "table-2", 1); + CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions() + .Replicated(true) + .ReplicationConsistency(EReplicationConsistency::Strong) + ); + CreateShardedTable(server, sender, "/Root", "table-2", TShardedTableOptions() + .Replicated(true) + .ReplicationConsistency(EReplicationConsistency::Strong) + ); auto shards1 = GetTableShards(server, sender, "/Root/table-1"); auto shards2 = GetTableShards(server, sender, "/Root/table-2"); @@ -96,8 +102,14 @@ Y_UNIT_TEST_SUITE(DataShardReplication) { InitRoot(server, sender); - CreateShardedTable(server, sender, "/Root", "table-1", 1); - CreateShardedTable(server, sender, "/Root", "table-2", 1); + CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions() + .Replicated(true) + .ReplicationConsistency(EReplicationConsistency::Strong) + ); + CreateShardedTable(server, sender, "/Root", "table-2", TShardedTableOptions() + .Replicated(true) + .ReplicationConsistency(EReplicationConsistency::Strong) + ); auto shards1 = GetTableShards(server, sender, "/Root/table-1"); auto tableId1 = ResolveTableId(server, sender, "/Root/table-1"); @@ -234,6 +246,41 @@ Y_UNIT_TEST_SUITE(DataShardReplication) { Ydb::StatusIds::GENERIC_ERROR); } + Y_UNIT_TEST(ApplyChangesToReplicatedTable) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions() + .Replicated(true) + .ReplicationConsistency(EReplicationConsistency::Weak) + ); + + auto shards = GetTableShards(server, sender, "/Root/table-1"); + auto tableId = ResolveTableId(server, sender, "/Root/table-1"); + + ApplyChanges(server, shards.at(0), tableId, "my-source", { + TChange{ .Offset = 0, .WriteTxId = 0, .Key = 1, .Value = 11 }, + TChange{ .Offset = 1, .WriteTxId = 0, .Key = 2, .Value = 22 }, + TChange{ .Offset = 2, .WriteTxId = 0, .Key = 3, .Value = 33 }, + }); + + auto result = ReadShardedTable(server, "/Root/table-1"); + UNIT_ASSERT_VALUES_EQUAL(result, + "key = 1, value = 11\n" + "key = 2, value = 22\n" + "key = 3, value = 33\n" + ); + } + } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execution_unit.cpp b/ydb/core/tx/datashard/execution_unit.cpp index 22c9294fa9..404bfd25ea 100644 --- a/ydb/core/tx/datashard/execution_unit.cpp +++ b/ydb/core/tx/datashard/execution_unit.cpp @@ -216,7 +216,7 @@ bool TExecutionUnit::CheckRejectDataTx(TOperation::TPtr op, const TActorContext& return true; } - if (!op->IsReadOnly() && DataShard.IsReplicated()) { + if (DataShard.IsReplicated() && !(op->IsReadOnly() || op->IsCommitWritesTx())) { TString err = TStringBuilder() << "Can't execute write tx at replicated table:" << " tablet id: " << DataShard.TabletID(); @@ -261,7 +261,7 @@ bool TExecutionUnit::WillRejectDataTx(TOperation::TPtr op) const { return true; } - if (!op->IsReadOnly() && DataShard.IsReplicated()) { + if (DataShard.IsReplicated() && !(op->IsReadOnly() || op->IsCommitWritesTx())) { return true; } |