summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shumkov <[email protected]>2024-11-18 22:57:36 +0300
committerGitHub <[email protected]>2024-11-18 22:57:36 +0300
commit10ec47ab48e04f74f2b867da88630d2f1651ac5a (patch)
tree0e89f7c87adbc8719192b5e47accaf600045d1bc
parentfc93800af71ca2799d0847d1d30431074b47a38e (diff)
Support alter sequence restart with (#11447)
-rw-r--r--ydb/core/kqp/host/kqp_gateway_proxy.cpp7
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp5
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway.h2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_type_ann.cpp2
-rw-r--r--ydb/core/kqp/ut/pg/kqp_pg_ut.cpp114
-rw-r--r--ydb/core/protos/flat_scheme_op.proto1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_sequence.cpp46
-rw-r--r--ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp35
8 files changed, 203 insertions, 9 deletions
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
index 945463a9ddc..9688d0aecd2 100644
--- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp
+++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
@@ -1917,6 +1917,13 @@ public:
}
}
+ if (settings.SequenceSettings.Restart) {
+ seqDesc->SetRestart(true);
+ if (settings.SequenceSettings.RestartValue) {
+ seqDesc->MutableSetVal()->SetNextValue(*settings.SequenceSettings.RestartValue);
+ }
+ }
+
if (IsPrepare()) {
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index 477655f128b..189bba353c3 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -309,6 +309,11 @@ namespace {
result.Cycle = value == "1" ? true : false;
} else if (name == "increment") {
result.Increment = FromString<i64>(value);
+ } else if (name == "restart") {
+ result.Restart = true;
+ if (!value.empty()) {
+ result.RestartValue = FromString<i64>(value);
+ }
}
}
return result;
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h
index b5814c1329e..9a04060f00c 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway.h
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h
@@ -739,6 +739,8 @@ struct TSequenceSettings {
TMaybe<bool> Cycle;
TMaybe<TString> OwnedBy;
TMaybe<TString> DataType;
+ TMaybe<bool> Restart;
+ TMaybe<i64> RestartValue;
};
struct TCreateSequenceSettings {
diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
index 5dc2d1dbf83..da26a6afa26 100644
--- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
@@ -1609,7 +1609,7 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
static bool CheckSequenceSettings(const TCoNameValueTupleList& settings, TExprContext& ctx) {
const static std::unordered_set<TString> sequenceSettingNames =
- {"start", "increment", "cache", "minvalue", "maxvalue", "cycle"};
+ {"start", "increment", "cache", "minvalue", "maxvalue", "cycle", "restart"};
for (const auto& setting : settings) {
auto name = setting.Name().Value();
if (!sequenceSettingNames.contains(TString(name))) {
diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
index 50be4575e48..d5460cd164f 100644
--- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
+++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
@@ -2650,7 +2650,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
UNIT_ASSERT(!resultAlter.IsSuccess());
}
- {
+ {
auto session = client.GetSession().GetValueSync().GetSession();
auto id = session.GetId();
@@ -2663,6 +2663,34 @@ Y_UNIT_TEST_SUITE(KqpPg) {
auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(resultAlter.IsSuccess(), resultAlter.GetIssues().ToString());
}
+
+ {
+ auto session = client.GetSession().GetValueSync().GetSession();
+ auto id = session.GetId();
+
+ const auto queryAlter = R"(
+ --!syntax_pg
+ ALTER SEQUENCE IF EXISTS seq
+ RESTART WITH 101;
+ )";
+
+ auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
+ UNIT_ASSERT_C(resultAlter.IsSuccess(), resultAlter.GetIssues().ToString());
+ }
+
+ {
+ auto session = client.GetSession().GetValueSync().GetSession();
+ auto id = session.GetId();
+
+ const auto queryAlter = R"(
+ --!syntax_pg
+ ALTER SEQUENCE seq
+ RESTART;
+ )";
+
+ auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
+ UNIT_ASSERT_C(resultAlter.IsSuccess(), resultAlter.GetIssues().ToString());
+ }
}
Y_UNIT_TEST(AlterColumnSetDefaultFromSequence) {
@@ -2734,7 +2762,6 @@ Y_UNIT_TEST_SUITE(KqpPg) {
START WITH 10
INCREMENT BY 2
MINVALUE 1
- CACHE 3
CYCLE;
)";
@@ -2879,6 +2906,89 @@ Y_UNIT_TEST_SUITE(KqpPg) {
[["1";"1"];["5";"4"];["8";"5"];["10";"2"];["12";"3"];["13";"14"];["14";"16"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}
+
+ {
+ const auto queryAlter = R"(
+ --!syntax_pg
+ ALTER SEQUENCE IF EXISTS seq1
+ RESTART WITH 105;
+ )";
+
+ auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
+ UNIT_ASSERT_C(resultAlter.IsSuccess(), resultAlter.GetIssues().ToString());
+ }
+
+ {
+ const auto query = Q_(R"(
+ --!syntax_pg
+ INSERT INTO Pg (key) values (105), (107);
+ )");
+
+ auto result = tableClientSession.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+
+ {
+ const auto query = Q_(R"(
+ --!syntax_pg
+ SELECT * FROM Pg;
+ )");
+
+ auto result = tableClientSession.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
+ CompareYson(R"(
+ [["1";"1"];["5";"4"];["8";"5"];["10";"2"];["12";"3"];["13";"14"];["14";"16"];["105"; "105"];["107";"107"]]
+ )", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ {
+ const auto queryAlter = R"(
+ --!syntax_pg
+ ALTER SEQUENCE IF EXISTS seq1
+ START WITH 206;
+ )";
+
+ auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
+ UNIT_ASSERT_C(resultAlter.IsSuccess(), resultAlter.GetIssues().ToString());
+ }
+
+ {
+ const auto queryAlter = R"(
+ --!syntax_pg
+ ALTER SEQUENCE IF EXISTS seq1
+ RESTART;
+ )";
+
+ auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
+ UNIT_ASSERT_C(resultAlter.IsSuccess(), resultAlter.GetIssues().ToString());
+ }
+
+ {
+ const auto query = Q_(R"(
+ --!syntax_pg
+ INSERT INTO Pg (key) values (206), (208);
+ )");
+
+ auto result = tableClientSession.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+
+ {
+ const auto query = Q_(R"(
+ --!syntax_pg
+ SELECT * FROM Pg;
+ )");
+
+ auto result = tableClientSession.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
+ CompareYson(R"(
+ [["1";"1"];["5";"4"];["8";"5"];["10";"2"];["12";"3"];["13";"14"];["14";"16"];["105"; "105"];["107";"107"];["206"; "206"];["208";"208"]]
+ )", FormatResultSetYson(result.GetResultSet(0)));
+ }
}
Y_UNIT_TEST(TempTablesSessionsIsolation) {
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index f5b3aa047cc..6003864b8a4 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -1432,6 +1432,7 @@ message TSequenceDescription {
optional bool Cycle = 10; // true when cycle on overflow is allowed
optional TSetVal SetVal = 11; // SetVal(NextValue, NextUsed) is executed atomically when creating
optional string DataType = 12; // data type of the sequence: Int64/pgint8, Int32/pgint4, Int16/pgint2, defaults to Int64/pgint8
+ optional bool Restart = 13; // flag for changing the current value of the sequence with SetVal if it is set, or start value otherwise
}
message TSequenceSharding {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_sequence.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_sequence.cpp
index c2277fbdd44..49ca314a821 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_sequence.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_sequence.cpp
@@ -112,6 +112,9 @@ public:
auto event = MakeHolder<NSequenceShard::TEvSequenceShard::TEvUpdateSequence>(txState->TargetPathId);
event->Record.SetTxId(ui64(OperationId.GetTxId()));
event->Record.SetTxPartId(OperationId.GetSubTxId());
+ if (alterData->Description.HasStartValue()) {
+ event->Record.SetStartValue(alterData->Description.GetStartValue());
+ }
if (alterData->Description.HasMinValue()) {
event->Record.SetMinValue(alterData->Description.GetMinValue());
}
@@ -130,6 +133,10 @@ public:
if (alterData->Description.HasCycle()) {
event->Record.SetCycle(alterData->Description.GetCycle());
}
+ if (alterData->Description.HasSetVal()) {
+ event->Record.SetNextValue(alterData->Description.GetSetVal().GetNextValue());
+ event->Record.SetNextUsed(alterData->Description.GetSetVal().GetNextUsed());
+ }
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TAlterSequence TConfigureParts ProgressState"
@@ -304,7 +311,40 @@ std::optional<NKikimrSchemeOp::TSequenceDescription> GetAlterSequenceDescription
i64 increment = result.GetIncrement();
if (alter.HasIncrement()) {
increment = alter.GetIncrement();
+ if (increment == 0) {
+ errStr = Sprintf("INCREMENT must not be zero");
+ return std::nullopt;
+ }
+ }
+
+ if (alter.HasRestart() && alter.GetRestart()) {
+ i64 nextValue = startValue;
+ if (alter.HasSetVal()) {
+ nextValue = alter.GetSetVal().GetNextValue();
+ if (nextValue > maxValue) {
+ errStr = Sprintf("RESTART value (%ld) cannot be greater than MAXVALUE (%ld)", nextValue, maxValue);
+ return std::nullopt;
+ }
+ if (nextValue < minValue) {
+ errStr = Sprintf("RESTART value (%ld) cannot be less than MINVALUE (%ld)", nextValue, minValue);
+ return std::nullopt;
+ }
+ }
+ bool nextUsed = false;
+ if (increment > 0) {
+ if (nextValue == maxValue) {
+ nextUsed = true;
+ }
+ } else {
+ if (nextValue == minValue) {
+ nextUsed = true;
+ }
+ }
+ auto setVal = result.MutableSetVal();
+ setVal->SetNextUsed(nextUsed);
+ setVal->SetNextValue(nextValue);
}
+
ui64 cache = result.GetCache();
if (alter.HasCache()) {
cache = alter.GetCache();
@@ -444,12 +484,6 @@ public:
TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(dstPath->PathId);
Y_ABORT_UNLESS(!sequenceInfo->AlterData);
- if (sequenceAlter.HasSetVal()) {
- errStr = "Set value by alter sequence is not supported";
- result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
- return result;
- }
-
const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry;
auto description = GetAlterSequenceDescription(
sequenceInfo->Description, sequenceAlter, *typeRegistry, context.SS->EnableTablePgTypes, errStr);
diff --git a/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp b/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp
index 8ccfe5f5efe..223eca2c35f 100644
--- a/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp
+++ b/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp
@@ -537,6 +537,41 @@ Y_UNIT_TEST_SUITE(TSequence) {
TestAlterSequence(runtime, ++txId, "/MyRoot", R"(
Name: "seq"
+ Restart: true
+ SetVal {
+ NextValue: 77
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ value = DoNextVal(runtime, "/MyRoot/seq");
+ UNIT_ASSERT_VALUES_EQUAL(value, 77);
+
+ TestAlterSequence(runtime, ++txId, "/MyRoot", R"(
+ Name: "seq"
+ Increment: 0
+ )", {{NKikimrScheme::StatusInvalidParameter, "INCREMENT must not be zero"}});
+
+ TestAlterSequence(runtime, ++txId, "/MyRoot", R"(
+ Name: "seq"
+ Restart: true
+ SetVal {
+ NextValue: 650000
+ }
+ )", {{NKikimrScheme::StatusInvalidParameter, "RESTART value (650000) cannot be greater than MAXVALUE (32767)"}});
+
+ TestAlterSequence(runtime, ++txId, "/MyRoot", R"(
+ Name: "seq"
+ Restart: true
+ StartValue: 305
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ value = DoNextVal(runtime, "/MyRoot/seq");
+ UNIT_ASSERT_VALUES_EQUAL(value, 305);
+
+ TestAlterSequence(runtime, ++txId, "/MyRoot", R"(
+ Name: "seq"
Increment: 650000
)");
env.TestWaitNotification(runtime, txId);