summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitaliy Filippov <[email protected]>2026-06-08 21:17:51 +0300
committerGitHub <[email protected]>2026-06-08 21:17:51 +0300
commit37da33fd7f782c86883ec7469f284819dbb8b68d (patch)
treef3a5fb5e80ae425d363d0043dc1ce880483e4357
parent51d720a1236a68acae23d6144fa6f914113dd8f4 (diff)
Add missing precharge for INCREMENT and UPSERT_INCREMENT operations (#42825)
-rw-r--r--ydb/core/tx/datashard/datashard_ut_write.cpp70
-rw-r--r--ydb/core/tx/datashard/execute_write_unit.cpp2
2 files changed, 72 insertions, 0 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_write.cpp b/ydb/core/tx/datashard/datashard_ut_write.cpp
index cdd04b4106c..9bed7c639ce 100644
--- a/ydb/core/tx/datashard/datashard_ut_write.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_write.cpp
@@ -675,6 +675,76 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
}
}
+ Y_UNIT_TEST(UpsertIncrementPrecharge) {
+ auto [runtime, server, sender] = TestCreateServer();
+
+ auto opts = TShardedTableOptions()
+ .Columns({
+ {"key", "Uint64", true, false}, // key (id=1)
+ {"uint32_val", "Uint32", false, false}, // id=2
+ {"uint64_val", "Uint64", false, false} // id=3
+ });
+
+ auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
+ const ui64 shard = shards[0];
+ ui64 txId = 100;
+
+ Cout << "========= Upsert 1000 rows =========\n";
+ {
+ TVector<ui32> columnIds = {1, 2, 3}; // key and numeric columns
+ TVector<TCell> increments;
+ for (ui64 id = 1; id <= 1000; id++) {
+ increments.push_back(TCell::Make(id));
+ increments.push_back(TCell::Make(ui32(1)));
+ increments.push_back(TCell::Make(ui64(1)));
+ }
+ auto result = UpsertIncrement(runtime, sender, shard, tableId, txId,
+ NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, columnIds, increments);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
+ }
+
+ Cout << "========= Compact and reboot to clear cache =========\n";
+ CompactTable(runtime, shard, tableId, false);
+ RebootTablet(runtime, shard, sender);
+ runtime.SimulateSleep(TDuration::Seconds(1));
+
+ auto getTabletNotReadyCount = [&]() -> ui64 {
+ auto edge = runtime.AllocateEdgeActor();
+ runtime.SendToPipe(shard, edge, new TEvTablet::TEvGetCounters(),
+ 0, GetPipeConfigWithRetries());
+ auto ev = runtime.GrabEdgeEventRethrow<TEvTablet::TEvGetCountersResponse>(edge);
+ UNIT_ASSERT(ev);
+ for (const auto& counter : ev->Get()->Record.GetTabletCounters().GetAppCounters().GetCumulativeCounters()) {
+ if (counter.GetName() == "DataShard/TxTabletNotReady") {
+ return counter.GetValue();
+ }
+ }
+ return 0;
+ };
+
+ ui64 notReadyBefore = getTabletNotReadyCount();
+
+ Cout << "========= UPSERT_INCREMENT on rows 1, 500, 1000 =========\n";
+ {
+ TVector<ui32> columnIds = {1, 2, 3};
+ TVector<TCell> increments = {
+ TCell::Make(ui64(1)), TCell::Make(ui32(10)), TCell::Make(ui64(100)),
+ TCell::Make(ui64(500)), TCell::Make(ui32(10)), TCell::Make(ui64(100)),
+ TCell::Make(ui64(1000)), TCell::Make(ui32(10)), TCell::Make(ui64(100)),
+ };
+
+ auto result = UpsertIncrement(runtime, sender, shard, tableId, txId,
+ NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, columnIds, increments);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
+ }
+
+ ui64 notReadyAfter = getTabletNotReadyCount();
+
+ // Should have at most 1 TabletNotReady
+ Cout << "TabletNotReady count after UPSERT_INCREMENT: " << (notReadyAfter - notReadyBefore) << "\n";
+ UNIT_ASSERT_C(notReadyAfter - notReadyBefore <= 1, "Too many page faults: " << (notReadyAfter - notReadyBefore));
+ }
+
Y_UNIT_TEST_TWIN(ExecSQLUpsertPrepared, Volatile) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
diff --git a/ydb/core/tx/datashard/execute_write_unit.cpp b/ydb/core/tx/datashard/execute_write_unit.cpp
index 79965c2b0b2..6e2da3aa89e 100644
--- a/ydb/core/tx/datashard/execute_write_unit.cpp
+++ b/ydb/core/tx/datashard/execute_write_unit.cpp
@@ -222,6 +222,8 @@ public:
if (operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT ||
operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE ||
+ operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INCREMENT ||
+ operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT_INCREMENT ||
userDb.NeedToReadBeforeWrite(fullTableId))
{
for (ui32 rowIdx = 0; rowIdx < matrix.GetRowCount(); ++rowIdx) {