diff options
| author | Nikolay Shumkov <[email protected]> | 2024-11-18 22:57:36 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-11-18 22:57:36 +0300 |
| commit | 10ec47ab48e04f74f2b867da88630d2f1651ac5a (patch) | |
| tree | 0e89f7c87adbc8719192b5e47accaf600045d1bc | |
| parent | fc93800af71ca2799d0847d1d30431074b47a38e (diff) | |
Support alter sequence restart with (#11447)
| -rw-r--r-- | ydb/core/kqp/host/kqp_gateway_proxy.cpp | 7 | ||||
| -rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 5 | ||||
| -rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_gateway.h | 2 | ||||
| -rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_type_ann.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/pg/kqp_pg_ut.cpp | 114 | ||||
| -rw-r--r-- | ydb/core/protos/flat_scheme_op.proto | 1 | ||||
| -rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_alter_sequence.cpp | 46 | ||||
| -rw-r--r-- | ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp | 35 |
8 files changed, 203 insertions, 9 deletions
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 945463a9ddc..9688d0aecd2 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -1917,6 +1917,13 @@ public: } } + if (settings.SequenceSettings.Restart) { + seqDesc->SetRestart(true); + if (settings.SequenceSettings.RestartValue) { + seqDesc->MutableSetVal()->SetNextValue(*settings.SequenceSettings.RestartValue); + } + } + if (IsPrepare()) { auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); auto& phyTx = *phyQuery.AddTransactions(); diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 477655f128b..189bba353c3 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -309,6 +309,11 @@ namespace { result.Cycle = value == "1" ? true : false; } else if (name == "increment") { result.Increment = FromString<i64>(value); + } else if (name == "restart") { + result.Restart = true; + if (!value.empty()) { + result.RestartValue = FromString<i64>(value); + } } } return result; diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index b5814c1329e..9a04060f00c 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -739,6 +739,8 @@ struct TSequenceSettings { TMaybe<bool> Cycle; TMaybe<TString> OwnedBy; TMaybe<TString> DataType; + TMaybe<bool> Restart; + TMaybe<i64> RestartValue; }; struct TCreateSequenceSettings { diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index 5dc2d1dbf83..da26a6afa26 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -1609,7 +1609,7 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over static bool CheckSequenceSettings(const TCoNameValueTupleList& settings, TExprContext& ctx) { const static std::unordered_set<TString> sequenceSettingNames = - {"start", "increment", "cache", "minvalue", "maxvalue", "cycle"}; + {"start", "increment", "cache", "minvalue", "maxvalue", "cycle", "restart"}; for (const auto& setting : settings) { auto name = setting.Name().Value(); if (!sequenceSettingNames.contains(TString(name))) { diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index 50be4575e48..d5460cd164f 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -2650,7 +2650,7 @@ Y_UNIT_TEST_SUITE(KqpPg) { UNIT_ASSERT(!resultAlter.IsSuccess()); } - { + { auto session = client.GetSession().GetValueSync().GetSession(); auto id = session.GetId(); @@ -2663,6 +2663,34 @@ Y_UNIT_TEST_SUITE(KqpPg) { auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_C(resultAlter.IsSuccess(), resultAlter.GetIssues().ToString()); } + + { + auto session = client.GetSession().GetValueSync().GetSession(); + auto id = session.GetId(); + + const auto queryAlter = R"( + --!syntax_pg + ALTER SEQUENCE IF EXISTS seq + RESTART WITH 101; + )"; + + auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_C(resultAlter.IsSuccess(), resultAlter.GetIssues().ToString()); + } + + { + auto session = client.GetSession().GetValueSync().GetSession(); + auto id = session.GetId(); + + const auto queryAlter = R"( + --!syntax_pg + ALTER SEQUENCE seq + RESTART; + )"; + + auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_C(resultAlter.IsSuccess(), resultAlter.GetIssues().ToString()); + } } Y_UNIT_TEST(AlterColumnSetDefaultFromSequence) { @@ -2734,7 +2762,6 @@ Y_UNIT_TEST_SUITE(KqpPg) { START WITH 10 INCREMENT BY 2 MINVALUE 1 - CACHE 3 CYCLE; )"; @@ -2879,6 +2906,89 @@ Y_UNIT_TEST_SUITE(KqpPg) { [["1";"1"];["5";"4"];["8";"5"];["10";"2"];["12";"3"];["13";"14"];["14";"16"]] )", FormatResultSetYson(result.GetResultSet(0))); } + + { + const auto queryAlter = R"( + --!syntax_pg + ALTER SEQUENCE IF EXISTS seq1 + RESTART WITH 105; + )"; + + auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_C(resultAlter.IsSuccess(), resultAlter.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + INSERT INTO Pg (key) values (105), (107); + )"); + + auto result = tableClientSession.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM Pg; + )"); + + auto result = tableClientSession.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"];["5";"4"];["8";"5"];["10";"2"];["12";"3"];["13";"14"];["14";"16"];["105"; "105"];["107";"107"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + + { + const auto queryAlter = R"( + --!syntax_pg + ALTER SEQUENCE IF EXISTS seq1 + START WITH 206; + )"; + + auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_C(resultAlter.IsSuccess(), resultAlter.GetIssues().ToString()); + } + + { + const auto queryAlter = R"( + --!syntax_pg + ALTER SEQUENCE IF EXISTS seq1 + RESTART; + )"; + + auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_C(resultAlter.IsSuccess(), resultAlter.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + INSERT INTO Pg (key) values (206), (208); + )"); + + auto result = tableClientSession.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM Pg; + )"); + + auto result = tableClientSession.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"];["5";"4"];["8";"5"];["10";"2"];["12";"3"];["13";"14"];["14";"16"];["105"; "105"];["107";"107"];["206"; "206"];["208";"208"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } } Y_UNIT_TEST(TempTablesSessionsIsolation) { diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index f5b3aa047cc..6003864b8a4 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1432,6 +1432,7 @@ message TSequenceDescription { optional bool Cycle = 10; // true when cycle on overflow is allowed optional TSetVal SetVal = 11; // SetVal(NextValue, NextUsed) is executed atomically when creating optional string DataType = 12; // data type of the sequence: Int64/pgint8, Int32/pgint4, Int16/pgint2, defaults to Int64/pgint8 + optional bool Restart = 13; // flag for changing the current value of the sequence with SetVal if it is set, or start value otherwise } message TSequenceSharding { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_sequence.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_sequence.cpp index c2277fbdd44..49ca314a821 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_sequence.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_sequence.cpp @@ -112,6 +112,9 @@ public: auto event = MakeHolder<NSequenceShard::TEvSequenceShard::TEvUpdateSequence>(txState->TargetPathId); event->Record.SetTxId(ui64(OperationId.GetTxId())); event->Record.SetTxPartId(OperationId.GetSubTxId()); + if (alterData->Description.HasStartValue()) { + event->Record.SetStartValue(alterData->Description.GetStartValue()); + } if (alterData->Description.HasMinValue()) { event->Record.SetMinValue(alterData->Description.GetMinValue()); } @@ -130,6 +133,10 @@ public: if (alterData->Description.HasCycle()) { event->Record.SetCycle(alterData->Description.GetCycle()); } + if (alterData->Description.HasSetVal()) { + event->Record.SetNextValue(alterData->Description.GetSetVal().GetNextValue()); + event->Record.SetNextUsed(alterData->Description.GetSetVal().GetNextUsed()); + } LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TAlterSequence TConfigureParts ProgressState" @@ -304,7 +311,40 @@ std::optional<NKikimrSchemeOp::TSequenceDescription> GetAlterSequenceDescription i64 increment = result.GetIncrement(); if (alter.HasIncrement()) { increment = alter.GetIncrement(); + if (increment == 0) { + errStr = Sprintf("INCREMENT must not be zero"); + return std::nullopt; + } + } + + if (alter.HasRestart() && alter.GetRestart()) { + i64 nextValue = startValue; + if (alter.HasSetVal()) { + nextValue = alter.GetSetVal().GetNextValue(); + if (nextValue > maxValue) { + errStr = Sprintf("RESTART value (%ld) cannot be greater than MAXVALUE (%ld)", nextValue, maxValue); + return std::nullopt; + } + if (nextValue < minValue) { + errStr = Sprintf("RESTART value (%ld) cannot be less than MINVALUE (%ld)", nextValue, minValue); + return std::nullopt; + } + } + bool nextUsed = false; + if (increment > 0) { + if (nextValue == maxValue) { + nextUsed = true; + } + } else { + if (nextValue == minValue) { + nextUsed = true; + } + } + auto setVal = result.MutableSetVal(); + setVal->SetNextUsed(nextUsed); + setVal->SetNextValue(nextValue); } + ui64 cache = result.GetCache(); if (alter.HasCache()) { cache = alter.GetCache(); @@ -444,12 +484,6 @@ public: TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(dstPath->PathId); Y_ABORT_UNLESS(!sequenceInfo->AlterData); - if (sequenceAlter.HasSetVal()) { - errStr = "Set value by alter sequence is not supported"; - result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); - return result; - } - const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; auto description = GetAlterSequenceDescription( sequenceInfo->Description, sequenceAlter, *typeRegistry, context.SS->EnableTablePgTypes, errStr); diff --git a/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp b/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp index 8ccfe5f5efe..223eca2c35f 100644 --- a/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp +++ b/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp @@ -537,6 +537,41 @@ Y_UNIT_TEST_SUITE(TSequence) { TestAlterSequence(runtime, ++txId, "/MyRoot", R"( Name: "seq" + Restart: true + SetVal { + NextValue: 77 + } + )"); + env.TestWaitNotification(runtime, txId); + + value = DoNextVal(runtime, "/MyRoot/seq"); + UNIT_ASSERT_VALUES_EQUAL(value, 77); + + TestAlterSequence(runtime, ++txId, "/MyRoot", R"( + Name: "seq" + Increment: 0 + )", {{NKikimrScheme::StatusInvalidParameter, "INCREMENT must not be zero"}}); + + TestAlterSequence(runtime, ++txId, "/MyRoot", R"( + Name: "seq" + Restart: true + SetVal { + NextValue: 650000 + } + )", {{NKikimrScheme::StatusInvalidParameter, "RESTART value (650000) cannot be greater than MAXVALUE (32767)"}}); + + TestAlterSequence(runtime, ++txId, "/MyRoot", R"( + Name: "seq" + Restart: true + StartValue: 305 + )"); + env.TestWaitNotification(runtime, txId); + + value = DoNextVal(runtime, "/MyRoot/seq"); + UNIT_ASSERT_VALUES_EQUAL(value, 305); + + TestAlterSequence(runtime, ++txId, "/MyRoot", R"( + Name: "seq" Increment: 650000 )"); env.TestWaitNotification(runtime, txId); |
