aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@ydb.tech>2023-12-14 15:24:59 +0300
committerazevaykin <azevaykin@ydb.tech>2023-12-14 16:32:05 +0300
commitb87f9725966f7ba60f51c1efb78408b14f622b4e (patch)
treedf5202c2ebb7b3373a92dd34c9ae6f92cd1cd96a
parent7604a0f075df258b85700a9f3e51dd94e24b843f (diff)
downloadydb-b87f9725966f7ba60f51c1efb78408b14f622b4e.tar.gz
TEvWriteResult Origin
-rw-r--r--ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp16
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp2
-rw-r--r--ydb/core/tx/data_events/events.h9
5 files changed, 18 insertions, 13 deletions
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
index dac5556582..16f5105e67 100644
--- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
@@ -73,7 +73,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
auto txInfo = Self->ProgressTxController->RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc);
Y_UNUSED(txInfo);
NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController->GetCoordinatorInfo(operation->GetTxId());
- Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(operation->GetTxId(), tInfo);
+ Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), operation->GetTxId(), tInfo);
} else {
Y_ABORT_UNLESS(writeIds.size() == 1);
Result = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)writeIds.front(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index d17301acd4..522f6a6301 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -31,7 +31,7 @@ private:
std::unique_ptr<IEventBase> MakeEvent(ui64 tabletId) const {
if (TxInfo.TxKind == NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE) {
- auto result = NEvents::TDataEvents::TEvWriteResult::BuildCommited(TxInfo.TxId);
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildCommited(tabletId, TxInfo.TxId);
return result;
} else {
auto result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 8da097b232..73b89494a7 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -89,7 +89,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
} else {
auto operation = OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation);
- auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "put data fails");
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "put data fails");
ctx.Send(writeMeta.GetSource(), result.release());
}
CSCounters.OnFailedWriteResponse();
@@ -193,7 +193,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
if (record.GetOperations().size() != 1) {
IncCounter(COUNTER_WRITE_FAIL);
- auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "only single operation is supported");
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "only single operation is supported");
ctx.Send(source, result.release());
return;
}
@@ -202,14 +202,14 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
if (operation.GetType() != NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE) {
IncCounter(COUNTER_WRITE_FAIL);
- auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "only REPLACE operation is supported");
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "only REPLACE operation is supported");
ctx.Send(source, result.release());
return;
}
if (!operation.GetTableId().HasSchemaVersion()) {
IncCounter(COUNTER_WRITE_FAIL);
- auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "schema version not set");
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "schema version not set");
ctx.Send(source, result.release());
return;
}
@@ -217,7 +217,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
auto schema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchema(operation.GetTableId().GetSchemaVersion());
if (!schema) {
IncCounter(COUNTER_WRITE_FAIL);
- auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "unknown schema version");
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "unknown schema version");
ctx.Send(source, result.release());
return;
}
@@ -226,7 +226,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
if (!TablesManager.IsReadyForWrite(tableId)) {
IncCounter(COUNTER_WRITE_FAIL);
- auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "table not writable");
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "table not writable");
ctx.Send(source, result.release());
return;
}
@@ -234,14 +234,14 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
auto arrowData = std::make_shared<TArrowData>(schema);
if (!arrowData->Parse(operation, NEvWrite::TPayloadHelper<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) {
IncCounter(COUNTER_WRITE_FAIL);
- auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "parsing data error");
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "parsing data error");
ctx.Send(source, result.release());
}
auto overloadStatus = CheckOverloaded(tableId);
if (overloadStatus != EOverloadStatus::None) {
NEvWrite::TWriteData writeData(NEvWrite::TWriteMeta(0, tableId, source), arrowData);
- std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error");
+ std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error");
OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx);
return;
}
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 55fa1e342e..d2ae54d73f 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -1591,6 +1591,8 @@ Y_UNIT_TEST_SUITE(EvWrite) {
TAutoPtr<NActors::IEventHandle> handle;
auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
UNIT_ASSERT(event);
+ UNIT_ASSERT_VALUES_EQUAL(event->Record.GetOrigin(), TTestTxConfig::TxTablet0);
+ UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), txId);
UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(10, txId), schema);
diff --git a/ydb/core/tx/data_events/events.h b/ydb/core/tx/data_events/events.h
index 402d442001..28679f9b26 100644
--- a/ydb/core/tx/data_events/events.h
+++ b/ydb/core/tx/data_events/events.h
@@ -81,9 +81,10 @@ struct TDataEvents {
public:
TEvWriteResult() = default;
- static std::unique_ptr<TEvWriteResult> BuildError(const ui64 txId, const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg) {
+ static std::unique_ptr<TEvWriteResult> BuildError(const ui64 origin, const ui64 txId, const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg) {
auto result = std::make_unique<TEvWriteResult>();
ACFL_ERROR("event", "ev_write_error")("status", NKikimrDataEvents::TEvWriteResult::EStatus_Name(status))("details", errorMsg)("tx_id", txId);
+ result->Record.SetOrigin(origin);
result->Record.SetTxId(txId);
result->Record.SetStatus(status);
auto issue = result->Record.AddIssues();
@@ -91,15 +92,17 @@ struct TDataEvents {
return result;
}
- static std::unique_ptr<TEvWriteResult> BuildCommited(const ui64 txId) {
+ static std::unique_ptr<TEvWriteResult> BuildCommited(const ui64 origin, const ui64 txId) {
auto result = std::make_unique<TEvWriteResult>();
+ result->Record.SetOrigin(origin);
result->Record.SetTxId(txId);
result->Record.SetStatus(NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
return result;
}
- static std::unique_ptr<TEvWriteResult> BuildPrepared(const ui64 txId, const TCoordinatorInfo& transactionInfo) {
+ static std::unique_ptr<TEvWriteResult> BuildPrepared(const ui64 origin, const ui64 txId, const TCoordinatorInfo& transactionInfo) {
auto result = std::make_unique<TEvWriteResult>();
+ result->Record.SetOrigin(origin);
result->Record.SetTxId(txId);
result->Record.SetStatus(NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);