aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorandrewproni <andrewproni@yandex-team.com>2023-08-02 21:21:49 +0300
committerandrewproni <andrewproni@yandex-team.com>2023-08-02 21:21:49 +0300
commite1858b92f55a7b35864002810b6f0b84ff5c6001 (patch)
tree772bba5b23105698c688e064eb1998c26bfc401e
parentd00a68736059a6b7c8b1ba9b2efa34fd1aad3332 (diff)
downloadydb-e1858b92f55a7b35864002810b6f0b84ff5c6001.tar.gz
KIKIMR-18689: automaic table update for QueryService
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.cpp243
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp185
-rw-r--r--ydb/core/kqp/proxy_service/kqp_table_creator.cpp381
-rw-r--r--ydb/core/kqp/proxy_service/kqp_table_creator.h12
-rw-r--r--ydb/core/kqp/proxy_service/ya.make1
-rw-r--r--ydb/tests/functional/script_execution/test_update_script_tables.py74
-rw-r--r--ydb/tests/functional/script_execution/ya.make32
-rw-r--r--ydb/tests/functional/ya.make1
12 files changed, 694 insertions, 239 deletions
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt
index e962a6a7b5..e204bf29d8 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt
@@ -42,4 +42,5 @@ target_sources(core-kqp-proxy_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/proxy_service/kqp_proxy_peer_stats_calculator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/proxy_service/kqp_table_creator.cpp
)
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt
index ae463c9f90..2c471136bb 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt
@@ -43,4 +43,5 @@ target_sources(core-kqp-proxy_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/proxy_service/kqp_proxy_peer_stats_calculator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/proxy_service/kqp_table_creator.cpp
)
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt
index ae463c9f90..2c471136bb 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt
@@ -43,4 +43,5 @@ target_sources(core-kqp-proxy_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/proxy_service/kqp_proxy_peer_stats_calculator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/proxy_service/kqp_table_creator.cpp
)
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt
index e962a6a7b5..e204bf29d8 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt
@@ -42,4 +42,5 @@ target_sources(core-kqp-proxy_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/proxy_service/kqp_proxy_peer_stats_calculator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/proxy_service/kqp_table_creator.cpp
)
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
index 66693d4d92..d00666403f 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
@@ -1,22 +1,17 @@
#include "kqp_script_executions.h"
#include "kqp_script_executions_impl.h"
+#include "kqp_table_creator.h"
-#include <ydb/core/base/path.h>
-#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/grpc_services/rpc_kqp_base.h>
#include <ydb/core/kqp/common/events/events.h>
#include <ydb/core/kqp/common/kqp_script_executions.h>
#include <ydb/core/kqp/run_script_actor/kqp_run_script_actor.h>
#include <ydb/library/services/services.pb.h>
-#include <ydb/core/tx/scheme_cache/scheme_cache.h>
-#include <ydb/core/tx/schemeshard/schemeshard.h>
-#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/library/query_actor/query_actor.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/public/api/protos/ydb_issue_message.pb.h>
#include <ydb/public/api/protos/ydb_operation.pb.h>
#include <ydb/public/lib/operation_id/operation_id.h>
-#include <ydb/public/lib/scheme_types/scheme_type_id.h>
#include <ydb/public/sdk/cpp/client/ydb_params/params.h>
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
@@ -30,7 +25,6 @@
#include <util/generic/guid.h>
#include <util/generic/utility.h>
-#include <util/random/random.h>
namespace NKikimr::NKqp {
@@ -84,235 +78,6 @@ public:
};
-class TTableCreator : public NActors::TActorBootstrapped<TTableCreator> {
-public:
- TTableCreator(TVector<TString> pathComponents, TVector<NKikimrSchemeOp::TColumnDescription> columns, TVector<TString> keyColumns,
- TMaybe<NKikimrSchemeOp::TTTLSettings> ttlSettings = Nothing())
- : PathComponents(std::move(pathComponents))
- , Columns(std::move(columns))
- , KeyColumns(std::move(keyColumns))
- , TtlSettings(std::move(ttlSettings))
- {
- Y_VERIFY(!PathComponents.empty());
- Y_VERIFY(!Columns.empty());
- }
-
- void Registered(NActors::TActorSystem* sys, const NActors::TActorId& owner) override {
- NActors::TActorBootstrapped<TTableCreator>::Registered(sys, owner);
- Owner = owner;
- }
-
- STRICT_STFUNC(StateFuncCheck,
- hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
- sFunc(NActors::TEvents::TEvWakeup, CheckTableExistence);
- )
-
- STRICT_STFUNC(StateFuncCreate,
- hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle);
- hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, Handle);
- sFunc(NActors::TEvents::TEvWakeup, RunCreateTableRequest);
- hFunc(TEvTabletPipe::TEvClientConnected, Handle);
- hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
- hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered, Handle);
- )
-
- void Bootstrap() {
- Become(&TTableCreator::StateFuncCheck);
- CheckTableExistence();
- }
-
- void CheckTableExistence() {
- auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
- auto pathComponents = SplitPath(AppData()->TenantName);
- request->DatabaseName = CanonizePath(pathComponents);
- auto& entry = request->ResultSet.emplace_back();
- entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
- pathComponents.insert(pathComponents.end(), PathComponents.begin(), PathComponents.end());
- entry.Path = pathComponents;
- entry.ShowPrivatePath = true;
- entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByPath;
- Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
- }
-
- void RunCreateTableRequest() {
- auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
- NKikimrSchemeOp::TModifyScheme& modifyScheme = *request->Record.MutableTransaction()->MutableModifyScheme();
- auto pathComponents = SplitPath(AppData()->TenantName);
- for (size_t i = 0; i < PathComponents.size() - 1; ++i) {
- pathComponents.emplace_back(PathComponents[i]);
- }
- modifyScheme.SetWorkingDir(CanonizePath(pathComponents));
- modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable);
- modifyScheme.SetInternal(true);
- modifyScheme.SetAllowAccessToPrivatePaths(true);
- NKikimrSchemeOp::TTableDescription& tableDesc = *modifyScheme.MutableCreateTable();
- tableDesc.SetName(TableName());
- for (const TString& k : KeyColumns) {
- tableDesc.AddKeyColumnNames(k);
- }
- for (const NKikimrSchemeOp::TColumnDescription& col : Columns) {
- *tableDesc.AddColumns() = col;
- }
- if (TtlSettings) {
- tableDesc.MutableTTLSettings()->CopyFrom(*TtlSettings);
- }
- Send(MakeTxProxyID(), std::move(request));
- }
-
- void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
- using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus;
- const NSchemeCache::TSchemeCacheNavigate& request = *ev->Get()->Request;
- Y_VERIFY(request.ResultSet.size() == 1);
- const NSchemeCache::TSchemeCacheNavigate::TEntry& result = request.ResultSet[0];
- if (result.Status != EStatus::Ok) {
- KQP_PROXY_LOG_D("Describe table " << TableName() << " result: " << result.Status);
- }
-
- switch (result.Status) {
- case EStatus::Unknown:
- [[fallthrough]];
- case EStatus::PathNotTable:
- [[fallthrough]];
- case EStatus::PathNotPath:
- [[fallthrough]];
- case EStatus::RedirectLookupError:
- Fail(result.Status);
- break;
- case EStatus::RootUnknown:
- [[fallthrough]];
- case EStatus::PathErrorUnknown:
- Become(&TTableCreator::StateFuncCreate);
- RunCreateTableRequest();
- break;
- case EStatus::LookupError:
- [[fallthrough]];
- case EStatus::TableCreationNotComplete:
- Retry();
- break;
- case EStatus::Ok:
- Success();
- break;
- }
- }
-
- void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) {
- KQP_PROXY_LOG_D("TEvProposeTransactionStatus " << TableName() << ": " << ev->Get()->Record);
- const auto ssStatus = ev->Get()->Record.GetSchemeShardStatus();
- switch (ev->Get()->Status()) {
- case NTxProxy::TResultStatus::ExecComplete:
- [[fallthrough]];
- case NTxProxy::TResultStatus::ExecAlready:
- if (ssStatus == NKikimrScheme::EStatus::StatusSuccess || ssStatus == NKikimrScheme::EStatus::StatusAlreadyExists) {
- Success(ev);
- } else {
- Fail(ev);
- }
- break;
- case NTxProxy::TResultStatus::ProxyShardNotAvailable:
- Retry();
- break;
- case NTxProxy::TResultStatus::ExecError:
- if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications) {
- SubscribeOnTransaction(ev);
- // In the process of creating a database, errors of the form may occur -
- // database doesn't have storage pools at all to create tablet
- // channels to storage pool binding by profile id
- } else if (ssStatus == NKikimrScheme::EStatus::StatusInvalidParameter) {
- Retry();
- } else {
- Fail(ev);
- }
- break;
- case NTxProxy::TResultStatus::ExecInProgress:
- SubscribeOnTransaction(ev);
- break;
- default:
- Fail(ev);
- }
- }
-
- void Retry() {
- Schedule(TDuration::MilliSeconds(50 + RandomNumber<ui64>(30)), new NActors::TEvents::TEvWakeup());
- }
-
- void SubscribeOnTransaction(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) {
- NActors::IActor* pipeActor = NTabletPipe::CreateClient(SelfId(), ev->Get()->Record.GetSchemeShardTabletId());
- Y_VERIFY(pipeActor);
- SchemePipeActorId = Register(pipeActor);
- auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>();
- const ui64 txId = ev->Get()->Status() == NTxProxy::TResultStatus::ExecInProgress ? ev->Get()->Record.GetTxId() : ev->Get()->Record.GetPathCreateTxId();
- request->Record.SetTxId(txId);
- NTabletPipe::SendData(SelfId(), SchemePipeActorId, std::move(request));
- KQP_PROXY_LOG_D("Subscribe on create table " << TableName() << " tx: " << txId);
- }
-
- void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
- if (ev->Get()->Status != NKikimrProto::OK) {
- KQP_PROXY_LOG_E("Create table " << TableName() << ". Tablet to pipe not conected: " << NKikimrProto::EReplyStatus_Name(ev->Get()->Status) << ", retry");
- NTabletPipe::CloseClient(SelfId(), SchemePipeActorId);
- SchemePipeActorId = {};
- Retry();
- }
- }
-
- void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) {
- KQP_PROXY_LOG_E("Create table " << TableName() << ". Tablet to pipe destroyed, retry");
- SchemePipeActorId = {};
- }
-
- void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered::TPtr&) {
- }
-
- void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) {
- KQP_PROXY_LOG_D("Create table " << TableName() << ". Transaction completed: " << ev->Get()->Record.GetTxId());
- Success(ev);
- }
-
- void Fail(NSchemeCache::TSchemeCacheNavigate::EStatus status) {
- KQP_PROXY_LOG_E("Failed to create table " << TableName() << ": " << status);
- Reply();
- }
-
- void Fail(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) {
- KQP_PROXY_LOG_E("Failed to create table " << TableName() << ": " << ev->Get()->Status() << ". Response: " << ev->Get()->Record);
- Reply();
- }
-
- void Success() {
- Reply();
- }
-
- void Success(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) {
- KQP_PROXY_LOG_I("Successfully created table " << TableName() << ": " << ev->Get()->Status());
- Reply();
- }
-
- void Success(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) {
- KQP_PROXY_LOG_I("Successfully created table " << TableName() << ". TxId: " << ev->Get()->Record.GetTxId());
- Reply();
- }
-
- void Reply() {
- Send(Owner, new TEvPrivate::TEvCreateTableResponse());
- if (SchemePipeActorId) {
- NTabletPipe::CloseClient(SelfId(), SchemePipeActorId);
- }
- PassAway();
- }
-
- const TString& TableName() const {
- return PathComponents.back();
- }
-
-private:
- const TVector<TString> PathComponents;
- const TVector<NKikimrSchemeOp::TColumnDescription> Columns;
- const TVector<TString> KeyColumns;
- const TMaybe<NKikimrSchemeOp::TTTLSettings> TtlSettings;
- NActors::TActorId Owner;
- NActors::TActorId SchemePipeActorId;
-};
-
class TScriptExecutionsTablesCreator : public TActorBootstrapped<TScriptExecutionsTablesCreator> {
public:
explicit TScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent)
@@ -355,7 +120,7 @@ private:
void RunCreateScriptExecutions() {
TablesCreating++;
Register(
- new TTableCreator(
+ CreateTableCreator(
{ ".metadata", "script_executions" },
{
Col("database", NScheme::NTypeIds::Text),
@@ -386,7 +151,7 @@ private:
void RunCreateScriptExecutionLeases() {
TablesCreating++;
Register(
- new TTableCreator(
+ CreateTableCreator(
{ ".metadata", "script_execution_leases" },
{
Col("database", NScheme::NTypeIds::Text),
@@ -402,7 +167,7 @@ private:
void RunCreateScriptResultSets() {
TablesCreating++;
Register(
- new TTableCreator(
+ CreateTableCreator(
{ ".metadata", "result_sets" },
{
Col("database", NScheme::NTypeIds::Text),
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp
index 1adc650798..e4ba0d1a73 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp
@@ -1,5 +1,6 @@
#include "kqp_script_executions.h"
#include "kqp_script_executions_impl.h"
+#include "kqp_table_creator.h"
#include <ydb/core/testlib/test_client.h>
#include <ydb/core/testlib/basics/appdata.h>
@@ -21,6 +22,44 @@ constexpr TDuration TestOperationTtl = TDuration::Minutes(1);
constexpr TDuration TestResultsTtl = TDuration::Minutes(1);
const TString TestDatabase = "test_db";
+NKikimrSchemeOp::TColumnDescription Col(const TString& columnName, const char* columnType) {
+ NKikimrSchemeOp::TColumnDescription desc;
+ desc.SetName(columnName);
+ desc.SetType(columnType);
+ return desc;
+}
+
+NKikimrSchemeOp::TColumnDescription Col(const TString& columnName, NScheme::TTypeId columnType) {
+ return Col(columnName, NScheme::TypeName(columnType));
+}
+
+[[maybe_unused]] NKikimrSchemeOp::TTTLSettings TtlCol(const TString& columnName) {
+ NKikimrSchemeOp::TTTLSettings settings;
+ settings.MutableEnabled()->SetExpireAfterSeconds(TDuration::Minutes(20).Seconds());
+ settings.MutableEnabled()->SetColumnName(columnName);
+ settings.MutableEnabled()->MutableSysSettings()->SetRunInterval(TDuration::Minutes(60).MicroSeconds());
+ return settings;
+}
+
+const TVector<NKikimrSchemeOp::TColumnDescription> DEFAULT_COLUMNS = {
+ Col("col1", NScheme::NTypeIds::Int32),
+ Col("col2", NScheme::NTypeIds::Int32),
+ Col("col3", NScheme::NTypeIds::String)
+};
+
+const TVector<NKikimrSchemeOp::TColumnDescription> EXTENDED_COLUMNS = {
+ Col("col1", NScheme::NTypeIds::Int32),
+ Col("col2", NScheme::NTypeIds::Int32),
+ Col("col3", NScheme::NTypeIds::String),
+ Col("col4", NScheme::NTypeIds::JsonDocument),
+ Col("col5", NScheme::NTypeIds::Interval)
+};
+
+const TVector<TString> TEST_TABLE_PATH = { ".test", "test_table" };
+
+const TVector<TString> TEST_KEY_COLUMNS = {"col1"};
+
+
struct TScriptExecutionsYdbSetup {
TScriptExecutionsYdbSetup() {
Init();
@@ -100,6 +139,53 @@ struct TScriptExecutionsYdbSetup {
return reply->Get()->ExecutionId;
}
+ void CreateTableInDbSync(TVector<NKikimrSchemeOp::TColumnDescription> columns = DEFAULT_COLUMNS, i32 numberOfRequests = 1, TVector<TString> pathComponents = TEST_TABLE_PATH, TVector<TString> keyColumns = TEST_KEY_COLUMNS,
+ TMaybe<NKikimrSchemeOp::TTTLSettings> ttlSettings = Nothing()) {
+
+ TVector<TActorId> edgeActors;
+ for (i32 i = 0; i < numberOfRequests; ++i) {
+ edgeActors.push_back(CreateTableInDbAsync(columns, pathComponents, keyColumns, ttlSettings));
+ }
+ WaitTableCreation(std::move(edgeActors));
+ }
+
+ TActorId CreateTableInDbAsync(TVector<NKikimrSchemeOp::TColumnDescription> columns = DEFAULT_COLUMNS, TVector<TString> pathComponents = TEST_TABLE_PATH, TVector<TString> keyColumns = TEST_KEY_COLUMNS,
+ TMaybe<NKikimrSchemeOp::TTTLSettings> ttlSettings = Nothing()) {
+ const ui32 node = 0;
+ TActorId edgeActor = GetRuntime()->AllocateEdgeActor(node);
+ GetRuntime()->Register(CreateTableCreator(std::move(pathComponents), std::move(columns), std::move(keyColumns), std::move(ttlSettings)), 0, 0, TMailboxType::Simple, 0, edgeActor);
+ return edgeActor;
+ }
+
+ void WaitTableCreation(TVector<TActorId> edgeActors) {
+ for (const auto& actor: edgeActors) {
+ GetRuntime()->GrabEdgeEvent<NPrivate::TEvPrivate::TEvCreateTableResponse>(actor);
+ }
+ }
+
+ void VerifyColumnsList( TVector<TString> pathComponents = TEST_TABLE_PATH, TVector<NKikimrSchemeOp::TColumnDescription> columns = DEFAULT_COLUMNS) {
+ TStringBuilder path;
+ path << "/dc-1/";
+ for (size_t i = 0; i < pathComponents.size() - 1; ++i) {
+ path << pathComponents[i] << "/";
+ }
+ path << pathComponents.back();
+
+ auto result = TableClientSession->DescribeTable(path).ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ const auto& existingColumns = result.GetTableDescription().GetColumns();
+ UNIT_ASSERT_C(existingColumns.size() == columns.size(), "Expected size: " << columns.size() << ", actual size: " << existingColumns.size());
+
+ THashSet<TString> existingNames;
+ for (const auto& col: existingColumns) {
+ existingNames.emplace(col.Name);
+ }
+
+ for (const auto& col: columns) {
+ UNIT_ASSERT_C(existingNames.contains(col.GetName()), "Column \"" << col.GetName() << "\" is not present" );
+ }
+ }
+
NPrivate::TEvPrivate::TEvLeaseCheckResult::TPtr CheckLeaseStatus(const TString& executionId) {
const ui32 node = 0;
TActorId edgeActor = GetRuntime()->AllocateEdgeActor(node);
@@ -232,4 +318,103 @@ Y_UNIT_TEST_SUITE(ScriptExecutionsTest) {
UNIT_ASSERT(!updateResponse->ExecutionEntryExists);
}
}
+
+Y_UNIT_TEST_SUITE(TableCreation) {
+
+ Y_UNIT_TEST(SimpleTableCreation) {
+ TScriptExecutionsYdbSetup ydb;
+
+ ydb.CreateTableInDbSync();
+ ydb.VerifyColumnsList();
+ }
+
+ Y_UNIT_TEST(ConcurrentTableCreation) {
+ TScriptExecutionsYdbSetup ydb;
+
+ constexpr i32 requests = 20;
+
+ ydb.CreateTableInDbSync(DEFAULT_COLUMNS, requests);
+ ydb.VerifyColumnsList();
+ }
+
+ Y_UNIT_TEST(MultipleTablesCreation) {
+ TScriptExecutionsYdbSetup ydb;
+
+ constexpr i32 requests = 2;
+
+ TVector<TActorId> edgeActors;
+ for (i32 i = 0; i < requests; ++i) {
+ edgeActors.push_back(ydb.CreateTableInDbAsync(DEFAULT_COLUMNS, { ".test", "test_table" + ToString(i)}));
+ }
+
+ ydb.WaitTableCreation(std::move(edgeActors));
+
+ for(i32 i = 0; i < requests; i++) {
+ ydb.VerifyColumnsList({".test", "test_table" + ToString(i)});
+ }
+ }
+
+ Y_UNIT_TEST(ConcurrentMultipleTablesCreation) {
+ TScriptExecutionsYdbSetup ydb;
+
+ constexpr i32 tables = 2;
+ constexpr i32 requests = 20;
+
+ TVector<TActorId> edgeActors;
+ for (i32 i = 0; i < tables; ++i) {
+ for (i32 j = 0; j < requests; ++j){
+ edgeActors.push_back(ydb.CreateTableInDbAsync(DEFAULT_COLUMNS, { ".test", "test_table" + ToString(i)}));
+ }
+ }
+
+ ydb.WaitTableCreation(std::move(edgeActors));
+
+ for(i32 i = 0; i < tables; i++) {
+ ydb.VerifyColumnsList({".test", "test_table" + ToString(i)});
+ }
+ }
+
+ Y_UNIT_TEST(ConcurrentTableCreationWithDifferentVersions) {
+ TScriptExecutionsYdbSetup ydb;
+
+ constexpr i32 requests = 10;
+
+ TVector<TActorId> edgeActors;
+ for (i32 i = 0; i < requests; ++i) {
+ edgeActors.push_back(ydb.CreateTableInDbAsync(i % 2 ? EXTENDED_COLUMNS : DEFAULT_COLUMNS));
+
+ }
+
+ ydb.WaitTableCreation(edgeActors);
+ ydb.VerifyColumnsList(TEST_TABLE_PATH, EXTENDED_COLUMNS);
+ }
+
+ Y_UNIT_TEST(SimpleUpdateTable) {
+ TScriptExecutionsYdbSetup ydb;
+
+ ydb.CreateTableInDbSync(DEFAULT_COLUMNS);
+ ydb.CreateTableInDbSync(EXTENDED_COLUMNS);
+ ydb.VerifyColumnsList(TEST_TABLE_PATH, EXTENDED_COLUMNS);
+ }
+
+ Y_UNIT_TEST(ConcurrentUpdateTable) {
+ TScriptExecutionsYdbSetup ydb;
+
+ constexpr i32 requests = 10;
+
+ ydb.CreateTableInDbSync(DEFAULT_COLUMNS);
+ ydb.CreateTableInDbSync(EXTENDED_COLUMNS, requests);
+
+ ydb.VerifyColumnsList(TEST_TABLE_PATH, EXTENDED_COLUMNS);
+ }
+
+ Y_UNIT_TEST(CreateOldTable) {
+ TScriptExecutionsYdbSetup ydb;
+
+ ydb.CreateTableInDbSync(EXTENDED_COLUMNS);
+ ydb.CreateTableInDbSync(DEFAULT_COLUMNS);
+ ydb.VerifyColumnsList(TEST_TABLE_PATH, EXTENDED_COLUMNS);
+ }
+
+}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/proxy_service/kqp_table_creator.cpp b/ydb/core/kqp/proxy_service/kqp_table_creator.cpp
new file mode 100644
index 0000000000..32b7686bd4
--- /dev/null
+++ b/ydb/core/kqp/proxy_service/kqp_table_creator.cpp
@@ -0,0 +1,381 @@
+#include "kqp_script_executions.h"
+#include "kqp_script_executions_impl.h"
+#include "kqp_table_creator.h"
+
+#include <ydb/core/base/path.h>
+#include <ydb/core/base/tablet_pipe.h>
+#include <ydb/library/services/services.pb.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/core/tx/schemeshard/schemeshard_path.h>
+#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+#include <ydb/public/api/protos/ydb_issue_message.pb.h>
+#include <ydb/public/lib/scheme_types/scheme_type_id.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/interconnect.h>
+#include <library/cpp/actors/core/log.h>
+#include <library/cpp/retry/retry_policy.h>
+
+#include <util/generic/guid.h>
+#include <util/generic/utility.h>
+#include <util/random/random.h>
+
+namespace NKikimr::NKqp {
+
+using namespace NKikimr::NKqp::NPrivate;
+
+namespace {
+
+#define KQP_PROXY_LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_PROXY, stream)
+#define KQP_PROXY_LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_PROXY, stream)
+#define KQP_PROXY_LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_PROXY, stream)
+#define KQP_PROXY_LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_PROXY, stream)
+#define KQP_PROXY_LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_PROXY, stream)
+#define KQP_PROXY_LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_PROXY, stream)
+#define KQP_PROXY_LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_PROXY, stream)
+
+class TTableCreator : public NActors::TActorBootstrapped<TTableCreator> {
+
+using TTableCreatorRetryPolicy = IRetryPolicy<bool>;
+
+public:
+ TTableCreator(TVector<TString> pathComponents, TVector<NKikimrSchemeOp::TColumnDescription> columns, TVector<TString> keyColumns,
+ TMaybe<NKikimrSchemeOp::TTTLSettings> ttlSettings = Nothing())
+ : PathComponents(std::move(pathComponents))
+ , Columns(std::move(columns))
+ , KeyColumns(std::move(keyColumns))
+ , TtlSettings(std::move(ttlSettings))
+ , LogPrefix("Table " + TableName() + " updater. ")
+ {
+ Y_VERIFY(!PathComponents.empty());
+ Y_VERIFY(!Columns.empty());
+ }
+
+ void Registered(NActors::TActorSystem* sys, const NActors::TActorId& owner) override {
+ NActors::TActorBootstrapped<TTableCreator>::Registered(sys, owner);
+ Owner = owner;
+ }
+
+ STRICT_STFUNC(StateFuncCheck,
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ sFunc(NActors::TEvents::TEvWakeup, CheckTableExistence);
+ hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ )
+
+ STRICT_STFUNC(StateFuncUpgrade,
+ hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle);
+ hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, Handle);
+ sFunc(NActors::TEvents::TEvWakeup, RunTableRequest);
+ hFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered, Handle);
+ )
+
+ void Bootstrap() {
+ Become(&TTableCreator::StateFuncCheck);
+ CheckTableExistence();
+ }
+
+ void CheckTableExistence() {
+ auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
+ auto pathComponents = SplitPath(AppData()->TenantName);
+ request->DatabaseName = CanonizePath(pathComponents);
+ auto& entry = request->ResultSet.emplace_back();
+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
+ pathComponents.insert(pathComponents.end(), PathComponents.begin(), PathComponents.end());
+ entry.Path = pathComponents;
+ entry.ShowPrivatePath = true;
+ entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByPath;
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
+ }
+
+ void RunTableRequest() {
+ auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
+ NKikimrSchemeOp::TModifyScheme& modifyScheme = *request->Record.MutableTransaction()->MutableModifyScheme();
+ auto pathComponents = SplitPath(AppData()->TenantName);
+ for (size_t i = 0; i < PathComponents.size() - 1; ++i) {
+ pathComponents.emplace_back(PathComponents[i]);
+ }
+ modifyScheme.SetWorkingDir(CanonizePath(pathComponents));
+ KQP_PROXY_LOG_D(LogPrefix << "Full table path:" << modifyScheme.GetWorkingDir() << "/" << TableName());
+ modifyScheme.SetOperationType(OperationType);
+ modifyScheme.SetInternal(true);
+ modifyScheme.SetAllowAccessToPrivatePaths(true);
+ NKikimrSchemeOp::TTableDescription* tableDesc;
+ if (OperationType == NKikimrSchemeOp::ESchemeOpCreateTable) {
+ tableDesc = modifyScheme.MutableCreateTable();
+ for (const TString& k : KeyColumns) {
+ tableDesc->AddKeyColumnNames(k);
+ }
+ } else {
+ Y_VERIFY_DEBUG(OperationType == NKikimrSchemeOp::ESchemeOpAlterTable);
+ tableDesc = modifyScheme.MutableAlterTable();
+ }
+ tableDesc->SetName(TableName());
+ for (const NKikimrSchemeOp::TColumnDescription& col : Columns) {
+ *tableDesc->AddColumns() = col;
+ }
+ if (TtlSettings) {
+ tableDesc->MutableTTLSettings()->CopyFrom(*TtlSettings);
+ }
+ Send(MakeTxProxyID(), std::move(request));
+ }
+
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus;
+ const NSchemeCache::TSchemeCacheNavigate& request = *ev->Get()->Request;
+ Y_VERIFY(request.ResultSet.size() == 1);
+ const NSchemeCache::TSchemeCacheNavigate::TEntry& result = request.ResultSet[0];
+ if (result.Status != EStatus::Ok) {
+ KQP_PROXY_LOG_D(LogPrefix << "Describe result: " << result.Status);
+ }
+
+ switch (result.Status) {
+ case EStatus::Unknown:
+ [[fallthrough]];
+ case EStatus::PathNotTable:
+ [[fallthrough]];
+ case EStatus::PathNotPath:
+ [[fallthrough]];
+ case EStatus::RedirectLookupError:
+ Fail(result.Status);
+ break;
+ case EStatus::RootUnknown:
+ [[fallthrough]];
+ case EStatus::PathErrorUnknown:
+ Become(&TTableCreator::StateFuncUpgrade);
+ OperationType = NKikimrSchemeOp::ESchemeOpCreateTable;
+ KQP_PROXY_LOG_N(LogPrefix << "Creating table");
+ RunTableRequest();
+ break;
+ case EStatus::LookupError:
+ [[fallthrough]];
+ case EStatus::TableCreationNotComplete:
+ Retry();
+ break;
+ case EStatus::Ok:
+ ExcludeExistingColumns(result.Columns);
+ if (!Columns.empty()) {
+ OperationType = NKikimrSchemeOp::ESchemeOpAlterTable;
+ Become(&TTableCreator::StateFuncUpgrade);
+ RunTableRequest();
+ } else {
+ Success();
+ }
+ break;
+ }
+ }
+
+ void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) {
+ KQP_PROXY_LOG_D(LogPrefix << "TEvProposeTransactionStatus: " << ev->Get()->Record);
+ const auto ssStatus = ev->Get()->Record.GetSchemeShardStatus();
+ switch (ev->Get()->Status()) {
+ case NTxProxy::TResultStatus::ExecComplete:
+ [[fallthrough]];
+ case NTxProxy::TResultStatus::ExecAlready:
+ if (ssStatus == NKikimrScheme::EStatus::StatusSuccess || ssStatus == NKikimrScheme::EStatus::StatusAlreadyExists) {
+ Success(ev);
+ } else {
+ Fail(ev);
+ }
+ break;
+ case NTxProxy::TResultStatus::ProxyShardNotAvailable:
+ Retry();
+ break;
+ case NTxProxy::TResultStatus::ExecError:
+ if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications) {
+ SubscribeOnTransactionOrFallback(ev);
+ // In the process of creating a database, errors of the form may occur -
+ // database doesn't have storage pools at all to create tablet
+ // channels to storage pool binding by profile id
+ // Also, this status is returned when column types mismatch -
+ // need to fallback to rebuild column diff
+ } else if (ssStatus == NKikimrScheme::EStatus::StatusInvalidParameter) {
+ FallBack(true /* long delay */);
+ } else {
+ Fail(ev);
+ }
+ break;
+ case NTxProxy::TResultStatus::ExecInProgress:
+ SubscribeOnTransactionOrFallback(ev);
+ break;
+ default:
+ Fail(ev);
+ }
+ }
+
+ void Retry(bool longDelay = false) {
+ auto delay = GetRetryDelay(longDelay);
+ if (delay) {
+ Schedule(*delay, new NActors::TEvents::TEvWakeup());
+ } else {
+ Fail();
+ }
+
+ }
+
+ void FallBack(bool longDelay = false) {
+ if (SchemePipeActorId){
+ PipeClientClosedByUs = true;
+ NTabletPipe::CloseClient(SelfId(), SchemePipeActorId);
+ }
+ Become(&TTableCreator::StateFuncCheck);
+ Retry(longDelay);
+ }
+
+ void SubscribeOnTransactionOrFallback(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) {
+ const ui64 txId = ev->Get()->Status() == NTxProxy::TResultStatus::ExecInProgress ? ev->Get()->Record.GetTxId() : ev->Get()->Record.GetPathCreateTxId();
+ if (txId == 0) {
+ KQP_PROXY_LOG_D(LogPrefix << "Unable to subscribe to concurrent transaction, falling back");
+ FallBack();
+ return;
+ }
+ PipeClientClosedByUs = false;
+ NActors::IActor* pipeActor = NTabletPipe::CreateClient(SelfId(), ev->Get()->Record.GetSchemeShardTabletId());
+ Y_VERIFY(pipeActor);
+ SchemePipeActorId = Register(pipeActor);
+ auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>();
+ request->Record.SetTxId(txId);
+ NTabletPipe::SendData(SelfId(), SchemePipeActorId, std::move(request));
+ KQP_PROXY_LOG_D(LogPrefix << "Subscribe on create table tx: " << txId);
+ }
+
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
+ if (ev->Get()->Status != NKikimrProto::OK) {
+ KQP_PROXY_LOG_E(LogPrefix << "Request: " << GetOperationType() << ". Tablet to pipe not connected: " << NKikimrProto::EReplyStatus_Name(ev->Get()->Status) << ", retry");
+ PipeClientClosedByUs = true;
+ NTabletPipe::CloseClient(SelfId(), SchemePipeActorId);
+ SchemePipeActorId = {};
+ Retry();
+ }
+ }
+
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) {
+ SchemePipeActorId = {};
+ if (!PipeClientClosedByUs) {
+ KQP_PROXY_LOG_E(LogPrefix << "Request: " << GetOperationType() << ". Tablet to pipe destroyed, retry");
+ Retry();
+ }
+ PipeClientClosedByUs = false;
+ }
+
+ void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered::TPtr&) {
+ }
+
+ void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) {
+ KQP_PROXY_LOG_D(LogPrefix << "Request: " << GetOperationType() << ". Transaction completed: " << ev->Get()->Record.GetTxId() << ". Doublechecking...");
+ FallBack();
+ }
+
+ void Fail(NSchemeCache::TSchemeCacheNavigate::EStatus status) {
+ KQP_PROXY_LOG_E(LogPrefix << "Failed to upgrade table: " << status);
+ Reply();
+ }
+
+ void Fail(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) {
+ KQP_PROXY_LOG_E(LogPrefix << "Failed " << GetOperationType() << " request: " << ev->Get()->Status() << ". Response: " << ev->Get()->Record);
+ Reply();
+ }
+
+ void Fail() {
+ KQP_PROXY_LOG_E(LogPrefix << "Retry limit exceeded");
+ Reply();
+ }
+
+ void Success() {
+ Reply();
+ }
+
+ void Success(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) {
+ KQP_PROXY_LOG_I(LogPrefix << "Successful " << GetOperationType() << " request: " << ev->Get()->Status());
+ Reply();
+ }
+
+ void Reply() {
+ Send(Owner, new TEvPrivate::TEvCreateTableResponse());
+ if (SchemePipeActorId) {
+ NTabletPipe::CloseClient(SelfId(), SchemePipeActorId);
+ }
+ PassAway();
+ }
+
+ const TString& TableName() const {
+ return PathComponents.back();
+ }
+
+private:
+ void ExcludeExistingColumns(const THashMap<ui32, TSysTables::TTableColumnInfo>& existingColumns) {
+ THashSet<TString> existingNames;
+ TStringBuilder columns;
+ for (const auto& [_, colInfo] : existingColumns) {
+ existingNames.insert(colInfo.Name);
+ if (columns) {
+ columns << ", ";
+ }
+ columns << colInfo.Name;
+ }
+
+ TVector<NKikimrSchemeOp::TColumnDescription> filteredColumns;
+ TStringBuilder filtered;
+ for (auto& col : Columns) {
+ if (!existingNames.contains(col.GetName())) {
+ if (filtered) {
+ filtered << ", ";
+ }
+ filtered << col.GetName();
+ filteredColumns.emplace_back(std::move(col));
+ }
+ }
+ if (filteredColumns.empty()) {
+ KQP_PROXY_LOG_D(LogPrefix << "Column diff is empty, finishing");
+ } else {
+ KQP_PROXY_LOG_N(LogPrefix << "Adding columns. New columns: " << filtered << ". Existing columns: " << columns);
+ }
+
+
+ Columns = std::move(filteredColumns);
+ }
+
+ TStringBuf GetOperationType() const {
+ return OperationType == NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable ? "create" : "alter";
+ }
+
+ TMaybe<TDuration> GetRetryDelay(bool longDelay = false) {
+ if (!RetryState) {
+ RetryState = CreateRetryState();
+ }
+ return RetryState->GetNextRetryDelay(longDelay);
+ }
+
+ static TTableCreatorRetryPolicy::IRetryState::TPtr CreateRetryState() {
+ return TTableCreatorRetryPolicy::GetFixedIntervalPolicy(
+ [](bool longDelay){return longDelay ? ERetryErrorClass::LongRetry : ERetryErrorClass::ShortRetry;}
+ , TDuration::MilliSeconds(100)
+ , TDuration::MilliSeconds(300)
+ , 100
+ )->CreateRetryState();
+ }
+
+ const TVector<TString> PathComponents;
+ TVector<NKikimrSchemeOp::TColumnDescription> Columns;
+ const TVector<TString> KeyColumns;
+ const TMaybe<NKikimrSchemeOp::TTTLSettings> TtlSettings;
+ NKikimrSchemeOp::EOperationType OperationType = NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable;
+ NActors::TActorId Owner;
+ NActors::TActorId SchemePipeActorId;
+ bool PipeClientClosedByUs = false;
+ const TString LogPrefix;
+ TTableCreatorRetryPolicy::IRetryState::TPtr RetryState;
+};
+
+} // namespace
+
+NActors::IActor* CreateTableCreator(TVector<TString> pathComponents, TVector<NKikimrSchemeOp::TColumnDescription> columns, TVector<TString> keyColumns,
+ TMaybe<NKikimrSchemeOp::TTTLSettings> ttlSettings) {
+ return new TTableCreator(std::move(pathComponents), std::move(columns), std::move(keyColumns), std::move(ttlSettings));
+}
+
+} // namespace NKikimr::NKqp \ No newline at end of file
diff --git a/ydb/core/kqp/proxy_service/kqp_table_creator.h b/ydb/core/kqp/proxy_service/kqp_table_creator.h
new file mode 100644
index 0000000000..2e1e6afd21
--- /dev/null
+++ b/ydb/core/kqp/proxy_service/kqp_table_creator.h
@@ -0,0 +1,12 @@
+#pragma once
+
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+
+#include <library/cpp/actors/core/actor.h>
+
+namespace NKikimr::NKqp {
+
+NActors::IActor* CreateTableCreator(TVector<TString> pathComponents, TVector<NKikimrSchemeOp::TColumnDescription> columns, TVector<TString> keyColumns,
+ TMaybe<NKikimrSchemeOp::TTTLSettings> ttlSettings = Nothing());
+
+} // namespace NKikimr::NKqp \ No newline at end of file
diff --git a/ydb/core/kqp/proxy_service/ya.make b/ydb/core/kqp/proxy_service/ya.make
index eba98d2c09..a919719387 100644
--- a/ydb/core/kqp/proxy_service/ya.make
+++ b/ydb/core/kqp/proxy_service/ya.make
@@ -4,6 +4,7 @@ SRCS(
kqp_proxy_service.cpp
kqp_proxy_peer_stats_calculator.cpp
kqp_script_executions.cpp
+ kqp_table_creator.cpp
)
PEERDIR(
diff --git a/ydb/tests/functional/script_execution/test_update_script_tables.py b/ydb/tests/functional/script_execution/test_update_script_tables.py
new file mode 100644
index 0000000000..babbb9b415
--- /dev/null
+++ b/ydb/tests/functional/script_execution/test_update_script_tables.py
@@ -0,0 +1,74 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import pytest
+
+from ydb.tests.library.harness.kikimr_cluster import kikimr_cluster_factory
+from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
+
+from ydb.tests.oss.ydb_sdk_import import ydb
+
+
+def _list_operations_request(kind):
+ return ydb._apis.ydb_operation.ListOperationsRequest(kind=kind)
+
+
+def list_operations(driver):
+ return driver(_list_operations_request("scriptexec"),
+ ydb._apis.OperationService.Stub,
+ "ListOperations")
+
+
+columns = ["syntax", "ast", "stats"]
+
+
+class TestUpdateScriptTablesYdb(object):
+ @classmethod
+ def setup_class(cls):
+ cls.config = KikimrConfigGenerator(extra_feature_flags=["enable_script_execution_operations"])
+ cls.cluster = kikimr_cluster_factory(configurator=cls.config)
+ cls.cluster.start()
+ cls.driver = ydb.Driver(ydb.DriverConfig(
+ database="/Root",
+ endpoint="%s:%d" % (cls.cluster.nodes[1].host, cls.cluster.nodes[1].port)))
+ cls.driver.wait()
+
+ @classmethod
+ def teardown_class(cls):
+ if hasattr(cls, 'cluster'):
+ cls.cluster.stop()
+
+ def check_table_existance(self, table_name):
+ with self.session.transaction() as tx:
+ tx.execute(
+ "SELECT * FROM `.metadata/script_executions`"
+ )
+ tx.commit()
+
+ drop_columns = ["ALTER TABLE {} " + ', '.join(["DROP COLUMN {}".format(col) for col in columns[:i + 1]]) for i in range(len(columns))]
+
+ @pytest.mark.parametrize("table_name", ["`.metadata/script_executions`"])
+ @pytest.mark.parametrize("operation", ["DROP TABLE {}", *drop_columns])
+ def test_recreate_tables(self, operation, table_name):
+ self.session = ydb.retry_operation_sync(lambda: self.driver.table_client.session().create())
+
+ res = list_operations(self.driver)
+ assert "SUCCESS" in str(res)
+
+ self.check_table_existance(table_name)
+
+ self.session.execute_scheme(
+ operation.format(table_name)
+ )
+
+ res = list_operations(self.driver)
+ assert "ERROR" in str(res)
+
+ self.cluster.stop()
+ self.cluster.start()
+ self.session = ydb.retry_operation_sync(lambda: self.driver.table_client.session().create())
+
+ res = list_operations(self.driver)
+ assert "SUCCESS" in str(res)
+
+ self.check_table_existance(table_name)
diff --git a/ydb/tests/functional/script_execution/ya.make b/ydb/tests/functional/script_execution/ya.make
new file mode 100644
index 0000000000..ab22c161ec
--- /dev/null
+++ b/ydb/tests/functional/script_execution/ya.make
@@ -0,0 +1,32 @@
+PY3TEST()
+
+ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")
+
+PEERDIR(
+ ydb/public/api/protos
+ ydb/public/sdk/python
+ ydb/public/api/grpc
+ ydb/tests/library
+ ydb/tests/oss/ydb_sdk_import
+)
+
+DEPENDS(
+ ydb/apps/ydbd
+)
+
+TEST_SRCS(
+ test_update_script_tables.py
+)
+
+IF (SANITIZER_TYPE)
+ TIMEOUT(2400)
+ SIZE(LARGE)
+ TAG(ya:fat)
+ELSE()
+ TIMEOUT(600)
+ SIZE(MEDIUM)
+ENDIF()
+
+FORK_SUBTESTS()
+
+END() \ No newline at end of file
diff --git a/ydb/tests/functional/ya.make b/ydb/tests/functional/ya.make
index 851335c7f4..99ea5059b5 100644
--- a/ydb/tests/functional/ya.make
+++ b/ydb/tests/functional/ya.make
@@ -26,4 +26,5 @@ RECURSE(
ttl
ydb_cli
query_cache
+ script_execution
)