aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-04-19 12:32:45 +0300
committerilnaz <ilnaz@ydb.tech>2023-04-19 12:32:45 +0300
commitd78a7b261977a1969b10e9e6c5c5c2557200e81c (patch)
treec03a13fee245d804e0118408a18395c0042ba9b7
parent16e830bd9935305badd89ad5d618150f16e3eeee (diff)
downloadydb-d78a7b261977a1969b10e9e6c5c5c2557200e81c.tar.gz
Forbid write transactions at replicated tables
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard__op_rows.cpp46
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h8
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.cpp8
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.h1
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h1
-rw-r--r--ydb/core/tx/datashard/datashard_ut_replication.cpp20
-rw-r--r--ydb/core/tx/datashard/datashard_ut_upload_rows.cpp18
-rw-r--r--ydb/core/tx/datashard/execution_unit.cpp17
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp4
11 files changed, 118 insertions, 11 deletions
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 8df24eb161..c6f78e015f 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -202,7 +202,7 @@ public:
TDataShardEngineHost(TDataShard* self, TEngineBay& engineBay, NTable::TDatabase& db, TEngineHostCounters& counters, ui64& lockTxId, ui32& lockNodeId, TInstant now)
: TEngineHost(db, counters,
TEngineHostSettings(self->TabletID(),
- (self->State == TShardState::Readonly || self->State == TShardState::Frozen),
+ (self->State == TShardState::Readonly || self->State == TShardState::Frozen || self->IsReplicated()),
self->ByKeyFilterDisabled(),
self->GetKeyAccessSampler()))
, Self(self)
diff --git a/ydb/core/tx/datashard/datashard__op_rows.cpp b/ydb/core/tx/datashard/datashard__op_rows.cpp
index e1475e3856..de8ac3599f 100644
--- a/ydb/core/tx/datashard/datashard__op_rows.cpp
+++ b/ydb/core/tx/datashard/datashard__op_rows.cpp
@@ -120,6 +120,10 @@ static void WrongShardState(NKikimrTxDataShard::TEvUploadRowsResponse& response)
response.SetStatus(NKikimrTxDataShard::TError::WRONG_SHARD_STATE);
}
+static void ReadOnly(NKikimrTxDataShard::TEvUploadRowsResponse& response) {
+ response.SetStatus(NKikimrTxDataShard::TError::READONLY);
+}
+
static void OutOfSpace(NKikimrTxDataShard::TEvEraseRowsResponse& response) {
// NOTE: this function is never called, because erase is allowed when out of space
response.SetStatus(NKikimrTxDataShard::TEvEraseRowsResponse::WRONG_SHARD_STATE);
@@ -129,6 +133,28 @@ static void WrongShardState(NKikimrTxDataShard::TEvEraseRowsResponse& response)
response.SetStatus(NKikimrTxDataShard::TEvEraseRowsResponse::WRONG_SHARD_STATE);
}
+static void ExecError(NKikimrTxDataShard::TEvEraseRowsResponse& response) {
+ response.SetStatus(NKikimrTxDataShard::TEvEraseRowsResponse::EXEC_ERROR);
+}
+
+template <typename TEvResponse>
+using TSetStatusFunc = void(*)(typename TEvResponse::ProtoRecordType&);
+
+template <typename TEvResponse, typename TEvRequest>
+static void Reject(TDataShard* self, TEvRequest& ev, const TString& txDesc, const TString& reason,
+ TSetStatusFunc<TEvResponse> setStatusFunc, const TActorContext& ctx)
+{
+ LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, "Rejecting " << txDesc << " request on datashard"
+ << ": tablet# " << self->TabletID()
+ << ", error# " << reason);
+
+ auto response = MakeHolder<TEvResponse>();
+ setStatusFunc(response->Record);
+ response->Record.SetTabletID(self->TabletID());
+ response->Record.SetErrorDescription(reason);
+ ctx.Send(ev->Sender, std::move(response));
+}
+
template <typename TEvResponse, typename TEvRequest>
static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& ctx, const TString& txDesc, bool isWrite) {
NKikimrTxDataShard::TEvProposeTransactionResult::EStatus rejectStatus;
@@ -154,19 +180,11 @@ static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& c
return false;
}
- LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, "Rejecting " << txDesc << " request on datashard"
- << ": tablet# " << self->TabletID()
- << ", error# " << rejectReason);
-
- auto response = MakeHolder<TEvResponse>();
- response->Record.SetTabletID(self->TabletID());
if (outOfSpace) {
- OutOfSpace(response->Record);
+ Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &OutOfSpace, ctx);
} else {
- WrongShardState(response->Record);
+ Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &WrongShardState, ctx);
}
- response->Record.SetErrorDescription(rejectReason);
- ctx.Send(ev->Sender, std::move(response));
return true;
}
@@ -177,6 +195,10 @@ void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TAct
UpdateProposeQueueSize();
return;
}
+ if (IsReplicated()) {
+ return Reject<TEvDataShard::TEvUploadRowsResponse>(this, ev, "bulk upsert",
+ "Can't execute bulk upsert at replicated table", &ReadOnly, ctx);
+ }
if (!MaybeReject<TEvDataShard::TEvUploadRowsResponse>(this, ev, ctx, "bulk upsert", true)) {
Executor()->Execute(new TTxUploadRows(this, ev), ctx);
} else {
@@ -190,6 +212,10 @@ void TDataShard::Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActo
UpdateProposeQueueSize();
return;
}
+ if (IsReplicated()) {
+ return Reject<TEvDataShard::TEvEraseRowsResponse>(this, ev, "erase",
+ "Can't execute erase at replicated table", &ExecError, ctx);
+ }
if (!MaybeReject<TEvDataShard::TEvEraseRowsResponse>(this, ev, ctx, "erase", false)) {
Executor()->Execute(new TTxEraseRows(this, ev), ctx);
} else {
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 65c3d6a147..2b3d32402b 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1416,6 +1416,14 @@ public:
return State == TShardState::Frozen;
}
+ bool IsReplicated() const {
+ for (const auto& [_, info] : TableInfos) {
+ if (info->IsReplicated) {
+ return true;
+ }
+ }
+ return false;
+ }
ui32 Generation() const { return Executor()->Generation(); }
bool IsFollower() const { return Executor()->GetStats().IsFollower; }
diff --git a/ydb/core/tx/datashard/datashard_user_table.cpp b/ydb/core/tx/datashard/datashard_user_table.cpp
index 32e7ba1899..9b1f2f3318 100644
--- a/ydb/core/tx/datashard/datashard_user_table.cpp
+++ b/ydb/core/tx/datashard/datashard_user_table.cpp
@@ -268,6 +268,14 @@ void TUserTable::ParseProto(const NKikimrSchemeOp::TTableDescription& descr)
TableSchemaVersion = descr.GetTableSchemaVersion();
IsBackup = descr.GetIsBackup();
+ switch (descr.GetReplicationConfig().GetMode()) {
+ case NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE:
+ break;
+ default:
+ IsReplicated = true;
+ break;
+ }
+
CheckSpecialColumns();
for (const auto& indexDesc : descr.GetTableIndexes()) {
diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h
index f8a35d9f80..7f22dbf06a 100644
--- a/ydb/core/tx/datashard/datashard_user_table.h
+++ b/ydb/core/tx/datashard/datashard_user_table.h
@@ -359,6 +359,7 @@ struct TUserTable : public TThrRefBase {
TVector<ui32> KeyColumnIds;
TSerializedTableRange Range;
bool IsBackup = false;
+ bool IsReplicated = false;
TMap<TPathId, TTableIndex> Indexes;
TMap<TPathId, TCdcStream> CdcStreams;
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp
index ebdfe970ac..8a0a890dd0 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_common.cpp
@@ -1175,6 +1175,10 @@ void CreateShardedTable(
desc->MutablePartitionConfig()->SetExecutorCacheSize(*opts.ExecutorCacheSize_);
}
+ if (opts.Replicated_) {
+ desc->MutableReplicationConfig()->SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
+ }
+
WaitTxNotification(server, sender, RunSchemeTx(*server->GetRuntime(), std::move(request), sender));
}
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index 404745ba2f..a6430b2f33 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -451,6 +451,7 @@ struct TShardedTableOptions {
TABLE_OPTION(bool, FollowerPromotion, false);
TABLE_OPTION(bool, ExternalStorage, false);
TABLE_OPTION(std::optional<ui64>, ExecutorCacheSize, std::nullopt);
+ TABLE_OPTION(bool, Replicated, false);
#undef TABLE_OPTION
#undef TABLE_OPTION_IMPL
diff --git a/ydb/core/tx/datashard/datashard_ut_replication.cpp b/ydb/core/tx/datashard/datashard_ut_replication.cpp
index ef98ed6619..7906420e0c 100644
--- a/ydb/core/tx/datashard/datashard_ut_replication.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_replication.cpp
@@ -214,6 +214,26 @@ Y_UNIT_TEST_SUITE(DataShardReplication) {
DoSplitMergeChanges(true);
}
+ Y_UNIT_TEST(ReplicatedTable) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+ CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions().Replicated(true));
+
+ ExecSQL(server, sender, "SELECT * FROM `/Root/table-1`");
+ ExecSQL(server, sender, "INSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);", true,
+ Ydb::StatusIds::GENERIC_ERROR);
+ }
+
}
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp
index ce54103122..d826ff4e91 100644
--- a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp
@@ -726,6 +726,24 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) {
"key = 10, value = (empty maybe), extra = (empty maybe)\n");
}
+ Y_UNIT_TEST(UploadRowsToReplicatedTable) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+ CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions().Replicated(true));
+
+ DoUploadTestRows(server, sender, "/Root/table-1", Ydb::Type::UINT32, Ydb::StatusIds::GENERIC_ERROR);
+ }
+
}
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execution_unit.cpp b/ydb/core/tx/datashard/execution_unit.cpp
index 5e8c32a658..22c9294fa9 100644
--- a/ydb/core/tx/datashard/execution_unit.cpp
+++ b/ydb/core/tx/datashard/execution_unit.cpp
@@ -216,6 +216,19 @@ bool TExecutionUnit::CheckRejectDataTx(TOperation::TPtr op, const TActorContext&
return true;
}
+ if (!op->IsReadOnly() && DataShard.IsReplicated()) {
+ TString err = TStringBuilder()
+ << "Can't execute write tx at replicated table:"
+ << " tablet id: " << DataShard.TabletID();
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::EXEC_ERROR)
+ ->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, err);
+
+ LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+ op->Abort();
+ return true;
+ }
+
return false;
}
@@ -248,6 +261,10 @@ bool TExecutionUnit::WillRejectDataTx(TOperation::TPtr op) const {
return true;
}
+ if (!op->IsReadOnly() && DataShard.IsReplicated()) {
+ return true;
+ }
+
return false;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 5ef47bb957..8787e0d877 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -6198,6 +6198,10 @@ void TSchemeShard::FillTableDescriptionForShardIdx(
tableDescr->SetIsBackup(true);
}
+ if (tinfo->HasReplicationConfig()) {
+ tableDescr->MutableReplicationConfig()->CopyFrom(tinfo->ReplicationConfig());
+ }
+
// Fill indexes & cdc streams (if any)
for (const auto& child : pinfo->GetChildren()) {
const auto& childName = child.first;