aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@ydb.tech>2023-12-15 23:04:30 +0300
committerazevaykin <azevaykin@ydb.tech>2023-12-16 00:13:57 +0300
commitfc639245d659f8a4b177331d606fe0bda06b5dae (patch)
tree6c90f410bd5726a5e90993fa9905d3204e35e5f3
parent7004cc66a83f229898f1b621892a2b55bb225408 (diff)
downloadydb-fc639245d659f8a4b177331d606fe0bda06b5dae.tar.gz
DataShard EvWrite Immediate
-rw-r--r--.mapping.json6
-rw-r--r--ydb/core/protos/counters_datashard.proto1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt5
-rw-r--r--ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt5
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt5
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt5
-rw-r--r--ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt5
-rw-r--r--ydb/core/tx/datashard/check_write_unit.cpp337
-rw-r--r--ydb/core/tx/datashard/datashard.cpp125
-rw-r--r--ydb/core/tx/datashard/datashard.h1
-rw-r--r--ydb/core/tx/datashard/datashard__write.cpp261
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h34
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp69
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.h8
-rw-r--r--ydb/core/tx/datashard/datashard_txs.h19
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_table.h7
-rw-r--r--ydb/core/tx/datashard/datashard_ut_write.cpp81
-rw-r--r--ydb/core/tx/datashard/datashard_write.h17
-rw-r--r--ydb/core/tx/datashard/datashard_write_operation.cpp555
-rw-r--r--ydb/core/tx/datashard/datashard_write_operation.h440
-rw-r--r--ydb/core/tx/datashard/execution_unit.cpp6
-rw-r--r--ydb/core/tx/datashard/execution_unit_ctors.h7
-rw-r--r--ydb/core/tx/datashard/execution_unit_kind.h5
-rw-r--r--ydb/core/tx/datashard/probes.h13
-rw-r--r--ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp82
-rw-r--r--ydb/core/tx/datashard/ut_common/datashard_ut_common.h8
-rw-r--r--ydb/core/tx/datashard/ut_write/CMakeLists.darwin-arm64.txt86
-rw-r--r--ydb/core/tx/datashard/ut_write/CMakeLists.darwin-x86_64.txt87
-rw-r--r--ydb/core/tx/datashard/ut_write/CMakeLists.linux-aarch64.txt90
-rw-r--r--ydb/core/tx/datashard/ut_write/CMakeLists.linux-x86_64.txt92
-rw-r--r--ydb/core/tx/datashard/ut_write/CMakeLists.txt19
-rw-r--r--ydb/core/tx/datashard/ut_write/CMakeLists.windows-x86_64.txt80
-rw-r--r--ydb/core/tx/datashard/ut_write/ya.make38
-rw-r--r--ydb/core/tx/datashard/write_unit.cpp207
-rw-r--r--ydb/core/tx/datashard/ya.make5
35 files changed, 2762 insertions, 49 deletions
diff --git a/.mapping.json b/.mapping.json
index babdeb70d5a..2af7e2909bf 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -6736,6 +6736,12 @@
"ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-x86_64.txt":"",
"ydb/core/tx/datashard/ut_volatile/CMakeLists.txt":"",
"ydb/core/tx/datashard/ut_volatile/CMakeLists.windows-x86_64.txt":"",
+ "ydb/core/tx/datashard/ut_write/CMakeLists.darwin-arm64.txt":"",
+ "ydb/core/tx/datashard/ut_write/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/core/tx/datashard/ut_write/CMakeLists.linux-aarch64.txt":"",
+ "ydb/core/tx/datashard/ut_write/CMakeLists.linux-x86_64.txt":"",
+ "ydb/core/tx/datashard/ut_write/CMakeLists.txt":"",
+ "ydb/core/tx/datashard/ut_write/CMakeLists.windows-x86_64.txt":"",
"ydb/core/tx/long_tx_service/CMakeLists.darwin-arm64.txt":"",
"ydb/core/tx/long_tx_service/CMakeLists.darwin-x86_64.txt":"",
"ydb/core/tx/long_tx_service/CMakeLists.linux-aarch64.txt":"",
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto
index 0d872d299ff..133b6cb1261 100644
--- a/ydb/core/protos/counters_datashard.proto
+++ b/ydb/core/protos/counters_datashard.proto
@@ -466,4 +466,5 @@ enum ETxTypes {
TXTYPE_CDC_STREAM_EMIT_HEARTBEATS = 79 [(TxTypeOpts) = {Name: "TTxCdcStreamEmitHeartbeats"}];
TXTYPE_CLEANUP_VOLATILE = 80 [(TxTypeOpts) = {Name: "TxCleanupVolatile"}];
TXTYPE_PLAN_PREDICTED_TXS = 81 [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}];
+ TXTYPE_WRITE = 82 [(TxTypeOpts) = {Name: "TxWrite"}];
}
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt
index ca3bfea616a..02077d6ae64 100644
--- a/ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt
@@ -36,6 +36,7 @@ add_subdirectory(ut_snapshot)
add_subdirectory(ut_stats)
add_subdirectory(ut_upload_rows)
add_subdirectory(ut_volatile)
+add_subdirectory(ut_write)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
@@ -180,6 +181,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_read_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_scheme_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_write_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp
@@ -216,6 +218,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__stats.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_table_path.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_scan_state.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_receiving.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sender_activation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sending.cpp
@@ -257,6 +260,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_client.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_write_operation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/direct_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
@@ -313,6 +317,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/write_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
index ca3bfea616a..02077d6ae64 100644
--- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
@@ -36,6 +36,7 @@ add_subdirectory(ut_snapshot)
add_subdirectory(ut_stats)
add_subdirectory(ut_upload_rows)
add_subdirectory(ut_volatile)
+add_subdirectory(ut_write)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
@@ -180,6 +181,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_read_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_scheme_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_write_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp
@@ -216,6 +218,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__stats.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_table_path.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_scan_state.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_receiving.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sender_activation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sending.cpp
@@ -257,6 +260,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_client.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_write_operation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/direct_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
@@ -313,6 +317,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/write_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
index 83f3d65feef..bfda437b594 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
@@ -36,6 +36,7 @@ add_subdirectory(ut_snapshot)
add_subdirectory(ut_stats)
add_subdirectory(ut_upload_rows)
add_subdirectory(ut_volatile)
+add_subdirectory(ut_write)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
@@ -181,6 +182,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_read_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_scheme_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_write_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp
@@ -217,6 +219,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__stats.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_table_path.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_scan_state.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_receiving.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sender_activation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sending.cpp
@@ -258,6 +261,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_client.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_write_operation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/direct_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
@@ -314,6 +318,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/write_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
index 83f3d65feef..bfda437b594 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
@@ -36,6 +36,7 @@ add_subdirectory(ut_snapshot)
add_subdirectory(ut_stats)
add_subdirectory(ut_upload_rows)
add_subdirectory(ut_volatile)
+add_subdirectory(ut_write)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
@@ -181,6 +182,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_read_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_scheme_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_write_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp
@@ -217,6 +219,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__stats.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_table_path.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_scan_state.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_receiving.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sender_activation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sending.cpp
@@ -258,6 +261,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_client.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_write_operation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/direct_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
@@ -314,6 +318,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/write_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
index 99d4810d8a5..f77a673de3d 100644
--- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
@@ -36,6 +36,7 @@ add_subdirectory(ut_snapshot)
add_subdirectory(ut_stats)
add_subdirectory(ut_upload_rows)
add_subdirectory(ut_volatile)
+add_subdirectory(ut_write)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
@@ -181,6 +182,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_read_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_scheme_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_write_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp
@@ -217,6 +219,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__stats.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_table_path.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_scan_state.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_receiving.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sender_activation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sending.cpp
@@ -258,6 +261,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_client.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_write_operation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/direct_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
@@ -314,6 +318,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/write_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp
)
generate_enum_serilization(core-tx-datashard
diff --git a/ydb/core/tx/datashard/check_write_unit.cpp b/ydb/core/tx/datashard/check_write_unit.cpp
new file mode 100644
index 00000000000..32c9bd7fc1c
--- /dev/null
+++ b/ydb/core/tx/datashard/check_write_unit.cpp
@@ -0,0 +1,337 @@
+#include "datashard_impl.h"
+#include "datashard_pipeline.h"
+#include "execution_unit_ctors.h"
+
+#include <ydb/core/tablet/tablet_exception.h>
+
+namespace NKikimr {
+namespace NDataShard {
+
+class TCheckWriteUnit: public TExecutionUnit {
+public:
+ TCheckWriteUnit(TDataShard &dataShard, TPipeline &pipeline);
+ ~TCheckWriteUnit() override;
+
+ bool IsReadyToExecute(TOperation::TPtr op) const override;
+ EExecutionStatus Execute(TOperation::TPtr op,
+ TTransactionContext &txc,
+ const TActorContext &ctx) override;
+ void Complete(TOperation::TPtr op,
+ const TActorContext &ctx) override;
+
+private:
+};
+
+TCheckWriteUnit::TCheckWriteUnit(TDataShard &dataShard,
+ TPipeline &pipeline)
+ : TExecutionUnit(EExecutionUnitKind::CheckDataTx, false, dataShard, pipeline)
+{
+}
+
+TCheckWriteUnit::~TCheckWriteUnit()
+{
+}
+
+bool TCheckWriteUnit::IsReadyToExecute(TOperation::TPtr) const
+{
+ return true;
+}
+
+EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
+ TTransactionContext &,
+ const TActorContext &ctx)
+{
+ Y_ABORT_UNLESS(op->IsDataTx() || op->IsReadTable());
+ Y_ABORT_UNLESS(!op->IsAborted());
+
+ if (CheckRejectDataTx(op, ctx)) {
+ op->Abort(EExecutionUnitKind::FinishPropose);
+
+ return EExecutionStatus::Executed;
+ }
+
+ //TODO: remove this return
+ return EExecutionStatus::Executed;
+
+ TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get());
+ Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
+ auto dataTx = tx->GetDataTx();
+ Y_ABORT_UNLESS(dataTx);
+ Y_ABORT_UNLESS(dataTx->Ready() || dataTx->RequirePrepare());
+
+ if (dataTx->Ready()) {
+ DataShard.IncCounter(COUNTER_MINIKQL_PROGRAM_SIZE, dataTx->ProgramSize());
+ } else {
+ Y_ABORT_UNLESS(dataTx->RequirePrepare());
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
+ "Require prepare Tx " << op->GetTxId() << " at " << DataShard.TabletID()
+ << ": " << dataTx->GetErrors());
+ }
+
+ // Check if we are out of space and tx wants to update user
+ // or system table.
+ if (DataShard.IsAnyChannelYellowStop()
+ && (dataTx->HasWrites() || !op->IsImmediate())) {
+ TString err = TStringBuilder()
+ << "Cannot perform transaction: out of disk space at tablet "
+ << DataShard.TabletID() << " txId " << op->GetTxId();
+
+ DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);
+
+ BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err);
+ op->Abort(EExecutionUnitKind::FinishPropose);
+
+ LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
+
+ return EExecutionStatus::Executed;
+ }
+
+ if (tx->IsMvccSnapshotRead()) {
+ auto snapshot = tx->GetMvccSnapshot();
+ if (DataShard.IsFollower()) {
+ TString err = TStringBuilder()
+ << "Operation " << *op << " cannot read from snapshot " << snapshot
+ << " using data tx on a follower " << DataShard.TabletID();
+
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
+ ->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
+ op->Abort(EExecutionUnitKind::FinishPropose);
+
+ LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+ return EExecutionStatus::Executed;
+ } else if (!DataShard.IsMvccEnabled()) {
+ TString err = TStringBuilder()
+ << "Operation " << *op << " reads from snapshot " << snapshot
+ << " with MVCC feature disabled at " << DataShard.TabletID();
+
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
+ ->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
+ op->Abort(EExecutionUnitKind::FinishPropose);
+
+ LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+ return EExecutionStatus::Executed;
+ } else if (snapshot < DataShard.GetSnapshotManager().GetLowWatermark()) {
+ TString err = TStringBuilder()
+ << "Operation " << *op << " reads from stale snapshot " << snapshot
+ << " at " << DataShard.TabletID();
+
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
+ ->AddError(NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST, err);
+ op->Abort(EExecutionUnitKind::FinishPropose);
+
+ LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+ ;
+ }
+ }
+
+ TEngineBay::TSizes txReads;
+
+ if (op->IsDataTx()) {
+ bool hasTotalKeysSizeLimit = !!dataTx->PerShardKeysSizeLimitBytes();
+ txReads = dataTx->CalcReadSizes(hasTotalKeysSizeLimit);
+
+ if (txReads.ReadSize > DataShard.GetTxReadSizeLimit()) {
+ TString err = TStringBuilder()
+ << "Transaction read size " << txReads.ReadSize << " exceeds limit "
+ << DataShard.GetTxReadSizeLimit() << " at tablet " << DataShard.TabletID()
+ << " txId " << op->GetTxId();
+
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
+ ->AddError(NKikimrTxDataShard::TError::READ_SIZE_EXECEEDED, err);
+ op->Abort(EExecutionUnitKind::FinishPropose);
+
+ LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+ return EExecutionStatus::Executed;
+ }
+
+ if (hasTotalKeysSizeLimit
+ && txReads.TotalKeysSize > *dataTx->PerShardKeysSizeLimitBytes()) {
+ TString err = TStringBuilder()
+ << "Transaction total keys size " << txReads.TotalKeysSize
+ << " exceeds limit " << *dataTx->PerShardKeysSizeLimitBytes()
+ << " at tablet " << DataShard.TabletID() << " txId " << op->GetTxId();
+
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
+ ->AddError(NKikimrTxDataShard::TError::READ_SIZE_EXECEEDED, err);
+ op->Abort(EExecutionUnitKind::FinishPropose);
+
+ LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+ return EExecutionStatus::Executed;
+ }
+
+ for (const auto& key : dataTx->TxInfo().Keys) {
+ if (key.IsWrite && DataShard.IsUserTable(key.Key->TableId)) {
+ ui64 keySize = 0;
+ for (const auto& cell : key.Key->Range.From) {
+ keySize += cell.Size();
+ }
+ if (keySize > NLimits::MaxWriteKeySize) {
+ TString err = TStringBuilder()
+ << "Operation " << *op << " writes key of " << keySize
+ << " bytes which exceeds limit " << NLimits::MaxWriteKeySize
+ << " bytes at " << DataShard.TabletID();
+
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
+ ->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
+ op->Abort(EExecutionUnitKind::FinishPropose);
+
+ LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+ return EExecutionStatus::Executed;
+ }
+ for (const auto& col : key.Key->Columns) {
+ if (col.Operation == TKeyDesc::EColumnOperation::Set ||
+ col.Operation == TKeyDesc::EColumnOperation::InplaceUpdate)
+ {
+ if (col.ImmediateUpdateSize > NLimits::MaxWriteValueSize) {
+ TString err = TStringBuilder()
+ << "Transaction write column value of " << col.ImmediateUpdateSize
+ << " bytes is larger than the allowed threshold";
+
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::EXEC_ERROR)->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
+ op->Abort(EExecutionUnitKind::FinishPropose);
+
+ LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+ return EExecutionStatus::Executed;
+ }
+ }
+ }
+
+ if (DataShard.IsSubDomainOutOfSpace()) {
+ switch (key.Key->RowOperation) {
+ case TKeyDesc::ERowOperation::Read:
+ case TKeyDesc::ERowOperation::Erase:
+ // Read and erase are allowed even when we're out of disk space
+ break;
+
+ default: {
+ // Updates are not allowed when database is out of space
+ TString err = "Cannot perform writes: database is out of disk space";
+
+ DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);
+
+ BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err);
+ op->Abort(EExecutionUnitKind::FinishPropose);
+
+ LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
+
+ return EExecutionStatus::Executed;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if (op->IsReadTable()) {
+ const auto& record = dataTx->GetReadTableTransaction();
+ const auto& userTables = DataShard.GetUserTables();
+
+ TMaybe<TString> schemaChangedError;
+ if (auto it = userTables.find(record.GetTableId().GetTableId()); it != userTables.end()) {
+ const auto& tableInfo = *it->second;
+ for (const auto& columnRecord : record.GetColumns()) {
+ if (auto* columnInfo = tableInfo.Columns.FindPtr(columnRecord.GetId())) {
+ // TODO: column types don't change when bound by id, but we may want to check anyway
+ } else {
+ schemaChangedError = TStringBuilder() << "ReadTable cannot find column "
+ << columnRecord.GetName() << " (" << columnRecord.GetId() << ")";
+ break;
+ }
+ }
+ // TODO: validate key ranges?
+ } else {
+ schemaChangedError = TStringBuilder() << "ReadTable cannot find table "
+ << record.GetTableId().GetOwnerId() << ":" << record.GetTableId().GetTableId();
+ }
+
+ if (schemaChangedError) {
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::ERROR)
+ ->AddError(NKikimrTxDataShard::TError::SCHEME_CHANGED, *schemaChangedError);
+ op->Abort(EExecutionUnitKind::FinishPropose);
+ return EExecutionStatus::Executed;
+ }
+
+ if (record.HasSnapshotStep() && record.HasSnapshotTxId()) {
+ if (!op->IsImmediate()) {
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)->AddError(
+ NKikimrTxDataShard::TError::BAD_ARGUMENT,
+ "ReadTable from snapshot must be an immediate transaction");
+ op->Abort(EExecutionUnitKind::FinishPropose);
+ return EExecutionStatus::Executed;
+ }
+
+ const TSnapshotKey key(
+ record.GetTableId().GetOwnerId(),
+ record.GetTableId().GetTableId(),
+ record.GetSnapshotStep(),
+ record.GetSnapshotTxId());
+
+ if (!DataShard.GetSnapshotManager().AcquireReference(key)) {
+ // TODO: try upgrading to mvcc snapshot when available
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)->AddError(
+ NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST,
+ TStringBuilder()
+ << "Shard " << DataShard.TabletID()
+ << " has no snapshot " << key);
+ op->Abort(EExecutionUnitKind::FinishPropose);
+ return EExecutionStatus::Executed;
+ }
+
+ op->SetAcquiredSnapshotKey(key);
+ op->SetUsingSnapshotFlag();
+ }
+ }
+
+ if (!op->IsImmediate()) {
+ if (!Pipeline.AssignPlanInterval(op)) {
+ TString err = TStringBuilder()
+ << "Can't propose tx " << op->GetTxId() << " at blocked shard "
+ << DataShard.TabletID();
+ BuildResult(op)->AddError(NKikimrTxDataShard::TError::SHARD_IS_BLOCKED, err);
+ op->Abort(EExecutionUnitKind::FinishPropose);
+
+ LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+ return EExecutionStatus::Executed;
+ }
+
+ auto &res = BuildResult(op);
+ res->SetPrepared(op->GetMinStep(), op->GetMaxStep(), op->GetReceivedAt());
+
+ if (op->IsDataTx()) {
+ res->Record.SetReadSize(txReads.ReadSize);
+ res->Record.SetReplySize(txReads.ReplySize);
+
+ for (const auto& rs : txReads.OutReadSetSize) {
+ auto entry = res->Record.AddOutgoingReadSetInfo();
+ entry->SetShardId(rs.first);
+ entry->SetSize(rs.second);
+ }
+ }
+
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
+ "Prepared " << op->GetKind() << " transaction txId " << op->GetTxId()
+ << " at tablet " << DataShard.TabletID());
+ }
+
+ return EExecutionStatus::Executed;
+}
+
+void TCheckWriteUnit::Complete(TOperation::TPtr, const TActorContext &)
+{
+}
+
+THolder<TExecutionUnit> CreateCheckWriteUnit(TDataShard &dataShard, TPipeline &pipeline)
+{
+ return THolder(new TCheckWriteUnit(dataShard, pipeline));
+}
+
+} // namespace NDataShard
+} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 90553f12983..cf3d9bd9d81 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -2634,6 +2634,41 @@ bool TDataShard::CheckDataTxRejectAndReply(const TEvDataShard::TEvProposeTransac
return false;
}
+bool TDataShard::CheckDataTxRejectAndReply(const NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx)
+{
+ auto* msg = ev->Get();
+ TString txDescr = TStringBuilder() << "data TxId " << msg->GetTxId();
+
+ NKikimrTxDataShard::TEvProposeTransactionResult::EStatus rejectStatus;
+ ERejectReasons rejectReasons;
+ TString rejectDescription;
+ bool reject = CheckDataTxReject(txDescr, ctx, rejectStatus, rejectReasons, rejectDescription);
+
+ if (reject) {
+ LWTRACK(ProposeTransactionReject, msg->GetOrbit());
+ NKikimrDataEvents::TEvWriteResult::EStatus status;
+ switch (rejectStatus) {
+ case NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED:
+ status = NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED;
+ break;
+ case NKikimrTxDataShard::TEvProposeTransactionResult::ERROR:
+ status = NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR;
+ break;
+ default:
+ Y_FAIL_S("Unexpected rejectStatus " << rejectStatus);
+ }
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), msg->GetTxId(), status, rejectDescription);
+
+ LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectDescription);
+
+ ctx.Send(ev->Sender, result.release());
+ IncCounter(COUNTER_PREPARE_OVERLOADED);
+ IncCounter(COUNTER_PREPARE_COMPLETE);
+ return true;
+ }
+
+ return false;
+}
void TDataShard::UpdateProposeQueueSize() const {
SetCounter(COUNTER_PROPOSE_QUEUE_SIZE, MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + DelayedProposeQueue.size() + Pipeline.WaitingTxs());
SetCounter(COUNTER_READ_ITERATORS_WAITING, Pipeline.WaitingReadIterators());
@@ -2771,26 +2806,38 @@ void TDataShard::CheckDelayedProposeQueue(const TActorContext &ctx) {
void TDataShard::ProposeTransaction(TEvDataShard::TEvProposeTransaction::TPtr &&ev, const TActorContext &ctx) {
auto* msg = ev->Get();
- bool mayRunImmediate = false;
- if ((msg->GetFlags() & TTxFlags::Immediate) &&
- !(msg->GetFlags() & TTxFlags::ForceOnline) &&
- msg->GetTxKind() == NKikimrTxDataShard::TX_KIND_DATA)
- {
- // This transaction may run in immediate mode
- mayRunImmediate = true;
- }
+ // This transaction may run in immediate mode
+ bool mayRunImmediate = (msg->GetFlags() & TTxFlags::Immediate) && !(msg->GetFlags() & TTxFlags::ForceOnline) &&
+ msg->GetTxKind() == NKikimrTxDataShard::TX_KIND_DATA;
if (mayRunImmediate) {
// Enqueue immediate transactions so they don't starve existing operations
LWTRACK(ProposeTransactionEnqueue, msg->Orbit);
- ProposeQueue.Enqueue(std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, ctx);
+ ProposeQueue.Enqueue(IEventHandle::Upcast<TEvDataShard::TEvProposeTransaction>(std::move(ev)), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, ctx);
UpdateProposeQueueSize();
} else {
// Prepare planned transactions as soon as possible
Execute(new TTxProposeTransactionBase(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx);
}
}
+void TDataShard::ProposeTransaction(NEvents::TDataEvents::TEvWrite::TPtr&& ev, const TActorContext& ctx) {
+ auto* msg = ev->Get();
+ const auto& record = msg->Record;
+
+ // This transaction may run in immediate mode
+ bool mayRunImmediate = record.txmode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE;
+
+ if (mayRunImmediate) {
+ // Enqueue immediate transactions so they don't starve existing operations
+ LWTRACK(ProposeTransactionEnqueue, msg->GetOrbit());
+ ProposeQueue.Enqueue(IEventHandle::Upcast<NEvents::TDataEvents::TEvWrite>(std::move(ev)), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, ctx);
+ UpdateProposeQueueSize();
+ } else {
+ // Prepare planned transactions as soon as possible
+ Execute(new TTxWrite(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx);
+ }
+}
void TDataShard::Handle(TEvTxProcessing::TEvPlanStep::TPtr &ev, const TActorContext &ctx) {
ui64 srcMediatorId = ev->Get()->Record.GetMediatorID();
@@ -2849,18 +2896,47 @@ void TDataShard::Handle(TEvPrivate::TEvDelayedProposeTransaction::TPtr &ev, cons
if (!item.Cancelled) {
// N.B. we don't call ProposeQueue.Reset(), tx will Ack() on its first Execute()
- Execute(new TTxProposeTransactionBase(this, std::move(item.Event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx);
- return;
+
+ switch (item.Event->GetTypeRewrite()) {
+ case TEvDataShard::TEvProposeTransaction::EventType: {
+ auto event = IEventHandle::Downcast<TEvDataShard::TEvProposeTransaction>(std::move(item.Event));
+ Execute(new TTxProposeTransactionBase(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx);
+ return;
+ }
+ case NEvents::TDataEvents::TEvWrite::EventType: {
+ auto event = IEventHandle::Downcast<NEvents::TDataEvents::TEvWrite>(std::move(item.Event));
+ Execute(new TTxWrite(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx);
+ return;
+ }
+ default:
+ Y_FAIL_S("Unexpected event type " << item.Event->GetTypeRewrite());
+ }
}
TActorId target = item.Event->Sender;
ui64 cookie = item.Event->Cookie;
- auto kind = item.Event->Get()->GetTxKind();
- auto txId = item.Event->Get()->GetTxId();
- auto result = new TEvDataShard::TEvProposeTransactionResult(
- kind, TabletID(), txId,
- NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED);
- ctx.Send(target, result, 0, cookie);
+ switch (item.Event->GetTypeRewrite()) {
+ case TEvDataShard::TEvProposeTransaction::EventType: {
+ auto* msg = item.Event->Get<TEvDataShard::TEvProposeTransaction>();
+ auto kind = msg->GetTxKind();
+ auto txId = msg->GetTxId();
+ auto result = new TEvDataShard::TEvProposeTransactionResult(
+ kind, TabletID(), txId,
+ NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED);
+ ctx.Send(target, result, 0, cookie);
+ return;
+ }
+ case NEvents::TDataEvents::TEvWrite::EventType: {
+ auto* msg = item.Event->Get<NEvents::TDataEvents::TEvWrite>();
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), msg->GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED, "Canceled");
+ ctx.Send(target, result.release(), 0, cookie);
+ return;
+ }
+ default:
+ Y_FAIL_S("Unexpected event type " << item.Event->GetTypeRewrite());
+ }
+
+
}
// N.B. Ack directly since we didn't start any delayed transactions
@@ -3226,15 +3302,22 @@ void TDataShard::WaitPredictedPlanStep(ui64 step) {
}
}
-bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr& ev) const {
- auto* msg = ev->Get();
-
+bool TDataShard::CheckTxNeedWait() const {
if (MvccSwitchState == TSwitchState::SWITCHING) {
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "New transaction needs to wait because of mvcc state switching");
return true;
}
- auto &rec = ev->Get()->Record;
+ return false;
+}
+
+bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr& ev) const {
+ if (CheckTxNeedWait()) {
+ return true;
+ }
+
+ auto* msg = ev->Get();
+ auto& rec = msg->Record;
if (rec.HasMvccSnapshot()) {
TRowVersion rowVersion(rec.GetMvccSnapshot().GetStep(), rec.GetMvccSnapshot().GetTxId());
TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(GetEnablePrioritizedMvccSnapshotReads());
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h
index 0f2884dfdff..dba448cf6b4 100644
--- a/ydb/core/tx/datashard/datashard.h
+++ b/ydb/core/tx/datashard/datashard.h
@@ -4,6 +4,7 @@
#include "datashard_s3_upload.h"
#include <ydb/core/tx/tx.h>
+#include <ydb/core/tx/data_events/events.h>
#include <ydb/core/tx/message_seqno.h>
#include <ydb/core/base/domain.h>
#include <ydb/core/base/row_version.h>
diff --git a/ydb/core/tx/datashard/datashard__write.cpp b/ydb/core/tx/datashard/datashard__write.cpp
new file mode 100644
index 00000000000..905178057b3
--- /dev/null
+++ b/ydb/core/tx/datashard/datashard__write.cpp
@@ -0,0 +1,261 @@
+#include "datashard_txs.h"
+#include "probes.h"
+#include "operation.h"
+#include "datashard_write_operation.h"
+
+#include <ydb/library/wilson_ids/wilson.h>
+
+LWTRACE_USING(DATASHARD_PROVIDER)
+
+namespace NKikimr::NDataShard {
+
+TDataShard::TTxWrite::TTxWrite(TDataShard* self, NEvents::TDataEvents::TEvWrite::TPtr ev, TInstant receivedAt, ui64 tieBreakerIndex, bool delayed)
+ : TBase(self)
+ , Ev(std::move(ev))
+ , ReceivedAt(receivedAt)
+ , TieBreakerIndex(tieBreakerIndex)
+ , TxId(Ev->Get()->GetTxId())
+ , Acked(!delayed)
+ , ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, std::move(Ev->TraceId), "ProposeTransaction", NWilson::EFlags::AUTO_END)
+{
+}
+
+bool TDataShard::TTxWrite::Execute(TTransactionContext& txc, const TActorContext& ctx) {
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxWrite:: execute at tablet# " << Self->TabletID());
+ auto* request = Ev->Get();
+ const auto& record = request->Record;
+ Y_UNUSED(record);
+
+ LWTRACK(WriteExecute, request->GetOrbit());
+
+ if (!Acked) {
+ // Ack event on the first execute (this will schedule the next event if any)
+ Self->ProposeQueue.Ack(ctx);
+ Acked = true;
+ }
+
+ try {
+ // If tablet is in follower mode then we should sync scheme
+ // before we build and check operation.
+ if (Self->IsFollower()) {
+ NKikimrTxDataShard::TError::EKind status;
+ TString errMessage;
+
+ if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage))
+ return false;
+
+ if (status != NKikimrTxDataShard::TError::OK) {
+ LOG_LOG_S_THROTTLE(Self->GetLogThrottler(TDataShard::ELogThrottlerType::TxProposeTransactionBase_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD,
+ "TTxWrite:: errors while proposing transaction txid " << TxId << " at tablet " << Self->TabletID() << " status: " << status << " error: " << errMessage);
+
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(Self->TabletID(), TxId, NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, errMessage);
+
+ TActorId target = Op ? Op->GetTarget() : Ev->Sender;
+ ui64 cookie = Op ? Op->GetCookie() : Ev->Cookie;
+
+ if (ProposeTransactionSpan) {
+ ProposeTransactionSpan.EndOk();
+ }
+ ctx.Send(target, result.release(), 0, cookie);
+
+ return true;
+ }
+ }
+
+ if (Ev) {
+ Y_ABORT_UNLESS(!Op);
+
+ if (Self->CheckDataTxRejectAndReply(Ev, ctx)) {
+ Ev = nullptr;
+ return true;
+ }
+
+ TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, ctx);
+
+ // Unsuccessful operation parse.
+ if (op->IsAborted()) {
+ LWTRACK(ProposeTransactionParsed, op->Orbit, false);
+ Y_ABORT_UNLESS(op->Result());
+
+ if (ProposeTransactionSpan) {
+ ProposeTransactionSpan.EndError("TTxWrite:: unsuccessful operation parse");
+ }
+ ctx.Send(op->GetTarget(), op->Result().Release());
+ return true;
+ }
+ LWTRACK(ProposeTransactionParsed, op->Orbit, true);
+
+ op->BuildExecutionPlan(false);
+ if (!op->IsExecutionPlanFinished())
+ Self->Pipeline.GetExecutionUnit(op->GetCurrentUnit()).AddOperation(op);
+
+ Op = op;
+ Ev = nullptr;
+ Op->IncrementInProgress();
+ }
+
+ Y_ABORT_UNLESS(Op && Op->IsInProgress() && !Op->GetExecutionPlan().empty());
+
+ auto status = Self->Pipeline.RunExecutionPlan(Op, CompleteList, txc, ctx);
+
+ switch (status) {
+ case EExecutionStatus::Restart:
+ // Restart even if current CompleteList is not empty
+ // It will be extended in subsequent iterations
+ return false;
+
+ case EExecutionStatus::Reschedule:
+ // Reschedule transaction as soon as possible
+ if (!Op->IsExecutionPlanFinished()) {
+ Op->IncrementInProgress();
+ Self->ExecuteProgressTx(Op, ctx);
+ Rescheduled = true;
+ }
+ Op->DecrementInProgress();
+ break;
+
+ case EExecutionStatus::Executed:
+ case EExecutionStatus::Continue:
+ Op->DecrementInProgress();
+ break;
+
+ case EExecutionStatus::WaitComplete:
+ WaitComplete = true;
+ break;
+
+ default:
+ Y_FAIL_S("unexpected execution status " << status << " for operation " << *Op << " " << Op->GetKind() << " at " << Self->TabletID());
+ }
+
+ if (WaitComplete || !CompleteList.empty()) {
+ // Keep operation active until we run the complete list
+ CommitStart = AppData()->TimeProvider->Now();
+ } else {
+ // Release operation as it's no longer needed
+ Op = nullptr;
+ }
+
+ // Commit all side effects
+ return true;
+ } catch (const TNotReadyTabletException&) {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "TX [" << 0 << " : " << TxId << "] can't prepare (tablet's not ready) at tablet " << Self->TabletID());
+ return false;
+ } catch (const TSchemeErrorTabletException& ex) {
+ Y_UNUSED(ex);
+ Y_ABORT();
+ } catch (const TMemoryLimitExceededException& ex) {
+ Y_ABORT("there must be no leaked exceptions: TMemoryLimitExceededException");
+ } catch (const std::exception& e) {
+ Y_ABORT("there must be no leaked exceptions: %s", e.what());
+ } catch (...) {
+ Y_ABORT("there must be no leaked exceptions");
+ }
+
+ return true;
+}
+
+void TDataShard::TTxWrite::Complete(const TActorContext& ctx) {
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxWrite complete: at tablet# " << Self->TabletID());
+
+ if (ProposeTransactionSpan) {
+ ProposeTransactionSpan.End();
+ }
+
+ if (Op) {
+ Y_ABORT_UNLESS(!Op->GetExecutionPlan().empty());
+ if (!CompleteList.empty()) {
+ auto commitTime = AppData()->TimeProvider->Now() - CommitStart;
+ Op->SetCommitTime(CompleteList.front(), commitTime);
+
+ if (!Op->IsExecutionPlanFinished() && (Op->GetCurrentUnit() != CompleteList.front()))
+ Op->SetDelayedCommitTime(commitTime);
+
+ Self->Pipeline.RunCompleteList(Op, CompleteList, ctx);
+ }
+
+ if (WaitComplete) {
+ Op->DecrementInProgress();
+
+ if (!Op->IsInProgress() && !Op->IsExecutionPlanFinished()) {
+ Self->Pipeline.AddCandidateOp(Op);
+
+ if (Self->Pipeline.CanRunAnotherOp()) {
+ Self->PlanQueue.Progress(ctx);
+ }
+ }
+ }
+ }
+
+ Self->CheckSplitCanStart(ctx);
+ Self->CheckMvccStateChangeCanStart(ctx);
+}
+
+
+void TDataShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx) {
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Handle TTxWrite: at tablet# " << TabletID());
+
+ auto* msg = ev->Get();
+ const auto& record = msg->Record;
+ Y_UNUSED(record);
+
+ LWTRACK(WriteRequest, msg->GetOrbit());
+
+ // Check if we need to delay an immediate transaction
+ if (MediatorStateWaiting && record.txmode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE)
+ {
+ // We cannot calculate correct version until we restore mediator state
+ LWTRACK(ProposeTransactionWaitMediatorState, msg->GetOrbit());
+ MediatorStateWaitingMsgs.emplace_back(ev.Release());
+ UpdateProposeQueueSize();
+ return;
+ }
+
+ if (Pipeline.HasProposeDelayers()) {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Handle TEvProposeTransaction delayed at " << TabletID() << " until dependency graph is restored");
+ LWTRACK(ProposeTransactionWaitDelayers, msg->GetOrbit());
+ DelayedProposeQueue.emplace_back().Reset(ev.Release());
+ UpdateProposeQueueSize();
+ return;
+ }
+
+ if (CheckTxNeedWait()) {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Handle TEvProposeTransaction delayed at " << TabletID() << " until interesting plan step will come");
+ if (Pipeline.AddWaitingTxOp(ev)) {
+ UpdateProposeQueueSize();
+ return;
+ }
+ }
+
+ IncCounter(COUNTER_PREPARE_REQUEST);
+
+ if (CheckDataTxRejectAndReply(ev, ctx)) {
+ return;
+ }
+
+ ProposeTransaction(std::move(ev), ctx);
+}
+
+ui64 EvWrite::Convertor::GetTxId(const TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::TEvProposeTransaction::EventType:
+ return ev->Get<TEvDataShard::TEvProposeTransaction>()->GetTxId();
+ case NEvents::TDataEvents::TEvWrite::EventType:
+ return ev->Get<NEvents::TDataEvents::TEvWrite>()->GetTxId();
+ default:
+ Y_FAIL_S("Unexpected event type " << ev->GetTypeRewrite());
+ }
+}
+
+ui64 EvWrite::Convertor::GetProposeFlags(NKikimrDataEvents::TEvWrite::ETxMode txMode) {
+ switch (txMode) {
+ case NKikimrDataEvents::TEvWrite::MODE_PREPARE:
+ return TTxFlags::Default;
+ case NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE:
+ return TTxFlags::VolatilePrepare;
+ case NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE:
+ return TTxFlags::Immediate;
+ default:
+ Y_FAIL_S("Unexpected tx mode " << txMode);
+ }
+}
+} \ No newline at end of file
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 712c6ebdc79..c4bc0b6b007 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -13,6 +13,7 @@
#include "datashard_repl_offsets.h"
#include "datashard_repl_offsets_client.h"
#include "datashard_repl_offsets_server.h"
+#include "datashard_write.h"
#include "build_index.h"
#include "cdc_stream_heartbeat.h"
#include "cdc_stream_scan.h"
@@ -175,6 +176,7 @@ class TDataShard
class TTxProposeSchemeTransaction;
class TTxCancelTransactionProposal;
class TTxProposeTransactionBase;
+ class TTxWrite;
class TTxReadSet;
class TTxSchemaChanged;
class TTxInitiateBorrowedPartsReturn;
@@ -272,7 +274,9 @@ class TDataShard
friend class TPipeline;
friend class TLocksDataShardAdapter<TDataShard>;
friend class TActiveTransaction;
+ friend class TWriteOperation;
friend class TValidatedDataTx;
+ friend class TValidatedWriteTx;
friend class TEngineBay;
friend class NMiniKQL::TKqpScanComputeContext;
friend class TSnapshotManager;
@@ -1205,7 +1209,9 @@ class TDataShard
void Handle(TEvDataShard::TEvProposeTransactionAttach::TPtr &ev, const TActorContext &ctx);
void HandleAsFollower(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx);
void ProposeTransaction(TEvDataShard::TEvProposeTransaction::TPtr &&ev, const TActorContext &ctx);
- void Handle(TEvTxProcessing::TEvPlanStep::TPtr &ev, const TActorContext &ctx);
+ void Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx);
+ void ProposeTransaction(NEvents::TDataEvents::TEvWrite::TPtr&& ev, const TActorContext& ctx);
+ void Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTxProcessing::TEvReadSet::TPtr &ev, const TActorContext &ctx);
void Handle(TEvTxProcessing::TEvReadSetAck::TPtr &ev, const TActorContext &ctx);
void Handle(TEvPrivate::TEvProgressTransaction::TPtr &ev, const TActorContext &ctx);
@@ -1547,6 +1553,7 @@ public:
ERejectReasons& rejectReasons,
TString& rejectDescription);
bool CheckDataTxRejectAndReply(const TEvDataShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx);
+ bool CheckDataTxRejectAndReply(const NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx);
TSysLocks& SysLocksTable() { return SysLocks; }
@@ -1967,6 +1974,7 @@ public:
bool WaitPlanStep(ui64 step);
bool CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr& ev) const;
+ bool CheckTxNeedWait() const;
void WaitPredictedPlanStep(ui64 step);
void SchedulePlanPredictedTxs();
@@ -2399,7 +2407,7 @@ private:
class TProposeQueue : private TTxProgressIdempotentScalarQueue<TEvPrivate::TEvDelayedProposeTransaction> {
public:
struct TItem : public TMoveOnly {
- TItem(TEvDataShard::TEvProposeTransaction::TPtr&& event, TInstant receivedAt, ui64 tieBreakerIndex)
+ TItem(TAutoPtr<IEventHandle>&& event, TInstant receivedAt, ui64 tieBreakerIndex)
: Event(std::move(event))
, ReceivedAt(receivedAt)
, TieBreakerIndex(tieBreakerIndex)
@@ -2407,7 +2415,7 @@ private:
, Cancelled(false)
{ }
- TEvDataShard::TEvProposeTransaction::TPtr Event;
+ TAutoPtr<IEventHandle> Event;
TInstant ReceivedAt;
ui64 TieBreakerIndex;
TItem* Next;
@@ -2419,10 +2427,10 @@ private:
TItem* Last = nullptr;
};
- void Enqueue(TEvDataShard::TEvProposeTransaction::TPtr event, TInstant receivedAt, ui64 tieBreakerIndex, const TActorContext& ctx) {
+ void Enqueue(TAutoPtr<IEventHandle> event, TInstant receivedAt, ui64 tieBreakerIndex, const TActorContext& ctx) {
TItem* item = &Items.emplace_back(std::move(event), receivedAt, tieBreakerIndex);
- const ui64 txId = item->Event->Get()->GetTxId();
+ const ui64 txId = EvWrite::Convertor::GetTxId(item->Event);
auto& links = TxIds[txId];
if (Y_UNLIKELY(links.Last)) {
@@ -2437,7 +2445,7 @@ private:
TItem Dequeue() {
TItem* first = &Items.front();
- const ui64 txId = first->Event->Get()->GetTxId();
+ const ui64 txId = EvWrite::Convertor::GetTxId(first->Event);
auto it = TxIds.find(txId);
Y_ABORT_UNLESS(it != TxIds.end() && it->second.First == first,
@@ -2944,6 +2952,7 @@ protected:
HFunc(TEvDataShard::TEvReadAck, Handle);
HFunc(TEvDataShard::TEvReadCancel, Handle);
HFunc(TEvDataShard::TEvReadColumnsRequest, Handle);
+ HFunc(NEvents::TDataEvents::TEvWrite, Handle);
HFunc(TEvDataShard::TEvGetInfoRequest, Handle);
HFunc(TEvDataShard::TEvListOperationsRequest, Handle);
HFunc(TEvDataShard::TEvGetDataHistogramRequest, Handle);
@@ -2992,13 +3001,11 @@ protected:
HFuncTraced(TEvPrivate::TEvRemoveLockChangeRecords, Handle);
HFunc(TEvPrivate::TEvConfirmReadonlyLease, Handle);
HFunc(TEvPrivate::TEvPlanPredictedTxs, Handle);
- default:
- if (!HandleDefaultEvents(ev, SelfId())) {
- ALOG_WARN(NKikimrServices::TX_DATASHARD,
- "TDataShard::StateWork unhandled event type: "<< ev->GetTypeRewrite()
- << " event: " << ev->ToString());
- }
- break;
+ default:
+ if (!HandleDefaultEvents(ev, SelfId())) {
+ ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateWork unhandled event type: " << ev->GetTypeRewrite() << " event: " << ev->ToString());
+ }
+ break;
}
}
@@ -3016,6 +3023,7 @@ protected:
HFuncTraced(TEvDataShard::TEvReadContinue, Handle);
HFuncTraced(TEvDataShard::TEvReadAck, Handle);
HFuncTraced(TEvDataShard::TEvReadCancel, Handle);
+ HFuncTraced(NEvents::TDataEvents::TEvWrite, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateWorkAsFollower unhandled event type: " << ev->GetTypeRewrite()
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index f4f52a4f027..c0888f27203 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -3,6 +3,7 @@
#include "datashard_pipeline.h"
#include "datashard_impl.h"
#include "datashard_txs.h"
+#include "datashard_write_operation.h"
#include <ydb/core/base/compile_time_flags.h>
#include <ydb/core/base/cputime.h>
@@ -1555,6 +1556,52 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction::
return tx;
}
+TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr& ev, TInstant receivedAt, ui64 tieBreakerIndex, NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx)
+{
+ const auto& rec = ev->Get()->Record;
+ TBasicOpInfo info(rec.GetTxId(), EOperationKind::DataTx, EvWrite::Convertor::GetProposeFlags(rec.GetTxMode()), 0, receivedAt, tieBreakerIndex);
+ auto op = MakeIntrusive<TWriteOperation>(info, ev, Self, txc, ctx);
+
+ auto badRequest = [&](const TString& error) {
+ op->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, error);
+ LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error);
+ };
+
+ if (!op->WriteTx()->Ready()) {
+ badRequest(TStringBuilder() << "Shard " << Self->TabletID() << " cannot parse tx " << op->GetTxId() << ": " << op->WriteTx()->GetError());
+ return op;
+ }
+
+ op->ExtractKeys();
+
+ switch (rec.txmode()) {
+ case NKikimrDataEvents::TEvWrite::MODE_PREPARE:
+ break;
+ case NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE:
+ op->SetVolatilePrepareFlag();
+ break;
+ case NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE:
+ op->SetImmediateFlag();
+ break;
+ default:
+ badRequest(TStringBuilder() << "Unknown txmode: " << rec.txmode());
+ return op;
+ }
+
+ // Make config checks for immediate op.
+ if (op->IsImmediate()) {
+ if (Config.NoImmediate() || (Config.ForceOnlineRW())) {
+ LOG_INFO_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, "Shard " << Self->TabletID() << " force immediate op " << op->GetTxId() << " to online according to config");
+ op->SetForceOnlineFlag();
+ } else {
+ if (Config.DirtyImmediate())
+ op->SetForceDirtyFlag();
+ }
+ }
+
+ return op;
+}
+
void TPipeline::BuildDataTx(TActiveTransaction *tx, TTransactionContext &txc, const TActorContext &ctx)
{
auto dataTx = tx->BuildDataTx(Self, txc, ctx);
@@ -1826,7 +1873,7 @@ void TPipeline::MaybeActivateWaitingSchemeOps(const TActorContext& ctx) const {
}
}
-bool TPipeline::AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) {
+bool TPipeline::CheckInflightLimit() const {
// check in-flight limit
size_t totalInFly =
Self->ReadIteratorsInFly() + Self->TxInFly() + Self->ImmediateInFly()
@@ -1834,13 +1881,20 @@ bool TPipeline::AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, co
if (totalInFly > Self->GetMaxTxInFly())
return false; // let tx to be rejected
+ return true;
+}
+
+bool TPipeline::AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) {
+ if (!CheckInflightLimit())
+ return false;
+
if (Self->MvccSwitchState == TSwitchState::SWITCHING) {
- WaitingDataTxOps.emplace(TRowVersion::Min(), std::move(ev)); // postpone tx processing till mvcc state switch is finished
+ WaitingDataTxOps.emplace(TRowVersion::Min(), IEventHandle::Upcast<TEvDataShard::TEvProposeTransaction>(std::move(ev))); // postpone tx processing till mvcc state switch is finished
} else {
bool prioritizedReads = Self->GetEnablePrioritizedMvccSnapshotReads();
Y_DEBUG_ABORT_UNLESS(ev->Get()->Record.HasMvccSnapshot());
TRowVersion snapshot(ev->Get()->Record.GetMvccSnapshot().GetStep(), ev->Get()->Record.GetMvccSnapshot().GetTxId());
- WaitingDataTxOps.emplace(snapshot, std::move(ev));
+ WaitingDataTxOps.emplace(snapshot, IEventHandle::Upcast<TEvDataShard::TEvProposeTransaction>(std::move(ev)));
const ui64 waitStep = prioritizedReads ? snapshot.Step : snapshot.Step + 1;
TRowVersion unreadableEdge;
if (!Self->WaitPlanStep(waitStep) && snapshot < (unreadableEdge = GetUnreadableEdge(prioritizedReads))) {
@@ -1851,6 +1905,15 @@ bool TPipeline::AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, co
return true;
}
+bool TPipeline::AddWaitingTxOp(NEvents::TDataEvents::TEvWrite::TPtr& ev) {
+ if (!CheckInflightLimit())
+ return false;
+
+ WaitingDataTxOps.emplace(TRowVersion::Min(), IEventHandle::Upcast<NEvents::TDataEvents::TEvWrite>(std::move(ev))); // postpone tx processing till mvcc state switch is finished
+
+ return true;
+}
+
void TPipeline::ActivateWaitingTxOps(TRowVersion edge, bool prioritizedReads, const TActorContext& ctx) {
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ActivateWaitingTxOps for version# " << edge
<< ", txOps: " << (WaitingDataTxOps.empty() ? "empty" : ToString(WaitingDataTxOps.begin()->first.Step))
diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h
index 85cf95c6b84..9e63590493c 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.h
+++ b/ydb/core/tx/datashard/datashard_pipeline.h
@@ -266,6 +266,10 @@ public:
TInstant receivedAt, ui64 tieBreakerIndex,
NTabletFlatExecutor::TTransactionContext &txc,
const TActorContext &ctx);
+ TOperation::TPtr BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr &ev,
+ TInstant receivedAt, ui64 tieBreakerIndex,
+ NTabletFlatExecutor::TTransactionContext &txc,
+ const TActorContext &ctx);
void BuildDataTx(TActiveTransaction *tx,
TTransactionContext &txc,
const TActorContext &ctx);
@@ -344,7 +348,9 @@ public:
void MaybeActivateWaitingSchemeOps(const TActorContext& ctx) const;
ui64 WaitingTxs() const { return WaitingDataTxOps.size(); } // note that without iterators
+ bool CheckInflightLimit() const;
bool AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx);
+ bool AddWaitingTxOp(NEvents::TDataEvents::TEvWrite::TPtr& ev);
void ActivateWaitingTxOps(TRowVersion edge, bool prioritizedReads, const TActorContext& ctx);
void ActivateWaitingTxOps(const TActorContext& ctx);
@@ -507,7 +513,7 @@ private:
TWaitingSchemeOpsOrder WaitingSchemeOpsOrder;
TWaitingSchemeOps WaitingSchemeOps;
- TMultiMap<TRowVersion, TEvDataShard::TEvProposeTransaction::TPtr> WaitingDataTxOps;
+ TMultiMap<TRowVersion, TAutoPtr<IEventHandle>> WaitingDataTxOps;
TCommittingDataTxOps CommittingOps;
THashMap<ui64, TOperation::TPtr> CompletingOps;
diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h
index a0380f8aa0a..ab313b9843d 100644
--- a/ydb/core/tx/datashard/datashard_txs.h
+++ b/ydb/core/tx/datashard/datashard_txs.h
@@ -112,6 +112,25 @@ protected:
NWilson::TSpan ProposeTransactionSpan;
};
+class TDataShard::TTxWrite: public NTabletFlatExecutor::TTransactionBase<TDataShard> {
+public:
+ TTxWrite(TDataShard* ds, NEvents::TDataEvents::TEvWrite::TPtr ev, TInstant receivedAt, ui64 tieBreakerIndex, bool delayed);
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
+ void Complete(const TActorContext& ctx) override;
+protected:
+ TOperation::TPtr Op;
+ NEvents::TDataEvents::TEvWrite::TPtr Ev;
+ const TInstant ReceivedAt;
+ const ui64 TieBreakerIndex;
+ ui64 TxId;
+ TVector<EExecutionUnitKind> CompleteList;
+ TInstant CommitStart;
+ bool Acked;
+ bool Rescheduled = false;
+ bool WaitComplete = false;
+ NWilson::TSpan ProposeTransactionSpan;
+};
+
class TDataShard::TTxReadSet : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
public:
TTxReadSet(TDataShard *self, TEvTxProcessing::TEvReadSet::TPtr ev);
diff --git a/ydb/core/tx/datashard/datashard_ut_read_table.h b/ydb/core/tx/datashard/datashard_ut_read_table.h
index 919e1b7eb06..73823f683ad 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_table.h
+++ b/ydb/core/tx/datashard/datashard_ut_read_table.h
@@ -166,10 +166,15 @@ namespace NDataShardReadTableTest {
static void PrintPrimitive(TStringBuilder& out, const NYdb::TValueParser& parser) {
switch (parser.GetPrimitiveType()) {
+ case NYdb::EPrimitiveType::Uint64:
+ out << parser.GetUint64();
+ break;
case NYdb::EPrimitiveType::Uint32:
out << parser.GetUint32();
break;
-
+ case NYdb::EPrimitiveType::Utf8:
+ out << parser.GetUtf8();
+ break;
case NYdb::EPrimitiveType::Timestamp:
out << parser.GetTimestamp();
break;
diff --git a/ydb/core/tx/datashard/datashard_ut_write.cpp b/ydb/core/tx/datashard/datashard_ut_write.cpp
new file mode 100644
index 00000000000..508dbc2062d
--- /dev/null
+++ b/ydb/core/tx/datashard/datashard_ut_write.cpp
@@ -0,0 +1,81 @@
+#include "datashard_active_transaction.h"
+#include "datashard_ut_read_table.h"
+#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
+
+namespace NKikimr {
+
+using namespace NKikimr::NDataShard;
+using namespace NSchemeShard;
+using namespace Tests;
+using namespace NDataShardReadTableTest;
+
+Y_UNIT_TEST_SUITE(DataShardWrite) {
+ std::tuple<Tests::TServer::TPtr, TActorId> TestCreateServer() {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root").SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto& runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+ runtime.GetAppData().AllowReadTableImmediate = true;
+
+ InitRoot(server, sender);
+
+ return {server, sender};
+ }
+
+ Y_UNIT_TEST(WriteImmediateOnShard) {
+ auto [server, sender] = TestCreateServer();
+
+ auto opts = TShardedTableOptions().Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}});
+ auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
+
+ const ui32 rowCount = 3;
+ ui64 txId = 100;
+ Write(server, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+
+ auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
+
+ UNIT_ASSERT_VALUES_EQUAL(table1state, "key = 0, value = 1\n"
+ "key = 2, value = 3\n"
+ "key = 4, value = 5\n");
+ }
+
+ Y_UNIT_TEST(WriteImmediateOnShardManyColumns) {
+ auto [server, sender] = TestCreateServer();
+
+ auto opts = TShardedTableOptions().Columns({{"key64", "Uint64", true, false}, {"key32", "Uint32", true, false},
+ {"value64", "Uint64", false, false}, {"value32", "Uint32", false, false}, {"valueUtf8", "Utf8", false, false}});
+ auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
+
+ const ui32 rowCount = 3;
+ ui64 txId = 100;
+ Write(server, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+
+ auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
+
+ UNIT_ASSERT_VALUES_EQUAL(table1state, "key64 = 0, key32 = 1, value64 = 2, value32 = 3, valueUtf8 = String_4\n"
+ "key64 = 5, key32 = 6, value64 = 7, value32 = 8, valueUtf8 = String_9\n"
+ "key64 = 10, key32 = 11, value64 = 12, value32 = 13, valueUtf8 = String_14\n");
+ }
+
+ Y_UNIT_TEST(WriteOnShard) {
+ auto [server, sender] = TestCreateServer();
+
+ TShardedTableOptions opts;
+ auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
+
+ const ui32 rowCount = 3;
+ ui64 txId = 100;
+ Write(server, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
+
+ auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
+
+ UNIT_ASSERT_VALUES_EQUAL(table1state, "");
+ }
+}
+} \ No newline at end of file
diff --git a/ydb/core/tx/datashard/datashard_write.h b/ydb/core/tx/datashard/datashard_write.h
new file mode 100644
index 00000000000..f58bd947b87
--- /dev/null
+++ b/ydb/core/tx/datashard/datashard_write.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include <ydb/library/actors/core/event.h>
+#include <ydb/core/protos/data_events.pb.h>
+
+#include <util/generic/ptr.h>
+
+namespace NKikimr::NDataShard::EvWrite {
+
+using namespace NActors;
+
+class Convertor {
+public:
+ static ui64 GetTxId(const TAutoPtr<IEventHandle>& ev);
+ static ui64 GetProposeFlags(NKikimrDataEvents::TEvWrite::ETxMode txMode);
+};
+} \ No newline at end of file
diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp
new file mode 100644
index 00000000000..a5ca6da3c9f
--- /dev/null
+++ b/ydb/core/tx/datashard/datashard_write_operation.cpp
@@ -0,0 +1,555 @@
+#include "defs.h"
+
+#include "datashard_write_operation.h"
+#include "datashard_kqp.h"
+#include "datashard_locks.h"
+#include "datashard_impl.h"
+#include "datashard_failpoints.h"
+
+#include "key_conflicts.h"
+#include "range_ops.h"
+
+#include <ydb/core/tx/data_events/payload_helper.h>
+#include <ydb/core/scheme/scheme_types_proto.h>
+
+#include <ydb/library/actors/util/memory_track.h>
+
+namespace NKikimr {
+namespace NDataShard {
+
+
+
+TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TStepOrder& stepTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite::TPtr& ev)
+ : StepTxId_(stepTxId)
+ , TabletId_(self->TabletID())
+ , Ev_(ev)
+ , EngineBay(self, txc, ctx, stepTxId.ToPair())
+ , ErrCode(NKikimrTxDataShard::TError::OK)
+ , TxSize(0)
+ , TxCacheUsage(0)
+ , IsReleased(false)
+ , ReceivedAt_(receivedAt)
+{
+ ComputeTxSize();
+ NActors::NMemory::TLabel<MemoryLabelValidatedDataTx>::Add(TxSize);
+
+ if (LockTxId())
+ EngineBay.SetLockTxId(LockTxId(), LockNodeId());
+
+ if (Immediate())
+ EngineBay.SetIsImmediateTx();
+
+ auto& typeRegistry = *AppData()->TypeRegistry;
+
+ NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;
+
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TxId: " << StepTxId_.TxId << ", shard " << TabletId() << ", meta: " << Record().ShortDebugString());
+
+ if (!ParseRecord(self))
+ return;
+
+ SetTxKeys(RecordOperation().GetColumnIds(), typeRegistry, ctx);
+
+ KqpSetTxLocksKeys(GetKqpLocks(), self->SysLocksTable(), EngineBay);
+ EngineBay.MarkTxLoaded();
+}
+
+TValidatedWriteTx::~TValidatedWriteTx() {
+ NActors::NMemory::TLabel<MemoryLabelValidatedDataTx>::Sub(TxSize);
+}
+
+bool TValidatedWriteTx::ParseRecord(TDataShard* self) {
+ if (Record().GetOperations().size() != 1)
+ {
+ ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT;
+ ErrStr = TStringBuilder() << "Only one operation is supported now.";
+ return false;
+ }
+
+ const NKikimrDataEvents::TTableId& tableIdRecord = RecordOperation().GetTableId();
+
+ auto tableInfoPtr = self->TableInfos.FindPtr(tableIdRecord.GetTableId());
+ if (!tableInfoPtr) {
+ ErrCode = NKikimrTxDataShard::TError::SCHEME_ERROR;
+ ErrStr = TStringBuilder() << "Table '" << tableIdRecord.GetTableId() << "' doesn't exist";
+ return false;
+ }
+ TableInfo_ = tableInfoPtr->Get();
+ Y_ABORT_UNLESS(TableInfo_);
+
+ if (TableInfo_->GetTableSchemaVersion() != 0 && tableIdRecord.GetSchemaVersion() != TableInfo_->GetTableSchemaVersion())
+ {
+ ErrCode = NKikimrTxDataShard::TError::SCHEME_CHANGED;
+ ErrStr = TStringBuilder() << "Table '" << TableInfo_->Path << "' scheme changed.";
+ return false;
+ }
+
+ if (RecordOperation().GetPayloadFormat() != NKikimrDataEvents::FORMAT_CELLVEC)
+ {
+ ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT;
+ ErrStr = TStringBuilder() << "Only FORMAT_CELLVEC is supported now. Got: " << RecordOperation().GetPayloadFormat();
+ return false;
+ }
+
+ NEvWrite::TPayloadHelper<NEvents::TDataEvents::TEvWrite> payloadHelper(*Ev_->Get());
+ TString payload = payloadHelper.GetDataFromPayload(RecordOperation().GetPayloadIndex());
+
+ if (!TSerializedCellMatrix::TryParse(payload, Matrix_))
+ {
+ ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT;
+ ErrStr = TStringBuilder() << "Can't parse TSerializedCellVec in payload";
+ return false;
+ }
+
+ const auto& columnTags = RecordOperation().GetColumnIds();
+ if ((size_t)columnTags.size() != Matrix_.GetColCount())
+ {
+ ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT;
+ ErrStr = TStringBuilder() << "Column count mismatch: got columnids " << columnTags.size() << ", got cells count " << Matrix_.GetColCount();
+ return false;
+ }
+
+ if ((size_t)columnTags.size() < TableInfo_->KeyColumnIds.size())
+ {
+ ErrCode = NKikimrTxDataShard::TError::SCHEME_ERROR;
+ ErrStr = TStringBuilder() << "Column count mismatch: got " << columnTags.size() << ", expected greater or equal than key column count " << TableInfo_->KeyColumnIds.size();
+ return false;
+ }
+
+ for (size_t i = 0; i < TableInfo_->KeyColumnIds.size(); ++i) {
+ if (RecordOperation().columnids(i) != TableInfo_->KeyColumnIds[i]) {
+ ErrCode = NKikimrTxDataShard::TError::SCHEME_ERROR;
+ ErrStr = TStringBuilder() << "Key column schema at position " << i;
+ return false;
+ }
+ }
+
+ for (ui32 columnTag : columnTags) {
+ auto* col = TableInfo_->Columns.FindPtr(columnTag);
+ if (!col) {
+ ErrCode = NKikimrTxDataShard::TError::SCHEME_ERROR;
+ ErrStr = TStringBuilder() << "Missing column with id " << columnTag;
+ return false;
+ }
+ }
+
+ TableId_ = TTableId(tableIdRecord.ownerid(), tableIdRecord.GetTableId(), tableIdRecord.GetSchemaVersion());
+ return true;
+}
+
+TVector<TEngineBay::TColumnWriteMeta> GetColumnWrites(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags) {
+ TVector<TEngineBay::TColumnWriteMeta> writeColumns;
+ writeColumns.reserve(columnTags.size());
+ for (ui32 columnTag : columnTags) {
+ TEngineBay::TColumnWriteMeta writeColumn;
+ writeColumn.Column = NTable::TColumn("", columnTag, {}, {});
+
+ writeColumns.push_back(std::move(writeColumn));
+ }
+
+ return writeColumns;
+}
+
+void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags, const NScheme::TTypeRegistry& typeRegistry, const TActorContext& ctx)
+{
+ for (ui32 rowIdx = 0; rowIdx < Matrix_.GetRowCount(); ++rowIdx)
+ {
+ //TODO zero copy necessary keys from TableInfo_->KeyColumnTypes
+ KeyCells_.clear();
+ for (ui16 colIdx = 0; colIdx < TableInfo_->KeyColumnIds.size(); ++colIdx)
+ KeyCells_.push_back(Matrix_.GetCells()[rowIdx * columnTags.size() + colIdx]);
+
+ TTableRange tableRange(KeyCells_);
+
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Table " << TableInfo_->Path << ", shard: " << TabletId_ << ", "
+ << "write point " << DebugPrintPoint(TableInfo_->KeyColumnTypes, KeyCells_, typeRegistry));
+
+ EngineBay.AddWriteRange(TableId_, tableRange, TableInfo_->KeyColumnTypes, GetColumnWrites(columnTags), false);
+ }
+}
+
+ui32 TValidatedWriteTx::ExtractKeys(bool allowErrors)
+{
+ using EResult = NMiniKQL::IEngineFlat::EResult;
+
+ EResult result = EngineBay.Validate();
+ if (allowErrors) {
+ if (result != EResult::Ok) {
+ ErrStr = EngineBay.GetEngine()->GetErrors();
+ ErrCode = ConvertErrCode(result);
+ return 0;
+ }
+ } else {
+ Y_ABORT_UNLESS(result == EResult::Ok, "Engine errors: %s", EngineBay.GetEngine()->GetErrors().data());
+ }
+ return KeysCount();
+}
+
+bool TValidatedWriteTx::ReValidateKeys()
+{
+ using EResult = NMiniKQL::IEngineFlat::EResult;
+
+
+ auto [result, error] = EngineBay.GetKqpComputeCtx().ValidateKeys(EngineBay.TxInfo());
+ if (result != EResult::Ok) {
+ ErrStr = std::move(error);
+ ErrCode = ConvertErrCode(result);
+ return false;
+ }
+
+ return true;
+}
+
+bool TValidatedWriteTx::CanCancel() {
+ return false;
+}
+
+bool TValidatedWriteTx::CheckCancelled() {
+ return false;
+}
+
+void TValidatedWriteTx::ReleaseTxData() {
+ EngineBay.DestroyEngine();
+ IsReleased = true;
+
+ NActors::NMemory::TLabel<MemoryLabelValidatedDataTx>::Sub(TxSize);
+ ComputeTxSize();
+ NActors::NMemory::TLabel<MemoryLabelValidatedDataTx>::Add(TxSize);
+}
+
+void TValidatedWriteTx::ComputeTxSize() {
+ TxSize = sizeof(TValidatedWriteTx);
+}
+
+TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr ev, TDataShard* self, TTransactionContext& txc, const TActorContext& ctx)
+ : TOperation(op)
+ , Ev_(ev)
+ , ArtifactFlags(0)
+ , TxCacheUsage(0)
+ , ReleasedTxDataSize(0)
+ , SchemeShardId(0)
+ , SubDomainPathId(0)
+{
+ SetTarget(Ev_->Sender);
+ SetCookie(Ev_->Cookie);
+ Orbit = std::move(Ev_->Get()->MoveOrbit());
+
+ BuildWriteTx(self, txc, ctx);
+
+ // First time parsing, so we can fail
+ Y_DEBUG_ABORT_UNLESS(WriteTx_->Ready());
+
+ TrackMemory();
+}
+
+TWriteOperation::~TWriteOperation()
+{
+ UntrackMemory();
+}
+
+void TWriteOperation::FillTxData(TValidatedWriteTx::TPtr writeTx)
+{
+ Y_ABORT_UNLESS(!WriteTx_);
+ Y_ABORT_UNLESS(!Ev_ || HasVolatilePrepareFlag());
+
+ Target = writeTx->Source();
+ WriteTx_ = writeTx;
+}
+
+void TWriteOperation::FillTxData(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TActorId& target, NEvents::TDataEvents::TEvWrite::TPtr&& ev, const TVector<TSysTables::TLocksTable::TLock>& locks, ui64 artifactFlags)
+{
+ UntrackMemory();
+
+ Y_ABORT_UNLESS(!WriteTx_);
+ Y_ABORT_UNLESS(!Ev_);
+
+ Target = target;
+ Ev_ = std::move(ev);
+ if (locks.size()) {
+ for (auto lock : locks)
+ LocksCache().Locks[lock.LockId] = lock;
+ }
+ ArtifactFlags = artifactFlags;
+ Y_ABORT_UNLESS(!WriteTx_);
+ BuildWriteTx(self, txc, ctx);
+ Y_ABORT_UNLESS(WriteTx_->Ready());
+
+ TrackMemory();
+}
+
+void TWriteOperation::FillVolatileTxData(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx)
+{
+ UntrackMemory();
+
+ Y_ABORT_UNLESS(!WriteTx_);
+ Y_ABORT_UNLESS(Ev_);
+
+ BuildWriteTx(self, txc, ctx);
+ Y_ABORT_UNLESS(WriteTx_->Ready());
+
+
+ TrackMemory();
+}
+
+TValidatedWriteTx::TPtr TWriteOperation::BuildWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx)
+{
+ if (!WriteTx_) {
+ Y_ABORT_UNLESS(Ev_);
+ WriteTx_ = std::make_shared<TValidatedWriteTx>(self, txc, ctx, GetStepOrder(), GetReceivedAt(), Ev_);
+ }
+ return WriteTx_;
+}
+
+void TWriteOperation::ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBase& provider, const TActorContext& ctx) {
+ ReleasedTxDataSize = provider.GetMemoryLimit() + provider.GetRequestedMemory();
+
+ if (!WriteTx_ || WriteTx_->IsTxDataReleased())
+ return;
+
+ WriteTx_->ReleaseTxData();
+ // Immediate transactions have no body stored.
+ if (!IsImmediate() && !HasVolatilePrepareFlag()) {
+ UntrackMemory();
+ Ev_.Reset();
+ TrackMemory();
+ }
+
+ //InReadSets.clear();
+ OutReadSets().clear();
+ LocksAccessLog().Locks.clear();
+ LocksCache().Locks.clear();
+ ArtifactFlags = 0;
+
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << GetTxId() << " released its data");
+}
+
+void TWriteOperation::DbStoreLocksAccessLog(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx)
+{
+ using Schema = TDataShard::Schema;
+
+ NIceDb::TNiceDb db(txc.DB);
+
+ using TLocksVector = TVector<TSysTables::TLocksTable::TPersistentLock>;
+ TLocksVector vec;
+ vec.reserve(LocksAccessLog().Locks.size());
+ for (auto& pr : LocksAccessLog().Locks)
+ vec.emplace_back(pr.second);
+
+ // Historically C++ column type was TVector<TLock>
+ const char* vecDataStart = reinterpret_cast<const char*>(vec.data());
+ size_t vecDataSize = vec.size() * sizeof(TLocksVector::value_type);
+ TStringBuf vecData(vecDataStart, vecDataSize);
+ db.Table<Schema::TxArtifacts>().Key(GetTxId()).Update(NIceDb::TUpdate<Schema::TxArtifacts::Locks>(vecData));
+
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing " << vec.size() << " locks for txid=" << GetTxId() << " in " << self->TabletID());
+}
+
+void TWriteOperation::DbStoreArtifactFlags(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx)
+{
+ using Schema = TDataShard::Schema;
+
+ NIceDb::TNiceDb db(txc.DB);
+ db.Table<Schema::TxArtifacts>().Key(GetTxId()).Update<Schema::TxArtifacts::Flags>(ArtifactFlags);
+
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing artifactflags=" << ArtifactFlags << " for txid=" << GetTxId() << " in " << self->TabletID());
+}
+
+ui64 TWriteOperation::GetMemoryConsumption() const {
+ ui64 res = 0;
+ if (WriteTx_) {
+ res += WriteTx_->GetTxSize() + WriteTx_->GetMemoryAllocated();
+ }
+ if (Ev_) {
+ res += sizeof(NEvents::TDataEvents::TEvWrite);
+ }
+
+ return res;
+}
+
+ERestoreDataStatus TWriteOperation::RestoreTxData(
+ TDataShard* self,
+ TTransactionContext& txc,
+ const TActorContext& ctx
+)
+{
+ // TODO
+ Y_UNUSED(self);
+ Y_UNUSED(txc);
+ Y_UNUSED(ctx);
+ Y_ABORT();
+ /*
+ if (!WriteTx_) {
+ ReleasedTxDataSize = 0;
+ return ERestoreDataStatus::Ok;
+ }
+
+ UntrackMemory();
+
+ // For immediate transactions we should restore just
+ // from the Ev_. For planned transaction we should
+ // restore from local database.
+
+ TVector<TSysTables::TLocksTable::TLock> locks;
+ if (!IsImmediate() && !HasVolatilePrepareFlag()) {
+ NIceDb::TNiceDb db(txc.DB);ExtractKeys
+ bool ok = self->TransQueue.LoadTxDetails(db, GetTxId(), Target, Ev_, locks, ArtifactFlags);
+ if (!ok) {
+ Ev_.Reset();
+ ArtifactFlags = 0;
+ return ERestoreDataStatus::Restart;
+ }
+ } else {
+ Y_ABORT_UNLESS(Ev_);
+ }
+
+ TrackMemory();
+
+ for (auto& lock : locks)
+ LocksCache().Locks[lock.LockId] = lock;
+
+ bool extractKeys = WriteTx_->IsTxInfoLoaded();
+ WriteTx_ = std::make_shared<TValidatedWriteTx>(self, txc, ctx, GetStepOrder(), GetReceivedAt(), Ev_);
+ if (WriteTx_->Ready() && extractKeys) {
+ WriteTx_->ExtractKeys(true);
+ }
+
+ if (!WriteTx_->Ready()) {
+ return ERestoreDataStatus::Error;
+ }
+
+ ReleasedTxDataSize = 0;
+ */
+
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << GetTxId() << " at " << self->TabletID() << " restored its data");
+
+ return ERestoreDataStatus::Ok;
+}
+
+void TWriteOperation::FinalizeWriteTxPlan()
+{
+ Y_ABORT_UNLESS(IsDataTx());
+ Y_ABORT_UNLESS(!IsImmediate());
+ Y_ABORT_UNLESS(!IsKqpScanTransaction());
+
+ TVector<EExecutionUnitKind> plan;
+
+ plan.push_back(EExecutionUnitKind::BuildAndWaitDependencies);
+ if (IsKqpDataTransaction()) {
+ plan.push_back(EExecutionUnitKind::BuildKqpDataTxOutRS);
+ plan.push_back(EExecutionUnitKind::StoreAndSendOutRS);
+ plan.push_back(EExecutionUnitKind::PrepareKqpDataTxInRS);
+ plan.push_back(EExecutionUnitKind::LoadAndWaitInRS);
+ plan.push_back(EExecutionUnitKind::ExecuteKqpDataTx);
+ } else {
+ plan.push_back(EExecutionUnitKind::BuildDataTxOutRS);
+ plan.push_back(EExecutionUnitKind::StoreAndSendOutRS);
+ plan.push_back(EExecutionUnitKind::PrepareDataTxInRS);
+ plan.push_back(EExecutionUnitKind::LoadAndWaitInRS);
+ plan.push_back(EExecutionUnitKind::ExecuteDataTx);
+ }
+ plan.push_back(EExecutionUnitKind::CompleteOperation);
+ plan.push_back(EExecutionUnitKind::CompletedOperations);
+
+ RewriteExecutionPlan(plan);
+}
+
+class TFinalizeWriteTxPlanUnit: public TExecutionUnit {
+public:
+ TFinalizeWriteTxPlanUnit(TDataShard& dataShard, TPipeline& pipeline)
+ : TExecutionUnit(EExecutionUnitKind::FinalizeWriteTxPlan, false, dataShard, pipeline)
+ {
+ }
+
+ bool IsReadyToExecute(TOperation::TPtr) const override {
+ return true;
+ }
+
+ EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override {
+ Y_UNUSED(txc);
+ Y_UNUSED(ctx);
+
+ TWriteOperation* tx = dynamic_cast<TWriteOperation*>(op.Get());
+ Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
+
+ tx->FinalizeWriteTxPlan();
+
+ return EExecutionStatus::Executed;
+ }
+
+ void Complete(TOperation::TPtr op, const TActorContext& ctx) override {
+ Y_UNUSED(op);
+ Y_UNUSED(ctx);
+ }
+};
+
+THolder<TExecutionUnit> CreateFinalizeWriteTxPlanUnit(TDataShard& dataShard, TPipeline& pipeline) {
+ return THolder(new TFinalizeWriteTxPlanUnit(dataShard, pipeline));
+}
+
+void TWriteOperation::TrackMemory() const {
+ // TODO More accurate calc memory
+ NActors::NMemory::TLabel<MemoryLabelActiveTransactionBody>::Add(Record().SpaceUsed());
+}
+
+void TWriteOperation::UntrackMemory() const {
+ NActors::NMemory::TLabel<MemoryLabelActiveTransactionBody>::Sub(Record().SpaceUsed());
+}
+
+void TWriteOperation::SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg) {
+ SetAbortedFlag();
+ WriteResult_ = NEvents::TDataEvents::TEvWriteResult::BuildError(WriteTx_->TabletId(), GetTxId(), status, errorMsg);
+}
+
+void TWriteOperation::SetWriteResult(std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>&& writeResult) {
+ WriteResult_ = std::move(writeResult);
+}
+
+void TWriteOperation::BuildExecutionPlan(bool loaded)
+{
+ Y_ABORT_UNLESS(GetExecutionPlan().empty());
+ Y_ABORT_UNLESS(!loaded);
+
+ TVector<EExecutionUnitKind> plan;
+
+ //if (IsImmediate())
+ {
+ Y_ABORT_UNLESS(!loaded);
+ plan.push_back(EExecutionUnitKind::CheckWrite);
+ plan.push_back(EExecutionUnitKind::BuildAndWaitDependencies);
+ plan.push_back(EExecutionUnitKind::ExecuteWrite);
+ plan.push_back(EExecutionUnitKind::FinishPropose);
+ plan.push_back(EExecutionUnitKind::CompletedOperations);
+ }
+ /*
+ else if (HasVolatilePrepareFlag()) {
+ plan.push_back(EExecutionUnitKind::StoreDataTx); // note: stores in memory
+ plan.push_back(EExecutionUnitKind::FinishPropose);
+ Y_ABORT_UNLESS(!GetStep());
+ plan.push_back(EExecutionUnitKind::WaitForPlan);
+ plan.push_back(EExecutionUnitKind::PlanQueue);
+ plan.push_back(EExecutionUnitKind::LoadTxDetails); // note: reloads from memory
+ plan.push_back(EExecutionUnitKind::BuildAndWaitDependencies);
+ plan.push_back(EExecutionUnitKind::ExecuteWrite);
+ plan.push_back(EExecutionUnitKind::CompleteOperation);
+ plan.push_back(EExecutionUnitKind::CompletedOperations);
+ } else {
+ if (!loaded) {
+ plan.push_back(EExecutionUnitKind::CheckWrite);
+ plan.push_back(EExecutionUnitKind::StoreDataTx);
+ plan.push_back(EExecutionUnitKind::FinishPropose);
+ }
+ if (!GetStep())
+ plan.push_back(EExecutionUnitKind::WaitForPlan);
+ plan.push_back(EExecutionUnitKind::PlanQueue);
+ plan.push_back(EExecutionUnitKind::LoadTxDetails);
+ plan.push_back(EExecutionUnitKind::FinalizeWriteTxPlan);
+ } */
+ RewriteExecutionPlan(plan);
+}
+
+} // NDataShard
+} // NKikimr
+
+Y_DECLARE_OUT_SPEC(, NKikimr::NDataShard::TWriteOperation, stream, tx) {
+ stream << '[' << tx.GetStep() << ':' << tx.GetTxId() << ']';
+}
diff --git a/ydb/core/tx/datashard/datashard_write_operation.h b/ydb/core/tx/datashard/datashard_write_operation.h
new file mode 100644
index 00000000000..2d6a107c29b
--- /dev/null
+++ b/ydb/core/tx/datashard/datashard_write_operation.h
@@ -0,0 +1,440 @@
+#pragma once
+
+#include "datashard_impl.h"
+#include "datashard_locks.h"
+#include "datashard__engine_host.h"
+#include "operation.h"
+
+#include <ydb/core/tx/tx_processing.h>
+#include <ydb/core/tablet_flat/flat_cxx_database.h>
+
+#include <ydb/library/yql/public/issue/yql_issue.h>
+
+namespace NKikimr {
+namespace NDataShard {
+
+
+class TValidatedWriteTx: TNonCopyable {
+public:
+ using TPtr = std::shared_ptr<TValidatedWriteTx>;
+
+ TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TStepOrder& stepTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite::TPtr& ev);
+
+ ~TValidatedWriteTx();
+
+ static constexpr ui64 MaxReorderTxKeys() {
+ return 100;
+ }
+
+ NKikimrTxDataShard::TError::EKind Code() const {
+ return ErrCode;
+ }
+ const TString GetError() const {
+ return ErrStr;
+ }
+
+ TStepOrder StepTxId() const {
+ return StepTxId_;
+ }
+ ui64 TxId() const {
+ return StepTxId_.TxId;
+ }
+ ui64 TabletId() const {
+ return TabletId_;
+ }
+ const NEvents::TDataEvents::TEvWrite::TPtr& Ev() const {
+ return Ev_;
+ }
+
+ const NKikimrDataEvents::TEvWrite& Record() const {
+ return Ev_->Get()->Record;
+ }
+
+ const NKikimrDataEvents::TEvWrite::TOperation& RecordOperation() const {
+ //TODO Only one operation is supported now
+ return Record().operations(0);
+ }
+
+ const TTableId& TableId() const {
+ return TableId_;
+ }
+
+ const TSerializedCellMatrix Matrix() const {
+ return Matrix_;
+ }
+
+ const TVector<TCell> KeyCells() const {
+ return KeyCells_;
+ }
+
+ ui64 LockTxId() const {
+ return Record().locktxid();
+ }
+ ui32 LockNodeId() const {
+ return Record().locknodeid();
+ }
+ bool Immediate() const {
+ return Record().txmode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE;
+ }
+ bool NeedDiagnostics() const {
+ return true;
+ }
+ bool CollectStats() const {
+ return true;
+ }
+ TInstant ReceivedAt() const {
+ return ReceivedAt_;
+ }
+ TInstant Deadline() const {
+ return Deadline_;
+ }
+
+ bool Ready() const {
+ return ErrCode == NKikimrTxDataShard::TError::OK;
+ }
+ bool RequirePrepare() const {
+ return ErrCode == NKikimrTxDataShard::TError::SNAPSHOT_NOT_READY_YET;
+ }
+ bool RequireWrites() const {
+ return TxInfo().HasWrites() || !Immediate();
+ }
+ bool HasWrites() const {
+ return TxInfo().HasWrites();
+ }
+ bool HasLockedWrites() const {
+ return HasWrites() && LockTxId();
+ }
+ bool HasDynamicWrites() const {
+ return TxInfo().DynKeysCount != 0;
+ }
+
+ // TODO: It's an expensive operation (Precharge() inside). We need avoid it.
+ TEngineBay::TSizes CalcReadSizes(bool needsTotalKeysSize) const {
+ return EngineBay.CalcSizes(needsTotalKeysSize);
+ }
+
+ ui64 GetMemoryAllocated() const {
+ return EngineBay.GetEngine() ? EngineBay.GetEngine()->GetMemoryAllocated() : 0;
+ }
+
+ NMiniKQL::IEngineFlat* GetEngine() {
+ return EngineBay.GetEngine();
+ }
+ void DestroyEngine() {
+ EngineBay.DestroyEngine();
+ }
+ const NMiniKQL::TEngineHostCounters& GetCounters() {
+ return EngineBay.GetCounters();
+ }
+ void ResetCounters() {
+ EngineBay.ResetCounters();
+ }
+
+ bool CanCancel();
+ bool CheckCancelled();
+
+ void SetWriteVersion(TRowVersion writeVersion) {
+ EngineBay.SetWriteVersion(writeVersion);
+ }
+ void SetReadVersion(TRowVersion readVersion) {
+ EngineBay.SetReadVersion(readVersion);
+ }
+ void SetVolatileTxId(ui64 txId) {
+ EngineBay.SetVolatileTxId(txId);
+ }
+
+ void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion) {
+ EngineBay.CommitChanges(tableId, lockId, writeVersion);
+ }
+
+ TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const {
+ return EngineBay.GetCollectedChanges();
+ }
+ void ResetCollectedChanges() {
+ EngineBay.ResetCollectedChanges();
+ }
+
+ TVector<ui64> GetVolatileCommitTxIds() const {
+ return EngineBay.GetVolatileCommitTxIds();
+ }
+ const absl::flat_hash_set<ui64>& GetVolatileDependencies() const {
+ return EngineBay.GetVolatileDependencies();
+ }
+ std::optional<ui64> GetVolatileChangeGroup() const {
+ return EngineBay.GetVolatileChangeGroup();
+ }
+ bool GetVolatileCommitOrdered() const {
+ return EngineBay.GetVolatileCommitOrdered();
+ }
+
+ TActorId Source() const {
+ return Source_;
+ }
+ void SetSource(const TActorId& actorId) {
+ Source_ = actorId;
+ }
+ void SetStep(ui64 step) {
+ StepTxId_.Step = step;
+ }
+ bool IsProposed() const {
+ return Source_ != TActorId();
+ }
+
+ inline const ::NKikimrDataEvents::TKqpLocks& GetKqpLocks() const {
+ return Record().locks();
+ }
+
+ bool ParseRecord(TDataShard* self);
+ void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds, const NScheme::TTypeRegistry& typeRegistry, const TActorContext& ctx);
+
+ ui32 ExtractKeys(bool allowErrors);
+ bool ReValidateKeys();
+
+ ui64 GetTxSize() const {
+ return TxSize;
+ }
+ ui32 KeysCount() const {
+ return TxInfo().WritesCount;
+ }
+
+ void SetTxCacheUsage(ui64 val) {
+ TxCacheUsage = val;
+ }
+ ui64 GetTxCacheUsage() const {
+ return TxCacheUsage;
+ }
+
+ void ReleaseTxData();
+ bool IsTxDataReleased() const {
+ return IsReleased;
+ }
+
+ bool IsTxInfoLoaded() const {
+ return TxInfo().Loaded;
+ }
+
+ bool HasOutReadsets() const {
+ return TxInfo().HasOutReadsets;
+ }
+ bool HasInReadsets() const {
+ return TxInfo().HasInReadsets;
+ }
+
+ const NMiniKQL::IEngineFlat::TValidationInfo& TxInfo() const {
+ return EngineBay.TxInfo();
+ }
+
+private:
+ //TODO: YDB_READONLY
+ TStepOrder StepTxId_;
+ ui64 TabletId_;
+ TTableId TableId_;
+ const TUserTable* TableInfo_;
+ const NEvents::TDataEvents::TEvWrite::TPtr& Ev_;
+ TSerializedCellMatrix Matrix_;
+ TVector<TCell> KeyCells_;
+ TActorId Source_;
+ TEngineBay EngineBay;
+ NKikimrTxDataShard::TError::EKind ErrCode;
+ TString ErrStr;
+ ui64 TxSize;
+ ui64 TxCacheUsage;
+ bool IsReleased;
+ const TInstant ReceivedAt_; // For local timeout tracking
+ TInstant Deadline_;
+
+ void ComputeTxSize();
+};
+
+class TWriteOperation : public TOperation {
+ friend class TWriteUnit;
+public:
+ explicit TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr ev, TDataShard* self, TTransactionContext& txc, const TActorContext& ctx);
+
+ ~TWriteOperation();
+
+ void FillTxData(TValidatedWriteTx::TPtr dataTx);
+ void FillTxData(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TActorId& target, NEvents::TDataEvents::TEvWrite::TPtr&& ev, const TVector<TSysTables::TLocksTable::TLock>& locks, ui64 artifactFlags);
+ void FillVolatileTxData(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx);
+
+ const NEvents::TDataEvents::TEvWrite::TPtr& GetEv() const {
+ return Ev_;
+ }
+ void SetEv(const NEvents::TDataEvents::TEvWrite::TPtr& ev) {
+ UntrackMemory();
+ Ev_ = ev;
+ TrackMemory();
+ }
+ void ClearEv() {
+ UntrackMemory();
+ Ev_.Reset();
+ TrackMemory();
+ }
+
+ ui64 GetSchemeShardId() const {
+ return SchemeShardId;
+ }
+ void SetSchemeShardId(ui64 id) {
+ SchemeShardId = id;
+ }
+ ui64 GetSubDomainPathId() const {
+ return SubDomainPathId;
+ }
+ void SetSubDomainPathId(ui64 pathId) {
+ SubDomainPathId = pathId;
+ }
+
+ const NKikimrSubDomains::TProcessingParams& GetProcessingParams() const {
+ return ProcessingParams;
+ }
+ void SetProcessingParams(const NKikimrSubDomains::TProcessingParams& params)
+ {
+ ProcessingParams.CopyFrom(params);
+ }
+
+ void Deactivate() override {
+ ClearEv();
+
+ TOperation::Deactivate();
+ }
+
+ ui32 ExtractKeys() {
+ return WriteTx_ ? WriteTx_->ExtractKeys(false) : 0;
+ }
+
+ bool ReValidateKeys() {
+ return WriteTx_ ? WriteTx_->ReValidateKeys() : true;
+ }
+
+ void MarkAsUsingSnapshot() {
+ SetUsingSnapshotFlag();
+ }
+
+ void SetTxCacheUsage(ui64 val) {
+ TxCacheUsage = val;
+ }
+ ui64 GetTxCacheUsage() const {
+ return TxCacheUsage;
+ }
+
+ ui64 GetReleasedTxDataSize() const {
+ return ReleasedTxDataSize;
+ }
+ bool IsTxDataReleased() const {
+ return ReleasedTxDataSize > 0;
+ }
+
+ enum EArtifactFlags {
+ OUT_RS_STORED = (1 << 0),
+ LOCKS_STORED = (1 << 1),
+ };
+ void MarkOutRSStored() {
+ ArtifactFlags |= OUT_RS_STORED;
+ }
+
+ bool IsOutRSStored() {
+ return ArtifactFlags & OUT_RS_STORED;
+ }
+
+ void MarkLocksStored() {
+ ArtifactFlags |= LOCKS_STORED;
+ }
+
+ bool IsLocksStored() {
+ return ArtifactFlags & LOCKS_STORED;
+ }
+
+ void DbStoreLocksAccessLog(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx);
+ void DbStoreArtifactFlags(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx);
+
+ ui64 GetMemoryConsumption() const;
+
+ ui64 GetRequiredMemory() const {
+ Y_ABORT_UNLESS(!GetTxCacheUsage() || !IsTxDataReleased());
+ ui64 requiredMem = GetTxCacheUsage() + GetReleasedTxDataSize();
+ if (!requiredMem)
+ requiredMem = GetMemoryConsumption();
+ return requiredMem;
+ }
+
+ void ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBase& provider, const TActorContext& ctx);
+ ERestoreDataStatus RestoreTxData(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx);
+ void FinalizeWriteTxPlan();
+
+ // TOperation iface.
+ void BuildExecutionPlan(bool loaded) override;
+
+ bool HasKeysInfo() const override {
+ return WriteTx_ ? WriteTx_->TxInfo().Loaded : false;
+ }
+
+ const NMiniKQL::IEngineFlat::TValidationInfo& GetKeysInfo() const override {
+ if (WriteTx_) {
+ Y_ABORT_UNLESS(WriteTx_->TxInfo().Loaded);
+ return WriteTx_->TxInfo();
+ }
+ // For scheme tx global reader and writer flags should
+ // result in all required dependencies.
+ return TOperation::GetKeysInfo();
+ }
+
+ ui64 LockTxId() const override {
+ return WriteTx_ ? WriteTx_->LockTxId() : 0;
+ }
+
+ ui32 LockNodeId() const override {
+ return WriteTx_ ? WriteTx_->LockNodeId() : 0;
+ }
+
+ bool HasLockedWrites() const override {
+ return WriteTx_ ? WriteTx_->HasLockedWrites() : false;
+ }
+
+ ui64 IncrementPageFaultCount() {
+ return ++PageFaultCount;
+ }
+
+ const TValidatedWriteTx::TPtr& WriteTx() const {
+ return WriteTx_;
+ }
+ TValidatedWriteTx::TPtr BuildWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx);
+
+ void ClearWriteTx() {
+ WriteTx_ = nullptr;
+ }
+
+ const NKikimrDataEvents::TEvWrite& Record() const {
+ return Ev_->Get()->Record;
+ }
+
+ const std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>& WriteResult() const {
+ return WriteResult_;
+ }
+ std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>&& WriteResult() {
+ return std::move(WriteResult_);
+ }
+
+ void SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg);
+ void SetWriteResult(std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>&& writeResult);
+
+private:
+ void TrackMemory() const;
+ void UntrackMemory() const;
+
+private:
+ NEvents::TDataEvents::TEvWrite::TPtr Ev_;
+ TValidatedWriteTx::TPtr WriteTx_;
+ std::unique_ptr<NEvents::TDataEvents::TEvWriteResult> WriteResult_;
+
+ // TODO: move to persistent part of operation's flags
+ ui64 ArtifactFlags;
+ ui64 TxCacheUsage;
+ ui64 ReleasedTxDataSize;
+ ui64 SchemeShardId;
+ ui64 SubDomainPathId;
+ NKikimrSubDomains::TProcessingParams ProcessingParams;
+ ui64 PageFaultCount = 0;
+};
+
+} // NDataShard
+} // NKikimr
diff --git a/ydb/core/tx/datashard/execution_unit.cpp b/ydb/core/tx/datashard/execution_unit.cpp
index 404bfd25ea2..51ce8cf8ecc 100644
--- a/ydb/core/tx/datashard/execution_unit.cpp
+++ b/ydb/core/tx/datashard/execution_unit.cpp
@@ -12,6 +12,8 @@ THolder<TExecutionUnit> CreateExecutionUnit(EExecutionUnitKind kind,
switch (kind) {
case EExecutionUnitKind::CheckDataTx:
return CreateCheckDataTxUnit(dataShard, pipeline);
+ case EExecutionUnitKind::CheckWrite:
+ return CreateCheckWriteUnit(dataShard, pipeline);
case EExecutionUnitKind::CheckSchemeTx:
return CreateCheckSchemeTxUnit(dataShard, pipeline);
case EExecutionUnitKind::CheckSnapshotTx:
@@ -44,6 +46,8 @@ THolder<TExecutionUnit> CreateExecutionUnit(EExecutionUnitKind kind,
return CreateLoadTxDetailsUnit(dataShard, pipeline);
case EExecutionUnitKind::FinalizeDataTxPlan:
return CreateFinalizeDataTxPlanUnit(dataShard, pipeline);
+ case EExecutionUnitKind::FinalizeWriteTxPlan:
+ return CreateFinalizeWriteTxPlanUnit(dataShard, pipeline);
case EExecutionUnitKind::ProtectSchemeEchoes:
return CreateProtectSchemeEchoesUnit(dataShard, pipeline);
case EExecutionUnitKind::BuildDataTxOutRS:
@@ -132,6 +136,8 @@ THolder<TExecutionUnit> CreateExecutionUnit(EExecutionUnitKind kind,
return CreateCheckReadUnit(dataShard, pipeline);
case EExecutionUnitKind::ExecuteRead:
return CreateReadUnit(dataShard, pipeline);
+ case EExecutionUnitKind::ExecuteWrite:
+ return CreateWriteUnit(dataShard, pipeline);
default:
Y_FAIL_S("Unexpected execution kind " << kind << " (" << (ui32)kind << ")");
}
diff --git a/ydb/core/tx/datashard/execution_unit_ctors.h b/ydb/core/tx/datashard/execution_unit_ctors.h
index 2585a2d9b8e..1a30d6f4dc9 100644
--- a/ydb/core/tx/datashard/execution_unit_ctors.h
+++ b/ydb/core/tx/datashard/execution_unit_ctors.h
@@ -6,7 +6,8 @@ namespace NKikimr {
namespace NDataShard {
THolder<TExecutionUnit> CreateCheckDataTxUnit(TDataShard &dataShard, TPipeline &pipeline);
-THolder<TExecutionUnit> CreateCheckSchemeTxUnit(TDataShard &dataShard, TPipeline &pipeline);
+THolder<TExecutionUnit> CreateCheckWriteUnit(TDataShard& dataShard, TPipeline& pipeline);
+THolder<TExecutionUnit> CreateCheckSchemeTxUnit(TDataShard& dataShard, TPipeline& pipeline);
THolder<TExecutionUnit> CreateCheckSnapshotTxUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateCheckDistributedEraseTxUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateCheckCommitWritesTxUnit(TDataShard &dataShard, TPipeline &pipeline);
@@ -22,7 +23,8 @@ THolder<TExecutionUnit> CreateWaitForPlanUnit(TDataShard &dataShard, TPipeline &
THolder<TExecutionUnit> CreatePlanQueueUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateLoadTxDetailsUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateFinalizeDataTxPlanUnit(TDataShard &dataShard, TPipeline &pipeline);
-THolder<TExecutionUnit> CreateProtectSchemeEchoesUnit(TDataShard &dataShard, TPipeline &pipeline);
+THolder<TExecutionUnit> CreateFinalizeWriteTxPlanUnit(TDataShard& dataShard, TPipeline& pipeline);
+THolder<TExecutionUnit> CreateProtectSchemeEchoesUnit(TDataShard& dataShard, TPipeline& pipeline);
THolder<TExecutionUnit> CreateBuildDataTxOutRSUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateBuildDistributedEraseTxOutRSUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateBuildKqpDataTxOutRSUnit(TDataShard &dataShard, TPipeline &pipeline);
@@ -66,6 +68,7 @@ THolder<TExecutionUnit> CreateAlterCdcStreamUnit(TDataShard &dataShard, TPipelin
THolder<TExecutionUnit> CreateDropCdcStreamUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateCheckReadUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateReadUnit(TDataShard &dataShard, TPipeline &pipeline);
+THolder<TExecutionUnit> CreateWriteUnit(TDataShard& dataShard, TPipeline& pipeline);
} // namespace NDataShard
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execution_unit_kind.h b/ydb/core/tx/datashard/execution_unit_kind.h
index a96aaaaf214..a62a8718aa1 100644
--- a/ydb/core/tx/datashard/execution_unit_kind.h
+++ b/ydb/core/tx/datashard/execution_unit_kind.h
@@ -4,13 +4,14 @@
namespace NKikimr {
namespace NDataShard {
-enum class EExecutionUnitKind : ui32 {
+enum class EExecutionUnitKind: ui32 {
CheckDataTx,
CheckSchemeTx,
CheckSnapshotTx,
CheckDistributedEraseTx,
CheckCommitWritesTx,
CheckRead,
+ CheckWrite,
StoreDataTx,
StoreSchemeTx,
StoreSnapshotTx,
@@ -23,6 +24,7 @@ enum class EExecutionUnitKind : ui32 {
PlanQueue,
LoadTxDetails,
FinalizeDataTxPlan,
+ FinalizeWriteTxPlan,
ProtectSchemeEchoes,
BuildDataTxOutRS,
BuildKqpDataTxOutRS,
@@ -37,6 +39,7 @@ enum class EExecutionUnitKind : ui32 {
ExecuteDistributedEraseTx,
ExecuteCommitWritesTx,
ExecuteRead,
+ ExecuteWrite,
CompleteOperation,
ExecuteKqpScanTx,
MakeScanSnapshot,
diff --git a/ydb/core/tx/datashard/probes.h b/ydb/core/tx/datashard/probes.h
index b7d96b79bcb..7640df6f07c 100644
--- a/ydb/core/tx/datashard/probes.h
+++ b/ydb/core/tx/datashard/probes.h
@@ -75,6 +75,19 @@
GROUPS("DataShard"), \
TYPES(), \
NAMES()) \
+ PROBE(WriteRequest, \
+ GROUPS("DataShard", "LWTrackStart"), \
+ TYPES(), \
+ NAMES()) \
+ PROBE(WriteExecute, \
+ GROUPS("DataShard"), \
+ TYPES(), \
+ NAMES()) \
+ PROBE(WriteResult, \
+ GROUPS("DataShard"), \
+ TYPES(), \
+ NAMES()) \
+
// DATASHARD_PROVIDER
LWTRACE_DECLARE_PROVIDER(DATASHARD_PROVIDER)
diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
index 00e0125d9c4..82648b94f87 100644
--- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
@@ -8,6 +8,8 @@
#include <ydb/core/tablet_flat/shared_cache_events.h>
#include <ydb/core/testlib/basics/appdata.h>
#include <ydb/core/tx/balance_coverage/balance_coverage_builder.h>
+#include <ydb/core/tx/data_events/events.h>
+#include <ydb/core/tx/data_events/payload_helper.h>
#include <ydb/core/tx/tx_allocator/txallocator.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/tx_proxy/upload_rows.h>
@@ -1105,7 +1107,7 @@ static ui64 RunSchemeTx(
return ev->Get()->Record.GetTxId();
}
-void CreateShardedTable(
+std::tuple<TVector<ui64>, ui64> CreateShardedTable(
Tests::TServer::TPtr server,
TActorId sender,
const TString &root,
@@ -1223,9 +1225,15 @@ void CreateShardedTable(
}
WaitTxNotification(server, sender, RunSchemeTx(*server->GetRuntime(), std::move(request), sender));
+
+ TString path = TStringBuilder() << root << "/" << name;
+ const auto& shards = GetTableShards(server, sender, path);
+ const ui64 tableId = ResolveTableId(server, sender, path).PathId.LocalPathId;
+
+ return {shards, tableId};
}
-void CreateShardedTable(
+std::tuple<TVector<ui64>, ui64> CreateShardedTable(
Tests::TServer::TPtr server,
TActorId sender,
const TString &root,
@@ -1240,7 +1248,7 @@ void CreateShardedTable(
.EnableOutOfOrder(enableOutOfOrder)
.Policy(policy)
.ShadowData(shadowData);
- CreateShardedTable(server, sender, root, name, opts);
+ return CreateShardedTable(server, sender, root, name, opts);
}
ui64 AsyncCreateCopyTable(
@@ -1803,6 +1811,70 @@ void ExecSQL(Tests::TServer::TPtr server,
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetRef().GetYdbStatus(), code);
}
+std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount) {
+ std::vector<ui32> columnIds(columns.size());
+ std::iota(columnIds.begin(), columnIds.end(), 1);
+
+ TVector<TString> stringValues;
+ TVector<TCell> cells;
+ for (ui32 row = 0; row < rowCount; ++row) {
+ for (ui32 col = 0; col < columns.size(); ++col) {
+ const TString& columnType = columns[col].Type;
+ ui64 value = row * columns.size() + col;
+ if (columnType == "Uint64") {
+ cells.emplace_back(TCell((const char*)&value, sizeof(ui64)));
+ } else if (columnType == "Uint32") {
+ ui32 value32 = (ui32)value;
+ cells.emplace_back(TCell((const char*)&value32, sizeof(ui32)));
+ } else if (columnType == "Utf8") {
+ stringValues.emplace_back(Sprintf("String_%" PRIu64, value));
+ cells.emplace_back(TCell(stringValues.back().c_str(), stringValues.back().size()));
+ } else {
+ Y_ABORT("Unsupported column type");
+ }
+ }
+ }
+
+ TSerializedCellMatrix matrix(cells, rowCount, columns.size());
+ TString blobData = matrix.GetBuffer();
+
+ UNIT_ASSERT(blobData.size() < 8_MB);
+
+ auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
+ ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
+ evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
+
+ return evWrite;
+}
+
+NKikimrDataEvents::TEvWriteResult Write(Tests::TServer::TPtr server, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode)
+{
+ auto& runtime = *server->GetRuntime();
+ auto request = MakeWriteRequest(txId, txMode, tableId, columns, rowCount);
+ runtime.SendToPipe(shardId, sender, request.release(), 0, GetPipeConfigWithRetries());
+
+ auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender);
+ auto status = ev->Get()->Record.GetStatus();
+
+ NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus;
+ switch (txMode) {
+ case NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE:
+ expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED;
+ break;
+ case NKikimrDataEvents::TEvWrite::MODE_PREPARE:
+ case NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE:
+ expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED;
+ break;
+ default:
+ expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED;
+ UNIT_ASSERT_C(false, "Unexpected txMode: " << txMode);
+ break;
+ }
+ UNIT_ASSERT_C(status == expectedStatus, "Status: " << ev->Get()->Record.GetStatus() << " Issues: " << ev->Get()->Record.GetIssues());
+
+ return ev->Get()->Record;
+}
+
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values)
{
auto txTypes = std::make_shared<NTxProxy::TUploadTypes>();
@@ -1832,8 +1904,8 @@ void WaitTabletBecomesOffline(TServer::TPtr server, ui64 tabletId)
{
IsShardStateChange(ui64 tabletId)
: TabletId(tabletId)
- {
- }
+ {
+ }
bool operator()(IEventHandle& ev)
{
diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
index b75914ade22..64a7ef814ea 100644
--- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
@@ -500,13 +500,15 @@ struct TShardedTableOptions {
template<bool OPT1, bool OPT2> \
void N(NUnitTest::TTestContext&)
-void CreateShardedTable(Tests::TServer::TPtr server,
+// Create table, returns shards & tableId
+std::tuple<TVector<ui64>, ui64> CreateShardedTable(Tests::TServer::TPtr server,
TActorId sender,
const TString &root,
const TString &name,
const TShardedTableOptions &opts = TShardedTableOptions());
-void CreateShardedTable(Tests::TServer::TPtr server,
+// Create table, returns shards & tableId
+std::tuple<TVector<ui64>, ui64> CreateShardedTable(Tests::TServer::TPtr server,
TActorId sender,
const TString &root,
const TString &name,
@@ -708,6 +710,8 @@ void ExecSQL(Tests::TServer::TPtr server,
bool dml = true,
Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS);
+NKikimrDataEvents::TEvWriteResult Write(Tests::TServer::TPtr server, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode);
+
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values);
struct IsTxResultComplete {
diff --git a/ydb/core/tx/datashard/ut_write/CMakeLists.darwin-arm64.txt b/ydb/core/tx/datashard/ut_write/CMakeLists.darwin-arm64.txt
new file mode 100644
index 00000000000..ac06a0c7e13
--- /dev/null
+++ b/ydb/core/tx/datashard/ut_write/CMakeLists.darwin-arm64.txt
@@ -0,0 +1,86 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-tx-datashard-ut_write)
+target_compile_options(ydb-core-tx-datashard-ut_write PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-tx-datashard-ut_write PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard
+)
+target_link_libraries(ydb-core-tx-datashard-ut_write PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-testing-unittest_main
+ core-tx-datashard
+ tx-datashard-ut_common
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ kqp-ut-common
+ core-testlib-default
+ ydb-core-tx
+ udf-service-exception_policy
+ public-lib-yson_value
+ cpp-client-ydb_result
+)
+target_link_options(ydb-core-tx-datashard-ut_write PRIVATE
+ -Wl,-platform_version,macos,11.0,11.0
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-core-tx-datashard-ut_write PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_write.cpp
+)
+set_property(
+ TARGET
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ SPLIT_FACTOR
+ 5
+)
+add_yunittest(
+ NAME
+ ydb-core-tx-datashard-ut_write
+ TEST_TARGET
+ ydb-core-tx-datashard-ut_write
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ TIMEOUT
+ 600
+)
+target_allocator(ydb-core-tx-datashard-ut_write
+ system_allocator
+)
+vcs_info(ydb-core-tx-datashard-ut_write)
diff --git a/ydb/core/tx/datashard/ut_write/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/ut_write/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..c4ef0696942
--- /dev/null
+++ b/ydb/core/tx/datashard/ut_write/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,87 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-tx-datashard-ut_write)
+target_compile_options(ydb-core-tx-datashard-ut_write PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-tx-datashard-ut_write PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard
+)
+target_link_libraries(ydb-core-tx-datashard-ut_write PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ core-tx-datashard
+ tx-datashard-ut_common
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ kqp-ut-common
+ core-testlib-default
+ ydb-core-tx
+ udf-service-exception_policy
+ public-lib-yson_value
+ cpp-client-ydb_result
+)
+target_link_options(ydb-core-tx-datashard-ut_write PRIVATE
+ -Wl,-platform_version,macos,11.0,11.0
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-core-tx-datashard-ut_write PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_write.cpp
+)
+set_property(
+ TARGET
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ SPLIT_FACTOR
+ 5
+)
+add_yunittest(
+ NAME
+ ydb-core-tx-datashard-ut_write
+ TEST_TARGET
+ ydb-core-tx-datashard-ut_write
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ TIMEOUT
+ 600
+)
+target_allocator(ydb-core-tx-datashard-ut_write
+ system_allocator
+)
+vcs_info(ydb-core-tx-datashard-ut_write)
diff --git a/ydb/core/tx/datashard/ut_write/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/ut_write/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..17b75fce13f
--- /dev/null
+++ b/ydb/core/tx/datashard/ut_write/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,90 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-tx-datashard-ut_write)
+target_compile_options(ydb-core-tx-datashard-ut_write PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-tx-datashard-ut_write PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard
+)
+target_link_libraries(ydb-core-tx-datashard-ut_write PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-testing-unittest_main
+ core-tx-datashard
+ tx-datashard-ut_common
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ kqp-ut-common
+ core-testlib-default
+ ydb-core-tx
+ udf-service-exception_policy
+ public-lib-yson_value
+ cpp-client-ydb_result
+)
+target_link_options(ydb-core-tx-datashard-ut_write PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-core-tx-datashard-ut_write PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_write.cpp
+)
+set_property(
+ TARGET
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ SPLIT_FACTOR
+ 5
+)
+add_yunittest(
+ NAME
+ ydb-core-tx-datashard-ut_write
+ TEST_TARGET
+ ydb-core-tx-datashard-ut_write
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ TIMEOUT
+ 600
+)
+target_allocator(ydb-core-tx-datashard-ut_write
+ cpp-malloc-jemalloc
+)
+vcs_info(ydb-core-tx-datashard-ut_write)
diff --git a/ydb/core/tx/datashard/ut_write/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/ut_write/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..45c988267d8
--- /dev/null
+++ b/ydb/core/tx/datashard/ut_write/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,92 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-tx-datashard-ut_write)
+target_compile_options(ydb-core-tx-datashard-ut_write PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-tx-datashard-ut_write PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard
+)
+target_link_libraries(ydb-core-tx-datashard-ut_write PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ core-tx-datashard
+ tx-datashard-ut_common
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ kqp-ut-common
+ core-testlib-default
+ ydb-core-tx
+ udf-service-exception_policy
+ public-lib-yson_value
+ cpp-client-ydb_result
+)
+target_link_options(ydb-core-tx-datashard-ut_write PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-core-tx-datashard-ut_write PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_write.cpp
+)
+set_property(
+ TARGET
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ SPLIT_FACTOR
+ 5
+)
+add_yunittest(
+ NAME
+ ydb-core-tx-datashard-ut_write
+ TEST_TARGET
+ ydb-core-tx-datashard-ut_write
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ TIMEOUT
+ 600
+)
+target_allocator(ydb-core-tx-datashard-ut_write
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+)
+vcs_info(ydb-core-tx-datashard-ut_write)
diff --git a/ydb/core/tx/datashard/ut_write/CMakeLists.txt b/ydb/core/tx/datashard/ut_write/CMakeLists.txt
new file mode 100644
index 00000000000..d863ebd1806
--- /dev/null
+++ b/ydb/core/tx/datashard/ut_write/CMakeLists.txt
@@ -0,0 +1,19 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64")
+ include(CMakeLists.darwin-arm64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/datashard/ut_write/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/ut_write/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..99c67389521
--- /dev/null
+++ b/ydb/core/tx/datashard/ut_write/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,80 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-tx-datashard-ut_write)
+target_compile_options(ydb-core-tx-datashard-ut_write PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-tx-datashard-ut_write PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard
+)
+target_link_libraries(ydb-core-tx-datashard-ut_write PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ core-tx-datashard
+ tx-datashard-ut_common
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ kqp-ut-common
+ core-testlib-default
+ ydb-core-tx
+ udf-service-exception_policy
+ public-lib-yson_value
+ cpp-client-ydb_result
+)
+target_sources(ydb-core-tx-datashard-ut_write PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_write.cpp
+)
+set_property(
+ TARGET
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ SPLIT_FACTOR
+ 5
+)
+add_yunittest(
+ NAME
+ ydb-core-tx-datashard-ut_write
+ TEST_TARGET
+ ydb-core-tx-datashard-ut_write
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-core-tx-datashard-ut_write
+ PROPERTY
+ TIMEOUT
+ 600
+)
+target_allocator(ydb-core-tx-datashard-ut_write
+ system_allocator
+)
+vcs_info(ydb-core-tx-datashard-ut_write)
diff --git a/ydb/core/tx/datashard/ut_write/ya.make b/ydb/core/tx/datashard/ut_write/ya.make
new file mode 100644
index 00000000000..74073a9ddbe
--- /dev/null
+++ b/ydb/core/tx/datashard/ut_write/ya.make
@@ -0,0 +1,38 @@
+UNITTEST_FOR(ydb/core/tx/datashard)
+
+FORK_SUBTESTS()
+
+SPLIT_FACTOR(5)
+
+IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND)
+ TIMEOUT(3600)
+ SIZE(LARGE)
+ TAG(ya:fat)
+ REQUIREMENTS(ram:16)
+ELSE()
+ TIMEOUT(600)
+ SIZE(MEDIUM)
+ENDIF()
+
+PEERDIR(
+ ydb/core/tx/datashard/ut_common
+ library/cpp/getopt
+ library/cpp/regex/pcre
+ library/cpp/svnversion
+ ydb/core/kqp/ut/common
+ ydb/core/testlib/default
+ ydb/core/tx
+ ydb/library/yql/public/udf/service/exception_policy
+ ydb/public/lib/yson_value
+ ydb/public/sdk/cpp/client/ydb_result
+)
+
+YQL_LAST_ABI_VERSION()
+
+SRCS(
+ datashard_ut_write.cpp
+)
+
+REQUIREMENTS(ram:32)
+
+END()
diff --git a/ydb/core/tx/datashard/write_unit.cpp b/ydb/core/tx/datashard/write_unit.cpp
new file mode 100644
index 00000000000..bfda765f844
--- /dev/null
+++ b/ydb/core/tx/datashard/write_unit.cpp
@@ -0,0 +1,207 @@
+#include "datashard_write_operation.h"
+#include "datashard_pipeline.h"
+#include "setup_sys_locks.h"
+#include "datashard_locks_db.h"
+#include "datashard_user_db.h"
+
+namespace NKikimr {
+namespace NDataShard {
+
+class TWriteUnit : public TExecutionUnit {
+public:
+ TWriteUnit(TDataShard& self, TPipeline& pipeline)
+ : TExecutionUnit(EExecutionUnitKind::ExecuteWrite, true, self, pipeline)
+ {
+ }
+
+ ~TWriteUnit()
+ {
+ }
+
+ bool IsReadyToExecute(TOperation::TPtr op) const override {
+ if (op->HasRuntimeConflicts() || op->HasWaitingForGlobalTxIdFlag()) {
+ return false;
+ }
+
+ if (op->Result() || op->HasResultSentFlag() || op->IsImmediate() && WillRejectDataTx(op)) {
+ return true;
+ }
+
+ if (DataShard.IsStopping()) {
+ // Avoid doing any new work when datashard is stopping
+ return false;
+ }
+
+ return !op->HasRuntimeConflicts();
+ }
+
+ void DoExecute(TDataShard* self, TWriteOperation* tx, TTransactionContext& txc, const TActorContext& ctx) {
+ const TValidatedWriteTx::TPtr& writeTx = tx->WriteTx();
+
+ const ui64 tableId = writeTx->TableId().PathId.LocalPathId;
+ const TTableId fullTableId(self->GetPathOwnerId(), tableId);
+ const ui64 localTableId = self->GetLocalTableId(fullTableId);
+ if (localTableId == 0) {
+ tx->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Unknown table id " << tableId);
+ return;
+ }
+ const ui64 shadowTableId = self->GetShadowTableId(fullTableId);
+
+ const TUserTable& TableInfo_ = *self->GetUserTables().at(tableId);
+ Y_ABORT_UNLESS(TableInfo_.LocalTid == localTableId);
+ Y_ABORT_UNLESS(TableInfo_.ShadowTid == shadowTableId);
+
+ const ui32 writeTableId = localTableId;
+ auto [readVersion, writeVersion] = self->GetReadWriteVersions(tx);
+
+ TDataShardUserDb userDb(*self, txc.DB, readVersion);
+ TDataShardChangeGroupProvider groupProvider(*self, txc.DB);
+
+ TVector<TRawTypeValue> key;
+ TVector<NTable::TUpdateOp> value;
+
+ const TSerializedCellMatrix& matrix = writeTx->Matrix();
+ TConstArrayRef<TCell> cells = matrix.GetCells();
+ const ui32 rowCount = matrix.GetRowCount();
+ const ui16 colCount = matrix.GetColCount();
+
+ for (ui32 rowIdx = 0; rowIdx < rowCount; ++rowIdx)
+ {
+ key.clear();
+ ui64 keyBytes = 0;
+ for (ui16 keyColIdx = 0; keyColIdx < TableInfo_.KeyColumnIds.size(); ++keyColIdx) {
+ const auto& cellType = TableInfo_.KeyColumnTypes[keyColIdx];
+ const TCell& cell = cells[rowIdx * colCount + keyColIdx];
+ if (cellType.GetTypeId() == NScheme::NTypeIds::Uint8 && !cell.IsNull() && cell.AsValue<ui8>() > 127) {
+ tx->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "Keys with Uint8 column values >127 are currently prohibited");
+ return;
+ }
+
+ keyBytes += cell.Size();
+ key.emplace_back(TRawTypeValue(cell.AsRef(), cellType));
+ }
+
+ if (keyBytes > NLimits::MaxWriteKeySize) {
+ tx->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row key size of " << keyBytes << " bytes is larger than the allowed threshold " << NLimits::MaxWriteKeySize);
+ return;
+ }
+
+ value.clear();
+ for (ui16 valueColIdx = TableInfo_.KeyColumnIds.size(); valueColIdx < colCount; ++valueColIdx) {
+ ui32 columnTag = writeTx->RecordOperation().GetColumnIds(valueColIdx);
+ const TCell& cell = cells[rowIdx * colCount + valueColIdx];
+ if (cell.Size() > NLimits::MaxWriteValueSize) {
+ tx->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row cell size of " << cell.Size() << " bytes is larger than the allowed threshold " << NLimits::MaxWriteValueSize);
+ return;
+ }
+
+ auto* col = TableInfo_.Columns.FindPtr(valueColIdx + 1);
+ Y_ABORT_UNLESS(col);
+
+ value.emplace_back(NTable::TUpdateOp(columnTag, NTable::ECellOp::Set, TRawTypeValue(cell.AsRef(), col->Type)));
+ }
+
+ txc.DB.Update(writeTableId, NTable::ERowOp::Upsert, key, value, writeVersion);
+ self->GetConflictsCache().GetTableCache(writeTableId).RemoveUncommittedWrites(writeTx->KeyCells(), txc.DB);
+ }
+ //TODO: Counters
+ // self->IncCounter(COUNTER_UPLOAD_ROWS, rowCount);
+ // self->IncCounter(COUNTER_UPLOAD_ROWS_BYTES, matrix.GetBuffer().size());
+
+ TableInfo_.Stats.UpdateTime = TAppData::TimeProvider->Now();
+
+ tx->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildCommited(writeTx->TabletId(), tx->GetTxId()));
+
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << tx->GetTxId() << " at " << self->TabletID() << " write operation is executed");
+ }
+
+ EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override {
+ TWriteOperation* tx = dynamic_cast<TWriteOperation*>(op.Get());
+ Y_ABORT_UNLESS(tx != nullptr);
+
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << op->GetTxId() << " at " << tx->WriteTx()->TabletId() << " is executing write operation");
+
+ if (op->Result() || op->HasResultSentFlag() || op->IsImmediate() && CheckRejectDataTx(op, ctx)) {
+ return EExecutionStatus::Executed;
+ }
+
+ if (op->HasWaitingForGlobalTxIdFlag()) {
+ return EExecutionStatus::Continue;
+ }
+
+ if (op->IsImmediate()) {
+ // Every time we execute immediate transaction we may choose a new mvcc version
+ op->MvccReadWriteVersion.reset();
+ }
+ else {
+ //TODO: Prepared
+ tx->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(tx->WriteTx()->TabletId(), op->GetTxId(), {0, 0, {}}));
+ return EExecutionStatus::DelayCompleteNoMoreRestarts;
+ }
+
+ TDataShardLocksDb locksDb(DataShard, txc);
+ TSetupSysLocks guardLocks(op, DataShard, &locksDb);
+
+ try {
+ DoExecute(&DataShard, tx, txc, ctx);
+ } catch (const TNeedGlobalTxId&) {
+ Y_VERIFY_S(op->GetGlobalTxId() == 0,
+ "Unexpected TNeedGlobalTxId exception for direct operation with TxId# " << op->GetGlobalTxId());
+ Y_VERIFY_S(op->IsImmediate(),
+ "Unexpected TNeedGlobalTxId exception for a non-immediate operation with TxId# " << op->GetTxId());
+
+ ctx.Send(MakeTxProxyID(),
+ new TEvTxUserProxy::TEvAllocateTxId(),
+ 0, op->GetTxId());
+ op->SetWaitingForGlobalTxIdFlag();
+
+ if (txc.DB.HasChanges()) {
+ txc.DB.RollbackChanges();
+ }
+ return EExecutionStatus::Continue;
+ }
+
+ if (Pipeline.AddLockDependencies(op, guardLocks)) {
+ if (txc.DB.HasChanges()) {
+ txc.DB.RollbackChanges();
+ }
+ return EExecutionStatus::Continue;
+ }
+
+ op->ChangeRecords() = std::move(tx->WriteTx()->GetCollectedChanges());
+
+ DataShard.SysLocksTable().ApplyLocks();
+ DataShard.SubscribeNewLocks(ctx);
+ Pipeline.AddCommittingOp(op);
+
+ return EExecutionStatus::DelayCompleteNoMoreRestarts;
+ }
+
+ void Complete(TOperation::TPtr op, const TActorContext& ctx) override {
+ Pipeline.RemoveCommittingOp(op);
+ DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
+ DataShard.EmitHeartbeats(ctx);
+
+ TWriteOperation* tx = dynamic_cast<TWriteOperation*>(op.Get());
+ Y_ABORT_UNLESS(tx != nullptr);
+
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << op->GetTxId() << " at " << tx->WriteTx()->TabletId() << " complete write operation");
+
+ //TODO: Counters
+ // if (WriteResult->Record.status() == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED || WriteResult->Record.status() == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED) {
+ // self->IncCounter(COUNTER_WRITE_SUCCESS);
+ // } else {
+ // self->IncCounter(COUNTER_WRITE_ERROR);
+ // }
+
+ ctx.Send(tx->GetEv()->Sender, tx->WriteResult().release(), 0, tx->GetEv()->Cookie);
+ }
+
+}; // TWriteUnit
+
+THolder<TExecutionUnit> CreateWriteUnit(TDataShard& self, TPipeline& pipeline) {
+ return THolder(new TWriteUnit(self, pipeline));
+}
+
+} // NDataShard
+} // NKikimr
diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make
index dde0d8d105a..c7db84b49e8 100644
--- a/ydb/core/tx/datashard/ya.make
+++ b/ydb/core/tx/datashard/ya.make
@@ -34,6 +34,7 @@ SRCS(
check_read_unit.cpp
check_scheme_tx_unit.cpp
check_snapshot_tx_unit.cpp
+ check_write_unit.cpp
complete_data_tx_unit.cpp
completed_operations_unit.cpp
conflicts_cache.cpp
@@ -71,6 +72,7 @@ SRCS(
datashard__stats.cpp
datashard__store_table_path.cpp
datashard__store_scan_state.cpp
+ datashard__write.cpp
datashard_change_receiving.cpp
datashard_change_sender_activation.cpp
datashard_change_sending.cpp
@@ -127,6 +129,7 @@ SRCS(
datashard_repl_offsets_client.cpp
datashard_repl_offsets_server.cpp
datashard_subdomain_path_id.cpp
+ datashard_write_operation.cpp
datashard_txs.h
datashard.cpp
datashard.h
@@ -196,6 +199,7 @@ SRCS(
volatile_tx.cpp
wait_for_plan_unit.cpp
wait_for_stream_clearance_unit.cpp
+ write_unit.cpp
upload_stats.cpp
)
@@ -304,4 +308,5 @@ RECURSE_FOR_TESTS(
ut_stats
ut_upload_rows
ut_volatile
+ ut_write
)