diff options
author | gvit <gvit@ydb.tech> | 2022-08-31 10:39:01 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2022-08-31 10:39:01 +0300 |
commit | 4f7dc78f7ccaffe684f90b0194163d38ed366ef7 (patch) | |
tree | 72632dfce57790a8f0414080d687beb57e2bb43a | |
parent | f834aa90cf558e4ede2fe69b1ffc7d7bb30a172a (diff) | |
download | ydb-4f7dc78f7ccaffe684f90b0194163d38ed366ef7.tar.gz |
bugfix: setup correct row operation type in case of erase
-rw-r--r-- | ydb/core/kqp/executer/kqp_data_executer.cpp | 16 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_tasks_graph.h | 15 | ||||
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp.cpp | 8 | ||||
-rw-r--r-- | ydb/tests/functional/serverless/test.py | 4 |
7 files changed, 45 insertions, 7 deletions
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp index 4728b59d00..3e09e81ada 100644 --- a/ydb/core/kqp/executer/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer/kqp_data_executer.cpp @@ -1065,6 +1065,13 @@ private: } else { task.Meta.Writes->Ranges.MergeWritePoints(TShardKeyRanges(read.Ranges), keyTypes); } + + if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kDeleteRows) { + task.Meta.Writes->AddEraseOp(); + } else { + task.Meta.Writes->AddUpdateOp(); + } + } ShardsWithEffects.insert(task.Meta.ShardId); @@ -1088,6 +1095,12 @@ private: task.Meta.Writes->Ranges.MergeWritePoints(std::move(*shardInfo.KeyWriteRanges), keyTypes); } + if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kDeleteRows) { + task.Meta.Writes->AddEraseOp(); + } else { + task.Meta.Writes->AddUpdateOp(); + } + for (const auto& [name, info] : shardInfo.ColumnWrites) { auto& column = table.Columns.at(name); @@ -1398,6 +1411,9 @@ private: if (task.Meta.Writes) { auto* protoWrites = protoTaskMeta.MutableWrites(); task.Meta.Writes->Ranges.SerializeTo(protoWrites->MutableRange()); + if (task.Meta.Writes->IsPureEraseOp()) { + protoWrites->SetIsPureEraseOp(true); + } for (const auto& [_, columnWrite] : task.Meta.Writes->ColumnWrites) { auto& protoColumnWrite = *protoWrites->AddColumns(); diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.h b/ydb/core/kqp/executer/kqp_tasks_graph.h index 986cd5ae7b..0651b7bde1 100644 --- a/ydb/core/kqp/executer/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer/kqp_tasks_graph.h @@ -133,8 +133,23 @@ struct TTaskMeta { }; struct TWriteInfo { + ui64 UpdateOps = 0; + ui64 EraseOps = 0; + TShardKeyRanges Ranges; THashMap<ui32, TColumnWrite> ColumnWrites; + + void AddUpdateOp() { + ++UpdateOps; + } + + void AddEraseOp() { + ++EraseOps; + } + + bool IsPureEraseOp() const { + return (EraseOps > 0) && (UpdateOps == 0); + } }; TReadInfo ReadInfo; diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 8faf93475e..e3b87242c2 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -191,6 +191,7 @@ message TKqpTransaction { message TWriteOpMeta { optional TKeyRange Range = 1; repeated TColumnWriteMeta Columns = 3; + optional bool IsPureEraseOp = 4; } optional TTableMeta Table = 1; diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 4cd541ce4d..d957d70e84 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -653,7 +653,8 @@ void TEngineBay::AddReadRange(const TTableId& tableId, const TVector<NTable::TCo } void TEngineBay::AddWriteRange(const TTableId& tableId, const TTableRange& range, - const TVector<NScheme::TTypeId>& keyTypes, const TVector<TColumnWriteMeta>& columns) + const TVector<NScheme::TTypeId>& keyTypes, const TVector<TColumnWriteMeta>& columns, + bool isPureEraseOp) { TVector<TKeyDesc::TColumnOp> columnOps; for (const auto& writeColumn : columns) { @@ -665,7 +666,8 @@ void TEngineBay::AddWriteRange(const TTableId& tableId, const TTableRange& range columnOps.emplace_back(std::move(op)); } - auto desc = MakeHolder<TKeyDesc>(tableId, range, TKeyDesc::ERowOperation::Update, keyTypes, columnOps); + auto rowOp = isPureEraseOp ? TKeyDesc::ERowOperation::Erase : TKeyDesc::ERowOperation::Update; + auto desc = MakeHolder<TKeyDesc>(tableId, range, rowOp, keyTypes, columnOps); Info.Keys.emplace_back(TValidatedKey(std::move(desc), /* isWrite */ true)); ++Info.WritesCount; if (!range.Point) { diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h index 3f185202ae..a960da0c9f 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.h +++ b/ydb/core/tx/datashard/datashard__engine_host.h @@ -69,7 +69,7 @@ public: const TVector<NScheme::TTypeId>& keyTypes, ui64 itemsLimit = 0, bool reverse = false); void AddWriteRange(const TTableId& tableId, const TTableRange& range, const TVector<NScheme::TTypeId>& keyTypes, - const TVector<TColumnWriteMeta>& columns); + const TVector<TColumnWriteMeta>& columns, bool isPureEraseOp); void MarkTxLoaded() { Info.Loaded = true; diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index 6bab0b6f7e..b49930a683 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -357,7 +357,7 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse()); } else { engineBay.AddWriteRange(tableId, tableRange.ToTableRange(), tableInfo->KeyColumnTypes, - GetColumnWrites(*writeMeta)); + GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp()); } } @@ -375,7 +375,7 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse()); } else { engineBay.AddWriteRange(tableId, tablePoint.ToTableRange(), tableInfo->KeyColumnTypes, - GetColumnWrites(*writeMeta)); + GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp()); } } @@ -396,7 +396,7 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse()); } else { engineBay.AddWriteRange(tableId, tableRange.ToTableRange(), tableInfo->KeyColumnTypes, - GetColumnWrites(*writeMeta)); + GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp()); } break; @@ -412,7 +412,7 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse()); } else { engineBay.AddWriteRange(tableId, tableInfo->Range.ToTableRange(), tableInfo->KeyColumnTypes, - GetColumnWrites(*writeMeta)); + GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp()); } break; diff --git a/ydb/tests/functional/serverless/test.py b/ydb/tests/functional/serverless/test.py index 55e6b5d4e2..9d7462ce87 100644 --- a/ydb/tests/functional/serverless/test.py +++ b/ydb/tests/functional/serverless/test.py @@ -63,6 +63,7 @@ def test_create_table(ydb_hostel_db, ydb_serverless_db, ydb_endpoint, metering_f def write_some_data(session, path): session.transaction().execute( """ + PRAGMA Kikimr.UseNewEngine="True"; UPSERT INTO `{}` (id, value_string, value_num) VALUES (1u, "Ok", 0u), (2u, "Also_Ok", 0u), @@ -118,6 +119,7 @@ def test_turn_on_serverless_storage_billing(ydb_hostel_db, ydb_serverless_db, yd def write_some_data(session, path): session.transaction().execute( """ + PRAGMA Kikimr.UseNewEngine="True"; UPSERT INTO `{}` (id, value_string, value_num) VALUES (1u, "Ok", 0u), (2u, "Also_Ok", 0u), @@ -302,6 +304,7 @@ def test_database_with_disk_quotas(ydb_hostel_db, ydb_disk_quoted_serverless_db, try: with (yield async_session()) as session: query = yield session.async_prepare('''\ + PRAGMA Kikimr.UseNewEngine="True"; DECLARE $key AS Uint64; DECLARE $value AS Utf8; @@ -325,6 +328,7 @@ def test_database_with_disk_quotas(ydb_hostel_db, ydb_disk_quoted_serverless_db, def async_erase_key(path, key): with (yield async_session()) as session: query = yield session.async_prepare('''\ + PRAGMA Kikimr.UseNewEngine="True"; DECLARE $key AS Uint64; DELETE FROM `{path}` WHERE id = $key; |