aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <145343289+azevaykin@users.noreply.github.com>2024-01-22 15:55:29 +0300
committerGitHub <noreply@github.com>2024-01-22 15:55:29 +0300
commitc76a19bbb5498b6ce481075ff9cd8c5f69bdd864 (patch)
tree003c45ef32e9a2d8523363b851c647d4b3ee8390
parent4a40fe6b958430b860964653a4f50c8d9c45e93e (diff)
downloadydb-c76a19bbb5498b6ce481075ff9cd8c5f69bdd864.tar.gz
Simplify error handling (#1181)
-rw-r--r--ydb/core/tx/datashard/check_write_unit.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_write_operation.cpp36
-rw-r--r--ydb/core/tx/datashard/datashard_write_operation.h27
-rw-r--r--ydb/core/tx/datashard/write_unit.cpp2
5 files changed, 42 insertions, 31 deletions
diff --git a/ydb/core/tx/datashard/check_write_unit.cpp b/ydb/core/tx/datashard/check_write_unit.cpp
index 295da3c72c..cf5abac1f5 100644
--- a/ydb/core/tx/datashard/check_write_unit.cpp
+++ b/ydb/core/tx/datashard/check_write_unit.cpp
@@ -64,7 +64,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
DataShard.IncCounter(COUNTER_WRITE_OUT_OF_SPACE);
- writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err, DataShard.TabletID());
+ writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);
LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
@@ -88,7 +88,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
DataShard.IncCounter(COUNTER_WRITE_OUT_OF_SPACE);
- writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err, DataShard.TabletID());
+ writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);
LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
@@ -105,7 +105,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
if (!Pipeline.AssignPlanInterval(op)) {
TString err = TStringBuilder() << "Can't propose tx " << op->GetTxId() << " at blocked shard " << DataShard.TabletID();
- writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err, DataShard.TabletID());
+ writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);
LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, err);
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index 20e69bf2c7..980abdedb8 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -1584,7 +1584,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr&
Y_ABORT_UNLESS(writeTx);
auto badRequest = [&](const TString& error) {
- writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << error << " at tablet# " << Self->TabletID(), Self->TabletID());
+ writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << error << " at tablet# " << Self->TabletID());
LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error);
};
diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp
index fc459ae97e..671240c8dd 100644
--- a/ydb/core/tx/datashard/datashard_write_operation.cpp
+++ b/ydb/core/tx/datashard/datashard_write_operation.cpp
@@ -20,6 +20,8 @@ namespace NDataShard {
TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TStepOrder& stepTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite::TPtr& ev)
: Ev(ev)
, EngineBay(self, txc, ctx, stepTxId.ToPair())
+ , TabletId(self->TabletID())
+ , Ctx(ctx)
, StepTxId(stepTxId)
, ReceivedAt(receivedAt)
, TxSize(0)
@@ -35,16 +37,14 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc,
if (Immediate())
EngineBay.SetIsImmediateTx();
- auto& typeRegistry = *AppData()->TypeRegistry;
-
NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;
- LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Parsing write transaction for " << StepTxId << " at " << self->TabletID() << ", record: " << GetRecord().ShortDebugString());
+ LOG_TRACE_S(Ctx, NKikimrServices::TX_DATASHARD, "Parsing write transaction for " << StepTxId << " at " << TabletId << ", record: " << GetRecord().ShortDebugString());
if (!ParseRecord(self->TableInfos))
return;
- SetTxKeys(RecordOperation().GetColumnIds(), typeRegistry, self->TabletID(), ctx);
+ SetTxKeys(RecordOperation().GetColumnIds());
KqpSetTxLocksKeys(GetKqpLocks(), self->SysLocksTable(), EngineBay);
EngineBay.MarkTxLoaded();
@@ -176,15 +176,15 @@ TVector<TKeyValidator::TColumnWriteMeta> GetColumnWrites(const ::google::protobu
return writeColumns;
}
-void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags, const NScheme::TTypeRegistry& typeRegistry, ui64 tabletId, const TActorContext& ctx)
+void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags)
{
TVector<TCell> keyCells;
for (ui32 rowIdx = 0; rowIdx <Matrix.GetRowCount(); ++rowIdx)
{
Matrix.GetSubmatrix(rowIdx, rowIdx, 0, TableInfo->KeyColumnIds.size() - 1, keyCells);
- LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Table " << TableInfo->Path << ", shard: " << tabletId << ", "
- << "write point " << DebugPrintPoint(TableInfo->KeyColumnTypes, keyCells, typeRegistry));
+ LOG_TRACE_S(Ctx, NKikimrServices::TX_DATASHARD, "Table " << TableInfo->Path << ", shard: " << TabletId << ", "
+ << "write point " << DebugPrintPoint(TableInfo->KeyColumnTypes, keyCells, *AppData()->TypeRegistry));
TTableRange tableRange(keyCells);
EngineBay.GetKeyValidator().AddWriteRange(TableId, tableRange, TableInfo->KeyColumnTypes, GetColumnWrites(columnTags), false);
}
@@ -254,6 +254,8 @@ TWriteOperation* TWriteOperation::CastWriteOperation(TOperation::TPtr op)
TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr ev, TDataShard* self, TTransactionContext& txc, const TActorContext& ctx)
: TOperation(op)
, Ev(ev)
+ , TabletId(self->TabletID())
+ , Ctx(ctx)
, ArtifactFlags(0)
, TxCacheUsage(0)
, ReleasedTxDataSize(0)
@@ -350,11 +352,11 @@ void TWriteOperation::ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBase&
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << GetTxId() << " released its data");
}
-void TWriteOperation::DbStoreLocksAccessLog(ui64 tabletId, TTransactionContext& txc, const TActorContext& ctx)
+void TWriteOperation::DbStoreLocksAccessLog(NTable::TDatabase& txcDb)
{
using Schema = TDataShard::Schema;
- NIceDb::TNiceDb db(txc.DB);
+ NIceDb::TNiceDb db(txcDb);
using TLocksVector = TVector<TSysTables::TLocksTable::TPersistentLock>;
TLocksVector vec;
@@ -368,17 +370,17 @@ void TWriteOperation::DbStoreLocksAccessLog(ui64 tabletId, TTransactionContext&
TStringBuf vecData(vecDataStart, vecDataSize);
db.Table<Schema::TxArtifacts>().Key(GetTxId()).Update(NIceDb::TUpdate<Schema::TxArtifacts::Locks>(vecData));
- LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing " << vec.size() << " locks for txid=" << GetTxId() << " in " << tabletId);
+ LOG_TRACE_S(Ctx, NKikimrServices::TX_DATASHARD, "Storing " << vec.size() << " locks for txid=" << GetTxId() << " in " << TabletId);
}
-void TWriteOperation::DbStoreArtifactFlags(ui64 tabletId, TTransactionContext& txc, const TActorContext& ctx)
+void TWriteOperation::DbStoreArtifactFlags(NTable::TDatabase& txcDb)
{
using Schema = TDataShard::Schema;
- NIceDb::TNiceDb db(txc.DB);
+ NIceDb::TNiceDb db(txcDb);
db.Table<Schema::TxArtifacts>().Key(GetTxId()).Update<Schema::TxArtifacts::Flags>(ArtifactFlags);
- LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing artifactflags=" << ArtifactFlags << " for txid=" << GetTxId() << " in " << tabletId);
+ LOG_TRACE_S(Ctx, NKikimrServices::TX_DATASHARD, "Storing artifactflags=" << ArtifactFlags << " for txid=" << GetTxId() << " in " << TabletId);
}
ui64 TWriteOperation::GetMemoryConsumption() const {
@@ -418,7 +420,7 @@ ERestoreDataStatus TWriteOperation::RestoreTxData(
TVector<TSysTables::TLocksTable::TLock> locks;
if (!IsImmediate() && !HasVolatilePrepareFlag()) {
- NIceDb::TNiceDb db(txc.DB);ExtractKeys
+ NIceDb::TNiceDb db(txc.DB);
bool ok = self->TransQueue.LoadTxDetails(db, GetTxId(), Target, Ev, locks, ArtifactFlags);
if (!ok) {
Ev.Reset();
@@ -437,7 +439,7 @@ ERestoreDataStatus TWriteOperation::RestoreTxData(
bool extractKeys = WriteTx->IsTxInfoLoaded();
WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, ctx, GetStepOrder(), GetReceivedAt(), Ev);
if (WriteTx->Ready() && extractKeys) {
- WriteTx->ExtractKeys(true);
+ WriteTx->ExtractKeys();
}
if (!WriteTx->Ready()) {
@@ -521,9 +523,9 @@ void TWriteOperation::UntrackMemory() const {
NActors::NMemory::TLabel<MemoryLabelActiveTransactionBody>::Sub(GetRecord().SpaceUsed());
}
-void TWriteOperation::SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg, ui64 tabletId) {
+void TWriteOperation::SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg) {
SetAbortedFlag();
- WriteResult = NEvents::TDataEvents::TEvWriteResult::BuildError(tabletId, GetTxId(), status, errorMsg);
+ WriteResult = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletId, GetTxId(), status, errorMsg);
}
void TWriteOperation::SetWriteResult(std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>&& writeResult) {
diff --git a/ydb/core/tx/datashard/datashard_write_operation.h b/ydb/core/tx/datashard/datashard_write_operation.h
index b62c8a7124..9cf371f5a8 100644
--- a/ydb/core/tx/datashard/datashard_write_operation.h
+++ b/ydb/core/tx/datashard/datashard_write_operation.h
@@ -74,11 +74,6 @@ public:
return TxInfo().DynKeysCount != 0;
}
- // TODO: It's an expensive operation (Precharge() inside). We need avoid it.
- TEngineBay::TSizes CalcReadSizes(bool needsTotalKeysSize) const {
- return EngineBay.CalcSizes(needsTotalKeysSize);
- }
-
ui64 GetMemoryAllocated() const {
return EngineBay.GetEngine() ? EngineBay.GetEngine()->GetMemoryAllocated() : 0;
}
@@ -92,6 +87,14 @@ public:
void DestroyEngine() {
EngineBay.DestroyEngine();
}
+
+ TKeyValidator& GetKeyValidator() {
+ return EngineBay.GetKeyValidator();
+ }
+ const TKeyValidator& GetKeyValidator() const {
+ return EngineBay.GetKeyValidator();
+ }
+
const NMiniKQL::TEngineHostCounters& GetCounters() {
return EngineBay.GetCounters();
}
@@ -145,7 +148,7 @@ public:
}
bool ParseRecord(const TDataShard::TTableInfos& tableInfos);
- void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds, const NScheme::TTypeRegistry& typeRegistry, ui64 tabletId, const TActorContext& ctx);
+ void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds);
ui32 ExtractKeys(bool allowErrors);
bool ReValidateKeys();
@@ -175,6 +178,9 @@ private:
const NEvents::TDataEvents::TEvWrite::TPtr& Ev;
TEngineBay EngineBay;
+ const ui64 TabletId;
+ const TActorContext& Ctx;
+
YDB_ACCESSOR_DEF(TActorId, Source);
YDB_READONLY(TStepOrder, StepTxId, TStepOrder(0, 0));
@@ -262,8 +268,8 @@ public:
return ArtifactFlags & LOCKS_STORED;
}
- void DbStoreLocksAccessLog(ui64 tabletId, TTransactionContext& txc, const TActorContext& ctx);
- void DbStoreArtifactFlags(ui64 tabletId, TTransactionContext& txc, const TActorContext& ctx);
+ void DbStoreLocksAccessLog(NTable::TDatabase& txcDb);
+ void DbStoreArtifactFlags(NTable::TDatabase& txcDb);
ui64 GetMemoryConsumption() const;
@@ -335,7 +341,7 @@ public:
return std::move(WriteResult);
}
- void SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg, ui64 tabletId);
+ void SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg);
void SetWriteResult(std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>&& writeResult);
private:
@@ -347,6 +353,9 @@ private:
TValidatedWriteTx::TPtr WriteTx;
std::unique_ptr<NEvents::TDataEvents::TEvWriteResult> WriteResult;
+ const ui64 TabletId;
+ const TActorContext& Ctx;
+
YDB_READONLY_DEF(ui64, ArtifactFlags);
YDB_ACCESSOR_DEF(ui64, TxCacheUsage);
YDB_ACCESSOR_DEF(ui64, ReleasedTxDataSize);
diff --git a/ydb/core/tx/datashard/write_unit.cpp b/ydb/core/tx/datashard/write_unit.cpp
index 2d195a42a9..4c70820b28 100644
--- a/ydb/core/tx/datashard/write_unit.cpp
+++ b/ydb/core/tx/datashard/write_unit.cpp
@@ -44,7 +44,7 @@ public:
const TTableId fullTableId(self->GetPathOwnerId(), tableId);
const ui64 localTableId = self->GetLocalTableId(fullTableId);
if (localTableId == 0) {
- writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Unknown table id " << tableId, self->TabletID());
+ writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Unknown table id " << tableId);
return;
}
const ui64 shadowTableId = self->GetShadowTableId(fullTableId);