diff options
author | azevaykin <azevaykin@ydb.tech> | 2023-12-15 23:04:30 +0300 |
---|---|---|
committer | azevaykin <azevaykin@ydb.tech> | 2023-12-16 00:13:57 +0300 |
commit | fc639245d659f8a4b177331d606fe0bda06b5dae (patch) | |
tree | 6c90f410bd5726a5e90993fa9905d3204e35e5f3 | |
parent | 7004cc66a83f229898f1b621892a2b55bb225408 (diff) | |
download | ydb-fc639245d659f8a4b177331d606fe0bda06b5dae.tar.gz |
DataShard EvWrite Immediate
35 files changed, 2762 insertions, 49 deletions
diff --git a/.mapping.json b/.mapping.json index babdeb70d5a..2af7e2909bf 100644 --- a/.mapping.json +++ b/.mapping.json @@ -6736,6 +6736,12 @@ "ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-x86_64.txt":"", "ydb/core/tx/datashard/ut_volatile/CMakeLists.txt":"", "ydb/core/tx/datashard/ut_volatile/CMakeLists.windows-x86_64.txt":"", + "ydb/core/tx/datashard/ut_write/CMakeLists.darwin-arm64.txt":"", + "ydb/core/tx/datashard/ut_write/CMakeLists.darwin-x86_64.txt":"", + "ydb/core/tx/datashard/ut_write/CMakeLists.linux-aarch64.txt":"", + "ydb/core/tx/datashard/ut_write/CMakeLists.linux-x86_64.txt":"", + "ydb/core/tx/datashard/ut_write/CMakeLists.txt":"", + "ydb/core/tx/datashard/ut_write/CMakeLists.windows-x86_64.txt":"", "ydb/core/tx/long_tx_service/CMakeLists.darwin-arm64.txt":"", "ydb/core/tx/long_tx_service/CMakeLists.darwin-x86_64.txt":"", "ydb/core/tx/long_tx_service/CMakeLists.linux-aarch64.txt":"", diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index 0d872d299ff..133b6cb1261 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -466,4 +466,5 @@ enum ETxTypes { TXTYPE_CDC_STREAM_EMIT_HEARTBEATS = 79 [(TxTypeOpts) = {Name: "TTxCdcStreamEmitHeartbeats"}]; TXTYPE_CLEANUP_VOLATILE = 80 [(TxTypeOpts) = {Name: "TxCleanupVolatile"}]; TXTYPE_PLAN_PREDICTED_TXS = 81 [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}]; + TXTYPE_WRITE = 82 [(TxTypeOpts) = {Name: "TxWrite"}]; } diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt index ca3bfea616a..02077d6ae64 100644 --- a/ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt +++ b/ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt @@ -36,6 +36,7 @@ add_subdirectory(ut_snapshot) add_subdirectory(ut_stats) add_subdirectory(ut_upload_rows) add_subdirectory(ut_volatile) +add_subdirectory(ut_write) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency @@ -180,6 +181,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_read_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_scheme_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_write_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp @@ -216,6 +218,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__stats.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_table_path.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_scan_state.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_receiving.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sender_activation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -257,6 +260,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_client.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_write_operation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/direct_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp @@ -313,6 +317,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/write_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_raw.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt index ca3bfea616a..02077d6ae64 100644 --- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt @@ -36,6 +36,7 @@ add_subdirectory(ut_snapshot) add_subdirectory(ut_stats) add_subdirectory(ut_upload_rows) add_subdirectory(ut_volatile) +add_subdirectory(ut_write) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency @@ -180,6 +181,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_read_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_scheme_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_write_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp @@ -216,6 +218,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__stats.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_table_path.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_scan_state.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_receiving.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sender_activation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -257,6 +260,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_client.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_write_operation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/direct_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp @@ -313,6 +317,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/write_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_raw.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt index 83f3d65feef..bfda437b594 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt @@ -36,6 +36,7 @@ add_subdirectory(ut_snapshot) add_subdirectory(ut_stats) add_subdirectory(ut_upload_rows) add_subdirectory(ut_volatile) +add_subdirectory(ut_write) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency @@ -181,6 +182,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_read_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_scheme_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_write_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp @@ -217,6 +219,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__stats.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_table_path.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_scan_state.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_receiving.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sender_activation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -258,6 +261,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_client.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_write_operation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/direct_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp @@ -314,6 +318,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/write_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_raw.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt index 83f3d65feef..bfda437b594 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt @@ -36,6 +36,7 @@ add_subdirectory(ut_snapshot) add_subdirectory(ut_stats) add_subdirectory(ut_upload_rows) add_subdirectory(ut_volatile) +add_subdirectory(ut_write) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency @@ -181,6 +182,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_read_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_scheme_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_write_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp @@ -217,6 +219,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__stats.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_table_path.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_scan_state.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_receiving.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sender_activation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -258,6 +261,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_client.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_write_operation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/direct_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp @@ -314,6 +318,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/write_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_raw.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt index 99d4810d8a5..f77a673de3d 100644 --- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt @@ -36,6 +36,7 @@ add_subdirectory(ut_snapshot) add_subdirectory(ut_stats) add_subdirectory(ut_upload_rows) add_subdirectory(ut_volatile) +add_subdirectory(ut_write) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency @@ -181,6 +182,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_read_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_scheme_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_write_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp @@ -217,6 +219,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__stats.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_table_path.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_scan_state.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_receiving.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sender_activation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -258,6 +261,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_client.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_write_operation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/direct_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp @@ -314,6 +318,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/write_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp ) generate_enum_serilization(core-tx-datashard diff --git a/ydb/core/tx/datashard/check_write_unit.cpp b/ydb/core/tx/datashard/check_write_unit.cpp new file mode 100644 index 00000000000..32c9bd7fc1c --- /dev/null +++ b/ydb/core/tx/datashard/check_write_unit.cpp @@ -0,0 +1,337 @@ +#include "datashard_impl.h" +#include "datashard_pipeline.h" +#include "execution_unit_ctors.h" + +#include <ydb/core/tablet/tablet_exception.h> + +namespace NKikimr { +namespace NDataShard { + +class TCheckWriteUnit: public TExecutionUnit { +public: + TCheckWriteUnit(TDataShard &dataShard, TPipeline &pipeline); + ~TCheckWriteUnit() override; + + bool IsReadyToExecute(TOperation::TPtr op) const override; + EExecutionStatus Execute(TOperation::TPtr op, + TTransactionContext &txc, + const TActorContext &ctx) override; + void Complete(TOperation::TPtr op, + const TActorContext &ctx) override; + +private: +}; + +TCheckWriteUnit::TCheckWriteUnit(TDataShard &dataShard, + TPipeline &pipeline) + : TExecutionUnit(EExecutionUnitKind::CheckDataTx, false, dataShard, pipeline) +{ +} + +TCheckWriteUnit::~TCheckWriteUnit() +{ +} + +bool TCheckWriteUnit::IsReadyToExecute(TOperation::TPtr) const +{ + return true; +} + +EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, + TTransactionContext &, + const TActorContext &ctx) +{ + Y_ABORT_UNLESS(op->IsDataTx() || op->IsReadTable()); + Y_ABORT_UNLESS(!op->IsAborted()); + + if (CheckRejectDataTx(op, ctx)) { + op->Abort(EExecutionUnitKind::FinishPropose); + + return EExecutionStatus::Executed; + } + + //TODO: remove this return + return EExecutionStatus::Executed; + + TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get()); + Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); + auto dataTx = tx->GetDataTx(); + Y_ABORT_UNLESS(dataTx); + Y_ABORT_UNLESS(dataTx->Ready() || dataTx->RequirePrepare()); + + if (dataTx->Ready()) { + DataShard.IncCounter(COUNTER_MINIKQL_PROGRAM_SIZE, dataTx->ProgramSize()); + } else { + Y_ABORT_UNLESS(dataTx->RequirePrepare()); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, + "Require prepare Tx " << op->GetTxId() << " at " << DataShard.TabletID() + << ": " << dataTx->GetErrors()); + } + + // Check if we are out of space and tx wants to update user + // or system table. + if (DataShard.IsAnyChannelYellowStop() + && (dataTx->HasWrites() || !op->IsImmediate())) { + TString err = TStringBuilder() + << "Cannot perform transaction: out of disk space at tablet " + << DataShard.TabletID() << " txId " << op->GetTxId(); + + DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE); + + BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err); + op->Abort(EExecutionUnitKind::FinishPropose); + + LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err); + + return EExecutionStatus::Executed; + } + + if (tx->IsMvccSnapshotRead()) { + auto snapshot = tx->GetMvccSnapshot(); + if (DataShard.IsFollower()) { + TString err = TStringBuilder() + << "Operation " << *op << " cannot read from snapshot " << snapshot + << " using data tx on a follower " << DataShard.TabletID(); + + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST) + ->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err); + op->Abort(EExecutionUnitKind::FinishPropose); + + LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); + + return EExecutionStatus::Executed; + } else if (!DataShard.IsMvccEnabled()) { + TString err = TStringBuilder() + << "Operation " << *op << " reads from snapshot " << snapshot + << " with MVCC feature disabled at " << DataShard.TabletID(); + + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST) + ->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err); + op->Abort(EExecutionUnitKind::FinishPropose); + + LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); + + return EExecutionStatus::Executed; + } else if (snapshot < DataShard.GetSnapshotManager().GetLowWatermark()) { + TString err = TStringBuilder() + << "Operation " << *op << " reads from stale snapshot " << snapshot + << " at " << DataShard.TabletID(); + + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST) + ->AddError(NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST, err); + op->Abort(EExecutionUnitKind::FinishPropose); + + LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); + + ; + } + } + + TEngineBay::TSizes txReads; + + if (op->IsDataTx()) { + bool hasTotalKeysSizeLimit = !!dataTx->PerShardKeysSizeLimitBytes(); + txReads = dataTx->CalcReadSizes(hasTotalKeysSizeLimit); + + if (txReads.ReadSize > DataShard.GetTxReadSizeLimit()) { + TString err = TStringBuilder() + << "Transaction read size " << txReads.ReadSize << " exceeds limit " + << DataShard.GetTxReadSizeLimit() << " at tablet " << DataShard.TabletID() + << " txId " << op->GetTxId(); + + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST) + ->AddError(NKikimrTxDataShard::TError::READ_SIZE_EXECEEDED, err); + op->Abort(EExecutionUnitKind::FinishPropose); + + LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); + + return EExecutionStatus::Executed; + } + + if (hasTotalKeysSizeLimit + && txReads.TotalKeysSize > *dataTx->PerShardKeysSizeLimitBytes()) { + TString err = TStringBuilder() + << "Transaction total keys size " << txReads.TotalKeysSize + << " exceeds limit " << *dataTx->PerShardKeysSizeLimitBytes() + << " at tablet " << DataShard.TabletID() << " txId " << op->GetTxId(); + + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST) + ->AddError(NKikimrTxDataShard::TError::READ_SIZE_EXECEEDED, err); + op->Abort(EExecutionUnitKind::FinishPropose); + + LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); + + return EExecutionStatus::Executed; + } + + for (const auto& key : dataTx->TxInfo().Keys) { + if (key.IsWrite && DataShard.IsUserTable(key.Key->TableId)) { + ui64 keySize = 0; + for (const auto& cell : key.Key->Range.From) { + keySize += cell.Size(); + } + if (keySize > NLimits::MaxWriteKeySize) { + TString err = TStringBuilder() + << "Operation " << *op << " writes key of " << keySize + << " bytes which exceeds limit " << NLimits::MaxWriteKeySize + << " bytes at " << DataShard.TabletID(); + + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST) + ->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err); + op->Abort(EExecutionUnitKind::FinishPropose); + + LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); + + return EExecutionStatus::Executed; + } + for (const auto& col : key.Key->Columns) { + if (col.Operation == TKeyDesc::EColumnOperation::Set || + col.Operation == TKeyDesc::EColumnOperation::InplaceUpdate) + { + if (col.ImmediateUpdateSize > NLimits::MaxWriteValueSize) { + TString err = TStringBuilder() + << "Transaction write column value of " << col.ImmediateUpdateSize + << " bytes is larger than the allowed threshold"; + + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::EXEC_ERROR)->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err); + op->Abort(EExecutionUnitKind::FinishPropose); + + LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); + + return EExecutionStatus::Executed; + } + } + } + + if (DataShard.IsSubDomainOutOfSpace()) { + switch (key.Key->RowOperation) { + case TKeyDesc::ERowOperation::Read: + case TKeyDesc::ERowOperation::Erase: + // Read and erase are allowed even when we're out of disk space + break; + + default: { + // Updates are not allowed when database is out of space + TString err = "Cannot perform writes: database is out of disk space"; + + DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE); + + BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err); + op->Abort(EExecutionUnitKind::FinishPropose); + + LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err); + + return EExecutionStatus::Executed; + } + } + } + } + } + } + + if (op->IsReadTable()) { + const auto& record = dataTx->GetReadTableTransaction(); + const auto& userTables = DataShard.GetUserTables(); + + TMaybe<TString> schemaChangedError; + if (auto it = userTables.find(record.GetTableId().GetTableId()); it != userTables.end()) { + const auto& tableInfo = *it->second; + for (const auto& columnRecord : record.GetColumns()) { + if (auto* columnInfo = tableInfo.Columns.FindPtr(columnRecord.GetId())) { + // TODO: column types don't change when bound by id, but we may want to check anyway + } else { + schemaChangedError = TStringBuilder() << "ReadTable cannot find column " + << columnRecord.GetName() << " (" << columnRecord.GetId() << ")"; + break; + } + } + // TODO: validate key ranges? + } else { + schemaChangedError = TStringBuilder() << "ReadTable cannot find table " + << record.GetTableId().GetOwnerId() << ":" << record.GetTableId().GetTableId(); + } + + if (schemaChangedError) { + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::ERROR) + ->AddError(NKikimrTxDataShard::TError::SCHEME_CHANGED, *schemaChangedError); + op->Abort(EExecutionUnitKind::FinishPropose); + return EExecutionStatus::Executed; + } + + if (record.HasSnapshotStep() && record.HasSnapshotTxId()) { + if (!op->IsImmediate()) { + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)->AddError( + NKikimrTxDataShard::TError::BAD_ARGUMENT, + "ReadTable from snapshot must be an immediate transaction"); + op->Abort(EExecutionUnitKind::FinishPropose); + return EExecutionStatus::Executed; + } + + const TSnapshotKey key( + record.GetTableId().GetOwnerId(), + record.GetTableId().GetTableId(), + record.GetSnapshotStep(), + record.GetSnapshotTxId()); + + if (!DataShard.GetSnapshotManager().AcquireReference(key)) { + // TODO: try upgrading to mvcc snapshot when available + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)->AddError( + NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST, + TStringBuilder() + << "Shard " << DataShard.TabletID() + << " has no snapshot " << key); + op->Abort(EExecutionUnitKind::FinishPropose); + return EExecutionStatus::Executed; + } + + op->SetAcquiredSnapshotKey(key); + op->SetUsingSnapshotFlag(); + } + } + + if (!op->IsImmediate()) { + if (!Pipeline.AssignPlanInterval(op)) { + TString err = TStringBuilder() + << "Can't propose tx " << op->GetTxId() << " at blocked shard " + << DataShard.TabletID(); + BuildResult(op)->AddError(NKikimrTxDataShard::TError::SHARD_IS_BLOCKED, err); + op->Abort(EExecutionUnitKind::FinishPropose); + + LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, err); + + return EExecutionStatus::Executed; + } + + auto &res = BuildResult(op); + res->SetPrepared(op->GetMinStep(), op->GetMaxStep(), op->GetReceivedAt()); + + if (op->IsDataTx()) { + res->Record.SetReadSize(txReads.ReadSize); + res->Record.SetReplySize(txReads.ReplySize); + + for (const auto& rs : txReads.OutReadSetSize) { + auto entry = res->Record.AddOutgoingReadSetInfo(); + entry->SetShardId(rs.first); + entry->SetSize(rs.second); + } + } + + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, + "Prepared " << op->GetKind() << " transaction txId " << op->GetTxId() + << " at tablet " << DataShard.TabletID()); + } + + return EExecutionStatus::Executed; +} + +void TCheckWriteUnit::Complete(TOperation::TPtr, const TActorContext &) +{ +} + +THolder<TExecutionUnit> CreateCheckWriteUnit(TDataShard &dataShard, TPipeline &pipeline) +{ + return THolder(new TCheckWriteUnit(dataShard, pipeline)); +} + +} // namespace NDataShard +} // namespace NKikimr diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 90553f12983..cf3d9bd9d81 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2634,6 +2634,41 @@ bool TDataShard::CheckDataTxRejectAndReply(const TEvDataShard::TEvProposeTransac return false; } +bool TDataShard::CheckDataTxRejectAndReply(const NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx) +{ + auto* msg = ev->Get(); + TString txDescr = TStringBuilder() << "data TxId " << msg->GetTxId(); + + NKikimrTxDataShard::TEvProposeTransactionResult::EStatus rejectStatus; + ERejectReasons rejectReasons; + TString rejectDescription; + bool reject = CheckDataTxReject(txDescr, ctx, rejectStatus, rejectReasons, rejectDescription); + + if (reject) { + LWTRACK(ProposeTransactionReject, msg->GetOrbit()); + NKikimrDataEvents::TEvWriteResult::EStatus status; + switch (rejectStatus) { + case NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED: + status = NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED; + break; + case NKikimrTxDataShard::TEvProposeTransactionResult::ERROR: + status = NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR; + break; + default: + Y_FAIL_S("Unexpected rejectStatus " << rejectStatus); + } + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), msg->GetTxId(), status, rejectDescription); + + LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectDescription); + + ctx.Send(ev->Sender, result.release()); + IncCounter(COUNTER_PREPARE_OVERLOADED); + IncCounter(COUNTER_PREPARE_COMPLETE); + return true; + } + + return false; +} void TDataShard::UpdateProposeQueueSize() const { SetCounter(COUNTER_PROPOSE_QUEUE_SIZE, MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + DelayedProposeQueue.size() + Pipeline.WaitingTxs()); SetCounter(COUNTER_READ_ITERATORS_WAITING, Pipeline.WaitingReadIterators()); @@ -2771,26 +2806,38 @@ void TDataShard::CheckDelayedProposeQueue(const TActorContext &ctx) { void TDataShard::ProposeTransaction(TEvDataShard::TEvProposeTransaction::TPtr &&ev, const TActorContext &ctx) { auto* msg = ev->Get(); - bool mayRunImmediate = false; - if ((msg->GetFlags() & TTxFlags::Immediate) && - !(msg->GetFlags() & TTxFlags::ForceOnline) && - msg->GetTxKind() == NKikimrTxDataShard::TX_KIND_DATA) - { - // This transaction may run in immediate mode - mayRunImmediate = true; - } + // This transaction may run in immediate mode + bool mayRunImmediate = (msg->GetFlags() & TTxFlags::Immediate) && !(msg->GetFlags() & TTxFlags::ForceOnline) && + msg->GetTxKind() == NKikimrTxDataShard::TX_KIND_DATA; if (mayRunImmediate) { // Enqueue immediate transactions so they don't starve existing operations LWTRACK(ProposeTransactionEnqueue, msg->Orbit); - ProposeQueue.Enqueue(std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, ctx); + ProposeQueue.Enqueue(IEventHandle::Upcast<TEvDataShard::TEvProposeTransaction>(std::move(ev)), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, ctx); UpdateProposeQueueSize(); } else { // Prepare planned transactions as soon as possible Execute(new TTxProposeTransactionBase(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx); } } +void TDataShard::ProposeTransaction(NEvents::TDataEvents::TEvWrite::TPtr&& ev, const TActorContext& ctx) { + auto* msg = ev->Get(); + const auto& record = msg->Record; + + // This transaction may run in immediate mode + bool mayRunImmediate = record.txmode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE; + + if (mayRunImmediate) { + // Enqueue immediate transactions so they don't starve existing operations + LWTRACK(ProposeTransactionEnqueue, msg->GetOrbit()); + ProposeQueue.Enqueue(IEventHandle::Upcast<NEvents::TDataEvents::TEvWrite>(std::move(ev)), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, ctx); + UpdateProposeQueueSize(); + } else { + // Prepare planned transactions as soon as possible + Execute(new TTxWrite(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx); + } +} void TDataShard::Handle(TEvTxProcessing::TEvPlanStep::TPtr &ev, const TActorContext &ctx) { ui64 srcMediatorId = ev->Get()->Record.GetMediatorID(); @@ -2849,18 +2896,47 @@ void TDataShard::Handle(TEvPrivate::TEvDelayedProposeTransaction::TPtr &ev, cons if (!item.Cancelled) { // N.B. we don't call ProposeQueue.Reset(), tx will Ack() on its first Execute() - Execute(new TTxProposeTransactionBase(this, std::move(item.Event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx); - return; + + switch (item.Event->GetTypeRewrite()) { + case TEvDataShard::TEvProposeTransaction::EventType: { + auto event = IEventHandle::Downcast<TEvDataShard::TEvProposeTransaction>(std::move(item.Event)); + Execute(new TTxProposeTransactionBase(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx); + return; + } + case NEvents::TDataEvents::TEvWrite::EventType: { + auto event = IEventHandle::Downcast<NEvents::TDataEvents::TEvWrite>(std::move(item.Event)); + Execute(new TTxWrite(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx); + return; + } + default: + Y_FAIL_S("Unexpected event type " << item.Event->GetTypeRewrite()); + } } TActorId target = item.Event->Sender; ui64 cookie = item.Event->Cookie; - auto kind = item.Event->Get()->GetTxKind(); - auto txId = item.Event->Get()->GetTxId(); - auto result = new TEvDataShard::TEvProposeTransactionResult( - kind, TabletID(), txId, - NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED); - ctx.Send(target, result, 0, cookie); + switch (item.Event->GetTypeRewrite()) { + case TEvDataShard::TEvProposeTransaction::EventType: { + auto* msg = item.Event->Get<TEvDataShard::TEvProposeTransaction>(); + auto kind = msg->GetTxKind(); + auto txId = msg->GetTxId(); + auto result = new TEvDataShard::TEvProposeTransactionResult( + kind, TabletID(), txId, + NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED); + ctx.Send(target, result, 0, cookie); + return; + } + case NEvents::TDataEvents::TEvWrite::EventType: { + auto* msg = item.Event->Get<NEvents::TDataEvents::TEvWrite>(); + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), msg->GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED, "Canceled"); + ctx.Send(target, result.release(), 0, cookie); + return; + } + default: + Y_FAIL_S("Unexpected event type " << item.Event->GetTypeRewrite()); + } + + } // N.B. Ack directly since we didn't start any delayed transactions @@ -3226,15 +3302,22 @@ void TDataShard::WaitPredictedPlanStep(ui64 step) { } } -bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr& ev) const { - auto* msg = ev->Get(); - +bool TDataShard::CheckTxNeedWait() const { if (MvccSwitchState == TSwitchState::SWITCHING) { LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "New transaction needs to wait because of mvcc state switching"); return true; } - auto &rec = ev->Get()->Record; + return false; +} + +bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr& ev) const { + if (CheckTxNeedWait()) { + return true; + } + + auto* msg = ev->Get(); + auto& rec = msg->Record; if (rec.HasMvccSnapshot()) { TRowVersion rowVersion(rec.GetMvccSnapshot().GetStep(), rec.GetMvccSnapshot().GetTxId()); TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(GetEnablePrioritizedMvccSnapshotReads()); diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index 0f2884dfdff..dba448cf6b4 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -4,6 +4,7 @@ #include "datashard_s3_upload.h" #include <ydb/core/tx/tx.h> +#include <ydb/core/tx/data_events/events.h> #include <ydb/core/tx/message_seqno.h> #include <ydb/core/base/domain.h> #include <ydb/core/base/row_version.h> diff --git a/ydb/core/tx/datashard/datashard__write.cpp b/ydb/core/tx/datashard/datashard__write.cpp new file mode 100644 index 00000000000..905178057b3 --- /dev/null +++ b/ydb/core/tx/datashard/datashard__write.cpp @@ -0,0 +1,261 @@ +#include "datashard_txs.h" +#include "probes.h" +#include "operation.h" +#include "datashard_write_operation.h" + +#include <ydb/library/wilson_ids/wilson.h> + +LWTRACE_USING(DATASHARD_PROVIDER) + +namespace NKikimr::NDataShard { + +TDataShard::TTxWrite::TTxWrite(TDataShard* self, NEvents::TDataEvents::TEvWrite::TPtr ev, TInstant receivedAt, ui64 tieBreakerIndex, bool delayed) + : TBase(self) + , Ev(std::move(ev)) + , ReceivedAt(receivedAt) + , TieBreakerIndex(tieBreakerIndex) + , TxId(Ev->Get()->GetTxId()) + , Acked(!delayed) + , ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, std::move(Ev->TraceId), "ProposeTransaction", NWilson::EFlags::AUTO_END) +{ +} + +bool TDataShard::TTxWrite::Execute(TTransactionContext& txc, const TActorContext& ctx) { + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxWrite:: execute at tablet# " << Self->TabletID()); + auto* request = Ev->Get(); + const auto& record = request->Record; + Y_UNUSED(record); + + LWTRACK(WriteExecute, request->GetOrbit()); + + if (!Acked) { + // Ack event on the first execute (this will schedule the next event if any) + Self->ProposeQueue.Ack(ctx); + Acked = true; + } + + try { + // If tablet is in follower mode then we should sync scheme + // before we build and check operation. + if (Self->IsFollower()) { + NKikimrTxDataShard::TError::EKind status; + TString errMessage; + + if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage)) + return false; + + if (status != NKikimrTxDataShard::TError::OK) { + LOG_LOG_S_THROTTLE(Self->GetLogThrottler(TDataShard::ELogThrottlerType::TxProposeTransactionBase_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, + "TTxWrite:: errors while proposing transaction txid " << TxId << " at tablet " << Self->TabletID() << " status: " << status << " error: " << errMessage); + + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(Self->TabletID(), TxId, NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, errMessage); + + TActorId target = Op ? Op->GetTarget() : Ev->Sender; + ui64 cookie = Op ? Op->GetCookie() : Ev->Cookie; + + if (ProposeTransactionSpan) { + ProposeTransactionSpan.EndOk(); + } + ctx.Send(target, result.release(), 0, cookie); + + return true; + } + } + + if (Ev) { + Y_ABORT_UNLESS(!Op); + + if (Self->CheckDataTxRejectAndReply(Ev, ctx)) { + Ev = nullptr; + return true; + } + + TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, ctx); + + // Unsuccessful operation parse. + if (op->IsAborted()) { + LWTRACK(ProposeTransactionParsed, op->Orbit, false); + Y_ABORT_UNLESS(op->Result()); + + if (ProposeTransactionSpan) { + ProposeTransactionSpan.EndError("TTxWrite:: unsuccessful operation parse"); + } + ctx.Send(op->GetTarget(), op->Result().Release()); + return true; + } + LWTRACK(ProposeTransactionParsed, op->Orbit, true); + + op->BuildExecutionPlan(false); + if (!op->IsExecutionPlanFinished()) + Self->Pipeline.GetExecutionUnit(op->GetCurrentUnit()).AddOperation(op); + + Op = op; + Ev = nullptr; + Op->IncrementInProgress(); + } + + Y_ABORT_UNLESS(Op && Op->IsInProgress() && !Op->GetExecutionPlan().empty()); + + auto status = Self->Pipeline.RunExecutionPlan(Op, CompleteList, txc, ctx); + + switch (status) { + case EExecutionStatus::Restart: + // Restart even if current CompleteList is not empty + // It will be extended in subsequent iterations + return false; + + case EExecutionStatus::Reschedule: + // Reschedule transaction as soon as possible + if (!Op->IsExecutionPlanFinished()) { + Op->IncrementInProgress(); + Self->ExecuteProgressTx(Op, ctx); + Rescheduled = true; + } + Op->DecrementInProgress(); + break; + + case EExecutionStatus::Executed: + case EExecutionStatus::Continue: + Op->DecrementInProgress(); + break; + + case EExecutionStatus::WaitComplete: + WaitComplete = true; + break; + + default: + Y_FAIL_S("unexpected execution status " << status << " for operation " << *Op << " " << Op->GetKind() << " at " << Self->TabletID()); + } + + if (WaitComplete || !CompleteList.empty()) { + // Keep operation active until we run the complete list + CommitStart = AppData()->TimeProvider->Now(); + } else { + // Release operation as it's no longer needed + Op = nullptr; + } + + // Commit all side effects + return true; + } catch (const TNotReadyTabletException&) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "TX [" << 0 << " : " << TxId << "] can't prepare (tablet's not ready) at tablet " << Self->TabletID()); + return false; + } catch (const TSchemeErrorTabletException& ex) { + Y_UNUSED(ex); + Y_ABORT(); + } catch (const TMemoryLimitExceededException& ex) { + Y_ABORT("there must be no leaked exceptions: TMemoryLimitExceededException"); + } catch (const std::exception& e) { + Y_ABORT("there must be no leaked exceptions: %s", e.what()); + } catch (...) { + Y_ABORT("there must be no leaked exceptions"); + } + + return true; +} + +void TDataShard::TTxWrite::Complete(const TActorContext& ctx) { + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TTxWrite complete: at tablet# " << Self->TabletID()); + + if (ProposeTransactionSpan) { + ProposeTransactionSpan.End(); + } + + if (Op) { + Y_ABORT_UNLESS(!Op->GetExecutionPlan().empty()); + if (!CompleteList.empty()) { + auto commitTime = AppData()->TimeProvider->Now() - CommitStart; + Op->SetCommitTime(CompleteList.front(), commitTime); + + if (!Op->IsExecutionPlanFinished() && (Op->GetCurrentUnit() != CompleteList.front())) + Op->SetDelayedCommitTime(commitTime); + + Self->Pipeline.RunCompleteList(Op, CompleteList, ctx); + } + + if (WaitComplete) { + Op->DecrementInProgress(); + + if (!Op->IsInProgress() && !Op->IsExecutionPlanFinished()) { + Self->Pipeline.AddCandidateOp(Op); + + if (Self->Pipeline.CanRunAnotherOp()) { + Self->PlanQueue.Progress(ctx); + } + } + } + } + + Self->CheckSplitCanStart(ctx); + Self->CheckMvccStateChangeCanStart(ctx); +} + + +void TDataShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx) { + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Handle TTxWrite: at tablet# " << TabletID()); + + auto* msg = ev->Get(); + const auto& record = msg->Record; + Y_UNUSED(record); + + LWTRACK(WriteRequest, msg->GetOrbit()); + + // Check if we need to delay an immediate transaction + if (MediatorStateWaiting && record.txmode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE) + { + // We cannot calculate correct version until we restore mediator state + LWTRACK(ProposeTransactionWaitMediatorState, msg->GetOrbit()); + MediatorStateWaitingMsgs.emplace_back(ev.Release()); + UpdateProposeQueueSize(); + return; + } + + if (Pipeline.HasProposeDelayers()) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Handle TEvProposeTransaction delayed at " << TabletID() << " until dependency graph is restored"); + LWTRACK(ProposeTransactionWaitDelayers, msg->GetOrbit()); + DelayedProposeQueue.emplace_back().Reset(ev.Release()); + UpdateProposeQueueSize(); + return; + } + + if (CheckTxNeedWait()) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Handle TEvProposeTransaction delayed at " << TabletID() << " until interesting plan step will come"); + if (Pipeline.AddWaitingTxOp(ev)) { + UpdateProposeQueueSize(); + return; + } + } + + IncCounter(COUNTER_PREPARE_REQUEST); + + if (CheckDataTxRejectAndReply(ev, ctx)) { + return; + } + + ProposeTransaction(std::move(ev), ctx); +} + +ui64 EvWrite::Convertor::GetTxId(const TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::TEvProposeTransaction::EventType: + return ev->Get<TEvDataShard::TEvProposeTransaction>()->GetTxId(); + case NEvents::TDataEvents::TEvWrite::EventType: + return ev->Get<NEvents::TDataEvents::TEvWrite>()->GetTxId(); + default: + Y_FAIL_S("Unexpected event type " << ev->GetTypeRewrite()); + } +} + +ui64 EvWrite::Convertor::GetProposeFlags(NKikimrDataEvents::TEvWrite::ETxMode txMode) { + switch (txMode) { + case NKikimrDataEvents::TEvWrite::MODE_PREPARE: + return TTxFlags::Default; + case NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE: + return TTxFlags::VolatilePrepare; + case NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE: + return TTxFlags::Immediate; + default: + Y_FAIL_S("Unexpected tx mode " << txMode); + } +} +}
\ No newline at end of file diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 712c6ebdc79..c4bc0b6b007 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -13,6 +13,7 @@ #include "datashard_repl_offsets.h" #include "datashard_repl_offsets_client.h" #include "datashard_repl_offsets_server.h" +#include "datashard_write.h" #include "build_index.h" #include "cdc_stream_heartbeat.h" #include "cdc_stream_scan.h" @@ -175,6 +176,7 @@ class TDataShard class TTxProposeSchemeTransaction; class TTxCancelTransactionProposal; class TTxProposeTransactionBase; + class TTxWrite; class TTxReadSet; class TTxSchemaChanged; class TTxInitiateBorrowedPartsReturn; @@ -272,7 +274,9 @@ class TDataShard friend class TPipeline; friend class TLocksDataShardAdapter<TDataShard>; friend class TActiveTransaction; + friend class TWriteOperation; friend class TValidatedDataTx; + friend class TValidatedWriteTx; friend class TEngineBay; friend class NMiniKQL::TKqpScanComputeContext; friend class TSnapshotManager; @@ -1205,7 +1209,9 @@ class TDataShard void Handle(TEvDataShard::TEvProposeTransactionAttach::TPtr &ev, const TActorContext &ctx); void HandleAsFollower(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx); void ProposeTransaction(TEvDataShard::TEvProposeTransaction::TPtr &&ev, const TActorContext &ctx); - void Handle(TEvTxProcessing::TEvPlanStep::TPtr &ev, const TActorContext &ctx); + void Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx); + void ProposeTransaction(NEvents::TDataEvents::TEvWrite::TPtr&& ev, const TActorContext& ctx); + void Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorContext& ctx); void Handle(TEvTxProcessing::TEvReadSet::TPtr &ev, const TActorContext &ctx); void Handle(TEvTxProcessing::TEvReadSetAck::TPtr &ev, const TActorContext &ctx); void Handle(TEvPrivate::TEvProgressTransaction::TPtr &ev, const TActorContext &ctx); @@ -1547,6 +1553,7 @@ public: ERejectReasons& rejectReasons, TString& rejectDescription); bool CheckDataTxRejectAndReply(const TEvDataShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx); + bool CheckDataTxRejectAndReply(const NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx); TSysLocks& SysLocksTable() { return SysLocks; } @@ -1967,6 +1974,7 @@ public: bool WaitPlanStep(ui64 step); bool CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr& ev) const; + bool CheckTxNeedWait() const; void WaitPredictedPlanStep(ui64 step); void SchedulePlanPredictedTxs(); @@ -2399,7 +2407,7 @@ private: class TProposeQueue : private TTxProgressIdempotentScalarQueue<TEvPrivate::TEvDelayedProposeTransaction> { public: struct TItem : public TMoveOnly { - TItem(TEvDataShard::TEvProposeTransaction::TPtr&& event, TInstant receivedAt, ui64 tieBreakerIndex) + TItem(TAutoPtr<IEventHandle>&& event, TInstant receivedAt, ui64 tieBreakerIndex) : Event(std::move(event)) , ReceivedAt(receivedAt) , TieBreakerIndex(tieBreakerIndex) @@ -2407,7 +2415,7 @@ private: , Cancelled(false) { } - TEvDataShard::TEvProposeTransaction::TPtr Event; + TAutoPtr<IEventHandle> Event; TInstant ReceivedAt; ui64 TieBreakerIndex; TItem* Next; @@ -2419,10 +2427,10 @@ private: TItem* Last = nullptr; }; - void Enqueue(TEvDataShard::TEvProposeTransaction::TPtr event, TInstant receivedAt, ui64 tieBreakerIndex, const TActorContext& ctx) { + void Enqueue(TAutoPtr<IEventHandle> event, TInstant receivedAt, ui64 tieBreakerIndex, const TActorContext& ctx) { TItem* item = &Items.emplace_back(std::move(event), receivedAt, tieBreakerIndex); - const ui64 txId = item->Event->Get()->GetTxId(); + const ui64 txId = EvWrite::Convertor::GetTxId(item->Event); auto& links = TxIds[txId]; if (Y_UNLIKELY(links.Last)) { @@ -2437,7 +2445,7 @@ private: TItem Dequeue() { TItem* first = &Items.front(); - const ui64 txId = first->Event->Get()->GetTxId(); + const ui64 txId = EvWrite::Convertor::GetTxId(first->Event); auto it = TxIds.find(txId); Y_ABORT_UNLESS(it != TxIds.end() && it->second.First == first, @@ -2944,6 +2952,7 @@ protected: HFunc(TEvDataShard::TEvReadAck, Handle); HFunc(TEvDataShard::TEvReadCancel, Handle); HFunc(TEvDataShard::TEvReadColumnsRequest, Handle); + HFunc(NEvents::TDataEvents::TEvWrite, Handle); HFunc(TEvDataShard::TEvGetInfoRequest, Handle); HFunc(TEvDataShard::TEvListOperationsRequest, Handle); HFunc(TEvDataShard::TEvGetDataHistogramRequest, Handle); @@ -2992,13 +3001,11 @@ protected: HFuncTraced(TEvPrivate::TEvRemoveLockChangeRecords, Handle); HFunc(TEvPrivate::TEvConfirmReadonlyLease, Handle); HFunc(TEvPrivate::TEvPlanPredictedTxs, Handle); - default: - if (!HandleDefaultEvents(ev, SelfId())) { - ALOG_WARN(NKikimrServices::TX_DATASHARD, - "TDataShard::StateWork unhandled event type: "<< ev->GetTypeRewrite() - << " event: " << ev->ToString()); - } - break; + default: + if (!HandleDefaultEvents(ev, SelfId())) { + ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateWork unhandled event type: " << ev->GetTypeRewrite() << " event: " << ev->ToString()); + } + break; } } @@ -3016,6 +3023,7 @@ protected: HFuncTraced(TEvDataShard::TEvReadContinue, Handle); HFuncTraced(TEvDataShard::TEvReadAck, Handle); HFuncTraced(TEvDataShard::TEvReadCancel, Handle); + HFuncTraced(NEvents::TDataEvents::TEvWrite, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateWorkAsFollower unhandled event type: " << ev->GetTypeRewrite() diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index f4f52a4f027..c0888f27203 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -3,6 +3,7 @@ #include "datashard_pipeline.h" #include "datashard_impl.h" #include "datashard_txs.h" +#include "datashard_write_operation.h" #include <ydb/core/base/compile_time_flags.h> #include <ydb/core/base/cputime.h> @@ -1555,6 +1556,52 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction:: return tx; } +TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr& ev, TInstant receivedAt, ui64 tieBreakerIndex, NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) +{ + const auto& rec = ev->Get()->Record; + TBasicOpInfo info(rec.GetTxId(), EOperationKind::DataTx, EvWrite::Convertor::GetProposeFlags(rec.GetTxMode()), 0, receivedAt, tieBreakerIndex); + auto op = MakeIntrusive<TWriteOperation>(info, ev, Self, txc, ctx); + + auto badRequest = [&](const TString& error) { + op->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, error); + LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error); + }; + + if (!op->WriteTx()->Ready()) { + badRequest(TStringBuilder() << "Shard " << Self->TabletID() << " cannot parse tx " << op->GetTxId() << ": " << op->WriteTx()->GetError()); + return op; + } + + op->ExtractKeys(); + + switch (rec.txmode()) { + case NKikimrDataEvents::TEvWrite::MODE_PREPARE: + break; + case NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE: + op->SetVolatilePrepareFlag(); + break; + case NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE: + op->SetImmediateFlag(); + break; + default: + badRequest(TStringBuilder() << "Unknown txmode: " << rec.txmode()); + return op; + } + + // Make config checks for immediate op. + if (op->IsImmediate()) { + if (Config.NoImmediate() || (Config.ForceOnlineRW())) { + LOG_INFO_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, "Shard " << Self->TabletID() << " force immediate op " << op->GetTxId() << " to online according to config"); + op->SetForceOnlineFlag(); + } else { + if (Config.DirtyImmediate()) + op->SetForceDirtyFlag(); + } + } + + return op; +} + void TPipeline::BuildDataTx(TActiveTransaction *tx, TTransactionContext &txc, const TActorContext &ctx) { auto dataTx = tx->BuildDataTx(Self, txc, ctx); @@ -1826,7 +1873,7 @@ void TPipeline::MaybeActivateWaitingSchemeOps(const TActorContext& ctx) const { } } -bool TPipeline::AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) { +bool TPipeline::CheckInflightLimit() const { // check in-flight limit size_t totalInFly = Self->ReadIteratorsInFly() + Self->TxInFly() + Self->ImmediateInFly() @@ -1834,13 +1881,20 @@ bool TPipeline::AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, co if (totalInFly > Self->GetMaxTxInFly()) return false; // let tx to be rejected + return true; +} + +bool TPipeline::AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) { + if (!CheckInflightLimit()) + return false; + if (Self->MvccSwitchState == TSwitchState::SWITCHING) { - WaitingDataTxOps.emplace(TRowVersion::Min(), std::move(ev)); // postpone tx processing till mvcc state switch is finished + WaitingDataTxOps.emplace(TRowVersion::Min(), IEventHandle::Upcast<TEvDataShard::TEvProposeTransaction>(std::move(ev))); // postpone tx processing till mvcc state switch is finished } else { bool prioritizedReads = Self->GetEnablePrioritizedMvccSnapshotReads(); Y_DEBUG_ABORT_UNLESS(ev->Get()->Record.HasMvccSnapshot()); TRowVersion snapshot(ev->Get()->Record.GetMvccSnapshot().GetStep(), ev->Get()->Record.GetMvccSnapshot().GetTxId()); - WaitingDataTxOps.emplace(snapshot, std::move(ev)); + WaitingDataTxOps.emplace(snapshot, IEventHandle::Upcast<TEvDataShard::TEvProposeTransaction>(std::move(ev))); const ui64 waitStep = prioritizedReads ? snapshot.Step : snapshot.Step + 1; TRowVersion unreadableEdge; if (!Self->WaitPlanStep(waitStep) && snapshot < (unreadableEdge = GetUnreadableEdge(prioritizedReads))) { @@ -1851,6 +1905,15 @@ bool TPipeline::AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, co return true; } +bool TPipeline::AddWaitingTxOp(NEvents::TDataEvents::TEvWrite::TPtr& ev) { + if (!CheckInflightLimit()) + return false; + + WaitingDataTxOps.emplace(TRowVersion::Min(), IEventHandle::Upcast<NEvents::TDataEvents::TEvWrite>(std::move(ev))); // postpone tx processing till mvcc state switch is finished + + return true; +} + void TPipeline::ActivateWaitingTxOps(TRowVersion edge, bool prioritizedReads, const TActorContext& ctx) { LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ActivateWaitingTxOps for version# " << edge << ", txOps: " << (WaitingDataTxOps.empty() ? "empty" : ToString(WaitingDataTxOps.begin()->first.Step)) diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index 85cf95c6b84..9e63590493c 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -266,6 +266,10 @@ public: TInstant receivedAt, ui64 tieBreakerIndex, NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx); + TOperation::TPtr BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr &ev, + TInstant receivedAt, ui64 tieBreakerIndex, + NTabletFlatExecutor::TTransactionContext &txc, + const TActorContext &ctx); void BuildDataTx(TActiveTransaction *tx, TTransactionContext &txc, const TActorContext &ctx); @@ -344,7 +348,9 @@ public: void MaybeActivateWaitingSchemeOps(const TActorContext& ctx) const; ui64 WaitingTxs() const { return WaitingDataTxOps.size(); } // note that without iterators + bool CheckInflightLimit() const; bool AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx); + bool AddWaitingTxOp(NEvents::TDataEvents::TEvWrite::TPtr& ev); void ActivateWaitingTxOps(TRowVersion edge, bool prioritizedReads, const TActorContext& ctx); void ActivateWaitingTxOps(const TActorContext& ctx); @@ -507,7 +513,7 @@ private: TWaitingSchemeOpsOrder WaitingSchemeOpsOrder; TWaitingSchemeOps WaitingSchemeOps; - TMultiMap<TRowVersion, TEvDataShard::TEvProposeTransaction::TPtr> WaitingDataTxOps; + TMultiMap<TRowVersion, TAutoPtr<IEventHandle>> WaitingDataTxOps; TCommittingDataTxOps CommittingOps; THashMap<ui64, TOperation::TPtr> CompletingOps; diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h index a0380f8aa0a..ab313b9843d 100644 --- a/ydb/core/tx/datashard/datashard_txs.h +++ b/ydb/core/tx/datashard/datashard_txs.h @@ -112,6 +112,25 @@ protected: NWilson::TSpan ProposeTransactionSpan; }; +class TDataShard::TTxWrite: public NTabletFlatExecutor::TTransactionBase<TDataShard> { +public: + TTxWrite(TDataShard* ds, NEvents::TDataEvents::TEvWrite::TPtr ev, TInstant receivedAt, ui64 tieBreakerIndex, bool delayed); + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; + void Complete(const TActorContext& ctx) override; +protected: + TOperation::TPtr Op; + NEvents::TDataEvents::TEvWrite::TPtr Ev; + const TInstant ReceivedAt; + const ui64 TieBreakerIndex; + ui64 TxId; + TVector<EExecutionUnitKind> CompleteList; + TInstant CommitStart; + bool Acked; + bool Rescheduled = false; + bool WaitComplete = false; + NWilson::TSpan ProposeTransactionSpan; +}; + class TDataShard::TTxReadSet : public NTabletFlatExecutor::TTransactionBase<TDataShard> { public: TTxReadSet(TDataShard *self, TEvTxProcessing::TEvReadSet::TPtr ev); diff --git a/ydb/core/tx/datashard/datashard_ut_read_table.h b/ydb/core/tx/datashard/datashard_ut_read_table.h index 919e1b7eb06..73823f683ad 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_table.h +++ b/ydb/core/tx/datashard/datashard_ut_read_table.h @@ -166,10 +166,15 @@ namespace NDataShardReadTableTest { static void PrintPrimitive(TStringBuilder& out, const NYdb::TValueParser& parser) { switch (parser.GetPrimitiveType()) { + case NYdb::EPrimitiveType::Uint64: + out << parser.GetUint64(); + break; case NYdb::EPrimitiveType::Uint32: out << parser.GetUint32(); break; - + case NYdb::EPrimitiveType::Utf8: + out << parser.GetUtf8(); + break; case NYdb::EPrimitiveType::Timestamp: out << parser.GetTimestamp(); break; diff --git a/ydb/core/tx/datashard/datashard_ut_write.cpp b/ydb/core/tx/datashard/datashard_ut_write.cpp new file mode 100644 index 00000000000..508dbc2062d --- /dev/null +++ b/ydb/core/tx/datashard/datashard_ut_write.cpp @@ -0,0 +1,81 @@ +#include "datashard_active_transaction.h" +#include "datashard_ut_read_table.h" +#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h> + +namespace NKikimr { + +using namespace NKikimr::NDataShard; +using namespace NSchemeShard; +using namespace Tests; +using namespace NDataShardReadTableTest; + +Y_UNIT_TEST_SUITE(DataShardWrite) { + std::tuple<Tests::TServer::TPtr, TActorId> TestCreateServer() { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root").SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.GetAppData().AllowReadTableImmediate = true; + + InitRoot(server, sender); + + return {server, sender}; + } + + Y_UNIT_TEST(WriteImmediateOnShard) { + auto [server, sender] = TestCreateServer(); + + auto opts = TShardedTableOptions().Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}}); + auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts); + + const ui32 rowCount = 3; + ui64 txId = 100; + Write(server, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); + + auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All(); + + UNIT_ASSERT_VALUES_EQUAL(table1state, "key = 0, value = 1\n" + "key = 2, value = 3\n" + "key = 4, value = 5\n"); + } + + Y_UNIT_TEST(WriteImmediateOnShardManyColumns) { + auto [server, sender] = TestCreateServer(); + + auto opts = TShardedTableOptions().Columns({{"key64", "Uint64", true, false}, {"key32", "Uint32", true, false}, + {"value64", "Uint64", false, false}, {"value32", "Uint32", false, false}, {"valueUtf8", "Utf8", false, false}}); + auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts); + + const ui32 rowCount = 3; + ui64 txId = 100; + Write(server, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); + + auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All(); + + UNIT_ASSERT_VALUES_EQUAL(table1state, "key64 = 0, key32 = 1, value64 = 2, value32 = 3, valueUtf8 = String_4\n" + "key64 = 5, key32 = 6, value64 = 7, value32 = 8, valueUtf8 = String_9\n" + "key64 = 10, key32 = 11, value64 = 12, value32 = 13, valueUtf8 = String_14\n"); + } + + Y_UNIT_TEST(WriteOnShard) { + auto [server, sender] = TestCreateServer(); + + TShardedTableOptions opts; + auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts); + + const ui32 rowCount = 3; + ui64 txId = 100; + Write(server, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE); + + auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All(); + + UNIT_ASSERT_VALUES_EQUAL(table1state, ""); + } +} +}
\ No newline at end of file diff --git a/ydb/core/tx/datashard/datashard_write.h b/ydb/core/tx/datashard/datashard_write.h new file mode 100644 index 00000000000..f58bd947b87 --- /dev/null +++ b/ydb/core/tx/datashard/datashard_write.h @@ -0,0 +1,17 @@ +#pragma once + +#include <ydb/library/actors/core/event.h> +#include <ydb/core/protos/data_events.pb.h> + +#include <util/generic/ptr.h> + +namespace NKikimr::NDataShard::EvWrite { + +using namespace NActors; + +class Convertor { +public: + static ui64 GetTxId(const TAutoPtr<IEventHandle>& ev); + static ui64 GetProposeFlags(NKikimrDataEvents::TEvWrite::ETxMode txMode); +}; +}
\ No newline at end of file diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp new file mode 100644 index 00000000000..a5ca6da3c9f --- /dev/null +++ b/ydb/core/tx/datashard/datashard_write_operation.cpp @@ -0,0 +1,555 @@ +#include "defs.h" + +#include "datashard_write_operation.h" +#include "datashard_kqp.h" +#include "datashard_locks.h" +#include "datashard_impl.h" +#include "datashard_failpoints.h" + +#include "key_conflicts.h" +#include "range_ops.h" + +#include <ydb/core/tx/data_events/payload_helper.h> +#include <ydb/core/scheme/scheme_types_proto.h> + +#include <ydb/library/actors/util/memory_track.h> + +namespace NKikimr { +namespace NDataShard { + + + +TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TStepOrder& stepTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite::TPtr& ev) + : StepTxId_(stepTxId) + , TabletId_(self->TabletID()) + , Ev_(ev) + , EngineBay(self, txc, ctx, stepTxId.ToPair()) + , ErrCode(NKikimrTxDataShard::TError::OK) + , TxSize(0) + , TxCacheUsage(0) + , IsReleased(false) + , ReceivedAt_(receivedAt) +{ + ComputeTxSize(); + NActors::NMemory::TLabel<MemoryLabelValidatedDataTx>::Add(TxSize); + + if (LockTxId()) + EngineBay.SetLockTxId(LockTxId(), LockNodeId()); + + if (Immediate()) + EngineBay.SetIsImmediateTx(); + + auto& typeRegistry = *AppData()->TypeRegistry; + + NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta; + + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TxId: " << StepTxId_.TxId << ", shard " << TabletId() << ", meta: " << Record().ShortDebugString()); + + if (!ParseRecord(self)) + return; + + SetTxKeys(RecordOperation().GetColumnIds(), typeRegistry, ctx); + + KqpSetTxLocksKeys(GetKqpLocks(), self->SysLocksTable(), EngineBay); + EngineBay.MarkTxLoaded(); +} + +TValidatedWriteTx::~TValidatedWriteTx() { + NActors::NMemory::TLabel<MemoryLabelValidatedDataTx>::Sub(TxSize); +} + +bool TValidatedWriteTx::ParseRecord(TDataShard* self) { + if (Record().GetOperations().size() != 1) + { + ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT; + ErrStr = TStringBuilder() << "Only one operation is supported now."; + return false; + } + + const NKikimrDataEvents::TTableId& tableIdRecord = RecordOperation().GetTableId(); + + auto tableInfoPtr = self->TableInfos.FindPtr(tableIdRecord.GetTableId()); + if (!tableInfoPtr) { + ErrCode = NKikimrTxDataShard::TError::SCHEME_ERROR; + ErrStr = TStringBuilder() << "Table '" << tableIdRecord.GetTableId() << "' doesn't exist"; + return false; + } + TableInfo_ = tableInfoPtr->Get(); + Y_ABORT_UNLESS(TableInfo_); + + if (TableInfo_->GetTableSchemaVersion() != 0 && tableIdRecord.GetSchemaVersion() != TableInfo_->GetTableSchemaVersion()) + { + ErrCode = NKikimrTxDataShard::TError::SCHEME_CHANGED; + ErrStr = TStringBuilder() << "Table '" << TableInfo_->Path << "' scheme changed."; + return false; + } + + if (RecordOperation().GetPayloadFormat() != NKikimrDataEvents::FORMAT_CELLVEC) + { + ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT; + ErrStr = TStringBuilder() << "Only FORMAT_CELLVEC is supported now. Got: " << RecordOperation().GetPayloadFormat(); + return false; + } + + NEvWrite::TPayloadHelper<NEvents::TDataEvents::TEvWrite> payloadHelper(*Ev_->Get()); + TString payload = payloadHelper.GetDataFromPayload(RecordOperation().GetPayloadIndex()); + + if (!TSerializedCellMatrix::TryParse(payload, Matrix_)) + { + ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT; + ErrStr = TStringBuilder() << "Can't parse TSerializedCellVec in payload"; + return false; + } + + const auto& columnTags = RecordOperation().GetColumnIds(); + if ((size_t)columnTags.size() != Matrix_.GetColCount()) + { + ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT; + ErrStr = TStringBuilder() << "Column count mismatch: got columnids " << columnTags.size() << ", got cells count " << Matrix_.GetColCount(); + return false; + } + + if ((size_t)columnTags.size() < TableInfo_->KeyColumnIds.size()) + { + ErrCode = NKikimrTxDataShard::TError::SCHEME_ERROR; + ErrStr = TStringBuilder() << "Column count mismatch: got " << columnTags.size() << ", expected greater or equal than key column count " << TableInfo_->KeyColumnIds.size(); + return false; + } + + for (size_t i = 0; i < TableInfo_->KeyColumnIds.size(); ++i) { + if (RecordOperation().columnids(i) != TableInfo_->KeyColumnIds[i]) { + ErrCode = NKikimrTxDataShard::TError::SCHEME_ERROR; + ErrStr = TStringBuilder() << "Key column schema at position " << i; + return false; + } + } + + for (ui32 columnTag : columnTags) { + auto* col = TableInfo_->Columns.FindPtr(columnTag); + if (!col) { + ErrCode = NKikimrTxDataShard::TError::SCHEME_ERROR; + ErrStr = TStringBuilder() << "Missing column with id " << columnTag; + return false; + } + } + + TableId_ = TTableId(tableIdRecord.ownerid(), tableIdRecord.GetTableId(), tableIdRecord.GetSchemaVersion()); + return true; +} + +TVector<TEngineBay::TColumnWriteMeta> GetColumnWrites(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags) { + TVector<TEngineBay::TColumnWriteMeta> writeColumns; + writeColumns.reserve(columnTags.size()); + for (ui32 columnTag : columnTags) { + TEngineBay::TColumnWriteMeta writeColumn; + writeColumn.Column = NTable::TColumn("", columnTag, {}, {}); + + writeColumns.push_back(std::move(writeColumn)); + } + + return writeColumns; +} + +void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags, const NScheme::TTypeRegistry& typeRegistry, const TActorContext& ctx) +{ + for (ui32 rowIdx = 0; rowIdx < Matrix_.GetRowCount(); ++rowIdx) + { + //TODO zero copy necessary keys from TableInfo_->KeyColumnTypes + KeyCells_.clear(); + for (ui16 colIdx = 0; colIdx < TableInfo_->KeyColumnIds.size(); ++colIdx) + KeyCells_.push_back(Matrix_.GetCells()[rowIdx * columnTags.size() + colIdx]); + + TTableRange tableRange(KeyCells_); + + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Table " << TableInfo_->Path << ", shard: " << TabletId_ << ", " + << "write point " << DebugPrintPoint(TableInfo_->KeyColumnTypes, KeyCells_, typeRegistry)); + + EngineBay.AddWriteRange(TableId_, tableRange, TableInfo_->KeyColumnTypes, GetColumnWrites(columnTags), false); + } +} + +ui32 TValidatedWriteTx::ExtractKeys(bool allowErrors) +{ + using EResult = NMiniKQL::IEngineFlat::EResult; + + EResult result = EngineBay.Validate(); + if (allowErrors) { + if (result != EResult::Ok) { + ErrStr = EngineBay.GetEngine()->GetErrors(); + ErrCode = ConvertErrCode(result); + return 0; + } + } else { + Y_ABORT_UNLESS(result == EResult::Ok, "Engine errors: %s", EngineBay.GetEngine()->GetErrors().data()); + } + return KeysCount(); +} + +bool TValidatedWriteTx::ReValidateKeys() +{ + using EResult = NMiniKQL::IEngineFlat::EResult; + + + auto [result, error] = EngineBay.GetKqpComputeCtx().ValidateKeys(EngineBay.TxInfo()); + if (result != EResult::Ok) { + ErrStr = std::move(error); + ErrCode = ConvertErrCode(result); + return false; + } + + return true; +} + +bool TValidatedWriteTx::CanCancel() { + return false; +} + +bool TValidatedWriteTx::CheckCancelled() { + return false; +} + +void TValidatedWriteTx::ReleaseTxData() { + EngineBay.DestroyEngine(); + IsReleased = true; + + NActors::NMemory::TLabel<MemoryLabelValidatedDataTx>::Sub(TxSize); + ComputeTxSize(); + NActors::NMemory::TLabel<MemoryLabelValidatedDataTx>::Add(TxSize); +} + +void TValidatedWriteTx::ComputeTxSize() { + TxSize = sizeof(TValidatedWriteTx); +} + +TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr ev, TDataShard* self, TTransactionContext& txc, const TActorContext& ctx) + : TOperation(op) + , Ev_(ev) + , ArtifactFlags(0) + , TxCacheUsage(0) + , ReleasedTxDataSize(0) + , SchemeShardId(0) + , SubDomainPathId(0) +{ + SetTarget(Ev_->Sender); + SetCookie(Ev_->Cookie); + Orbit = std::move(Ev_->Get()->MoveOrbit()); + + BuildWriteTx(self, txc, ctx); + + // First time parsing, so we can fail + Y_DEBUG_ABORT_UNLESS(WriteTx_->Ready()); + + TrackMemory(); +} + +TWriteOperation::~TWriteOperation() +{ + UntrackMemory(); +} + +void TWriteOperation::FillTxData(TValidatedWriteTx::TPtr writeTx) +{ + Y_ABORT_UNLESS(!WriteTx_); + Y_ABORT_UNLESS(!Ev_ || HasVolatilePrepareFlag()); + + Target = writeTx->Source(); + WriteTx_ = writeTx; +} + +void TWriteOperation::FillTxData(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TActorId& target, NEvents::TDataEvents::TEvWrite::TPtr&& ev, const TVector<TSysTables::TLocksTable::TLock>& locks, ui64 artifactFlags) +{ + UntrackMemory(); + + Y_ABORT_UNLESS(!WriteTx_); + Y_ABORT_UNLESS(!Ev_); + + Target = target; + Ev_ = std::move(ev); + if (locks.size()) { + for (auto lock : locks) + LocksCache().Locks[lock.LockId] = lock; + } + ArtifactFlags = artifactFlags; + Y_ABORT_UNLESS(!WriteTx_); + BuildWriteTx(self, txc, ctx); + Y_ABORT_UNLESS(WriteTx_->Ready()); + + TrackMemory(); +} + +void TWriteOperation::FillVolatileTxData(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx) +{ + UntrackMemory(); + + Y_ABORT_UNLESS(!WriteTx_); + Y_ABORT_UNLESS(Ev_); + + BuildWriteTx(self, txc, ctx); + Y_ABORT_UNLESS(WriteTx_->Ready()); + + + TrackMemory(); +} + +TValidatedWriteTx::TPtr TWriteOperation::BuildWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx) +{ + if (!WriteTx_) { + Y_ABORT_UNLESS(Ev_); + WriteTx_ = std::make_shared<TValidatedWriteTx>(self, txc, ctx, GetStepOrder(), GetReceivedAt(), Ev_); + } + return WriteTx_; +} + +void TWriteOperation::ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBase& provider, const TActorContext& ctx) { + ReleasedTxDataSize = provider.GetMemoryLimit() + provider.GetRequestedMemory(); + + if (!WriteTx_ || WriteTx_->IsTxDataReleased()) + return; + + WriteTx_->ReleaseTxData(); + // Immediate transactions have no body stored. + if (!IsImmediate() && !HasVolatilePrepareFlag()) { + UntrackMemory(); + Ev_.Reset(); + TrackMemory(); + } + + //InReadSets.clear(); + OutReadSets().clear(); + LocksAccessLog().Locks.clear(); + LocksCache().Locks.clear(); + ArtifactFlags = 0; + + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << GetTxId() << " released its data"); +} + +void TWriteOperation::DbStoreLocksAccessLog(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx) +{ + using Schema = TDataShard::Schema; + + NIceDb::TNiceDb db(txc.DB); + + using TLocksVector = TVector<TSysTables::TLocksTable::TPersistentLock>; + TLocksVector vec; + vec.reserve(LocksAccessLog().Locks.size()); + for (auto& pr : LocksAccessLog().Locks) + vec.emplace_back(pr.second); + + // Historically C++ column type was TVector<TLock> + const char* vecDataStart = reinterpret_cast<const char*>(vec.data()); + size_t vecDataSize = vec.size() * sizeof(TLocksVector::value_type); + TStringBuf vecData(vecDataStart, vecDataSize); + db.Table<Schema::TxArtifacts>().Key(GetTxId()).Update(NIceDb::TUpdate<Schema::TxArtifacts::Locks>(vecData)); + + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing " << vec.size() << " locks for txid=" << GetTxId() << " in " << self->TabletID()); +} + +void TWriteOperation::DbStoreArtifactFlags(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx) +{ + using Schema = TDataShard::Schema; + + NIceDb::TNiceDb db(txc.DB); + db.Table<Schema::TxArtifacts>().Key(GetTxId()).Update<Schema::TxArtifacts::Flags>(ArtifactFlags); + + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing artifactflags=" << ArtifactFlags << " for txid=" << GetTxId() << " in " << self->TabletID()); +} + +ui64 TWriteOperation::GetMemoryConsumption() const { + ui64 res = 0; + if (WriteTx_) { + res += WriteTx_->GetTxSize() + WriteTx_->GetMemoryAllocated(); + } + if (Ev_) { + res += sizeof(NEvents::TDataEvents::TEvWrite); + } + + return res; +} + +ERestoreDataStatus TWriteOperation::RestoreTxData( + TDataShard* self, + TTransactionContext& txc, + const TActorContext& ctx +) +{ + // TODO + Y_UNUSED(self); + Y_UNUSED(txc); + Y_UNUSED(ctx); + Y_ABORT(); + /* + if (!WriteTx_) { + ReleasedTxDataSize = 0; + return ERestoreDataStatus::Ok; + } + + UntrackMemory(); + + // For immediate transactions we should restore just + // from the Ev_. For planned transaction we should + // restore from local database. + + TVector<TSysTables::TLocksTable::TLock> locks; + if (!IsImmediate() && !HasVolatilePrepareFlag()) { + NIceDb::TNiceDb db(txc.DB);ExtractKeys + bool ok = self->TransQueue.LoadTxDetails(db, GetTxId(), Target, Ev_, locks, ArtifactFlags); + if (!ok) { + Ev_.Reset(); + ArtifactFlags = 0; + return ERestoreDataStatus::Restart; + } + } else { + Y_ABORT_UNLESS(Ev_); + } + + TrackMemory(); + + for (auto& lock : locks) + LocksCache().Locks[lock.LockId] = lock; + + bool extractKeys = WriteTx_->IsTxInfoLoaded(); + WriteTx_ = std::make_shared<TValidatedWriteTx>(self, txc, ctx, GetStepOrder(), GetReceivedAt(), Ev_); + if (WriteTx_->Ready() && extractKeys) { + WriteTx_->ExtractKeys(true); + } + + if (!WriteTx_->Ready()) { + return ERestoreDataStatus::Error; + } + + ReleasedTxDataSize = 0; + */ + + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << GetTxId() << " at " << self->TabletID() << " restored its data"); + + return ERestoreDataStatus::Ok; +} + +void TWriteOperation::FinalizeWriteTxPlan() +{ + Y_ABORT_UNLESS(IsDataTx()); + Y_ABORT_UNLESS(!IsImmediate()); + Y_ABORT_UNLESS(!IsKqpScanTransaction()); + + TVector<EExecutionUnitKind> plan; + + plan.push_back(EExecutionUnitKind::BuildAndWaitDependencies); + if (IsKqpDataTransaction()) { + plan.push_back(EExecutionUnitKind::BuildKqpDataTxOutRS); + plan.push_back(EExecutionUnitKind::StoreAndSendOutRS); + plan.push_back(EExecutionUnitKind::PrepareKqpDataTxInRS); + plan.push_back(EExecutionUnitKind::LoadAndWaitInRS); + plan.push_back(EExecutionUnitKind::ExecuteKqpDataTx); + } else { + plan.push_back(EExecutionUnitKind::BuildDataTxOutRS); + plan.push_back(EExecutionUnitKind::StoreAndSendOutRS); + plan.push_back(EExecutionUnitKind::PrepareDataTxInRS); + plan.push_back(EExecutionUnitKind::LoadAndWaitInRS); + plan.push_back(EExecutionUnitKind::ExecuteDataTx); + } + plan.push_back(EExecutionUnitKind::CompleteOperation); + plan.push_back(EExecutionUnitKind::CompletedOperations); + + RewriteExecutionPlan(plan); +} + +class TFinalizeWriteTxPlanUnit: public TExecutionUnit { +public: + TFinalizeWriteTxPlanUnit(TDataShard& dataShard, TPipeline& pipeline) + : TExecutionUnit(EExecutionUnitKind::FinalizeWriteTxPlan, false, dataShard, pipeline) + { + } + + bool IsReadyToExecute(TOperation::TPtr) const override { + return true; + } + + EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override { + Y_UNUSED(txc); + Y_UNUSED(ctx); + + TWriteOperation* tx = dynamic_cast<TWriteOperation*>(op.Get()); + Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); + + tx->FinalizeWriteTxPlan(); + + return EExecutionStatus::Executed; + } + + void Complete(TOperation::TPtr op, const TActorContext& ctx) override { + Y_UNUSED(op); + Y_UNUSED(ctx); + } +}; + +THolder<TExecutionUnit> CreateFinalizeWriteTxPlanUnit(TDataShard& dataShard, TPipeline& pipeline) { + return THolder(new TFinalizeWriteTxPlanUnit(dataShard, pipeline)); +} + +void TWriteOperation::TrackMemory() const { + // TODO More accurate calc memory + NActors::NMemory::TLabel<MemoryLabelActiveTransactionBody>::Add(Record().SpaceUsed()); +} + +void TWriteOperation::UntrackMemory() const { + NActors::NMemory::TLabel<MemoryLabelActiveTransactionBody>::Sub(Record().SpaceUsed()); +} + +void TWriteOperation::SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg) { + SetAbortedFlag(); + WriteResult_ = NEvents::TDataEvents::TEvWriteResult::BuildError(WriteTx_->TabletId(), GetTxId(), status, errorMsg); +} + +void TWriteOperation::SetWriteResult(std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>&& writeResult) { + WriteResult_ = std::move(writeResult); +} + +void TWriteOperation::BuildExecutionPlan(bool loaded) +{ + Y_ABORT_UNLESS(GetExecutionPlan().empty()); + Y_ABORT_UNLESS(!loaded); + + TVector<EExecutionUnitKind> plan; + + //if (IsImmediate()) + { + Y_ABORT_UNLESS(!loaded); + plan.push_back(EExecutionUnitKind::CheckWrite); + plan.push_back(EExecutionUnitKind::BuildAndWaitDependencies); + plan.push_back(EExecutionUnitKind::ExecuteWrite); + plan.push_back(EExecutionUnitKind::FinishPropose); + plan.push_back(EExecutionUnitKind::CompletedOperations); + } + /* + else if (HasVolatilePrepareFlag()) { + plan.push_back(EExecutionUnitKind::StoreDataTx); // note: stores in memory + plan.push_back(EExecutionUnitKind::FinishPropose); + Y_ABORT_UNLESS(!GetStep()); + plan.push_back(EExecutionUnitKind::WaitForPlan); + plan.push_back(EExecutionUnitKind::PlanQueue); + plan.push_back(EExecutionUnitKind::LoadTxDetails); // note: reloads from memory + plan.push_back(EExecutionUnitKind::BuildAndWaitDependencies); + plan.push_back(EExecutionUnitKind::ExecuteWrite); + plan.push_back(EExecutionUnitKind::CompleteOperation); + plan.push_back(EExecutionUnitKind::CompletedOperations); + } else { + if (!loaded) { + plan.push_back(EExecutionUnitKind::CheckWrite); + plan.push_back(EExecutionUnitKind::StoreDataTx); + plan.push_back(EExecutionUnitKind::FinishPropose); + } + if (!GetStep()) + plan.push_back(EExecutionUnitKind::WaitForPlan); + plan.push_back(EExecutionUnitKind::PlanQueue); + plan.push_back(EExecutionUnitKind::LoadTxDetails); + plan.push_back(EExecutionUnitKind::FinalizeWriteTxPlan); + } */ + RewriteExecutionPlan(plan); +} + +} // NDataShard +} // NKikimr + +Y_DECLARE_OUT_SPEC(, NKikimr::NDataShard::TWriteOperation, stream, tx) { + stream << '[' << tx.GetStep() << ':' << tx.GetTxId() << ']'; +} diff --git a/ydb/core/tx/datashard/datashard_write_operation.h b/ydb/core/tx/datashard/datashard_write_operation.h new file mode 100644 index 00000000000..2d6a107c29b --- /dev/null +++ b/ydb/core/tx/datashard/datashard_write_operation.h @@ -0,0 +1,440 @@ +#pragma once + +#include "datashard_impl.h" +#include "datashard_locks.h" +#include "datashard__engine_host.h" +#include "operation.h" + +#include <ydb/core/tx/tx_processing.h> +#include <ydb/core/tablet_flat/flat_cxx_database.h> + +#include <ydb/library/yql/public/issue/yql_issue.h> + +namespace NKikimr { +namespace NDataShard { + + +class TValidatedWriteTx: TNonCopyable { +public: + using TPtr = std::shared_ptr<TValidatedWriteTx>; + + TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TStepOrder& stepTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite::TPtr& ev); + + ~TValidatedWriteTx(); + + static constexpr ui64 MaxReorderTxKeys() { + return 100; + } + + NKikimrTxDataShard::TError::EKind Code() const { + return ErrCode; + } + const TString GetError() const { + return ErrStr; + } + + TStepOrder StepTxId() const { + return StepTxId_; + } + ui64 TxId() const { + return StepTxId_.TxId; + } + ui64 TabletId() const { + return TabletId_; + } + const NEvents::TDataEvents::TEvWrite::TPtr& Ev() const { + return Ev_; + } + + const NKikimrDataEvents::TEvWrite& Record() const { + return Ev_->Get()->Record; + } + + const NKikimrDataEvents::TEvWrite::TOperation& RecordOperation() const { + //TODO Only one operation is supported now + return Record().operations(0); + } + + const TTableId& TableId() const { + return TableId_; + } + + const TSerializedCellMatrix Matrix() const { + return Matrix_; + } + + const TVector<TCell> KeyCells() const { + return KeyCells_; + } + + ui64 LockTxId() const { + return Record().locktxid(); + } + ui32 LockNodeId() const { + return Record().locknodeid(); + } + bool Immediate() const { + return Record().txmode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE; + } + bool NeedDiagnostics() const { + return true; + } + bool CollectStats() const { + return true; + } + TInstant ReceivedAt() const { + return ReceivedAt_; + } + TInstant Deadline() const { + return Deadline_; + } + + bool Ready() const { + return ErrCode == NKikimrTxDataShard::TError::OK; + } + bool RequirePrepare() const { + return ErrCode == NKikimrTxDataShard::TError::SNAPSHOT_NOT_READY_YET; + } + bool RequireWrites() const { + return TxInfo().HasWrites() || !Immediate(); + } + bool HasWrites() const { + return TxInfo().HasWrites(); + } + bool HasLockedWrites() const { + return HasWrites() && LockTxId(); + } + bool HasDynamicWrites() const { + return TxInfo().DynKeysCount != 0; + } + + // TODO: It's an expensive operation (Precharge() inside). We need avoid it. + TEngineBay::TSizes CalcReadSizes(bool needsTotalKeysSize) const { + return EngineBay.CalcSizes(needsTotalKeysSize); + } + + ui64 GetMemoryAllocated() const { + return EngineBay.GetEngine() ? EngineBay.GetEngine()->GetMemoryAllocated() : 0; + } + + NMiniKQL::IEngineFlat* GetEngine() { + return EngineBay.GetEngine(); + } + void DestroyEngine() { + EngineBay.DestroyEngine(); + } + const NMiniKQL::TEngineHostCounters& GetCounters() { + return EngineBay.GetCounters(); + } + void ResetCounters() { + EngineBay.ResetCounters(); + } + + bool CanCancel(); + bool CheckCancelled(); + + void SetWriteVersion(TRowVersion writeVersion) { + EngineBay.SetWriteVersion(writeVersion); + } + void SetReadVersion(TRowVersion readVersion) { + EngineBay.SetReadVersion(readVersion); + } + void SetVolatileTxId(ui64 txId) { + EngineBay.SetVolatileTxId(txId); + } + + void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion) { + EngineBay.CommitChanges(tableId, lockId, writeVersion); + } + + TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const { + return EngineBay.GetCollectedChanges(); + } + void ResetCollectedChanges() { + EngineBay.ResetCollectedChanges(); + } + + TVector<ui64> GetVolatileCommitTxIds() const { + return EngineBay.GetVolatileCommitTxIds(); + } + const absl::flat_hash_set<ui64>& GetVolatileDependencies() const { + return EngineBay.GetVolatileDependencies(); + } + std::optional<ui64> GetVolatileChangeGroup() const { + return EngineBay.GetVolatileChangeGroup(); + } + bool GetVolatileCommitOrdered() const { + return EngineBay.GetVolatileCommitOrdered(); + } + + TActorId Source() const { + return Source_; + } + void SetSource(const TActorId& actorId) { + Source_ = actorId; + } + void SetStep(ui64 step) { + StepTxId_.Step = step; + } + bool IsProposed() const { + return Source_ != TActorId(); + } + + inline const ::NKikimrDataEvents::TKqpLocks& GetKqpLocks() const { + return Record().locks(); + } + + bool ParseRecord(TDataShard* self); + void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds, const NScheme::TTypeRegistry& typeRegistry, const TActorContext& ctx); + + ui32 ExtractKeys(bool allowErrors); + bool ReValidateKeys(); + + ui64 GetTxSize() const { + return TxSize; + } + ui32 KeysCount() const { + return TxInfo().WritesCount; + } + + void SetTxCacheUsage(ui64 val) { + TxCacheUsage = val; + } + ui64 GetTxCacheUsage() const { + return TxCacheUsage; + } + + void ReleaseTxData(); + bool IsTxDataReleased() const { + return IsReleased; + } + + bool IsTxInfoLoaded() const { + return TxInfo().Loaded; + } + + bool HasOutReadsets() const { + return TxInfo().HasOutReadsets; + } + bool HasInReadsets() const { + return TxInfo().HasInReadsets; + } + + const NMiniKQL::IEngineFlat::TValidationInfo& TxInfo() const { + return EngineBay.TxInfo(); + } + +private: + //TODO: YDB_READONLY + TStepOrder StepTxId_; + ui64 TabletId_; + TTableId TableId_; + const TUserTable* TableInfo_; + const NEvents::TDataEvents::TEvWrite::TPtr& Ev_; + TSerializedCellMatrix Matrix_; + TVector<TCell> KeyCells_; + TActorId Source_; + TEngineBay EngineBay; + NKikimrTxDataShard::TError::EKind ErrCode; + TString ErrStr; + ui64 TxSize; + ui64 TxCacheUsage; + bool IsReleased; + const TInstant ReceivedAt_; // For local timeout tracking + TInstant Deadline_; + + void ComputeTxSize(); +}; + +class TWriteOperation : public TOperation { + friend class TWriteUnit; +public: + explicit TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr ev, TDataShard* self, TTransactionContext& txc, const TActorContext& ctx); + + ~TWriteOperation(); + + void FillTxData(TValidatedWriteTx::TPtr dataTx); + void FillTxData(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TActorId& target, NEvents::TDataEvents::TEvWrite::TPtr&& ev, const TVector<TSysTables::TLocksTable::TLock>& locks, ui64 artifactFlags); + void FillVolatileTxData(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx); + + const NEvents::TDataEvents::TEvWrite::TPtr& GetEv() const { + return Ev_; + } + void SetEv(const NEvents::TDataEvents::TEvWrite::TPtr& ev) { + UntrackMemory(); + Ev_ = ev; + TrackMemory(); + } + void ClearEv() { + UntrackMemory(); + Ev_.Reset(); + TrackMemory(); + } + + ui64 GetSchemeShardId() const { + return SchemeShardId; + } + void SetSchemeShardId(ui64 id) { + SchemeShardId = id; + } + ui64 GetSubDomainPathId() const { + return SubDomainPathId; + } + void SetSubDomainPathId(ui64 pathId) { + SubDomainPathId = pathId; + } + + const NKikimrSubDomains::TProcessingParams& GetProcessingParams() const { + return ProcessingParams; + } + void SetProcessingParams(const NKikimrSubDomains::TProcessingParams& params) + { + ProcessingParams.CopyFrom(params); + } + + void Deactivate() override { + ClearEv(); + + TOperation::Deactivate(); + } + + ui32 ExtractKeys() { + return WriteTx_ ? WriteTx_->ExtractKeys(false) : 0; + } + + bool ReValidateKeys() { + return WriteTx_ ? WriteTx_->ReValidateKeys() : true; + } + + void MarkAsUsingSnapshot() { + SetUsingSnapshotFlag(); + } + + void SetTxCacheUsage(ui64 val) { + TxCacheUsage = val; + } + ui64 GetTxCacheUsage() const { + return TxCacheUsage; + } + + ui64 GetReleasedTxDataSize() const { + return ReleasedTxDataSize; + } + bool IsTxDataReleased() const { + return ReleasedTxDataSize > 0; + } + + enum EArtifactFlags { + OUT_RS_STORED = (1 << 0), + LOCKS_STORED = (1 << 1), + }; + void MarkOutRSStored() { + ArtifactFlags |= OUT_RS_STORED; + } + + bool IsOutRSStored() { + return ArtifactFlags & OUT_RS_STORED; + } + + void MarkLocksStored() { + ArtifactFlags |= LOCKS_STORED; + } + + bool IsLocksStored() { + return ArtifactFlags & LOCKS_STORED; + } + + void DbStoreLocksAccessLog(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx); + void DbStoreArtifactFlags(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx); + + ui64 GetMemoryConsumption() const; + + ui64 GetRequiredMemory() const { + Y_ABORT_UNLESS(!GetTxCacheUsage() || !IsTxDataReleased()); + ui64 requiredMem = GetTxCacheUsage() + GetReleasedTxDataSize(); + if (!requiredMem) + requiredMem = GetMemoryConsumption(); + return requiredMem; + } + + void ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBase& provider, const TActorContext& ctx); + ERestoreDataStatus RestoreTxData(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx); + void FinalizeWriteTxPlan(); + + // TOperation iface. + void BuildExecutionPlan(bool loaded) override; + + bool HasKeysInfo() const override { + return WriteTx_ ? WriteTx_->TxInfo().Loaded : false; + } + + const NMiniKQL::IEngineFlat::TValidationInfo& GetKeysInfo() const override { + if (WriteTx_) { + Y_ABORT_UNLESS(WriteTx_->TxInfo().Loaded); + return WriteTx_->TxInfo(); + } + // For scheme tx global reader and writer flags should + // result in all required dependencies. + return TOperation::GetKeysInfo(); + } + + ui64 LockTxId() const override { + return WriteTx_ ? WriteTx_->LockTxId() : 0; + } + + ui32 LockNodeId() const override { + return WriteTx_ ? WriteTx_->LockNodeId() : 0; + } + + bool HasLockedWrites() const override { + return WriteTx_ ? WriteTx_->HasLockedWrites() : false; + } + + ui64 IncrementPageFaultCount() { + return ++PageFaultCount; + } + + const TValidatedWriteTx::TPtr& WriteTx() const { + return WriteTx_; + } + TValidatedWriteTx::TPtr BuildWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx); + + void ClearWriteTx() { + WriteTx_ = nullptr; + } + + const NKikimrDataEvents::TEvWrite& Record() const { + return Ev_->Get()->Record; + } + + const std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>& WriteResult() const { + return WriteResult_; + } + std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>&& WriteResult() { + return std::move(WriteResult_); + } + + void SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg); + void SetWriteResult(std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>&& writeResult); + +private: + void TrackMemory() const; + void UntrackMemory() const; + +private: + NEvents::TDataEvents::TEvWrite::TPtr Ev_; + TValidatedWriteTx::TPtr WriteTx_; + std::unique_ptr<NEvents::TDataEvents::TEvWriteResult> WriteResult_; + + // TODO: move to persistent part of operation's flags + ui64 ArtifactFlags; + ui64 TxCacheUsage; + ui64 ReleasedTxDataSize; + ui64 SchemeShardId; + ui64 SubDomainPathId; + NKikimrSubDomains::TProcessingParams ProcessingParams; + ui64 PageFaultCount = 0; +}; + +} // NDataShard +} // NKikimr diff --git a/ydb/core/tx/datashard/execution_unit.cpp b/ydb/core/tx/datashard/execution_unit.cpp index 404bfd25ea2..51ce8cf8ecc 100644 --- a/ydb/core/tx/datashard/execution_unit.cpp +++ b/ydb/core/tx/datashard/execution_unit.cpp @@ -12,6 +12,8 @@ THolder<TExecutionUnit> CreateExecutionUnit(EExecutionUnitKind kind, switch (kind) { case EExecutionUnitKind::CheckDataTx: return CreateCheckDataTxUnit(dataShard, pipeline); + case EExecutionUnitKind::CheckWrite: + return CreateCheckWriteUnit(dataShard, pipeline); case EExecutionUnitKind::CheckSchemeTx: return CreateCheckSchemeTxUnit(dataShard, pipeline); case EExecutionUnitKind::CheckSnapshotTx: @@ -44,6 +46,8 @@ THolder<TExecutionUnit> CreateExecutionUnit(EExecutionUnitKind kind, return CreateLoadTxDetailsUnit(dataShard, pipeline); case EExecutionUnitKind::FinalizeDataTxPlan: return CreateFinalizeDataTxPlanUnit(dataShard, pipeline); + case EExecutionUnitKind::FinalizeWriteTxPlan: + return CreateFinalizeWriteTxPlanUnit(dataShard, pipeline); case EExecutionUnitKind::ProtectSchemeEchoes: return CreateProtectSchemeEchoesUnit(dataShard, pipeline); case EExecutionUnitKind::BuildDataTxOutRS: @@ -132,6 +136,8 @@ THolder<TExecutionUnit> CreateExecutionUnit(EExecutionUnitKind kind, return CreateCheckReadUnit(dataShard, pipeline); case EExecutionUnitKind::ExecuteRead: return CreateReadUnit(dataShard, pipeline); + case EExecutionUnitKind::ExecuteWrite: + return CreateWriteUnit(dataShard, pipeline); default: Y_FAIL_S("Unexpected execution kind " << kind << " (" << (ui32)kind << ")"); } diff --git a/ydb/core/tx/datashard/execution_unit_ctors.h b/ydb/core/tx/datashard/execution_unit_ctors.h index 2585a2d9b8e..1a30d6f4dc9 100644 --- a/ydb/core/tx/datashard/execution_unit_ctors.h +++ b/ydb/core/tx/datashard/execution_unit_ctors.h @@ -6,7 +6,8 @@ namespace NKikimr { namespace NDataShard { THolder<TExecutionUnit> CreateCheckDataTxUnit(TDataShard &dataShard, TPipeline &pipeline); -THolder<TExecutionUnit> CreateCheckSchemeTxUnit(TDataShard &dataShard, TPipeline &pipeline); +THolder<TExecutionUnit> CreateCheckWriteUnit(TDataShard& dataShard, TPipeline& pipeline); +THolder<TExecutionUnit> CreateCheckSchemeTxUnit(TDataShard& dataShard, TPipeline& pipeline); THolder<TExecutionUnit> CreateCheckSnapshotTxUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateCheckDistributedEraseTxUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateCheckCommitWritesTxUnit(TDataShard &dataShard, TPipeline &pipeline); @@ -22,7 +23,8 @@ THolder<TExecutionUnit> CreateWaitForPlanUnit(TDataShard &dataShard, TPipeline & THolder<TExecutionUnit> CreatePlanQueueUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateLoadTxDetailsUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateFinalizeDataTxPlanUnit(TDataShard &dataShard, TPipeline &pipeline); -THolder<TExecutionUnit> CreateProtectSchemeEchoesUnit(TDataShard &dataShard, TPipeline &pipeline); +THolder<TExecutionUnit> CreateFinalizeWriteTxPlanUnit(TDataShard& dataShard, TPipeline& pipeline); +THolder<TExecutionUnit> CreateProtectSchemeEchoesUnit(TDataShard& dataShard, TPipeline& pipeline); THolder<TExecutionUnit> CreateBuildDataTxOutRSUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateBuildDistributedEraseTxOutRSUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateBuildKqpDataTxOutRSUnit(TDataShard &dataShard, TPipeline &pipeline); @@ -66,6 +68,7 @@ THolder<TExecutionUnit> CreateAlterCdcStreamUnit(TDataShard &dataShard, TPipelin THolder<TExecutionUnit> CreateDropCdcStreamUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateCheckReadUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateReadUnit(TDataShard &dataShard, TPipeline &pipeline); +THolder<TExecutionUnit> CreateWriteUnit(TDataShard& dataShard, TPipeline& pipeline); } // namespace NDataShard } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execution_unit_kind.h b/ydb/core/tx/datashard/execution_unit_kind.h index a96aaaaf214..a62a8718aa1 100644 --- a/ydb/core/tx/datashard/execution_unit_kind.h +++ b/ydb/core/tx/datashard/execution_unit_kind.h @@ -4,13 +4,14 @@ namespace NKikimr { namespace NDataShard { -enum class EExecutionUnitKind : ui32 { +enum class EExecutionUnitKind: ui32 { CheckDataTx, CheckSchemeTx, CheckSnapshotTx, CheckDistributedEraseTx, CheckCommitWritesTx, CheckRead, + CheckWrite, StoreDataTx, StoreSchemeTx, StoreSnapshotTx, @@ -23,6 +24,7 @@ enum class EExecutionUnitKind : ui32 { PlanQueue, LoadTxDetails, FinalizeDataTxPlan, + FinalizeWriteTxPlan, ProtectSchemeEchoes, BuildDataTxOutRS, BuildKqpDataTxOutRS, @@ -37,6 +39,7 @@ enum class EExecutionUnitKind : ui32 { ExecuteDistributedEraseTx, ExecuteCommitWritesTx, ExecuteRead, + ExecuteWrite, CompleteOperation, ExecuteKqpScanTx, MakeScanSnapshot, diff --git a/ydb/core/tx/datashard/probes.h b/ydb/core/tx/datashard/probes.h index b7d96b79bcb..7640df6f07c 100644 --- a/ydb/core/tx/datashard/probes.h +++ b/ydb/core/tx/datashard/probes.h @@ -75,6 +75,19 @@ GROUPS("DataShard"), \ TYPES(), \ NAMES()) \ + PROBE(WriteRequest, \ + GROUPS("DataShard", "LWTrackStart"), \ + TYPES(), \ + NAMES()) \ + PROBE(WriteExecute, \ + GROUPS("DataShard"), \ + TYPES(), \ + NAMES()) \ + PROBE(WriteResult, \ + GROUPS("DataShard"), \ + TYPES(), \ + NAMES()) \ + // DATASHARD_PROVIDER LWTRACE_DECLARE_PROVIDER(DATASHARD_PROVIDER) diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp index 00e0125d9c4..82648b94f87 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp @@ -8,6 +8,8 @@ #include <ydb/core/tablet_flat/shared_cache_events.h> #include <ydb/core/testlib/basics/appdata.h> #include <ydb/core/tx/balance_coverage/balance_coverage_builder.h> +#include <ydb/core/tx/data_events/events.h> +#include <ydb/core/tx/data_events/payload_helper.h> #include <ydb/core/tx/tx_allocator/txallocator.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/tx/tx_proxy/upload_rows.h> @@ -1105,7 +1107,7 @@ static ui64 RunSchemeTx( return ev->Get()->Record.GetTxId(); } -void CreateShardedTable( +std::tuple<TVector<ui64>, ui64> CreateShardedTable( Tests::TServer::TPtr server, TActorId sender, const TString &root, @@ -1223,9 +1225,15 @@ void CreateShardedTable( } WaitTxNotification(server, sender, RunSchemeTx(*server->GetRuntime(), std::move(request), sender)); + + TString path = TStringBuilder() << root << "/" << name; + const auto& shards = GetTableShards(server, sender, path); + const ui64 tableId = ResolveTableId(server, sender, path).PathId.LocalPathId; + + return {shards, tableId}; } -void CreateShardedTable( +std::tuple<TVector<ui64>, ui64> CreateShardedTable( Tests::TServer::TPtr server, TActorId sender, const TString &root, @@ -1240,7 +1248,7 @@ void CreateShardedTable( .EnableOutOfOrder(enableOutOfOrder) .Policy(policy) .ShadowData(shadowData); - CreateShardedTable(server, sender, root, name, opts); + return CreateShardedTable(server, sender, root, name, opts); } ui64 AsyncCreateCopyTable( @@ -1803,6 +1811,70 @@ void ExecSQL(Tests::TServer::TPtr server, UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetRef().GetYdbStatus(), code); } +std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount) { + std::vector<ui32> columnIds(columns.size()); + std::iota(columnIds.begin(), columnIds.end(), 1); + + TVector<TString> stringValues; + TVector<TCell> cells; + for (ui32 row = 0; row < rowCount; ++row) { + for (ui32 col = 0; col < columns.size(); ++col) { + const TString& columnType = columns[col].Type; + ui64 value = row * columns.size() + col; + if (columnType == "Uint64") { + cells.emplace_back(TCell((const char*)&value, sizeof(ui64))); + } else if (columnType == "Uint32") { + ui32 value32 = (ui32)value; + cells.emplace_back(TCell((const char*)&value32, sizeof(ui32))); + } else if (columnType == "Utf8") { + stringValues.emplace_back(Sprintf("String_%" PRIu64, value)); + cells.emplace_back(TCell(stringValues.back().c_str(), stringValues.back().size())); + } else { + Y_ABORT("Unsupported column type"); + } + } + } + + TSerializedCellMatrix matrix(cells, rowCount, columns.size()); + TString blobData = matrix.GetBuffer(); + + UNIT_ASSERT(blobData.size() < 8_MB); + + auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode); + ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData)); + evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC); + + return evWrite; +} + +NKikimrDataEvents::TEvWriteResult Write(Tests::TServer::TPtr server, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode) +{ + auto& runtime = *server->GetRuntime(); + auto request = MakeWriteRequest(txId, txMode, tableId, columns, rowCount); + runtime.SendToPipe(shardId, sender, request.release(), 0, GetPipeConfigWithRetries()); + + auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender); + auto status = ev->Get()->Record.GetStatus(); + + NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus; + switch (txMode) { + case NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE: + expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED; + break; + case NKikimrDataEvents::TEvWrite::MODE_PREPARE: + case NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE: + expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED; + break; + default: + expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED; + UNIT_ASSERT_C(false, "Unexpected txMode: " << txMode); + break; + } + UNIT_ASSERT_C(status == expectedStatus, "Status: " << ev->Get()->Record.GetStatus() << " Issues: " << ev->Get()->Record.GetIssues()); + + return ev->Get()->Record; +} + void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values) { auto txTypes = std::make_shared<NTxProxy::TUploadTypes>(); @@ -1832,8 +1904,8 @@ void WaitTabletBecomesOffline(TServer::TPtr server, ui64 tabletId) { IsShardStateChange(ui64 tabletId) : TabletId(tabletId) - { - } + { + } bool operator()(IEventHandle& ev) { diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h index b75914ade22..64a7ef814ea 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h @@ -500,13 +500,15 @@ struct TShardedTableOptions { template<bool OPT1, bool OPT2> \ void N(NUnitTest::TTestContext&) -void CreateShardedTable(Tests::TServer::TPtr server, +// Create table, returns shards & tableId +std::tuple<TVector<ui64>, ui64> CreateShardedTable(Tests::TServer::TPtr server, TActorId sender, const TString &root, const TString &name, const TShardedTableOptions &opts = TShardedTableOptions()); -void CreateShardedTable(Tests::TServer::TPtr server, +// Create table, returns shards & tableId +std::tuple<TVector<ui64>, ui64> CreateShardedTable(Tests::TServer::TPtr server, TActorId sender, const TString &root, const TString &name, @@ -708,6 +710,8 @@ void ExecSQL(Tests::TServer::TPtr server, bool dml = true, Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS); +NKikimrDataEvents::TEvWriteResult Write(Tests::TServer::TPtr server, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode); + void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values); struct IsTxResultComplete { diff --git a/ydb/core/tx/datashard/ut_write/CMakeLists.darwin-arm64.txt b/ydb/core/tx/datashard/ut_write/CMakeLists.darwin-arm64.txt new file mode 100644 index 00000000000..ac06a0c7e13 --- /dev/null +++ b/ydb/core/tx/datashard/ut_write/CMakeLists.darwin-arm64.txt @@ -0,0 +1,86 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-datashard-ut_write) +target_compile_options(ydb-core-tx-datashard-ut_write PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-datashard-ut_write PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard +) +target_link_libraries(ydb-core-tx-datashard-ut_write PUBLIC + contrib-libs-cxxsupp + yutil + cpp-testing-unittest_main + core-tx-datashard + tx-datashard-ut_common + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + kqp-ut-common + core-testlib-default + ydb-core-tx + udf-service-exception_policy + public-lib-yson_value + cpp-client-ydb_result +) +target_link_options(ydb-core-tx-datashard-ut_write PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-core-tx-datashard-ut_write PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_write.cpp +) +set_property( + TARGET + ydb-core-tx-datashard-ut_write + PROPERTY + SPLIT_FACTOR + 5 +) +add_yunittest( + NAME + ydb-core-tx-datashard-ut_write + TEST_TARGET + ydb-core-tx-datashard-ut_write + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-tx-datashard-ut_write + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-tx-datashard-ut_write + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-tx-datashard-ut_write + PROPERTY + TIMEOUT + 600 +) +target_allocator(ydb-core-tx-datashard-ut_write + system_allocator +) +vcs_info(ydb-core-tx-datashard-ut_write) diff --git a/ydb/core/tx/datashard/ut_write/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/ut_write/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..c4ef0696942 --- /dev/null +++ b/ydb/core/tx/datashard/ut_write/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,87 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-datashard-ut_write) +target_compile_options(ydb-core-tx-datashard-ut_write PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-datashard-ut_write PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard +) +target_link_libraries(ydb-core-tx-datashard-ut_write PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-datashard + tx-datashard-ut_common + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + kqp-ut-common + core-testlib-default + ydb-core-tx + udf-service-exception_policy + public-lib-yson_value + cpp-client-ydb_result +) +target_link_options(ydb-core-tx-datashard-ut_write PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-core-tx-datashard-ut_write PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_write.cpp +) +set_property( + TARGET + ydb-core-tx-datashard-ut_write + PROPERTY + SPLIT_FACTOR + 5 +) +add_yunittest( + NAME + ydb-core-tx-datashard-ut_write + TEST_TARGET + ydb-core-tx-datashard-ut_write + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-tx-datashard-ut_write + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-tx-datashard-ut_write + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-tx-datashard-ut_write + PROPERTY + TIMEOUT + 600 +) +target_allocator(ydb-core-tx-datashard-ut_write + system_allocator +) +vcs_info(ydb-core-tx-datashard-ut_write) diff --git a/ydb/core/tx/datashard/ut_write/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/ut_write/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..17b75fce13f --- /dev/null +++ b/ydb/core/tx/datashard/ut_write/CMakeLists.linux-aarch64.txt @@ -0,0 +1,90 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-datashard-ut_write) +target_compile_options(ydb-core-tx-datashard-ut_write PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-datashard-ut_write PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard +) +target_link_libraries(ydb-core-tx-datashard-ut_write PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-testing-unittest_main + core-tx-datashard + tx-datashard-ut_common + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + kqp-ut-common + core-testlib-default + ydb-core-tx + udf-service-exception_policy + public-lib-yson_value + cpp-client-ydb_result +) +target_link_options(ydb-core-tx-datashard-ut_write PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-tx-datashard-ut_write PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_write.cpp +) +set_property( + TARGET + ydb-core-tx-datashard-ut_write + PROPERTY + SPLIT_FACTOR + 5 +) +add_yunittest( + NAME + ydb-core-tx-datashard-ut_write + TEST_TARGET + ydb-core-tx-datashard-ut_write + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-tx-datashard-ut_write + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-tx-datashard-ut_write + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-tx-datashard-ut_write + PROPERTY + TIMEOUT + 600 +) +target_allocator(ydb-core-tx-datashard-ut_write + cpp-malloc-jemalloc +) +vcs_info(ydb-core-tx-datashard-ut_write) diff --git a/ydb/core/tx/datashard/ut_write/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/ut_write/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..45c988267d8 --- /dev/null +++ b/ydb/core/tx/datashard/ut_write/CMakeLists.linux-x86_64.txt @@ -0,0 +1,92 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-datashard-ut_write) +target_compile_options(ydb-core-tx-datashard-ut_write PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-datashard-ut_write PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard +) +target_link_libraries(ydb-core-tx-datashard-ut_write PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-datashard + tx-datashard-ut_common + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + kqp-ut-common + core-testlib-default + ydb-core-tx + udf-service-exception_policy + public-lib-yson_value + cpp-client-ydb_result +) +target_link_options(ydb-core-tx-datashard-ut_write PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-tx-datashard-ut_write PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_write.cpp +) +set_property( + TARGET + ydb-core-tx-datashard-ut_write + PROPERTY + SPLIT_FACTOR + 5 +) +add_yunittest( + NAME + ydb-core-tx-datashard-ut_write + TEST_TARGET + ydb-core-tx-datashard-ut_write + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-tx-datashard-ut_write + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-tx-datashard-ut_write + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-tx-datashard-ut_write + PROPERTY + TIMEOUT + 600 +) +target_allocator(ydb-core-tx-datashard-ut_write + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache +) +vcs_info(ydb-core-tx-datashard-ut_write) diff --git a/ydb/core/tx/datashard/ut_write/CMakeLists.txt b/ydb/core/tx/datashard/ut_write/CMakeLists.txt new file mode 100644 index 00000000000..d863ebd1806 --- /dev/null +++ b/ydb/core/tx/datashard/ut_write/CMakeLists.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64") + include(CMakeLists.darwin-arm64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +endif() diff --git a/ydb/core/tx/datashard/ut_write/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/ut_write/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..99c67389521 --- /dev/null +++ b/ydb/core/tx/datashard/ut_write/CMakeLists.windows-x86_64.txt @@ -0,0 +1,80 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-tx-datashard-ut_write) +target_compile_options(ydb-core-tx-datashard-ut_write PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-tx-datashard-ut_write PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard +) +target_link_libraries(ydb-core-tx-datashard-ut_write PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + core-tx-datashard + tx-datashard-ut_common + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + kqp-ut-common + core-testlib-default + ydb-core-tx + udf-service-exception_policy + public-lib-yson_value + cpp-client-ydb_result +) +target_sources(ydb-core-tx-datashard-ut_write PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_write.cpp +) +set_property( + TARGET + ydb-core-tx-datashard-ut_write + PROPERTY + SPLIT_FACTOR + 5 +) +add_yunittest( + NAME + ydb-core-tx-datashard-ut_write + TEST_TARGET + ydb-core-tx-datashard-ut_write + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-tx-datashard-ut_write + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-tx-datashard-ut_write + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-tx-datashard-ut_write + PROPERTY + TIMEOUT + 600 +) +target_allocator(ydb-core-tx-datashard-ut_write + system_allocator +) +vcs_info(ydb-core-tx-datashard-ut_write) diff --git a/ydb/core/tx/datashard/ut_write/ya.make b/ydb/core/tx/datashard/ut_write/ya.make new file mode 100644 index 00000000000..74073a9ddbe --- /dev/null +++ b/ydb/core/tx/datashard/ut_write/ya.make @@ -0,0 +1,38 @@ +UNITTEST_FOR(ydb/core/tx/datashard) + +FORK_SUBTESTS() + +SPLIT_FACTOR(5) + +IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND) + TIMEOUT(3600) + SIZE(LARGE) + TAG(ya:fat) + REQUIREMENTS(ram:16) +ELSE() + TIMEOUT(600) + SIZE(MEDIUM) +ENDIF() + +PEERDIR( + ydb/core/tx/datashard/ut_common + library/cpp/getopt + library/cpp/regex/pcre + library/cpp/svnversion + ydb/core/kqp/ut/common + ydb/core/testlib/default + ydb/core/tx + ydb/library/yql/public/udf/service/exception_policy + ydb/public/lib/yson_value + ydb/public/sdk/cpp/client/ydb_result +) + +YQL_LAST_ABI_VERSION() + +SRCS( + datashard_ut_write.cpp +) + +REQUIREMENTS(ram:32) + +END() diff --git a/ydb/core/tx/datashard/write_unit.cpp b/ydb/core/tx/datashard/write_unit.cpp new file mode 100644 index 00000000000..bfda765f844 --- /dev/null +++ b/ydb/core/tx/datashard/write_unit.cpp @@ -0,0 +1,207 @@ +#include "datashard_write_operation.h" +#include "datashard_pipeline.h" +#include "setup_sys_locks.h" +#include "datashard_locks_db.h" +#include "datashard_user_db.h" + +namespace NKikimr { +namespace NDataShard { + +class TWriteUnit : public TExecutionUnit { +public: + TWriteUnit(TDataShard& self, TPipeline& pipeline) + : TExecutionUnit(EExecutionUnitKind::ExecuteWrite, true, self, pipeline) + { + } + + ~TWriteUnit() + { + } + + bool IsReadyToExecute(TOperation::TPtr op) const override { + if (op->HasRuntimeConflicts() || op->HasWaitingForGlobalTxIdFlag()) { + return false; + } + + if (op->Result() || op->HasResultSentFlag() || op->IsImmediate() && WillRejectDataTx(op)) { + return true; + } + + if (DataShard.IsStopping()) { + // Avoid doing any new work when datashard is stopping + return false; + } + + return !op->HasRuntimeConflicts(); + } + + void DoExecute(TDataShard* self, TWriteOperation* tx, TTransactionContext& txc, const TActorContext& ctx) { + const TValidatedWriteTx::TPtr& writeTx = tx->WriteTx(); + + const ui64 tableId = writeTx->TableId().PathId.LocalPathId; + const TTableId fullTableId(self->GetPathOwnerId(), tableId); + const ui64 localTableId = self->GetLocalTableId(fullTableId); + if (localTableId == 0) { + tx->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Unknown table id " << tableId); + return; + } + const ui64 shadowTableId = self->GetShadowTableId(fullTableId); + + const TUserTable& TableInfo_ = *self->GetUserTables().at(tableId); + Y_ABORT_UNLESS(TableInfo_.LocalTid == localTableId); + Y_ABORT_UNLESS(TableInfo_.ShadowTid == shadowTableId); + + const ui32 writeTableId = localTableId; + auto [readVersion, writeVersion] = self->GetReadWriteVersions(tx); + + TDataShardUserDb userDb(*self, txc.DB, readVersion); + TDataShardChangeGroupProvider groupProvider(*self, txc.DB); + + TVector<TRawTypeValue> key; + TVector<NTable::TUpdateOp> value; + + const TSerializedCellMatrix& matrix = writeTx->Matrix(); + TConstArrayRef<TCell> cells = matrix.GetCells(); + const ui32 rowCount = matrix.GetRowCount(); + const ui16 colCount = matrix.GetColCount(); + + for (ui32 rowIdx = 0; rowIdx < rowCount; ++rowIdx) + { + key.clear(); + ui64 keyBytes = 0; + for (ui16 keyColIdx = 0; keyColIdx < TableInfo_.KeyColumnIds.size(); ++keyColIdx) { + const auto& cellType = TableInfo_.KeyColumnTypes[keyColIdx]; + const TCell& cell = cells[rowIdx * colCount + keyColIdx]; + if (cellType.GetTypeId() == NScheme::NTypeIds::Uint8 && !cell.IsNull() && cell.AsValue<ui8>() > 127) { + tx->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "Keys with Uint8 column values >127 are currently prohibited"); + return; + } + + keyBytes += cell.Size(); + key.emplace_back(TRawTypeValue(cell.AsRef(), cellType)); + } + + if (keyBytes > NLimits::MaxWriteKeySize) { + tx->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row key size of " << keyBytes << " bytes is larger than the allowed threshold " << NLimits::MaxWriteKeySize); + return; + } + + value.clear(); + for (ui16 valueColIdx = TableInfo_.KeyColumnIds.size(); valueColIdx < colCount; ++valueColIdx) { + ui32 columnTag = writeTx->RecordOperation().GetColumnIds(valueColIdx); + const TCell& cell = cells[rowIdx * colCount + valueColIdx]; + if (cell.Size() > NLimits::MaxWriteValueSize) { + tx->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row cell size of " << cell.Size() << " bytes is larger than the allowed threshold " << NLimits::MaxWriteValueSize); + return; + } + + auto* col = TableInfo_.Columns.FindPtr(valueColIdx + 1); + Y_ABORT_UNLESS(col); + + value.emplace_back(NTable::TUpdateOp(columnTag, NTable::ECellOp::Set, TRawTypeValue(cell.AsRef(), col->Type))); + } + + txc.DB.Update(writeTableId, NTable::ERowOp::Upsert, key, value, writeVersion); + self->GetConflictsCache().GetTableCache(writeTableId).RemoveUncommittedWrites(writeTx->KeyCells(), txc.DB); + } + //TODO: Counters + // self->IncCounter(COUNTER_UPLOAD_ROWS, rowCount); + // self->IncCounter(COUNTER_UPLOAD_ROWS_BYTES, matrix.GetBuffer().size()); + + TableInfo_.Stats.UpdateTime = TAppData::TimeProvider->Now(); + + tx->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildCommited(writeTx->TabletId(), tx->GetTxId())); + + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << tx->GetTxId() << " at " << self->TabletID() << " write operation is executed"); + } + + EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override { + TWriteOperation* tx = dynamic_cast<TWriteOperation*>(op.Get()); + Y_ABORT_UNLESS(tx != nullptr); + + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << op->GetTxId() << " at " << tx->WriteTx()->TabletId() << " is executing write operation"); + + if (op->Result() || op->HasResultSentFlag() || op->IsImmediate() && CheckRejectDataTx(op, ctx)) { + return EExecutionStatus::Executed; + } + + if (op->HasWaitingForGlobalTxIdFlag()) { + return EExecutionStatus::Continue; + } + + if (op->IsImmediate()) { + // Every time we execute immediate transaction we may choose a new mvcc version + op->MvccReadWriteVersion.reset(); + } + else { + //TODO: Prepared + tx->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(tx->WriteTx()->TabletId(), op->GetTxId(), {0, 0, {}})); + return EExecutionStatus::DelayCompleteNoMoreRestarts; + } + + TDataShardLocksDb locksDb(DataShard, txc); + TSetupSysLocks guardLocks(op, DataShard, &locksDb); + + try { + DoExecute(&DataShard, tx, txc, ctx); + } catch (const TNeedGlobalTxId&) { + Y_VERIFY_S(op->GetGlobalTxId() == 0, + "Unexpected TNeedGlobalTxId exception for direct operation with TxId# " << op->GetGlobalTxId()); + Y_VERIFY_S(op->IsImmediate(), + "Unexpected TNeedGlobalTxId exception for a non-immediate operation with TxId# " << op->GetTxId()); + + ctx.Send(MakeTxProxyID(), + new TEvTxUserProxy::TEvAllocateTxId(), + 0, op->GetTxId()); + op->SetWaitingForGlobalTxIdFlag(); + + if (txc.DB.HasChanges()) { + txc.DB.RollbackChanges(); + } + return EExecutionStatus::Continue; + } + + if (Pipeline.AddLockDependencies(op, guardLocks)) { + if (txc.DB.HasChanges()) { + txc.DB.RollbackChanges(); + } + return EExecutionStatus::Continue; + } + + op->ChangeRecords() = std::move(tx->WriteTx()->GetCollectedChanges()); + + DataShard.SysLocksTable().ApplyLocks(); + DataShard.SubscribeNewLocks(ctx); + Pipeline.AddCommittingOp(op); + + return EExecutionStatus::DelayCompleteNoMoreRestarts; + } + + void Complete(TOperation::TPtr op, const TActorContext& ctx) override { + Pipeline.RemoveCommittingOp(op); + DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords())); + DataShard.EmitHeartbeats(ctx); + + TWriteOperation* tx = dynamic_cast<TWriteOperation*>(op.Get()); + Y_ABORT_UNLESS(tx != nullptr); + + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << op->GetTxId() << " at " << tx->WriteTx()->TabletId() << " complete write operation"); + + //TODO: Counters + // if (WriteResult->Record.status() == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED || WriteResult->Record.status() == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED) { + // self->IncCounter(COUNTER_WRITE_SUCCESS); + // } else { + // self->IncCounter(COUNTER_WRITE_ERROR); + // } + + ctx.Send(tx->GetEv()->Sender, tx->WriteResult().release(), 0, tx->GetEv()->Cookie); + } + +}; // TWriteUnit + +THolder<TExecutionUnit> CreateWriteUnit(TDataShard& self, TPipeline& pipeline) { + return THolder(new TWriteUnit(self, pipeline)); +} + +} // NDataShard +} // NKikimr diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index dde0d8d105a..c7db84b49e8 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -34,6 +34,7 @@ SRCS( check_read_unit.cpp check_scheme_tx_unit.cpp check_snapshot_tx_unit.cpp + check_write_unit.cpp complete_data_tx_unit.cpp completed_operations_unit.cpp conflicts_cache.cpp @@ -71,6 +72,7 @@ SRCS( datashard__stats.cpp datashard__store_table_path.cpp datashard__store_scan_state.cpp + datashard__write.cpp datashard_change_receiving.cpp datashard_change_sender_activation.cpp datashard_change_sending.cpp @@ -127,6 +129,7 @@ SRCS( datashard_repl_offsets_client.cpp datashard_repl_offsets_server.cpp datashard_subdomain_path_id.cpp + datashard_write_operation.cpp datashard_txs.h datashard.cpp datashard.h @@ -196,6 +199,7 @@ SRCS( volatile_tx.cpp wait_for_plan_unit.cpp wait_for_stream_clearance_unit.cpp + write_unit.cpp upload_stats.cpp ) @@ -304,4 +308,5 @@ RECURSE_FOR_TESTS( ut_stats ut_upload_rows ut_volatile + ut_write ) |