diff options
author | gvit <gvit@ydb.tech> | 2023-07-11 18:54:29 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-07-11 18:54:29 +0300 |
commit | 59d997e1ed77db8e28b9cf50be9f0800e7d240cf (patch) | |
tree | d9f5f978768472cd3d2e585d49471595e5fa748a | |
parent | 7a4d87f10048032c868fe4a69695f65d12b78743 (diff) | |
download | ydb-59d997e1ed77db8e28b9cf50be9f0800e7d240cf.tar.gz |
add more tests and support sequencers in replace/insert KIKIMR-13368
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_actor.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_kql.cpp | 144 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_settings.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_sequencer_actor.cpp | 12 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/ut_sequence/datashard_ut_sequence.cpp | 176 |
8 files changed, 277 insertions, 75 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 6fb416cb35b..18b0f0b2ba9 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -405,6 +405,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf kqpConfig.EnableSequentialReads = serviceConfig.GetEnableSequentialReads(); kqpConfig.EnableKqpImmediateEffects = serviceConfig.GetEnableKqpImmediateEffects(); kqpConfig.EnablePreparedDdl = serviceConfig.GetEnablePreparedDdl(); + kqpConfig.EnableSequences = serviceConfig.GetEnableSequences(); kqpConfig.BindingsMode = RemapBindingsMode(serviceConfig.GetBindingsMode()); } diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp index aa755e193ff..2ac4bc8ae81 100644 --- a/ydb/core/kqp/opt/kqp_opt_kql.cpp +++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp @@ -245,27 +245,40 @@ TCoAtomList BuildUpsertInputColumns(const TCoAtomList& inputColumns, .Done(); } -TExprBase BuildUpsertTable(const TKiWriteTable& write, const TCoAtomList& inputColumns, - const TCoAtomList& autoincrement, - const TKikimrTableDescription& tableData, TExprContext& ctx) +std::pair<TExprBase, TCoAtomList> BuildWriteInput(const TKiWriteTable& write, const TKikimrTableDescription& table, + const TCoAtomList& inputColumns, const TCoAtomList& autoIncrement, + TPositionHandle pos, TExprContext& ctx) { auto input = write.Input(); + const bool isWriteReplace = (GetTableOp(write) == TYdbOperation::Replace); - TCoAtomList inputCols = BuildUpsertInputColumns(inputColumns, autoincrement, write.Pos(), ctx); + TCoAtomList inputCols = BuildUpsertInputColumns(inputColumns, autoIncrement, pos, ctx); - if (autoincrement.Ref().ChildrenSize() > 0) { - input = BuildKqlSequencer(input, tableData, inputCols, autoincrement, write.Pos(), ctx); + if (autoIncrement.Ref().ChildrenSize() > 0) { + input = BuildKqlSequencer(input, table, inputCols, autoIncrement, pos, ctx); } - auto baseInput = Build<TKqpWriteConstraint>(ctx, write.Pos()) + if (isWriteReplace) { + std::tie(input, inputCols) = CreateRowsToReplace(input, inputColumns, table, write.Pos(), ctx); + } + + auto baseInput = Build<TKqpWriteConstraint>(ctx, pos) .Input(input) - .Columns(GetPgNotNullColumns(tableData, write.Pos(), ctx)) + .Columns(GetPgNotNullColumns(table, pos, ctx)) .Done(); + return {baseInput, inputCols}; +} + +TExprBase BuildUpsertTable(const TKiWriteTable& write, const TCoAtomList& inputColumns, + const TCoAtomList& autoincrement, + const TKikimrTableDescription& table, TExprContext& ctx) +{ + const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx); auto effect = Build<TKqlUpsertRows>(ctx, write.Pos()) - .Table(BuildTableMeta(tableData, write.Pos(), ctx)) - .Input(baseInput.Ptr()) - .Columns(inputCols.Ptr()) + .Table(BuildTableMeta(table, write.Pos(), ctx)) + .Input(input.Ptr()) + .Columns(columns.Ptr()) .Done(); return effect; @@ -273,64 +286,52 @@ TExprBase BuildUpsertTable(const TKiWriteTable& write, const TCoAtomList& inputC TExprBase BuildUpsertTableWithIndex(const TKiWriteTable& write, const TCoAtomList& inputColumns, const TCoAtomList& autoincrement, - const TKikimrTableDescription& tableData, TExprContext& ctx) + const TKikimrTableDescription& table, TExprContext& ctx) { - // todo: support later. - Y_UNUSED(autoincrement); + const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx); auto effect = Build<TKqlUpsertRowsIndex>(ctx, write.Pos()) - .Table(BuildTableMeta(tableData, write.Pos(), ctx)) - .Input<TKqpWriteConstraint>() - .Input(write.Input()) - .Columns(GetPgNotNullColumns(tableData, write.Pos(), ctx)) - .Build() - .Columns(inputColumns) + .Table(BuildTableMeta(table, write.Pos(), ctx)) + .Input(input.Ptr()) + .Columns(columns.Ptr()) .Done(); - return effect; } TExprBase BuildReplaceTable(const TKiWriteTable& write, const TCoAtomList& inputColumns, - const TKikimrTableDescription& tableData, TExprContext& ctx) + const TCoAtomList& autoincrement, + const TKikimrTableDescription& table, TExprContext& ctx) { - const auto [data, columns] = CreateRowsToReplace(write.Input(), inputColumns, tableData, write.Pos(), ctx); - + const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx); return Build<TKqlUpsertRows>(ctx, write.Pos()) - .Table(BuildTableMeta(tableData, write.Pos(), ctx)) - .Input<TKqpWriteConstraint>() - .Input(data) - .Columns(GetPgNotNullColumns(tableData, write.Pos(), ctx)) - .Build() + .Table(BuildTableMeta(table, write.Pos(), ctx)) + .Input(input.Ptr()) .Columns(columns) .Done(); } TExprBase BuildReplaceTableWithIndex(const TKiWriteTable& write, const TCoAtomList& inputColumns, - const TKikimrTableDescription& tableData, TExprContext& ctx) + const TCoAtomList& autoincrement, + const TKikimrTableDescription& table, TExprContext& ctx) { - const auto [data, columns] = CreateRowsToReplace(write.Input(), inputColumns, tableData, write.Pos(), ctx); - + const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx); auto effect = Build<TKqlUpsertRowsIndex>(ctx, write.Pos()) - .Table(BuildTableMeta(tableData, write.Pos(), ctx)) - .Input<TKqpWriteConstraint>() - .Input(data) - .Columns(GetPgNotNullColumns(tableData, write.Pos(), ctx)) - .Build() - .Columns(columns) + .Table(BuildTableMeta(table, write.Pos(), ctx)) + .Input(input.Ptr()) + .Columns(columns.Ptr()) .Done(); return effect; } TExprBase BuildInsertTable(const TKiWriteTable& write, bool abort, const TCoAtomList& inputColumns, - const TKikimrTableDescription& tableData, TExprContext& ctx) + const TCoAtomList& autoincrement, + const TKikimrTableDescription& table, TExprContext& ctx) { + const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx); auto effect = Build<TKqlInsertRows>(ctx, write.Pos()) - .Table(BuildTableMeta(tableData, write.Pos(), ctx)) - .Input<TKqpWriteConstraint>() - .Input(write.Input()) - .Columns(GetPgNotNullColumns(tableData, write.Pos(), ctx)) - .Build() - .Columns(inputColumns) + .Table(BuildTableMeta(table, write.Pos(), ctx)) + .Input(input.Ptr()) + .Columns(columns) .OnConflict() .Value(abort ? "abort"sv : "revert"sv) .Build() @@ -340,15 +341,14 @@ TExprBase BuildInsertTable(const TKiWriteTable& write, bool abort, const TCoAtom } TExprBase BuildInsertTableWithIndex(const TKiWriteTable& write, bool abort, const TCoAtomList& inputColumns, - const TKikimrTableDescription& tableData, TExprContext& ctx) + const TCoAtomList& autoincrement, + const TKikimrTableDescription& table, TExprContext& ctx) { + const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx); auto effect = Build<TKqlInsertRowsIndex>(ctx, write.Pos()) - .Table(BuildTableMeta(tableData, write.Pos(), ctx)) - .Input<TKqpWriteConstraint>() - .Input(write.Input()) - .Columns(GetPgNotNullColumns(tableData, write.Pos(), ctx)) - .Build() - .Columns(inputColumns) + .Table(BuildTableMeta(table, write.Pos(), ctx)) + .Input(input.Ptr()) + .Columns(columns.Ptr()) .OnConflict() .Value(abort ? "abort"sv : "revert"sv) .Build() @@ -719,10 +719,10 @@ TExprBase WriteTableSimple(const TKiWriteTable& write, const TCoAtomList& inputC case TYdbOperation::Upsert: return BuildUpsertTable(write, inputColumns, autoincrement, tableData, ctx); case TYdbOperation::Replace: - return BuildReplaceTable(write, inputColumns, tableData, ctx); + return BuildReplaceTable(write, inputColumns, autoincrement, tableData, ctx); case TYdbOperation::InsertAbort: case TYdbOperation::InsertRevert: - return BuildInsertTable(write, op == TYdbOperation::InsertAbort, inputColumns, tableData, ctx); + return BuildInsertTable(write, op == TYdbOperation::InsertAbort, inputColumns, autoincrement, tableData, ctx); case TYdbOperation::UpdateOn: return BuildUpdateOnTable(write, inputColumns, tableData, ctx); case TYdbOperation::Delete: @@ -743,10 +743,10 @@ TExprBase WriteTableWithIndexUpdate(const TKiWriteTable& write, const TCoAtomLis case TYdbOperation::Upsert: return BuildUpsertTableWithIndex(write, inputColumns, autoincrement, tableData, ctx); case TYdbOperation::Replace: - return BuildReplaceTableWithIndex(write, inputColumns, tableData, ctx); + return BuildReplaceTableWithIndex(write, inputColumns, autoincrement, tableData, ctx); case TYdbOperation::InsertAbort: case TYdbOperation::InsertRevert: - return BuildInsertTableWithIndex(write, op == TYdbOperation::InsertAbort, inputColumns, tableData, ctx); + return BuildInsertTableWithIndex(write, op == TYdbOperation::InsertAbort, inputColumns, autoincrement, tableData, ctx); case TYdbOperation::UpdateOn: return BuildUpdateOnTableWithIndex(write, inputColumns, tableData, ctx); case TYdbOperation::DeleteOn: @@ -758,7 +758,9 @@ TExprBase WriteTableWithIndexUpdate(const TKiWriteTable& write, const TCoAtomLis Y_UNREACHABLE(); } -TExprBase HandleWriteTable(const TKiWriteTable& write, TExprContext& ctx, const TKikimrTablesData& tablesData) { +TVector<TExprBase> HandleWriteTable(const TKiWriteTable& write, TExprContext& ctx, const TKikimrTablesData& tablesData, + const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx) +{ auto& tableData = GetTableData(tablesData, write.DataSink().Cluster(), write.Table().Value()); auto inputColumnsSetting = GetSetting(write.Settings().Ref(), "input_columns"); @@ -769,10 +771,25 @@ TExprBase HandleWriteTable(const TKiWriteTable& write, TExprContext& ctx, const YQL_ENSURE(autoincrementColumnsSetting); auto autoincrementColumns = TCoNameValueTuple(autoincrementColumnsSetting).Value().Cast<TCoAtomList>(); + if (autoincrementColumns.Ref().ChildrenSize() > 0 && !kqpCtx->Config->EnableSequences) { + const TString err = "Sequences are not supported."; + ctx.AddError(YqlIssue(ctx.GetPosition(write.Pos()), TIssuesIds::KIKIMR_BAD_REQUEST, err)); + return {}; + } + + auto op = GetTableOp(write); + if (autoincrementColumns.Ref().ChildrenSize() > 0) { + if (op == TYdbOperation::UpdateOn || op == TYdbOperation::DeleteOn) { + const TString err = "Key columns are not specified."; + ctx.AddError(YqlIssue(ctx.GetPosition(write.Pos()), TIssuesIds::KIKIMR_BAD_REQUEST, err)); + return {}; + } + } + if (HasIndexesToWrite(tableData)) { - return WriteTableWithIndexUpdate(write, inputColumns, autoincrementColumns, tableData, ctx); + return { WriteTableWithIndexUpdate(write, inputColumns, autoincrementColumns, tableData, ctx) } ; } else { - return WriteTableSimple(write, inputColumns, autoincrementColumns, tableData, ctx); + return { WriteTableSimple(write, inputColumns, autoincrementColumns, tableData, ctx) } ; } } @@ -823,13 +840,16 @@ TMaybe<TKqlQueryList> BuildKqlQuery(TKiDataQueryBlocks dataQueryBlocks, const TK { TVector<TKqlQuery> queryBlocks; queryBlocks.reserve(dataQueryBlocks.ArgCount()); - for (const auto& block : dataQueryBlocks) { TVector <TExprBase> kqlEffects; for (const auto& effect : block.Effects()) { if (auto maybeWrite = effect.Maybe<TKiWriteTable>()) { - auto result = HandleWriteTable(maybeWrite.Cast(), ctx, tablesData); - kqlEffects.push_back(result); + auto result = HandleWriteTable(maybeWrite.Cast(), ctx, tablesData, kqpCtx); + if (result.empty()) { + return {}; + } + + kqlEffects.insert(kqlEffects.end(), result.begin(), result.end()); } if (auto maybeUpdate = effect.Maybe<TKiUpdateTable>()) { diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index 93c77adc98c..e75b949b5a7 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -495,6 +495,17 @@ NYql::NNodes::TExprBase KqpBuildSequencerStages(NYql::NNodes::TExprBase node, NY .AutoIncrementColumns(sequencer.AutoIncrementColumns()) .InputItemType(sequencer.InputItemType()) .Done(); + } else if (sequencer.Input().Maybe<TDqCnUnionAll>()) { + auto output = sequencer.Input().Cast<TDqCnUnionAll>().Output(); + + cnSequencer = Build<TKqpCnSequencer>(ctx, sequencer.Pos()) + .Output(output) + .Table(sequencer.Table()) + .Columns(sequencer.Columns()) + .AutoIncrementColumns(sequencer.AutoIncrementColumns()) + .InputItemType(sequencer.InputItemType()) + .Done(); + } else { return node; } diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index 8acf67c89cd..91f1135a075 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -144,6 +144,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi bool EnableKqpImmediateEffects = false; bool EnableSequentialReads = false; bool EnablePreparedDdl = false; + bool EnableSequences = false; NSQLTranslation::EBindingsMode BindingsMode = NSQLTranslation::EBindingsMode::ENABLED; }; diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 051ea75b2d0..d4104c7f020 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -1027,9 +1027,9 @@ private: sequencerProto.AddAutoIncrementColumns(column.StringValue()); } - auto resultColumns = sequencer.Columns(); - for(const auto& column: resultColumns) { - sequencerProto.AddColumns(column.StringValue()); + YQL_ENSURE(resultItemType->GetKind() == ETypeAnnotationKind::Struct); + for(const auto* column: resultItemType->Cast<TStructExprType>()->GetItems()) { + sequencerProto.AddColumns(TString(column->GetName())); } return; diff --git a/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp b/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp index 21baf45f2a6..b68af64f9fb 100644 --- a/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp @@ -38,7 +38,7 @@ class TKqpSequencerActor : public NActors::TActorBootstrapped<TKqpSequencerActor explicit TColumnSequenceInfo(const ::NKikimrKqp::TKqpColumnMetadataProto& proto) : DefaultFromSequence(proto.GetDefaultFromSequence()) , TypeInfo(static_cast<NScheme::TTypeId>(proto.GetTypeId())) - { + { } bool IsAutoIncrement() const { @@ -167,17 +167,19 @@ private: NUdf::TUnboxedValue* rowItems = nullptr; auto newValue = HolderFactory.CreateDirectArrayHolder(Settings.GetColumns().size(), rowItems); - + int inputColIdx = 0; for(int columnIdx = 0; columnIdx < Settings.GetColumns().size(); ++columnIdx) { auto& columnInfo = ColumnSequenceInfo[columnIdx]; if (columnInfo.IsAutoIncrement()) { - *rowItems++ = NUdf::TUnboxedValuePod(columnInfo.AcquireNextVal()); + i64 nextVal = columnInfo.AcquireNextVal(); + *rowItems++ = NUdf::TUnboxedValuePod(nextVal); rowSize += sizeof(NUdf::TUnboxedValuePod); hasSequences &= columnInfo.HasValues(); } else { - *rowItems++ = currentValue.GetElement(columnIdx); - rowSize += NMiniKQL::GetUnboxedValueSize(currentValue.GetElement(columnIdx), columnInfo.TypeInfo).AllocatedBytes; + *rowItems++ = currentValue.GetElement(inputColIdx); + rowSize += NMiniKQL::GetUnboxedValueSize(currentValue.GetElement(inputColIdx), columnInfo.TypeInfo).AllocatedBytes; + ++inputColIdx; } } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 3ceb02ef2bc..2b952a6b630 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1309,6 +1309,7 @@ message TTableServiceConfig { optional bool EnableKqpImmediateEffects = 38 [default = true]; optional bool EnableSequentialReads = 39 [default = false]; optional bool EnablePreparedDdl = 42 [default = false]; + optional bool EnableSequences = 43 [default = true]; enum EBindingsMode { BM_ENABLED = 0; diff --git a/ydb/core/tx/datashard/ut_sequence/datashard_ut_sequence.cpp b/ydb/core/tx/datashard/ut_sequence/datashard_ut_sequence.cpp index a56e7b5607b..f517bc15206 100644 --- a/ydb/core/tx/datashard/ut_sequence/datashard_ut_sequence.cpp +++ b/ydb/core/tx/datashard/ut_sequence/datashard_ut_sequence.cpp @@ -11,9 +11,12 @@ using namespace Tests; Y_UNIT_TEST_SUITE(TSequence) { Y_UNIT_TEST(CreateTableWithDefaultFromSequence) { TPortManager pm; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableSequences(true); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") - .SetUseRealThreads(false); + .SetUseRealThreads(false) + .SetAppConfig(appConfig); Tests::TServer::TPtr server = new TServer(serverSettings); auto &runtime = *server->GetRuntime(); @@ -29,7 +32,7 @@ Y_UNIT_TEST_SUITE(TSequence) { TShardedTableOptions() .Sequences(true) .Columns({ - {"key", "Uint32", true, false, "default", "myseq"}, + {"key", "Int64", true, false, "default", "myseq"}, {"value", "Uint32", true, false}, })); @@ -39,13 +42,176 @@ Y_UNIT_TEST_SUITE(TSequence) { } { + TString result = KqpSimpleExec(runtime, "INSERT INTO `/Root/table-1` (value) VALUES (4), (5), (6);"); + UNIT_ASSERT_VALUES_EQUAL(result, "<empty>"); + } + + { + TString result = KqpSimpleExec(runtime, "REPLACE INTO `/Root/table-1` (value) VALUES (7), (8), (9);"); + UNIT_ASSERT_VALUES_EQUAL(result, "<empty>"); + } + + { TString result = KqpSimpleExec(runtime, "SELECT * FROM `/Root/table-1`;"); UNIT_ASSERT_VALUES_EQUAL( result, - "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " - "{ items { uint32_value: 2 } items { uint32_value: 2 } }, " - "{ items { uint32_value: 3 } items { uint32_value: 3 } }"); + "{ items { int64_value: 1 } items { uint32_value: 1 } }, " + "{ items { int64_value: 2 } items { uint32_value: 2 } }, " + "{ items { int64_value: 3 } items { uint32_value: 3 } }, " + "{ items { int64_value: 4 } items { uint32_value: 4 } }, " + "{ items { int64_value: 5 } items { uint32_value: 5 } }, " + "{ items { int64_value: 6 } items { uint32_value: 6 } }, " + "{ items { int64_value: 7 } items { uint32_value: 7 } }, " + "{ items { int64_value: 8 } items { uint32_value: 8 } }, " + "{ items { int64_value: 9 } items { uint32_value: 9 } }"); + } + } + + Y_UNIT_TEST(CreateTableWithDefaultFromSequenceFromSelect) { + TPortManager pm; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableSequences(true); + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetAppConfig(appConfig); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + // runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + // runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.GetAppData().AllowReadTableImmediate = true; + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-4", + TShardedTableOptions() + .Sequences(true) + .Columns({ + {"key", "Int64", true, false, "default", "myseq"}, + {"value", "Uint32", true, false}, + })); + + { + TString result = KqpSimpleExec(runtime, "UPSERT INTO `/Root/table-4` (value) VALUES (303);"); + UNIT_ASSERT_VALUES_EQUAL(result, "<empty>"); + } + + { + TString result = KqpSimpleExec(runtime, "SELECT key, value FROM `/Root/table-4`;"); + UNIT_ASSERT_VALUES_EQUAL( + result, + "{ items { int64_value: 1 } items { uint32_value: 303 } }"); + } + + { + TString result = KqpSimpleExec(runtime, "UPSERT INTO `/Root/table-4` SELECT value FROM `/Root/table-4`;"); + UNIT_ASSERT_VALUES_EQUAL(result, "<empty>"); + } + + { + TString result = KqpSimpleExec(runtime, "SELECT * FROM `/Root/table-4` ORDER BY key;"); + UNIT_ASSERT_VALUES_EQUAL( + result, + "{ items { int64_value: 1 } items { uint32_value: 303 } }, " + "{ items { int64_value: 2 } items { uint32_value: 303 } }"); + } + + { + TString result = KqpSimpleExec(runtime, "UPSERT INTO `/Root/table-4` SELECT value FROM `/Root/table-4` where key = 1;"); + UNIT_ASSERT_VALUES_EQUAL(result, "<empty>"); + } + + { + TString result = KqpSimpleExec(runtime, "SELECT * FROM `/Root/table-4` ORDER BY key;"); + UNIT_ASSERT_VALUES_EQUAL( + result, + "{ items { int64_value: 1 } items { uint32_value: 303 } }, " + "{ items { int64_value: 2 } items { uint32_value: 303 } }, " + "{ items { int64_value: 3 } items { uint32_value: 303 } }"); + } + } + + Y_UNIT_TEST(CreateTableWithDefaultFromSequenceNotEnabled) { + TPortManager pm; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableSequences(false); + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetAppConfig(appConfig); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + // runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + // runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.GetAppData().AllowReadTableImmediate = true; + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-2", + TShardedTableOptions() + .Sequences(true) + .Columns({ + {"key", "Int64", true, false, "default", "myseq"}, + {"value", "Uint32", true, false}, + })); + + { + TString result = KqpSimpleExec(runtime, "UPSERT INTO `/Root/table-2` (value) VALUES (1), (2), (3);"); + UNIT_ASSERT_VALUES_EQUAL(result, "ERROR: BAD_REQUEST"); + } + } + + Y_UNIT_TEST(CreateTableWithDefaultFromSequenceBadRequest) { + TPortManager pm; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableSequences(true); + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetAppConfig(appConfig); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + // runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + // runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.GetAppData().AllowReadTableImmediate = true; + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-3", + TShardedTableOptions() + .Sequences(true) + .Columns({ + {"key", "Int64", true, false, "default", "myseq"}, + {"value", "Uint32", true, false}, + })); + + { + TString result = KqpSimpleExec( + runtime, + "$to_update = AsList( " + " AsStruct(CAST(12 as Uint32) as value)); " + "UPDATE `/Root/table-3` ON SELECT * FROM AS_TABLE($to_update)"); + UNIT_ASSERT_VALUES_EQUAL(result, "ERROR: BAD_REQUEST"); + } + + { + TString result = KqpSimpleExec( + runtime, + "$to_update = AsList( " + " AsStruct(CAST(12 as Uint32) as value)); " + "DELETE FROM `/Root/table-3` ON SELECT * FROM AS_TABLE($to_update)"); + UNIT_ASSERT_VALUES_EQUAL(result, "ERROR: BAD_REQUEST"); } + } } // Y_UNIT_TEST_SUITE(TSequence) |