aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2024-12-03 17:18:24 +0300
committerGitHub <noreply@github.com>2024-12-03 17:18:24 +0300
commitd080a30dcb410e9ea21d82ce059dd69881b4dec3 (patch)
treef3886c738bc02e6dc55c3acf31476f2c4e70778f
parent7e246c61a1a38d0253cd4bea0562191d4560499a (diff)
downloadydb-d080a30dcb410e9ea21d82ce059dd69881b4dec3.tar.gz
writing enabled flag for ev write (#12250)
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp41
1 files changed, 16 insertions, 25 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 598d321aa0..abc5b19061 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -487,13 +487,13 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}
+ const auto sendError = [&](const TString& message, const NKikimrDataEvents::TEvWriteResult::EStatus status) {
+ Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, status, message);
+ ctx.Send(source, result.release(), 0, cookie);
+ };
if (behaviour == EOperationBehaviour::CommitWriteLock) {
auto commitOperation = std::make_shared<TCommitOperation>(TabletID());
- const auto sendError = [&](const TString& message, const NKikimrDataEvents::TEvWriteResult::EStatus status) {
- Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
- auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, status, message);
- ctx.Send(source, result.release(), 0, cookie);
- };
auto conclusionParse = commitOperation->Parse(*ev->Get());
if (conclusionParse.IsFail()) {
sendError(conclusionParse.GetErrorMessage(), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
@@ -536,37 +536,26 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
const std::optional<NEvWrite::EModificationType> mType =
TEnumOperator<NEvWrite::EModificationType>::DeserializeFromProto(operation.GetType());
if (!mType) {
- Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
- auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST,
- "operation " + NKikimrDataEvents::TEvWrite::TOperation::EOperationType_Name(operation.GetType()) + " is not supported");
- ctx.Send(source, result.release(), 0, cookie);
+ sendError("operation " + NKikimrDataEvents::TEvWrite::TOperation::EOperationType_Name(operation.GetType()) + " is not supported",
+ NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
return;
}
if (!operation.GetTableId().HasSchemaVersion()) {
- Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
- auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
- TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "schema version not set");
- ctx.Send(source, result.release(), 0, cookie);
+ sendError("schema version not set", NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
return;
}
auto schema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaOptional(operation.GetTableId().GetSchemaVersion());
if (!schema) {
- Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
- auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
- TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "unknown schema version");
- ctx.Send(source, result.release(), 0, cookie);
+ sendError("unknown schema version", NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
return;
}
const auto pathId = operation.GetTableId().GetTableId();
if (!TablesManager.IsReadyForWrite(pathId)) {
- Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
- auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
- TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "table not writable");
- ctx.Send(source, result.release(), 0, cookie);
+ sendError("table not writable", NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR);
return;
}
@@ -574,10 +563,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
auto arrowData = std::make_shared<TArrowData>(schema);
if (!arrowData->Parse(operation, NEvWrite::TPayloadReader<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) {
- Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
- auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
- TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "parsing data error");
- ctx.Send(source, result.release(), 0, cookie);
+ sendError("parsing data error", NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
return;
}
@@ -603,6 +589,11 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
lockId = record.GetLockTxId();
}
+ if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) {
+ sendError("writing disabled", NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);
+ return;
+ }
+
OperationsManager->RegisterLock(lockId, Generation());
auto writeOperation = OperationsManager->RegisterOperation(
pathId, lockId, cookie, granuleShardingVersionId, *mType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert());