aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-07-11 18:54:29 +0300
committergvit <gvit@ydb.tech>2023-07-11 18:54:29 +0300
commit59d997e1ed77db8e28b9cf50be9f0800e7d240cf (patch)
treed9f5f978768472cd3d2e585d49471595e5fa748a
parent7a4d87f10048032c868fe4a69695f65d12b78743 (diff)
downloadydb-59d997e1ed77db8e28b9cf50be9f0800e7d240cf.tar.gz
add more tests and support sequencers in replace/insert KIKIMR-13368
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp1
-rw-r--r--ydb/core/kqp/opt/kqp_opt_kql.cpp144
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp11
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_settings.h1
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp6
-rw-r--r--ydb/core/kqp/runtime/kqp_sequencer_actor.cpp12
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/tx/datashard/ut_sequence/datashard_ut_sequence.cpp176
8 files changed, 277 insertions, 75 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index 6fb416cb35b..18b0f0b2ba9 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -405,6 +405,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableSequentialReads = serviceConfig.GetEnableSequentialReads();
kqpConfig.EnableKqpImmediateEffects = serviceConfig.GetEnableKqpImmediateEffects();
kqpConfig.EnablePreparedDdl = serviceConfig.GetEnablePreparedDdl();
+ kqpConfig.EnableSequences = serviceConfig.GetEnableSequences();
kqpConfig.BindingsMode = RemapBindingsMode(serviceConfig.GetBindingsMode());
}
diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp
index aa755e193ff..2ac4bc8ae81 100644
--- a/ydb/core/kqp/opt/kqp_opt_kql.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp
@@ -245,27 +245,40 @@ TCoAtomList BuildUpsertInputColumns(const TCoAtomList& inputColumns,
.Done();
}
-TExprBase BuildUpsertTable(const TKiWriteTable& write, const TCoAtomList& inputColumns,
- const TCoAtomList& autoincrement,
- const TKikimrTableDescription& tableData, TExprContext& ctx)
+std::pair<TExprBase, TCoAtomList> BuildWriteInput(const TKiWriteTable& write, const TKikimrTableDescription& table,
+ const TCoAtomList& inputColumns, const TCoAtomList& autoIncrement,
+ TPositionHandle pos, TExprContext& ctx)
{
auto input = write.Input();
+ const bool isWriteReplace = (GetTableOp(write) == TYdbOperation::Replace);
- TCoAtomList inputCols = BuildUpsertInputColumns(inputColumns, autoincrement, write.Pos(), ctx);
+ TCoAtomList inputCols = BuildUpsertInputColumns(inputColumns, autoIncrement, pos, ctx);
- if (autoincrement.Ref().ChildrenSize() > 0) {
- input = BuildKqlSequencer(input, tableData, inputCols, autoincrement, write.Pos(), ctx);
+ if (autoIncrement.Ref().ChildrenSize() > 0) {
+ input = BuildKqlSequencer(input, table, inputCols, autoIncrement, pos, ctx);
}
- auto baseInput = Build<TKqpWriteConstraint>(ctx, write.Pos())
+ if (isWriteReplace) {
+ std::tie(input, inputCols) = CreateRowsToReplace(input, inputColumns, table, write.Pos(), ctx);
+ }
+
+ auto baseInput = Build<TKqpWriteConstraint>(ctx, pos)
.Input(input)
- .Columns(GetPgNotNullColumns(tableData, write.Pos(), ctx))
+ .Columns(GetPgNotNullColumns(table, pos, ctx))
.Done();
+ return {baseInput, inputCols};
+}
+
+TExprBase BuildUpsertTable(const TKiWriteTable& write, const TCoAtomList& inputColumns,
+ const TCoAtomList& autoincrement,
+ const TKikimrTableDescription& table, TExprContext& ctx)
+{
+ const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx);
auto effect = Build<TKqlUpsertRows>(ctx, write.Pos())
- .Table(BuildTableMeta(tableData, write.Pos(), ctx))
- .Input(baseInput.Ptr())
- .Columns(inputCols.Ptr())
+ .Table(BuildTableMeta(table, write.Pos(), ctx))
+ .Input(input.Ptr())
+ .Columns(columns.Ptr())
.Done();
return effect;
@@ -273,64 +286,52 @@ TExprBase BuildUpsertTable(const TKiWriteTable& write, const TCoAtomList& inputC
TExprBase BuildUpsertTableWithIndex(const TKiWriteTable& write, const TCoAtomList& inputColumns,
const TCoAtomList& autoincrement,
- const TKikimrTableDescription& tableData, TExprContext& ctx)
+ const TKikimrTableDescription& table, TExprContext& ctx)
{
- // todo: support later.
- Y_UNUSED(autoincrement);
+ const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx);
auto effect = Build<TKqlUpsertRowsIndex>(ctx, write.Pos())
- .Table(BuildTableMeta(tableData, write.Pos(), ctx))
- .Input<TKqpWriteConstraint>()
- .Input(write.Input())
- .Columns(GetPgNotNullColumns(tableData, write.Pos(), ctx))
- .Build()
- .Columns(inputColumns)
+ .Table(BuildTableMeta(table, write.Pos(), ctx))
+ .Input(input.Ptr())
+ .Columns(columns.Ptr())
.Done();
-
return effect;
}
TExprBase BuildReplaceTable(const TKiWriteTable& write, const TCoAtomList& inputColumns,
- const TKikimrTableDescription& tableData, TExprContext& ctx)
+ const TCoAtomList& autoincrement,
+ const TKikimrTableDescription& table, TExprContext& ctx)
{
- const auto [data, columns] = CreateRowsToReplace(write.Input(), inputColumns, tableData, write.Pos(), ctx);
-
+ const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx);
return Build<TKqlUpsertRows>(ctx, write.Pos())
- .Table(BuildTableMeta(tableData, write.Pos(), ctx))
- .Input<TKqpWriteConstraint>()
- .Input(data)
- .Columns(GetPgNotNullColumns(tableData, write.Pos(), ctx))
- .Build()
+ .Table(BuildTableMeta(table, write.Pos(), ctx))
+ .Input(input.Ptr())
.Columns(columns)
.Done();
}
TExprBase BuildReplaceTableWithIndex(const TKiWriteTable& write, const TCoAtomList& inputColumns,
- const TKikimrTableDescription& tableData, TExprContext& ctx)
+ const TCoAtomList& autoincrement,
+ const TKikimrTableDescription& table, TExprContext& ctx)
{
- const auto [data, columns] = CreateRowsToReplace(write.Input(), inputColumns, tableData, write.Pos(), ctx);
-
+ const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx);
auto effect = Build<TKqlUpsertRowsIndex>(ctx, write.Pos())
- .Table(BuildTableMeta(tableData, write.Pos(), ctx))
- .Input<TKqpWriteConstraint>()
- .Input(data)
- .Columns(GetPgNotNullColumns(tableData, write.Pos(), ctx))
- .Build()
- .Columns(columns)
+ .Table(BuildTableMeta(table, write.Pos(), ctx))
+ .Input(input.Ptr())
+ .Columns(columns.Ptr())
.Done();
return effect;
}
TExprBase BuildInsertTable(const TKiWriteTable& write, bool abort, const TCoAtomList& inputColumns,
- const TKikimrTableDescription& tableData, TExprContext& ctx)
+ const TCoAtomList& autoincrement,
+ const TKikimrTableDescription& table, TExprContext& ctx)
{
+ const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx);
auto effect = Build<TKqlInsertRows>(ctx, write.Pos())
- .Table(BuildTableMeta(tableData, write.Pos(), ctx))
- .Input<TKqpWriteConstraint>()
- .Input(write.Input())
- .Columns(GetPgNotNullColumns(tableData, write.Pos(), ctx))
- .Build()
- .Columns(inputColumns)
+ .Table(BuildTableMeta(table, write.Pos(), ctx))
+ .Input(input.Ptr())
+ .Columns(columns)
.OnConflict()
.Value(abort ? "abort"sv : "revert"sv)
.Build()
@@ -340,15 +341,14 @@ TExprBase BuildInsertTable(const TKiWriteTable& write, bool abort, const TCoAtom
}
TExprBase BuildInsertTableWithIndex(const TKiWriteTable& write, bool abort, const TCoAtomList& inputColumns,
- const TKikimrTableDescription& tableData, TExprContext& ctx)
+ const TCoAtomList& autoincrement,
+ const TKikimrTableDescription& table, TExprContext& ctx)
{
+ const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx);
auto effect = Build<TKqlInsertRowsIndex>(ctx, write.Pos())
- .Table(BuildTableMeta(tableData, write.Pos(), ctx))
- .Input<TKqpWriteConstraint>()
- .Input(write.Input())
- .Columns(GetPgNotNullColumns(tableData, write.Pos(), ctx))
- .Build()
- .Columns(inputColumns)
+ .Table(BuildTableMeta(table, write.Pos(), ctx))
+ .Input(input.Ptr())
+ .Columns(columns.Ptr())
.OnConflict()
.Value(abort ? "abort"sv : "revert"sv)
.Build()
@@ -719,10 +719,10 @@ TExprBase WriteTableSimple(const TKiWriteTable& write, const TCoAtomList& inputC
case TYdbOperation::Upsert:
return BuildUpsertTable(write, inputColumns, autoincrement, tableData, ctx);
case TYdbOperation::Replace:
- return BuildReplaceTable(write, inputColumns, tableData, ctx);
+ return BuildReplaceTable(write, inputColumns, autoincrement, tableData, ctx);
case TYdbOperation::InsertAbort:
case TYdbOperation::InsertRevert:
- return BuildInsertTable(write, op == TYdbOperation::InsertAbort, inputColumns, tableData, ctx);
+ return BuildInsertTable(write, op == TYdbOperation::InsertAbort, inputColumns, autoincrement, tableData, ctx);
case TYdbOperation::UpdateOn:
return BuildUpdateOnTable(write, inputColumns, tableData, ctx);
case TYdbOperation::Delete:
@@ -743,10 +743,10 @@ TExprBase WriteTableWithIndexUpdate(const TKiWriteTable& write, const TCoAtomLis
case TYdbOperation::Upsert:
return BuildUpsertTableWithIndex(write, inputColumns, autoincrement, tableData, ctx);
case TYdbOperation::Replace:
- return BuildReplaceTableWithIndex(write, inputColumns, tableData, ctx);
+ return BuildReplaceTableWithIndex(write, inputColumns, autoincrement, tableData, ctx);
case TYdbOperation::InsertAbort:
case TYdbOperation::InsertRevert:
- return BuildInsertTableWithIndex(write, op == TYdbOperation::InsertAbort, inputColumns, tableData, ctx);
+ return BuildInsertTableWithIndex(write, op == TYdbOperation::InsertAbort, inputColumns, autoincrement, tableData, ctx);
case TYdbOperation::UpdateOn:
return BuildUpdateOnTableWithIndex(write, inputColumns, tableData, ctx);
case TYdbOperation::DeleteOn:
@@ -758,7 +758,9 @@ TExprBase WriteTableWithIndexUpdate(const TKiWriteTable& write, const TCoAtomLis
Y_UNREACHABLE();
}
-TExprBase HandleWriteTable(const TKiWriteTable& write, TExprContext& ctx, const TKikimrTablesData& tablesData) {
+TVector<TExprBase> HandleWriteTable(const TKiWriteTable& write, TExprContext& ctx, const TKikimrTablesData& tablesData,
+ const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx)
+{
auto& tableData = GetTableData(tablesData, write.DataSink().Cluster(), write.Table().Value());
auto inputColumnsSetting = GetSetting(write.Settings().Ref(), "input_columns");
@@ -769,10 +771,25 @@ TExprBase HandleWriteTable(const TKiWriteTable& write, TExprContext& ctx, const
YQL_ENSURE(autoincrementColumnsSetting);
auto autoincrementColumns = TCoNameValueTuple(autoincrementColumnsSetting).Value().Cast<TCoAtomList>();
+ if (autoincrementColumns.Ref().ChildrenSize() > 0 && !kqpCtx->Config->EnableSequences) {
+ const TString err = "Sequences are not supported.";
+ ctx.AddError(YqlIssue(ctx.GetPosition(write.Pos()), TIssuesIds::KIKIMR_BAD_REQUEST, err));
+ return {};
+ }
+
+ auto op = GetTableOp(write);
+ if (autoincrementColumns.Ref().ChildrenSize() > 0) {
+ if (op == TYdbOperation::UpdateOn || op == TYdbOperation::DeleteOn) {
+ const TString err = "Key columns are not specified.";
+ ctx.AddError(YqlIssue(ctx.GetPosition(write.Pos()), TIssuesIds::KIKIMR_BAD_REQUEST, err));
+ return {};
+ }
+ }
+
if (HasIndexesToWrite(tableData)) {
- return WriteTableWithIndexUpdate(write, inputColumns, autoincrementColumns, tableData, ctx);
+ return { WriteTableWithIndexUpdate(write, inputColumns, autoincrementColumns, tableData, ctx) } ;
} else {
- return WriteTableSimple(write, inputColumns, autoincrementColumns, tableData, ctx);
+ return { WriteTableSimple(write, inputColumns, autoincrementColumns, tableData, ctx) } ;
}
}
@@ -823,13 +840,16 @@ TMaybe<TKqlQueryList> BuildKqlQuery(TKiDataQueryBlocks dataQueryBlocks, const TK
{
TVector<TKqlQuery> queryBlocks;
queryBlocks.reserve(dataQueryBlocks.ArgCount());
-
for (const auto& block : dataQueryBlocks) {
TVector <TExprBase> kqlEffects;
for (const auto& effect : block.Effects()) {
if (auto maybeWrite = effect.Maybe<TKiWriteTable>()) {
- auto result = HandleWriteTable(maybeWrite.Cast(), ctx, tablesData);
- kqlEffects.push_back(result);
+ auto result = HandleWriteTable(maybeWrite.Cast(), ctx, tablesData, kqpCtx);
+ if (result.empty()) {
+ return {};
+ }
+
+ kqlEffects.insert(kqlEffects.end(), result.begin(), result.end());
}
if (auto maybeUpdate = effect.Maybe<TKiUpdateTable>()) {
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
index 93c77adc98c..e75b949b5a7 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
@@ -495,6 +495,17 @@ NYql::NNodes::TExprBase KqpBuildSequencerStages(NYql::NNodes::TExprBase node, NY
.AutoIncrementColumns(sequencer.AutoIncrementColumns())
.InputItemType(sequencer.InputItemType())
.Done();
+ } else if (sequencer.Input().Maybe<TDqCnUnionAll>()) {
+ auto output = sequencer.Input().Cast<TDqCnUnionAll>().Output();
+
+ cnSequencer = Build<TKqpCnSequencer>(ctx, sequencer.Pos())
+ .Output(output)
+ .Table(sequencer.Table())
+ .Columns(sequencer.Columns())
+ .AutoIncrementColumns(sequencer.AutoIncrementColumns())
+ .InputItemType(sequencer.InputItemType())
+ .Done();
+
} else {
return node;
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h
index 8acf67c89cd..91f1135a075 100644
--- a/ydb/core/kqp/provider/yql_kikimr_settings.h
+++ b/ydb/core/kqp/provider/yql_kikimr_settings.h
@@ -144,6 +144,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
bool EnableKqpImmediateEffects = false;
bool EnableSequentialReads = false;
bool EnablePreparedDdl = false;
+ bool EnableSequences = false;
NSQLTranslation::EBindingsMode BindingsMode = NSQLTranslation::EBindingsMode::ENABLED;
};
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index 051ea75b2d0..d4104c7f020 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -1027,9 +1027,9 @@ private:
sequencerProto.AddAutoIncrementColumns(column.StringValue());
}
- auto resultColumns = sequencer.Columns();
- for(const auto& column: resultColumns) {
- sequencerProto.AddColumns(column.StringValue());
+ YQL_ENSURE(resultItemType->GetKind() == ETypeAnnotationKind::Struct);
+ for(const auto* column: resultItemType->Cast<TStructExprType>()->GetItems()) {
+ sequencerProto.AddColumns(TString(column->GetName()));
}
return;
diff --git a/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp b/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp
index 21baf45f2a6..b68af64f9fb 100644
--- a/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp
@@ -38,7 +38,7 @@ class TKqpSequencerActor : public NActors::TActorBootstrapped<TKqpSequencerActor
explicit TColumnSequenceInfo(const ::NKikimrKqp::TKqpColumnMetadataProto& proto)
: DefaultFromSequence(proto.GetDefaultFromSequence())
, TypeInfo(static_cast<NScheme::TTypeId>(proto.GetTypeId()))
- {
+ {
}
bool IsAutoIncrement() const {
@@ -167,17 +167,19 @@ private:
NUdf::TUnboxedValue* rowItems = nullptr;
auto newValue = HolderFactory.CreateDirectArrayHolder(Settings.GetColumns().size(), rowItems);
-
+ int inputColIdx = 0;
for(int columnIdx = 0; columnIdx < Settings.GetColumns().size(); ++columnIdx) {
auto& columnInfo = ColumnSequenceInfo[columnIdx];
if (columnInfo.IsAutoIncrement()) {
- *rowItems++ = NUdf::TUnboxedValuePod(columnInfo.AcquireNextVal());
+ i64 nextVal = columnInfo.AcquireNextVal();
+ *rowItems++ = NUdf::TUnboxedValuePod(nextVal);
rowSize += sizeof(NUdf::TUnboxedValuePod);
hasSequences &= columnInfo.HasValues();
} else {
- *rowItems++ = currentValue.GetElement(columnIdx);
- rowSize += NMiniKQL::GetUnboxedValueSize(currentValue.GetElement(columnIdx), columnInfo.TypeInfo).AllocatedBytes;
+ *rowItems++ = currentValue.GetElement(inputColIdx);
+ rowSize += NMiniKQL::GetUnboxedValueSize(currentValue.GetElement(inputColIdx), columnInfo.TypeInfo).AllocatedBytes;
+ ++inputColIdx;
}
}
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 3ceb02ef2bc..2b952a6b630 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1309,6 +1309,7 @@ message TTableServiceConfig {
optional bool EnableKqpImmediateEffects = 38 [default = true];
optional bool EnableSequentialReads = 39 [default = false];
optional bool EnablePreparedDdl = 42 [default = false];
+ optional bool EnableSequences = 43 [default = true];
enum EBindingsMode {
BM_ENABLED = 0;
diff --git a/ydb/core/tx/datashard/ut_sequence/datashard_ut_sequence.cpp b/ydb/core/tx/datashard/ut_sequence/datashard_ut_sequence.cpp
index a56e7b5607b..f517bc15206 100644
--- a/ydb/core/tx/datashard/ut_sequence/datashard_ut_sequence.cpp
+++ b/ydb/core/tx/datashard/ut_sequence/datashard_ut_sequence.cpp
@@ -11,9 +11,12 @@ using namespace Tests;
Y_UNIT_TEST_SUITE(TSequence) {
Y_UNIT_TEST(CreateTableWithDefaultFromSequence) {
TPortManager pm;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableSequences(true);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
- .SetUseRealThreads(false);
+ .SetUseRealThreads(false)
+ .SetAppConfig(appConfig);
Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
@@ -29,7 +32,7 @@ Y_UNIT_TEST_SUITE(TSequence) {
TShardedTableOptions()
.Sequences(true)
.Columns({
- {"key", "Uint32", true, false, "default", "myseq"},
+ {"key", "Int64", true, false, "default", "myseq"},
{"value", "Uint32", true, false},
}));
@@ -39,13 +42,176 @@ Y_UNIT_TEST_SUITE(TSequence) {
}
{
+ TString result = KqpSimpleExec(runtime, "INSERT INTO `/Root/table-1` (value) VALUES (4), (5), (6);");
+ UNIT_ASSERT_VALUES_EQUAL(result, "<empty>");
+ }
+
+ {
+ TString result = KqpSimpleExec(runtime, "REPLACE INTO `/Root/table-1` (value) VALUES (7), (8), (9);");
+ UNIT_ASSERT_VALUES_EQUAL(result, "<empty>");
+ }
+
+ {
TString result = KqpSimpleExec(runtime, "SELECT * FROM `/Root/table-1`;");
UNIT_ASSERT_VALUES_EQUAL(
result,
- "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
- "{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
- "{ items { uint32_value: 3 } items { uint32_value: 3 } }");
+ "{ items { int64_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { int64_value: 2 } items { uint32_value: 2 } }, "
+ "{ items { int64_value: 3 } items { uint32_value: 3 } }, "
+ "{ items { int64_value: 4 } items { uint32_value: 4 } }, "
+ "{ items { int64_value: 5 } items { uint32_value: 5 } }, "
+ "{ items { int64_value: 6 } items { uint32_value: 6 } }, "
+ "{ items { int64_value: 7 } items { uint32_value: 7 } }, "
+ "{ items { int64_value: 8 } items { uint32_value: 8 } }, "
+ "{ items { int64_value: 9 } items { uint32_value: 9 } }");
+ }
+ }
+
+ Y_UNIT_TEST(CreateTableWithDefaultFromSequenceFromSelect) {
+ TPortManager pm;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableSequences(true);
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetAppConfig(appConfig);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ // runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ // runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+ runtime.GetAppData().AllowReadTableImmediate = true;
+
+ InitRoot(server, sender);
+
+ CreateShardedTable(server, sender, "/Root", "table-4",
+ TShardedTableOptions()
+ .Sequences(true)
+ .Columns({
+ {"key", "Int64", true, false, "default", "myseq"},
+ {"value", "Uint32", true, false},
+ }));
+
+ {
+ TString result = KqpSimpleExec(runtime, "UPSERT INTO `/Root/table-4` (value) VALUES (303);");
+ UNIT_ASSERT_VALUES_EQUAL(result, "<empty>");
+ }
+
+ {
+ TString result = KqpSimpleExec(runtime, "SELECT key, value FROM `/Root/table-4`;");
+ UNIT_ASSERT_VALUES_EQUAL(
+ result,
+ "{ items { int64_value: 1 } items { uint32_value: 303 } }");
+ }
+
+ {
+ TString result = KqpSimpleExec(runtime, "UPSERT INTO `/Root/table-4` SELECT value FROM `/Root/table-4`;");
+ UNIT_ASSERT_VALUES_EQUAL(result, "<empty>");
+ }
+
+ {
+ TString result = KqpSimpleExec(runtime, "SELECT * FROM `/Root/table-4` ORDER BY key;");
+ UNIT_ASSERT_VALUES_EQUAL(
+ result,
+ "{ items { int64_value: 1 } items { uint32_value: 303 } }, "
+ "{ items { int64_value: 2 } items { uint32_value: 303 } }");
+ }
+
+ {
+ TString result = KqpSimpleExec(runtime, "UPSERT INTO `/Root/table-4` SELECT value FROM `/Root/table-4` where key = 1;");
+ UNIT_ASSERT_VALUES_EQUAL(result, "<empty>");
+ }
+
+ {
+ TString result = KqpSimpleExec(runtime, "SELECT * FROM `/Root/table-4` ORDER BY key;");
+ UNIT_ASSERT_VALUES_EQUAL(
+ result,
+ "{ items { int64_value: 1 } items { uint32_value: 303 } }, "
+ "{ items { int64_value: 2 } items { uint32_value: 303 } }, "
+ "{ items { int64_value: 3 } items { uint32_value: 303 } }");
+ }
+ }
+
+ Y_UNIT_TEST(CreateTableWithDefaultFromSequenceNotEnabled) {
+ TPortManager pm;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableSequences(false);
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetAppConfig(appConfig);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ // runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ // runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+ runtime.GetAppData().AllowReadTableImmediate = true;
+
+ InitRoot(server, sender);
+
+ CreateShardedTable(server, sender, "/Root", "table-2",
+ TShardedTableOptions()
+ .Sequences(true)
+ .Columns({
+ {"key", "Int64", true, false, "default", "myseq"},
+ {"value", "Uint32", true, false},
+ }));
+
+ {
+ TString result = KqpSimpleExec(runtime, "UPSERT INTO `/Root/table-2` (value) VALUES (1), (2), (3);");
+ UNIT_ASSERT_VALUES_EQUAL(result, "ERROR: BAD_REQUEST");
+ }
+ }
+
+ Y_UNIT_TEST(CreateTableWithDefaultFromSequenceBadRequest) {
+ TPortManager pm;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableSequences(true);
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetAppConfig(appConfig);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ // runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ // runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+ runtime.GetAppData().AllowReadTableImmediate = true;
+
+ InitRoot(server, sender);
+
+ CreateShardedTable(server, sender, "/Root", "table-3",
+ TShardedTableOptions()
+ .Sequences(true)
+ .Columns({
+ {"key", "Int64", true, false, "default", "myseq"},
+ {"value", "Uint32", true, false},
+ }));
+
+ {
+ TString result = KqpSimpleExec(
+ runtime,
+ "$to_update = AsList( "
+ " AsStruct(CAST(12 as Uint32) as value)); "
+ "UPDATE `/Root/table-3` ON SELECT * FROM AS_TABLE($to_update)");
+ UNIT_ASSERT_VALUES_EQUAL(result, "ERROR: BAD_REQUEST");
+ }
+
+ {
+ TString result = KqpSimpleExec(
+ runtime,
+ "$to_update = AsList( "
+ " AsStruct(CAST(12 as Uint32) as value)); "
+ "DELETE FROM `/Root/table-3` ON SELECT * FROM AS_TABLE($to_update)");
+ UNIT_ASSERT_VALUES_EQUAL(result, "ERROR: BAD_REQUEST");
}
+
}
} // Y_UNIT_TEST_SUITE(TSequence)