aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-04-26 16:44:19 +0300
committerilnaz <ilnaz@ydb.tech>2023-04-26 16:44:19 +0300
commit6d73358874922366b2611962ff0179b01fb838a0 (patch)
tree8abda74daf439ebf3d2b55a8d9443cee62d08a56
parent50b1a3669d6924e89fde02dc52208d793bba1fbd (diff)
downloadydb-6d73358874922366b2611962ff0179b01fb838a0.tar.gz
Apply changes
-rw-r--r--ydb/core/protos/flat_scheme_op.proto7
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h2
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp12
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.h2
-rw-r--r--ydb/core/tx/datashard/datashard_repl_apply.cpp84
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.cpp18
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.h29
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h6
-rw-r--r--ydb/core/tx/datashard/datashard_ut_compaction.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard_ut_replication.cpp55
-rw-r--r--ydb/core/tx/datashard/execution_unit.cpp4
12 files changed, 190 insertions, 39 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 87455da8b2..895d6040db 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -331,7 +331,14 @@ message TTableReplicationConfig {
REPLICATION_MODE_READ_ONLY = 1;
}
+ enum EConsistency {
+ CONSISTENCY_UNKNOWN = 0;
+ CONSISTENCY_STRONG = 1;
+ CONSISTENCY_WEAK = 2;
+ }
+
optional EReplicationMode Mode = 1;
+ optional EConsistency Consistency = 2;
}
message TTableDescription {
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 2b3d32402b..3e57671368 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1418,7 +1418,7 @@ public:
bool IsReplicated() const {
for (const auto& [_, info] : TableInfos) {
- if (info->IsReplicated) {
+ if (info->IsReplicated()) {
return true;
}
}
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index a9447a96f8..7d8dd3a1d4 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -1898,6 +1898,12 @@ TOperation::TPtr TPipeline::FindCompletingOp(ui64 txId) const {
return nullptr;
}
+void TPipeline::AddCommittingOp(const TRowVersion& version) {
+ if (!Self->IsMvccEnabled())
+ return;
+ CommittingOps.Add(version);
+}
+
void TPipeline::AddCommittingOp(const TOperation::TPtr& op) {
if (!Self->IsMvccEnabled() || op->IsReadOnly())
return;
@@ -1909,6 +1915,12 @@ void TPipeline::AddCommittingOp(const TOperation::TPtr& op) {
CommittingOps.Add(version);
}
+void TPipeline::RemoveCommittingOp(const TRowVersion& version) {
+ if (!Self->IsMvccEnabled())
+ return;
+ CommittingOps.Remove(version);
+}
+
void TPipeline::RemoveCommittingOp(const TOperation::TPtr& op) {
if (!Self->IsMvccEnabled() || op->IsReadOnly())
return;
diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h
index a57aaf6bde..16d3e332a9 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.h
+++ b/ydb/core/tx/datashard/datashard_pipeline.h
@@ -352,7 +352,9 @@ public:
void RemoveCompletingOp(const TOperation::TPtr& op);
TOperation::TPtr FindCompletingOp(ui64 txId) const;
+ void AddCommittingOp(const TRowVersion& version);
void AddCommittingOp(const TOperation::TPtr& op);
+ void RemoveCommittingOp(const TRowVersion& version);
void RemoveCommittingOp(const TOperation::TPtr& op);
bool WaitCompletion(const TOperation::TPtr& op) const;
bool HasCommittingOpsBelow(TRowVersion upperBound) const;
diff --git a/ydb/core/tx/datashard/datashard_repl_apply.cpp b/ydb/core/tx/datashard/datashard_repl_apply.cpp
index 169fd0203d..205c5528a1 100644
--- a/ydb/core/tx/datashard/datashard_repl_apply.cpp
+++ b/ydb/core/tx/datashard/datashard_repl_apply.cpp
@@ -9,8 +9,10 @@ using namespace NTabletFlatExecutor;
class TDataShard::TTxApplyReplicationChanges : public TTransactionBase<TDataShard> {
public:
- explicit TTxApplyReplicationChanges(TDataShard* self, TEvDataShard::TEvApplyReplicationChanges::TPtr&& ev)
+ explicit TTxApplyReplicationChanges(TDataShard* self, TPipeline& pipeline,
+ TEvDataShard::TEvApplyReplicationChanges::TPtr&& ev)
: TTransactionBase(self)
+ , Pipeline(pipeline)
, Ev(std::move(ev))
{
}
@@ -22,8 +24,7 @@ public:
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
Y_UNUSED(ctx);
- // TODO: check this is a replicated shard
- if (Self->State != TShardState::Ready) {
+ if (Self->State != TShardState::Ready && !Self->IsReplicated()) {
Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>(
NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED,
NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_WRONG_STATE);
@@ -33,11 +34,11 @@ public:
const auto& msg = Ev->Get()->Record;
const auto& tableId = msg.GetTableId();
- TPathId pathId(tableId.GetOwnerId(), tableId.GetTableId());
+ const TTableId fullTableId(tableId.GetOwnerId(), tableId.GetTableId());
const auto& userTables = Self->GetUserTables();
- auto it = userTables.find(pathId.LocalPathId);
- if (pathId.OwnerId != Self->GetPathOwnerId() || it == userTables.end()) {
+ auto it = userTables.find(fullTableId.PathId.LocalPathId);
+ if (fullTableId.PathId.OwnerId != Self->GetPathOwnerId() || it == userTables.end()) {
TString error = TStringBuilder()
<< "DataShard " << Self->TabletID() << " does not have a table "
<< tableId.GetOwnerId() << ":" << tableId.GetTableId();
@@ -62,17 +63,24 @@ public:
return true;
}
- auto& source = EnsureSource(txc, pathId, msg.GetSource());
+ auto& source = EnsureSource(txc, fullTableId.PathId, msg.GetSource());
for (const auto& change : msg.GetChanges()) {
- if (!ApplyChange(txc, userTable, source, change)) {
+ if (!ApplyChange(txc, fullTableId, userTable, source, change)) {
Y_VERIFY(Result);
- return true;
+ break;
}
}
- Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>(
- NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_OK);
+ if (MvccReadWriteVersion) {
+ Pipeline.AddCommittingOp(*MvccReadWriteVersion);
+ }
+
+ if (!Result) {
+ Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>(
+ NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_OK);
+ }
+
return true;
}
@@ -84,19 +92,31 @@ public:
}
bool ApplyChange(
- TTransactionContext& txc, const TUserTable& userTable, TReplicationSourceState& source,
- const NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& change)
+ TTransactionContext& txc, const TTableId& tableId, const TUserTable& userTable,
+ TReplicationSourceState& source, const NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& change)
{
+ Y_VERIFY(userTable.IsReplicated());
+
// TODO: check source and offset, persist new values
i64 sourceOffset = change.GetSourceOffset();
ui64 writeTxId = change.GetWriteTxId();
- if (Y_UNLIKELY(writeTxId == 0)) {
- Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>(
- NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED,
- NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST,
- "Every change must specify a non-zero WriteTxId");
- return false;
+ if (userTable.ReplicationConfig.HasWeakConsistency()) {
+ if (writeTxId) {
+ Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>(
+ NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED,
+ NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST,
+ "WriteTxId cannot be specified for weak consistency");
+ return false;
+ }
+ } else {
+ if (writeTxId == 0) {
+ Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>(
+ NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED,
+ NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST,
+ "Non-zero WriteTxId must be specified for strong consistency");
+ return false;
+ }
}
TSerializedCellVec keyCellVec;
@@ -153,7 +173,19 @@ public:
}
}
- txc.DB.UpdateTx(userTable.LocalTid, rop, key, update, writeTxId);
+ if (writeTxId) {
+ txc.DB.UpdateTx(userTable.LocalTid, rop, key, update, writeTxId);
+ } else {
+ if (!MvccReadWriteVersion) {
+ auto [readVersion, writeVersion] = Self->GetReadWriteVersions();
+ Y_VERIFY_DEBUG(readVersion == writeVersion);
+ MvccReadWriteVersion = writeVersion;
+ }
+
+ Self->SysLocksTable().BreakLocks(tableId, keyCellVec.GetCells());
+ txc.DB.Update(userTable.LocalTid, rop, key, update, *MvccReadWriteVersion);
+ }
+
return true;
}
@@ -201,16 +233,24 @@ public:
void Complete(const TActorContext& ctx) override {
Y_VERIFY(Ev);
Y_VERIFY(Result);
- ctx.Send(Ev->Sender, Result.Release(), 0, Ev->Cookie);
+
+ if (MvccReadWriteVersion) {
+ Pipeline.RemoveCommittingOp(*MvccReadWriteVersion);
+ Self->SendImmediateWriteResult(*MvccReadWriteVersion, Ev->Sender, Result.Release(), Ev->Cookie);
+ } else {
+ ctx.Send(Ev->Sender, Result.Release(), 0, Ev->Cookie);
+ }
}
private:
+ TPipeline& Pipeline;
TEvDataShard::TEvApplyReplicationChanges::TPtr Ev;
THolder<TEvDataShard::TEvApplyReplicationChangesResult> Result;
+ std::optional<TRowVersion> MvccReadWriteVersion;
}; // TTxApplyReplicationChanges
void TDataShard::Handle(TEvDataShard::TEvApplyReplicationChanges::TPtr& ev, const TActorContext& ctx) {
- Execute(new TTxApplyReplicationChanges(this, std::move(ev)), ctx);
+ Execute(new TTxApplyReplicationChanges(this, Pipeline, std::move(ev)), ctx);
}
} // NDataShard
diff --git a/ydb/core/tx/datashard/datashard_user_table.cpp b/ydb/core/tx/datashard/datashard_user_table.cpp
index 9b1f2f3318..217f079987 100644
--- a/ydb/core/tx/datashard/datashard_user_table.cpp
+++ b/ydb/core/tx/datashard/datashard_user_table.cpp
@@ -188,6 +188,15 @@ bool TUserTable::NeedSchemaSnapshots() const {
return JsonCdcStreamCount > 0;
}
+bool TUserTable::IsReplicated() const {
+ switch (ReplicationConfig.Mode) {
+ case NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE:
+ return false;
+ default:
+ return true;
+ }
+}
+
void TUserTable::ParseProto(const NKikimrSchemeOp::TTableDescription& descr)
{
// We expect schemeshard to send us full list of storage rooms
@@ -267,14 +276,7 @@ void TUserTable::ParseProto(const NKikimrSchemeOp::TTableDescription& descr)
TableSchemaVersion = descr.GetTableSchemaVersion();
IsBackup = descr.GetIsBackup();
-
- switch (descr.GetReplicationConfig().GetMode()) {
- case NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE:
- break;
- default:
- IsReplicated = true;
- break;
- }
+ ReplicationConfig = TReplicationConfig(descr.GetReplicationConfig());
CheckSpecialColumns();
diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h
index 7f22dbf06a..c1b3120752 100644
--- a/ydb/core/tx/datashard/datashard_user_table.h
+++ b/ydb/core/tx/datashard/datashard_user_table.h
@@ -308,6 +308,31 @@ struct TUserTable : public TThrRefBase {
}
};
+ struct TReplicationConfig {
+ NKikimrSchemeOp::TTableReplicationConfig::EReplicationMode Mode;
+ NKikimrSchemeOp::TTableReplicationConfig::EConsistency Consistency;
+
+ TReplicationConfig()
+ : Mode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE)
+ , Consistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_UNKNOWN)
+ {
+ }
+
+ TReplicationConfig(const NKikimrSchemeOp::TTableReplicationConfig& config)
+ : Mode(config.GetMode())
+ , Consistency(config.GetConsistency())
+ {
+ }
+
+ bool HasWeakConsistency() const {
+ return Consistency == NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK;
+ }
+
+ bool HasStrongConsistency() const {
+ return Consistency == NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_STRONG;
+ }
+ };
+
struct TStats {
NTable::TStats DataStats;
ui64 IndexSize = 0;
@@ -358,8 +383,8 @@ struct TUserTable : public TThrRefBase {
TVector<NScheme::TTypeInfo> KeyColumnTypes;
TVector<ui32> KeyColumnIds;
TSerializedTableRange Range;
+ TReplicationConfig ReplicationConfig;
bool IsBackup = false;
- bool IsReplicated = false;
TMap<TPathId, TTableIndex> Indexes;
TMap<TPathId, TCdcStream> CdcStreams;
@@ -421,6 +446,8 @@ struct TUserTable : public TThrRefBase {
bool HasCdcStreams() const;
bool NeedSchemaSnapshots() const;
+ bool IsReplicated() const;
+
private:
void DoApplyCreate(NTabletFlatExecutor::TTransactionContext& txc, const TString& tableName, bool shadow,
const NKikimrSchemeOp::TPartitionConfig& partConfig) const;
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp
index 8a0a890dd0..1ede140537 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_common.cpp
@@ -1179,6 +1179,11 @@ void CreateShardedTable(
desc->MutableReplicationConfig()->SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
}
+ if (opts.ReplicationConsistency_) {
+ desc->MutableReplicationConfig()->SetConsistency(
+ static_cast<NKikimrSchemeOp::TTableReplicationConfig::EConsistency>(*opts.ReplicationConsistency_));
+ }
+
WaitTxNotification(server, sender, RunSchemeTx(*server->GetRuntime(), std::move(request), sender));
}
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index a6430b2f33..2839c518a6 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -395,6 +395,11 @@ enum class EShadowDataMode {
Enabled,
};
+enum class EReplicationConsistency: int {
+ Strong = 1,
+ Weak = 2,
+};
+
struct TShardedTableOptions {
using TSelf = TShardedTableOptions;
@@ -452,6 +457,7 @@ struct TShardedTableOptions {
TABLE_OPTION(bool, ExternalStorage, false);
TABLE_OPTION(std::optional<ui64>, ExecutorCacheSize, std::nullopt);
TABLE_OPTION(bool, Replicated, false);
+ TABLE_OPTION(std::optional<EReplicationConsistency>, ReplicationConsistency, std::nullopt);
#undef TABLE_OPTION
#undef TABLE_OPTION_IMPL
diff --git a/ydb/core/tx/datashard/datashard_ut_compaction.cpp b/ydb/core/tx/datashard/datashard_ut_compaction.cpp
index 29bf2f966e..947728d89d 100644
--- a/ydb/core/tx/datashard/datashard_ut_compaction.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_compaction.cpp
@@ -142,7 +142,10 @@ Y_UNIT_TEST_SUITE(DataShardCompaction) {
InitRoot(server, sender);
- CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions()
+ .Replicated(true)
+ .ReplicationConsistency(EReplicationConsistency::Strong)
+ );
auto shards1 = GetTableShards(server, sender, "/Root/table-1");
UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u);
diff --git a/ydb/core/tx/datashard/datashard_ut_replication.cpp b/ydb/core/tx/datashard/datashard_ut_replication.cpp
index 7906420e0c..00b78a2789 100644
--- a/ydb/core/tx/datashard/datashard_ut_replication.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_replication.cpp
@@ -26,8 +26,14 @@ Y_UNIT_TEST_SUITE(DataShardReplication) {
InitRoot(server, sender);
- CreateShardedTable(server, sender, "/Root", "table-1", 1);
- CreateShardedTable(server, sender, "/Root", "table-2", 1);
+ CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions()
+ .Replicated(true)
+ .ReplicationConsistency(EReplicationConsistency::Strong)
+ );
+ CreateShardedTable(server, sender, "/Root", "table-2", TShardedTableOptions()
+ .Replicated(true)
+ .ReplicationConsistency(EReplicationConsistency::Strong)
+ );
auto shards1 = GetTableShards(server, sender, "/Root/table-1");
auto shards2 = GetTableShards(server, sender, "/Root/table-2");
@@ -96,8 +102,14 @@ Y_UNIT_TEST_SUITE(DataShardReplication) {
InitRoot(server, sender);
- CreateShardedTable(server, sender, "/Root", "table-1", 1);
- CreateShardedTable(server, sender, "/Root", "table-2", 1);
+ CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions()
+ .Replicated(true)
+ .ReplicationConsistency(EReplicationConsistency::Strong)
+ );
+ CreateShardedTable(server, sender, "/Root", "table-2", TShardedTableOptions()
+ .Replicated(true)
+ .ReplicationConsistency(EReplicationConsistency::Strong)
+ );
auto shards1 = GetTableShards(server, sender, "/Root/table-1");
auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
@@ -234,6 +246,41 @@ Y_UNIT_TEST_SUITE(DataShardReplication) {
Ydb::StatusIds::GENERIC_ERROR);
}
+ Y_UNIT_TEST(ApplyChangesToReplicatedTable) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+ CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions()
+ .Replicated(true)
+ .ReplicationConsistency(EReplicationConsistency::Weak)
+ );
+
+ auto shards = GetTableShards(server, sender, "/Root/table-1");
+ auto tableId = ResolveTableId(server, sender, "/Root/table-1");
+
+ ApplyChanges(server, shards.at(0), tableId, "my-source", {
+ TChange{ .Offset = 0, .WriteTxId = 0, .Key = 1, .Value = 11 },
+ TChange{ .Offset = 1, .WriteTxId = 0, .Key = 2, .Value = 22 },
+ TChange{ .Offset = 2, .WriteTxId = 0, .Key = 3, .Value = 33 },
+ });
+
+ auto result = ReadShardedTable(server, "/Root/table-1");
+ UNIT_ASSERT_VALUES_EQUAL(result,
+ "key = 1, value = 11\n"
+ "key = 2, value = 22\n"
+ "key = 3, value = 33\n"
+ );
+ }
+
}
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execution_unit.cpp b/ydb/core/tx/datashard/execution_unit.cpp
index 22c9294fa9..404bfd25ea 100644
--- a/ydb/core/tx/datashard/execution_unit.cpp
+++ b/ydb/core/tx/datashard/execution_unit.cpp
@@ -216,7 +216,7 @@ bool TExecutionUnit::CheckRejectDataTx(TOperation::TPtr op, const TActorContext&
return true;
}
- if (!op->IsReadOnly() && DataShard.IsReplicated()) {
+ if (DataShard.IsReplicated() && !(op->IsReadOnly() || op->IsCommitWritesTx())) {
TString err = TStringBuilder()
<< "Can't execute write tx at replicated table:"
<< " tablet id: " << DataShard.TabletID();
@@ -261,7 +261,7 @@ bool TExecutionUnit::WillRejectDataTx(TOperation::TPtr op) const {
return true;
}
- if (!op->IsReadOnly() && DataShard.IsReplicated()) {
+ if (DataShard.IsReplicated() && !(op->IsReadOnly() || op->IsCommitWritesTx())) {
return true;
}