aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2022-08-31 10:39:01 +0300
committergvit <gvit@ydb.tech>2022-08-31 10:39:01 +0300
commit4f7dc78f7ccaffe684f90b0194163d38ed366ef7 (patch)
tree72632dfce57790a8f0414080d687beb57e2bb43a
parentf834aa90cf558e4ede2fe69b1ffc7d7bb30a172a (diff)
downloadydb-4f7dc78f7ccaffe684f90b0194163d38ed366ef7.tar.gz
bugfix: setup correct row operation type in case of erase
-rw-r--r--ydb/core/kqp/executer/kqp_data_executer.cpp16
-rw-r--r--ydb/core/kqp/executer/kqp_tasks_graph.h15
-rw-r--r--ydb/core/protos/tx_datashard.proto1
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.h2
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp8
-rw-r--r--ydb/tests/functional/serverless/test.py4
7 files changed, 45 insertions, 7 deletions
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp
index 4728b59d00..3e09e81ada 100644
--- a/ydb/core/kqp/executer/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_data_executer.cpp
@@ -1065,6 +1065,13 @@ private:
} else {
task.Meta.Writes->Ranges.MergeWritePoints(TShardKeyRanges(read.Ranges), keyTypes);
}
+
+ if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kDeleteRows) {
+ task.Meta.Writes->AddEraseOp();
+ } else {
+ task.Meta.Writes->AddUpdateOp();
+ }
+
}
ShardsWithEffects.insert(task.Meta.ShardId);
@@ -1088,6 +1095,12 @@ private:
task.Meta.Writes->Ranges.MergeWritePoints(std::move(*shardInfo.KeyWriteRanges), keyTypes);
}
+ if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kDeleteRows) {
+ task.Meta.Writes->AddEraseOp();
+ } else {
+ task.Meta.Writes->AddUpdateOp();
+ }
+
for (const auto& [name, info] : shardInfo.ColumnWrites) {
auto& column = table.Columns.at(name);
@@ -1398,6 +1411,9 @@ private:
if (task.Meta.Writes) {
auto* protoWrites = protoTaskMeta.MutableWrites();
task.Meta.Writes->Ranges.SerializeTo(protoWrites->MutableRange());
+ if (task.Meta.Writes->IsPureEraseOp()) {
+ protoWrites->SetIsPureEraseOp(true);
+ }
for (const auto& [_, columnWrite] : task.Meta.Writes->ColumnWrites) {
auto& protoColumnWrite = *protoWrites->AddColumns();
diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.h b/ydb/core/kqp/executer/kqp_tasks_graph.h
index 986cd5ae7b..0651b7bde1 100644
--- a/ydb/core/kqp/executer/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer/kqp_tasks_graph.h
@@ -133,8 +133,23 @@ struct TTaskMeta {
};
struct TWriteInfo {
+ ui64 UpdateOps = 0;
+ ui64 EraseOps = 0;
+
TShardKeyRanges Ranges;
THashMap<ui32, TColumnWrite> ColumnWrites;
+
+ void AddUpdateOp() {
+ ++UpdateOps;
+ }
+
+ void AddEraseOp() {
+ ++EraseOps;
+ }
+
+ bool IsPureEraseOp() const {
+ return (EraseOps > 0) && (UpdateOps == 0);
+ }
};
TReadInfo ReadInfo;
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 8faf93475e..e3b87242c2 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -191,6 +191,7 @@ message TKqpTransaction {
message TWriteOpMeta {
optional TKeyRange Range = 1;
repeated TColumnWriteMeta Columns = 3;
+ optional bool IsPureEraseOp = 4;
}
optional TTableMeta Table = 1;
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 4cd541ce4d..d957d70e84 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -653,7 +653,8 @@ void TEngineBay::AddReadRange(const TTableId& tableId, const TVector<NTable::TCo
}
void TEngineBay::AddWriteRange(const TTableId& tableId, const TTableRange& range,
- const TVector<NScheme::TTypeId>& keyTypes, const TVector<TColumnWriteMeta>& columns)
+ const TVector<NScheme::TTypeId>& keyTypes, const TVector<TColumnWriteMeta>& columns,
+ bool isPureEraseOp)
{
TVector<TKeyDesc::TColumnOp> columnOps;
for (const auto& writeColumn : columns) {
@@ -665,7 +666,8 @@ void TEngineBay::AddWriteRange(const TTableId& tableId, const TTableRange& range
columnOps.emplace_back(std::move(op));
}
- auto desc = MakeHolder<TKeyDesc>(tableId, range, TKeyDesc::ERowOperation::Update, keyTypes, columnOps);
+ auto rowOp = isPureEraseOp ? TKeyDesc::ERowOperation::Erase : TKeyDesc::ERowOperation::Update;
+ auto desc = MakeHolder<TKeyDesc>(tableId, range, rowOp, keyTypes, columnOps);
Info.Keys.emplace_back(TValidatedKey(std::move(desc), /* isWrite */ true));
++Info.WritesCount;
if (!range.Point) {
diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h
index 3f185202ae..a960da0c9f 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.h
+++ b/ydb/core/tx/datashard/datashard__engine_host.h
@@ -69,7 +69,7 @@ public:
const TVector<NScheme::TTypeId>& keyTypes, ui64 itemsLimit = 0, bool reverse = false);
void AddWriteRange(const TTableId& tableId, const TTableRange& range, const TVector<NScheme::TTypeId>& keyTypes,
- const TVector<TColumnWriteMeta>& columns);
+ const TVector<TColumnWriteMeta>& columns, bool isPureEraseOp);
void MarkTxLoaded() {
Info.Loaded = true;
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index 6bab0b6f7e..b49930a683 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -357,7 +357,7 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const
tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse());
} else {
engineBay.AddWriteRange(tableId, tableRange.ToTableRange(), tableInfo->KeyColumnTypes,
- GetColumnWrites(*writeMeta));
+ GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp());
}
}
@@ -375,7 +375,7 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const
tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse());
} else {
engineBay.AddWriteRange(tableId, tablePoint.ToTableRange(), tableInfo->KeyColumnTypes,
- GetColumnWrites(*writeMeta));
+ GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp());
}
}
@@ -396,7 +396,7 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const
tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse());
} else {
engineBay.AddWriteRange(tableId, tableRange.ToTableRange(), tableInfo->KeyColumnTypes,
- GetColumnWrites(*writeMeta));
+ GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp());
}
break;
@@ -412,7 +412,7 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const
tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse());
} else {
engineBay.AddWriteRange(tableId, tableInfo->Range.ToTableRange(), tableInfo->KeyColumnTypes,
- GetColumnWrites(*writeMeta));
+ GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp());
}
break;
diff --git a/ydb/tests/functional/serverless/test.py b/ydb/tests/functional/serverless/test.py
index 55e6b5d4e2..9d7462ce87 100644
--- a/ydb/tests/functional/serverless/test.py
+++ b/ydb/tests/functional/serverless/test.py
@@ -63,6 +63,7 @@ def test_create_table(ydb_hostel_db, ydb_serverless_db, ydb_endpoint, metering_f
def write_some_data(session, path):
session.transaction().execute(
"""
+ PRAGMA Kikimr.UseNewEngine="True";
UPSERT INTO `{}` (id, value_string, value_num)
VALUES (1u, "Ok", 0u),
(2u, "Also_Ok", 0u),
@@ -118,6 +119,7 @@ def test_turn_on_serverless_storage_billing(ydb_hostel_db, ydb_serverless_db, yd
def write_some_data(session, path):
session.transaction().execute(
"""
+ PRAGMA Kikimr.UseNewEngine="True";
UPSERT INTO `{}` (id, value_string, value_num)
VALUES (1u, "Ok", 0u),
(2u, "Also_Ok", 0u),
@@ -302,6 +304,7 @@ def test_database_with_disk_quotas(ydb_hostel_db, ydb_disk_quoted_serverless_db,
try:
with (yield async_session()) as session:
query = yield session.async_prepare('''\
+ PRAGMA Kikimr.UseNewEngine="True";
DECLARE $key AS Uint64;
DECLARE $value AS Utf8;
@@ -325,6 +328,7 @@ def test_database_with_disk_quotas(ydb_hostel_db, ydb_disk_quoted_serverless_db,
def async_erase_key(path, key):
with (yield async_session()) as session:
query = yield session.async_prepare('''\
+ PRAGMA Kikimr.UseNewEngine="True";
DECLARE $key AS Uint64;
DELETE FROM `{path}` WHERE id = $key;