diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-04-19 12:32:45 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-04-19 12:32:45 +0300 |
commit | d78a7b261977a1969b10e9e6c5c5c2557200e81c (patch) | |
tree | c03a13fee245d804e0118408a18395c0042ba9b7 | |
parent | 16e830bd9935305badd89ad5d618150f16e3eeee (diff) | |
download | ydb-d78a7b261977a1969b10e9e6c5c5c2557200e81c.tar.gz |
Forbid write transactions at replicated tables
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__op_rows.cpp | 46 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_user_table.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_user_table.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_replication.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_upload_rows.cpp | 18 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execution_unit.cpp | 17 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.cpp | 4 |
11 files changed, 118 insertions, 11 deletions
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 8df24eb161..c6f78e015f 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -202,7 +202,7 @@ public: TDataShardEngineHost(TDataShard* self, TEngineBay& engineBay, NTable::TDatabase& db, TEngineHostCounters& counters, ui64& lockTxId, ui32& lockNodeId, TInstant now) : TEngineHost(db, counters, TEngineHostSettings(self->TabletID(), - (self->State == TShardState::Readonly || self->State == TShardState::Frozen), + (self->State == TShardState::Readonly || self->State == TShardState::Frozen || self->IsReplicated()), self->ByKeyFilterDisabled(), self->GetKeyAccessSampler())) , Self(self) diff --git a/ydb/core/tx/datashard/datashard__op_rows.cpp b/ydb/core/tx/datashard/datashard__op_rows.cpp index e1475e3856..de8ac3599f 100644 --- a/ydb/core/tx/datashard/datashard__op_rows.cpp +++ b/ydb/core/tx/datashard/datashard__op_rows.cpp @@ -120,6 +120,10 @@ static void WrongShardState(NKikimrTxDataShard::TEvUploadRowsResponse& response) response.SetStatus(NKikimrTxDataShard::TError::WRONG_SHARD_STATE); } +static void ReadOnly(NKikimrTxDataShard::TEvUploadRowsResponse& response) { + response.SetStatus(NKikimrTxDataShard::TError::READONLY); +} + static void OutOfSpace(NKikimrTxDataShard::TEvEraseRowsResponse& response) { // NOTE: this function is never called, because erase is allowed when out of space response.SetStatus(NKikimrTxDataShard::TEvEraseRowsResponse::WRONG_SHARD_STATE); @@ -129,6 +133,28 @@ static void WrongShardState(NKikimrTxDataShard::TEvEraseRowsResponse& response) response.SetStatus(NKikimrTxDataShard::TEvEraseRowsResponse::WRONG_SHARD_STATE); } +static void ExecError(NKikimrTxDataShard::TEvEraseRowsResponse& response) { + response.SetStatus(NKikimrTxDataShard::TEvEraseRowsResponse::EXEC_ERROR); +} + +template <typename TEvResponse> +using TSetStatusFunc = void(*)(typename TEvResponse::ProtoRecordType&); + +template <typename TEvResponse, typename TEvRequest> +static void Reject(TDataShard* self, TEvRequest& ev, const TString& txDesc, const TString& reason, + TSetStatusFunc<TEvResponse> setStatusFunc, const TActorContext& ctx) +{ + LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, "Rejecting " << txDesc << " request on datashard" + << ": tablet# " << self->TabletID() + << ", error# " << reason); + + auto response = MakeHolder<TEvResponse>(); + setStatusFunc(response->Record); + response->Record.SetTabletID(self->TabletID()); + response->Record.SetErrorDescription(reason); + ctx.Send(ev->Sender, std::move(response)); +} + template <typename TEvResponse, typename TEvRequest> static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& ctx, const TString& txDesc, bool isWrite) { NKikimrTxDataShard::TEvProposeTransactionResult::EStatus rejectStatus; @@ -154,19 +180,11 @@ static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& c return false; } - LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, "Rejecting " << txDesc << " request on datashard" - << ": tablet# " << self->TabletID() - << ", error# " << rejectReason); - - auto response = MakeHolder<TEvResponse>(); - response->Record.SetTabletID(self->TabletID()); if (outOfSpace) { - OutOfSpace(response->Record); + Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &OutOfSpace, ctx); } else { - WrongShardState(response->Record); + Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &WrongShardState, ctx); } - response->Record.SetErrorDescription(rejectReason); - ctx.Send(ev->Sender, std::move(response)); return true; } @@ -177,6 +195,10 @@ void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TAct UpdateProposeQueueSize(); return; } + if (IsReplicated()) { + return Reject<TEvDataShard::TEvUploadRowsResponse>(this, ev, "bulk upsert", + "Can't execute bulk upsert at replicated table", &ReadOnly, ctx); + } if (!MaybeReject<TEvDataShard::TEvUploadRowsResponse>(this, ev, ctx, "bulk upsert", true)) { Executor()->Execute(new TTxUploadRows(this, ev), ctx); } else { @@ -190,6 +212,10 @@ void TDataShard::Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActo UpdateProposeQueueSize(); return; } + if (IsReplicated()) { + return Reject<TEvDataShard::TEvEraseRowsResponse>(this, ev, "erase", + "Can't execute erase at replicated table", &ExecError, ctx); + } if (!MaybeReject<TEvDataShard::TEvEraseRowsResponse>(this, ev, ctx, "erase", false)) { Executor()->Execute(new TTxEraseRows(this, ev), ctx); } else { diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 65c3d6a147..2b3d32402b 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1416,6 +1416,14 @@ public: return State == TShardState::Frozen; } + bool IsReplicated() const { + for (const auto& [_, info] : TableInfos) { + if (info->IsReplicated) { + return true; + } + } + return false; + } ui32 Generation() const { return Executor()->Generation(); } bool IsFollower() const { return Executor()->GetStats().IsFollower; } diff --git a/ydb/core/tx/datashard/datashard_user_table.cpp b/ydb/core/tx/datashard/datashard_user_table.cpp index 32e7ba1899..9b1f2f3318 100644 --- a/ydb/core/tx/datashard/datashard_user_table.cpp +++ b/ydb/core/tx/datashard/datashard_user_table.cpp @@ -268,6 +268,14 @@ void TUserTable::ParseProto(const NKikimrSchemeOp::TTableDescription& descr) TableSchemaVersion = descr.GetTableSchemaVersion(); IsBackup = descr.GetIsBackup(); + switch (descr.GetReplicationConfig().GetMode()) { + case NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE: + break; + default: + IsReplicated = true; + break; + } + CheckSpecialColumns(); for (const auto& indexDesc : descr.GetTableIndexes()) { diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h index f8a35d9f80..7f22dbf06a 100644 --- a/ydb/core/tx/datashard/datashard_user_table.h +++ b/ydb/core/tx/datashard/datashard_user_table.h @@ -359,6 +359,7 @@ struct TUserTable : public TThrRefBase { TVector<ui32> KeyColumnIds; TSerializedTableRange Range; bool IsBackup = false; + bool IsReplicated = false; TMap<TPathId, TTableIndex> Indexes; TMap<TPathId, TCdcStream> CdcStreams; diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp index ebdfe970ac..8a0a890dd0 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/datashard_ut_common.cpp @@ -1175,6 +1175,10 @@ void CreateShardedTable( desc->MutablePartitionConfig()->SetExecutorCacheSize(*opts.ExecutorCacheSize_); } + if (opts.Replicated_) { + desc->MutableReplicationConfig()->SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY); + } + WaitTxNotification(server, sender, RunSchemeTx(*server->GetRuntime(), std::move(request), sender)); } diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index 404745ba2f..a6430b2f33 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -451,6 +451,7 @@ struct TShardedTableOptions { TABLE_OPTION(bool, FollowerPromotion, false); TABLE_OPTION(bool, ExternalStorage, false); TABLE_OPTION(std::optional<ui64>, ExecutorCacheSize, std::nullopt); + TABLE_OPTION(bool, Replicated, false); #undef TABLE_OPTION #undef TABLE_OPTION_IMPL diff --git a/ydb/core/tx/datashard/datashard_ut_replication.cpp b/ydb/core/tx/datashard/datashard_ut_replication.cpp index ef98ed6619..7906420e0c 100644 --- a/ydb/core/tx/datashard/datashard_ut_replication.cpp +++ b/ydb/core/tx/datashard/datashard_ut_replication.cpp @@ -214,6 +214,26 @@ Y_UNIT_TEST_SUITE(DataShardReplication) { DoSplitMergeChanges(true); } + Y_UNIT_TEST(ReplicatedTable) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions().Replicated(true)); + + ExecSQL(server, sender, "SELECT * FROM `/Root/table-1`"); + ExecSQL(server, sender, "INSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);", true, + Ydb::StatusIds::GENERIC_ERROR); + } + } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp index ce54103122..d826ff4e91 100644 --- a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp +++ b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp @@ -726,6 +726,24 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) { "key = 10, value = (empty maybe), extra = (empty maybe)\n"); } + Y_UNIT_TEST(UploadRowsToReplicatedTable) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); + + InitRoot(server, sender); + CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions().Replicated(true)); + + DoUploadTestRows(server, sender, "/Root/table-1", Ydb::Type::UINT32, Ydb::StatusIds::GENERIC_ERROR); + } + } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execution_unit.cpp b/ydb/core/tx/datashard/execution_unit.cpp index 5e8c32a658..22c9294fa9 100644 --- a/ydb/core/tx/datashard/execution_unit.cpp +++ b/ydb/core/tx/datashard/execution_unit.cpp @@ -216,6 +216,19 @@ bool TExecutionUnit::CheckRejectDataTx(TOperation::TPtr op, const TActorContext& return true; } + if (!op->IsReadOnly() && DataShard.IsReplicated()) { + TString err = TStringBuilder() + << "Can't execute write tx at replicated table:" + << " tablet id: " << DataShard.TabletID(); + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::EXEC_ERROR) + ->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, err); + + LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, err); + + op->Abort(); + return true; + } + return false; } @@ -248,6 +261,10 @@ bool TExecutionUnit::WillRejectDataTx(TOperation::TPtr op) const { return true; } + if (!op->IsReadOnly() && DataShard.IsReplicated()) { + return true; + } + return false; } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 5ef47bb957..8787e0d877 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -6198,6 +6198,10 @@ void TSchemeShard::FillTableDescriptionForShardIdx( tableDescr->SetIsBackup(true); } + if (tinfo->HasReplicationConfig()) { + tableDescr->MutableReplicationConfig()->CopyFrom(tinfo->ReplicationConfig()); + } + // Fill indexes & cdc streams (if any) for (const auto& child : pinfo->GetChildren()) { const auto& childName = child.first; |