diff options
author | azevaykin <azevaykin@ydb.tech> | 2023-12-14 15:24:59 +0300 |
---|---|---|
committer | azevaykin <azevaykin@ydb.tech> | 2023-12-14 16:32:05 +0300 |
commit | b87f9725966f7ba60f51c1efb78408b14f622b4e (patch) | |
tree | df5202c2ebb7b3373a92dd34c9ae6f92cd1cd96a | |
parent | 7604a0f075df258b85700a9f3e51dd94e24b843f (diff) | |
download | ydb-b87f9725966f7ba60f51c1efb78408b14f622b4e.tar.gz |
TEvWriteResult Origin
5 files changed, 18 insertions, 13 deletions
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index dac5556582..16f5105e67 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -73,7 +73,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { auto txInfo = Self->ProgressTxController->RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc); Y_UNUSED(txInfo); NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController->GetCoordinatorInfo(operation->GetTxId()); - Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(operation->GetTxId(), tInfo); + Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), operation->GetTxId(), tInfo); } else { Y_ABORT_UNLESS(writeIds.size() == 1); Result = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)writeIds.front(), NKikimrTxColumnShard::EResultStatus::SUCCESS); diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index d17301acd4..522f6a6301 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -31,7 +31,7 @@ private: std::unique_ptr<IEventBase> MakeEvent(ui64 tabletId) const { if (TxInfo.TxKind == NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE) { - auto result = NEvents::TDataEvents::TEvWriteResult::BuildCommited(TxInfo.TxId); + auto result = NEvents::TDataEvents::TEvWriteResult::BuildCommited(tabletId, TxInfo.TxId); return result; } else { auto result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>( diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 8da097b232..73b89494a7 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -89,7 +89,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo } else { auto operation = OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId()); Y_ABORT_UNLESS(operation); - auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "put data fails"); + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "put data fails"); ctx.Send(writeMeta.GetSource(), result.release()); } CSCounters.OnFailedWriteResponse(); @@ -193,7 +193,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor if (record.GetOperations().size() != 1) { IncCounter(COUNTER_WRITE_FAIL); - auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "only single operation is supported"); + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "only single operation is supported"); ctx.Send(source, result.release()); return; } @@ -202,14 +202,14 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor if (operation.GetType() != NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE) { IncCounter(COUNTER_WRITE_FAIL); - auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "only REPLACE operation is supported"); + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "only REPLACE operation is supported"); ctx.Send(source, result.release()); return; } if (!operation.GetTableId().HasSchemaVersion()) { IncCounter(COUNTER_WRITE_FAIL); - auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "schema version not set"); + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "schema version not set"); ctx.Send(source, result.release()); return; } @@ -217,7 +217,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor auto schema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchema(operation.GetTableId().GetSchemaVersion()); if (!schema) { IncCounter(COUNTER_WRITE_FAIL); - auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "unknown schema version"); + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "unknown schema version"); ctx.Send(source, result.release()); return; } @@ -226,7 +226,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor if (!TablesManager.IsReadyForWrite(tableId)) { IncCounter(COUNTER_WRITE_FAIL); - auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "table not writable"); + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "table not writable"); ctx.Send(source, result.release()); return; } @@ -234,14 +234,14 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor auto arrowData = std::make_shared<TArrowData>(schema); if (!arrowData->Parse(operation, NEvWrite::TPayloadHelper<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) { IncCounter(COUNTER_WRITE_FAIL); - auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "parsing data error"); + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "parsing data error"); ctx.Send(source, result.release()); } auto overloadStatus = CheckOverloaded(tableId); if (overloadStatus != EOverloadStatus::None) { NEvWrite::TWriteData writeData(NEvWrite::TWriteMeta(0, tableId, source), arrowData); - std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error"); + std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error"); OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx); return; } diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 55fa1e342e..d2ae54d73f 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -1591,6 +1591,8 @@ Y_UNIT_TEST_SUITE(EvWrite) { TAutoPtr<NActors::IEventHandle> handle; auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle); UNIT_ASSERT(event); + UNIT_ASSERT_VALUES_EQUAL(event->Record.GetOrigin(), TTestTxConfig::TxTablet0); + UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), txId); UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED); auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(10, txId), schema); diff --git a/ydb/core/tx/data_events/events.h b/ydb/core/tx/data_events/events.h index 402d442001..28679f9b26 100644 --- a/ydb/core/tx/data_events/events.h +++ b/ydb/core/tx/data_events/events.h @@ -81,9 +81,10 @@ struct TDataEvents { public: TEvWriteResult() = default; - static std::unique_ptr<TEvWriteResult> BuildError(const ui64 txId, const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg) { + static std::unique_ptr<TEvWriteResult> BuildError(const ui64 origin, const ui64 txId, const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg) { auto result = std::make_unique<TEvWriteResult>(); ACFL_ERROR("event", "ev_write_error")("status", NKikimrDataEvents::TEvWriteResult::EStatus_Name(status))("details", errorMsg)("tx_id", txId); + result->Record.SetOrigin(origin); result->Record.SetTxId(txId); result->Record.SetStatus(status); auto issue = result->Record.AddIssues(); @@ -91,15 +92,17 @@ struct TDataEvents { return result; } - static std::unique_ptr<TEvWriteResult> BuildCommited(const ui64 txId) { + static std::unique_ptr<TEvWriteResult> BuildCommited(const ui64 origin, const ui64 txId) { auto result = std::make_unique<TEvWriteResult>(); + result->Record.SetOrigin(origin); result->Record.SetTxId(txId); result->Record.SetStatus(NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED); return result; } - static std::unique_ptr<TEvWriteResult> BuildPrepared(const ui64 txId, const TCoordinatorInfo& transactionInfo) { + static std::unique_ptr<TEvWriteResult> BuildPrepared(const ui64 origin, const ui64 txId, const TCoordinatorInfo& transactionInfo) { auto result = std::make_unique<TEvWriteResult>(); + result->Record.SetOrigin(origin); result->Record.SetTxId(txId); result->Record.SetStatus(NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED); |