diff options
author | azevaykin <145343289+azevaykin@users.noreply.github.com> | 2024-01-22 15:55:29 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-22 15:55:29 +0300 |
commit | c76a19bbb5498b6ce481075ff9cd8c5f69bdd864 (patch) | |
tree | 003c45ef32e9a2d8523363b851c647d4b3ee8390 | |
parent | 4a40fe6b958430b860964653a4f50c8d9c45e93e (diff) | |
download | ydb-c76a19bbb5498b6ce481075ff9cd8c5f69bdd864.tar.gz |
Simplify error handling (#1181)
-rw-r--r-- | ydb/core/tx/datashard/check_write_unit.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_write_operation.cpp | 36 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_write_operation.h | 27 | ||||
-rw-r--r-- | ydb/core/tx/datashard/write_unit.cpp | 2 |
5 files changed, 42 insertions, 31 deletions
diff --git a/ydb/core/tx/datashard/check_write_unit.cpp b/ydb/core/tx/datashard/check_write_unit.cpp index 295da3c72c..cf5abac1f5 100644 --- a/ydb/core/tx/datashard/check_write_unit.cpp +++ b/ydb/core/tx/datashard/check_write_unit.cpp @@ -64,7 +64,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, DataShard.IncCounter(COUNTER_WRITE_OUT_OF_SPACE); - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err, DataShard.TabletID()); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err); op->Abort(EExecutionUnitKind::FinishProposeWrite); LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err); @@ -88,7 +88,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, DataShard.IncCounter(COUNTER_WRITE_OUT_OF_SPACE); - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err, DataShard.TabletID()); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err); op->Abort(EExecutionUnitKind::FinishProposeWrite); LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err); @@ -105,7 +105,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, if (!Pipeline.AssignPlanInterval(op)) { TString err = TStringBuilder() << "Can't propose tx " << op->GetTxId() << " at blocked shard " << DataShard.TabletID(); - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err, DataShard.TabletID()); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err); op->Abort(EExecutionUnitKind::FinishProposeWrite); LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, err); diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 20e69bf2c7..980abdedb8 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1584,7 +1584,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr& Y_ABORT_UNLESS(writeTx); auto badRequest = [&](const TString& error) { - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << error << " at tablet# " << Self->TabletID(), Self->TabletID()); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << error << " at tablet# " << Self->TabletID()); LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error); }; diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp index fc459ae97e..671240c8dd 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.cpp +++ b/ydb/core/tx/datashard/datashard_write_operation.cpp @@ -20,6 +20,8 @@ namespace NDataShard { TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TStepOrder& stepTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite::TPtr& ev) : Ev(ev) , EngineBay(self, txc, ctx, stepTxId.ToPair()) + , TabletId(self->TabletID()) + , Ctx(ctx) , StepTxId(stepTxId) , ReceivedAt(receivedAt) , TxSize(0) @@ -35,16 +37,14 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, if (Immediate()) EngineBay.SetIsImmediateTx(); - auto& typeRegistry = *AppData()->TypeRegistry; - NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta; - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Parsing write transaction for " << StepTxId << " at " << self->TabletID() << ", record: " << GetRecord().ShortDebugString()); + LOG_TRACE_S(Ctx, NKikimrServices::TX_DATASHARD, "Parsing write transaction for " << StepTxId << " at " << TabletId << ", record: " << GetRecord().ShortDebugString()); if (!ParseRecord(self->TableInfos)) return; - SetTxKeys(RecordOperation().GetColumnIds(), typeRegistry, self->TabletID(), ctx); + SetTxKeys(RecordOperation().GetColumnIds()); KqpSetTxLocksKeys(GetKqpLocks(), self->SysLocksTable(), EngineBay); EngineBay.MarkTxLoaded(); @@ -176,15 +176,15 @@ TVector<TKeyValidator::TColumnWriteMeta> GetColumnWrites(const ::google::protobu return writeColumns; } -void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags, const NScheme::TTypeRegistry& typeRegistry, ui64 tabletId, const TActorContext& ctx) +void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags) { TVector<TCell> keyCells; for (ui32 rowIdx = 0; rowIdx <Matrix.GetRowCount(); ++rowIdx) { Matrix.GetSubmatrix(rowIdx, rowIdx, 0, TableInfo->KeyColumnIds.size() - 1, keyCells); - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Table " << TableInfo->Path << ", shard: " << tabletId << ", " - << "write point " << DebugPrintPoint(TableInfo->KeyColumnTypes, keyCells, typeRegistry)); + LOG_TRACE_S(Ctx, NKikimrServices::TX_DATASHARD, "Table " << TableInfo->Path << ", shard: " << TabletId << ", " + << "write point " << DebugPrintPoint(TableInfo->KeyColumnTypes, keyCells, *AppData()->TypeRegistry)); TTableRange tableRange(keyCells); EngineBay.GetKeyValidator().AddWriteRange(TableId, tableRange, TableInfo->KeyColumnTypes, GetColumnWrites(columnTags), false); } @@ -254,6 +254,8 @@ TWriteOperation* TWriteOperation::CastWriteOperation(TOperation::TPtr op) TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr ev, TDataShard* self, TTransactionContext& txc, const TActorContext& ctx) : TOperation(op) , Ev(ev) + , TabletId(self->TabletID()) + , Ctx(ctx) , ArtifactFlags(0) , TxCacheUsage(0) , ReleasedTxDataSize(0) @@ -350,11 +352,11 @@ void TWriteOperation::ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBase& LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << GetTxId() << " released its data"); } -void TWriteOperation::DbStoreLocksAccessLog(ui64 tabletId, TTransactionContext& txc, const TActorContext& ctx) +void TWriteOperation::DbStoreLocksAccessLog(NTable::TDatabase& txcDb) { using Schema = TDataShard::Schema; - NIceDb::TNiceDb db(txc.DB); + NIceDb::TNiceDb db(txcDb); using TLocksVector = TVector<TSysTables::TLocksTable::TPersistentLock>; TLocksVector vec; @@ -368,17 +370,17 @@ void TWriteOperation::DbStoreLocksAccessLog(ui64 tabletId, TTransactionContext& TStringBuf vecData(vecDataStart, vecDataSize); db.Table<Schema::TxArtifacts>().Key(GetTxId()).Update(NIceDb::TUpdate<Schema::TxArtifacts::Locks>(vecData)); - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing " << vec.size() << " locks for txid=" << GetTxId() << " in " << tabletId); + LOG_TRACE_S(Ctx, NKikimrServices::TX_DATASHARD, "Storing " << vec.size() << " locks for txid=" << GetTxId() << " in " << TabletId); } -void TWriteOperation::DbStoreArtifactFlags(ui64 tabletId, TTransactionContext& txc, const TActorContext& ctx) +void TWriteOperation::DbStoreArtifactFlags(NTable::TDatabase& txcDb) { using Schema = TDataShard::Schema; - NIceDb::TNiceDb db(txc.DB); + NIceDb::TNiceDb db(txcDb); db.Table<Schema::TxArtifacts>().Key(GetTxId()).Update<Schema::TxArtifacts::Flags>(ArtifactFlags); - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing artifactflags=" << ArtifactFlags << " for txid=" << GetTxId() << " in " << tabletId); + LOG_TRACE_S(Ctx, NKikimrServices::TX_DATASHARD, "Storing artifactflags=" << ArtifactFlags << " for txid=" << GetTxId() << " in " << TabletId); } ui64 TWriteOperation::GetMemoryConsumption() const { @@ -418,7 +420,7 @@ ERestoreDataStatus TWriteOperation::RestoreTxData( TVector<TSysTables::TLocksTable::TLock> locks; if (!IsImmediate() && !HasVolatilePrepareFlag()) { - NIceDb::TNiceDb db(txc.DB);ExtractKeys + NIceDb::TNiceDb db(txc.DB); bool ok = self->TransQueue.LoadTxDetails(db, GetTxId(), Target, Ev, locks, ArtifactFlags); if (!ok) { Ev.Reset(); @@ -437,7 +439,7 @@ ERestoreDataStatus TWriteOperation::RestoreTxData( bool extractKeys = WriteTx->IsTxInfoLoaded(); WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, ctx, GetStepOrder(), GetReceivedAt(), Ev); if (WriteTx->Ready() && extractKeys) { - WriteTx->ExtractKeys(true); + WriteTx->ExtractKeys(); } if (!WriteTx->Ready()) { @@ -521,9 +523,9 @@ void TWriteOperation::UntrackMemory() const { NActors::NMemory::TLabel<MemoryLabelActiveTransactionBody>::Sub(GetRecord().SpaceUsed()); } -void TWriteOperation::SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg, ui64 tabletId) { +void TWriteOperation::SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg) { SetAbortedFlag(); - WriteResult = NEvents::TDataEvents::TEvWriteResult::BuildError(tabletId, GetTxId(), status, errorMsg); + WriteResult = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletId, GetTxId(), status, errorMsg); } void TWriteOperation::SetWriteResult(std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>&& writeResult) { diff --git a/ydb/core/tx/datashard/datashard_write_operation.h b/ydb/core/tx/datashard/datashard_write_operation.h index b62c8a7124..9cf371f5a8 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.h +++ b/ydb/core/tx/datashard/datashard_write_operation.h @@ -74,11 +74,6 @@ public: return TxInfo().DynKeysCount != 0; } - // TODO: It's an expensive operation (Precharge() inside). We need avoid it. - TEngineBay::TSizes CalcReadSizes(bool needsTotalKeysSize) const { - return EngineBay.CalcSizes(needsTotalKeysSize); - } - ui64 GetMemoryAllocated() const { return EngineBay.GetEngine() ? EngineBay.GetEngine()->GetMemoryAllocated() : 0; } @@ -92,6 +87,14 @@ public: void DestroyEngine() { EngineBay.DestroyEngine(); } + + TKeyValidator& GetKeyValidator() { + return EngineBay.GetKeyValidator(); + } + const TKeyValidator& GetKeyValidator() const { + return EngineBay.GetKeyValidator(); + } + const NMiniKQL::TEngineHostCounters& GetCounters() { return EngineBay.GetCounters(); } @@ -145,7 +148,7 @@ public: } bool ParseRecord(const TDataShard::TTableInfos& tableInfos); - void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds, const NScheme::TTypeRegistry& typeRegistry, ui64 tabletId, const TActorContext& ctx); + void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds); ui32 ExtractKeys(bool allowErrors); bool ReValidateKeys(); @@ -175,6 +178,9 @@ private: const NEvents::TDataEvents::TEvWrite::TPtr& Ev; TEngineBay EngineBay; + const ui64 TabletId; + const TActorContext& Ctx; + YDB_ACCESSOR_DEF(TActorId, Source); YDB_READONLY(TStepOrder, StepTxId, TStepOrder(0, 0)); @@ -262,8 +268,8 @@ public: return ArtifactFlags & LOCKS_STORED; } - void DbStoreLocksAccessLog(ui64 tabletId, TTransactionContext& txc, const TActorContext& ctx); - void DbStoreArtifactFlags(ui64 tabletId, TTransactionContext& txc, const TActorContext& ctx); + void DbStoreLocksAccessLog(NTable::TDatabase& txcDb); + void DbStoreArtifactFlags(NTable::TDatabase& txcDb); ui64 GetMemoryConsumption() const; @@ -335,7 +341,7 @@ public: return std::move(WriteResult); } - void SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg, ui64 tabletId); + void SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg); void SetWriteResult(std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>&& writeResult); private: @@ -347,6 +353,9 @@ private: TValidatedWriteTx::TPtr WriteTx; std::unique_ptr<NEvents::TDataEvents::TEvWriteResult> WriteResult; + const ui64 TabletId; + const TActorContext& Ctx; + YDB_READONLY_DEF(ui64, ArtifactFlags); YDB_ACCESSOR_DEF(ui64, TxCacheUsage); YDB_ACCESSOR_DEF(ui64, ReleasedTxDataSize); diff --git a/ydb/core/tx/datashard/write_unit.cpp b/ydb/core/tx/datashard/write_unit.cpp index 2d195a42a9..4c70820b28 100644 --- a/ydb/core/tx/datashard/write_unit.cpp +++ b/ydb/core/tx/datashard/write_unit.cpp @@ -44,7 +44,7 @@ public: const TTableId fullTableId(self->GetPathOwnerId(), tableId); const ui64 localTableId = self->GetLocalTableId(fullTableId); if (localTableId == 0) { - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Unknown table id " << tableId, self->TabletID()); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Unknown table id " << tableId); return; } const ui64 shadowTableId = self->GetShadowTableId(fullTableId); |