aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2025-02-28 02:18:16 +0500
committerGitHub <noreply@github.com>2025-02-28 00:18:16 +0300
commitce94a364f46bc962c4ff261a631ccea8f717ce4d (patch)
tree2c9a233c5d7c348d2fb7fced1dfcc1ec3be4ee63
parente8002d3a737240cf8751a577f634e7b73b68ada1 (diff)
downloadydb-ce94a364f46bc962c4ff261a631ccea8f717ce4d.tar.gz
Alter transfer from a topic to a table (#15024)
-rw-r--r--ydb/core/tx/replication/controller/dst_alterer.cpp11
-rw-r--r--ydb/core/tx/replication/controller/event_util.cpp2
-rw-r--r--ydb/core/tx/replication/controller/replication.cpp9
-rw-r--r--ydb/core/tx/replication/controller/replication.h6
-rw-r--r--ydb/core/tx/replication/controller/schema.h3
-rw-r--r--ydb/core/tx/replication/controller/target_base.cpp6
-rw-r--r--ydb/core/tx/replication/controller/target_base.h6
-rw-r--r--ydb/core/tx/replication/controller/target_discoverer_ut.cpp2
-rw-r--r--ydb/core/tx/replication/controller/target_table.cpp8
-rw-r--r--ydb/core/tx/replication/controller/target_table.h1
-rw-r--r--ydb/core/tx/replication/controller/tx_alter_dst_result.cpp40
-rw-r--r--ydb/core/tx/replication/controller/tx_alter_replication.cpp41
-rw-r--r--ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp2
-rw-r--r--ydb/core/tx/replication/controller/tx_init.cpp2
-rw-r--r--ydb/tests/functional/transfer/main.cpp107
15 files changed, 205 insertions, 41 deletions
diff --git a/ydb/core/tx/replication/controller/dst_alterer.cpp b/ydb/core/tx/replication/controller/dst_alterer.cpp
index 7a197f0c80..1bdfb49923 100644
--- a/ydb/core/tx/replication/controller/dst_alterer.cpp
+++ b/ydb/core/tx/replication/controller/dst_alterer.cpp
@@ -43,12 +43,13 @@ class TDstAlterer: public TActorBootstrapped<TDstAlterer> {
switch (Kind) {
case TReplication::ETargetKind::Table:
case TReplication::ETargetKind::IndexTable:
- case TReplication::ETargetKind::Transfer:
tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterTable);
DstPathId.ToProto(tx.MutableAlterTable()->MutablePathId());
tx.MutableAlterTable()->MutableReplicationConfig()->SetMode(
NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE);
break;
+ case TReplication::ETargetKind::Transfer:
+ break;
}
Send(PipeCache, new TEvPipeCache::TEvForward(ev.Release(), SchemeShardId, true));
@@ -153,7 +154,13 @@ public:
if (!DstPathId) {
Success();
} else {
- AllocateTxId();
+ switch (Kind) {
+ case TReplication::ETargetKind::Table:
+ case TReplication::ETargetKind::IndexTable:
+ return AllocateTxId();
+ case TReplication::ETargetKind::Transfer:
+ return Success();
+ }
}
}
diff --git a/ydb/core/tx/replication/controller/event_util.cpp b/ydb/core/tx/replication/controller/event_util.cpp
index 4b9d766b36..4882593286 100644
--- a/ydb/core/tx/replication/controller/event_util.cpp
+++ b/ydb/core/tx/replication/controller/event_util.cpp
@@ -51,7 +51,7 @@ THolder<TEvService::TEvRunWorker> MakeRunWorkerEv(
break;
}
case TReplication::ETargetKind::Transfer: {
- auto p = std::dynamic_pointer_cast<TTargetTransfer::TTransferConfig>(config);
+ auto p = std::dynamic_pointer_cast<const TTargetTransfer::TTransferConfig>(config);
auto& writerSettings = *record.MutableCommand()->MutableTransferWriter();
dstPathId.ToProto(writerSettings.MutablePathId());
writerSettings.SetTransformLambda(p->GetTransformLambda());
diff --git a/ydb/core/tx/replication/controller/replication.cpp b/ydb/core/tx/replication/controller/replication.cpp
index 2e235267a6..b0417987f8 100644
--- a/ydb/core/tx/replication/controller/replication.cpp
+++ b/ydb/core/tx/replication/controller/replication.cpp
@@ -228,6 +228,7 @@ private:
NKikimrReplication::TReplicationConfig Config;
EState State = EState::Ready;
TString Issue;
+ EState DesiredState = EState::Ready;
ui64 NextTargetId = 1;
THashMap<ui64, TTarget> Targets;
THashSet<ui64> PendingAlterTargets;
@@ -329,6 +330,14 @@ const TString& TReplication::GetIssue() const {
return Impl->Issue;
}
+TReplication::EState TReplication::GetDesiredState() const {
+ return Impl->DesiredState;
+}
+
+void TReplication::SetDesiredState(EState state) {
+ Impl->DesiredState = state;
+}
+
void TReplication::SetNextTargetId(ui64 value) {
Impl->NextTargetId = value;
}
diff --git a/ydb/core/tx/replication/controller/replication.h b/ydb/core/tx/replication/controller/replication.h
index 22252c82e1..c95222d382 100644
--- a/ydb/core/tx/replication/controller/replication.h
+++ b/ydb/core/tx/replication/controller/replication.h
@@ -56,7 +56,7 @@ public:
class ITarget {
public:
struct IConfig {
- using TPtr = std::shared_ptr<IConfig>;
+ using TPtr = std::shared_ptr<const IConfig>;
virtual ~IConfig() = default;
@@ -98,6 +98,8 @@ public:
virtual void Progress(const TActorContext& ctx) = 0;
virtual void Shutdown(const TActorContext& ctx) = 0;
+ virtual void UpdateConfig(const NKikimrReplication::TReplicationConfig&) = 0;
+
protected:
virtual IActor* CreateWorkerRegistar(const TActorContext& ctx) const = 0;
};
@@ -135,6 +137,8 @@ public:
const NKikimrReplication::TReplicationConfig& GetConfig() const;
void SetState(EState state, TString issue = {});
EState GetState() const;
+ EState GetDesiredState() const;
+ void SetDesiredState(EState state);
const TString& GetIssue() const;
const TMaybe<TDuration> GetLag() const;
diff --git a/ydb/core/tx/replication/controller/schema.h b/ydb/core/tx/replication/controller/schema.h
index 861e7e61a0..b30ab4a839 100644
--- a/ydb/core/tx/replication/controller/schema.h
+++ b/ydb/core/tx/replication/controller/schema.h
@@ -25,9 +25,10 @@ struct TControllerSchema: NIceDb::Schema {
struct State: Column<5, NScheme::NTypeIds::Uint8> { using Type = TReplication::EState; };
struct Issue: Column<6, NScheme::NTypeIds::Utf8> {};
struct NextTargetId: Column<7, NScheme::NTypeIds::Uint64> { static constexpr Type Default = 1; };
+ struct DesiredState: Column<8, NScheme::NTypeIds::Uint8> { using Type = TReplication::EState; };
using TKey = TableKey<Id>;
- using TColumns = TableColumns<Id, PathOwnerId, PathLocalId, Config, State, Issue, NextTargetId>;
+ using TColumns = TableColumns<Id, PathOwnerId, PathLocalId, Config, State, Issue, NextTargetId, DesiredState>;
};
struct Targets: Table<3> {
diff --git a/ydb/core/tx/replication/controller/target_base.cpp b/ydb/core/tx/replication/controller/target_base.cpp
index 881e033ac5..59ff79a38b 100644
--- a/ydb/core/tx/replication/controller/target_base.cpp
+++ b/ydb/core/tx/replication/controller/target_base.cpp
@@ -73,6 +73,9 @@ void TTargetBase::SetDstState(const EDstState value) {
return Replication->AddPendingAlterTarget(Id);
case EDstState::Done:
return Replication->RemovePendingAlterTarget(Id);
+ case EDstState::Ready:
+ PendingRemoveWorkers = false;
+ break;
default:
break;
}
@@ -196,4 +199,7 @@ void TTargetBase::Shutdown(const TActorContext& ctx) {
}
}
+void TTargetBase::UpdateConfig(const NKikimrReplication::TReplicationConfig&) {
+}
+
}
diff --git a/ydb/core/tx/replication/controller/target_base.h b/ydb/core/tx/replication/controller/target_base.h
index c627afc410..f3a474752b 100644
--- a/ydb/core/tx/replication/controller/target_base.h
+++ b/ydb/core/tx/replication/controller/target_base.h
@@ -71,11 +71,12 @@ public:
void Progress(const TActorContext& ctx) override;
void Shutdown(const TActorContext& ctx) override;
+ void UpdateConfig(const NKikimrReplication::TReplicationConfig&) override;
+
private:
TReplication* const Replication;
const ui64 Id;
const ETargetKind Kind;
- const IConfig::TPtr Config;
EDstState DstState = EDstState::Creating;
TPathId DstPathId;
@@ -90,6 +91,9 @@ private:
THashMap<ui64, TWorker> Workers;
bool PendingRemoveWorkers = false;
+protected:
+ IConfig::TPtr Config;
+
}; // TTargetBase
}
diff --git a/ydb/core/tx/replication/controller/target_discoverer_ut.cpp b/ydb/core/tx/replication/controller/target_discoverer_ut.cpp
index 025941ba02..72e4a43c32 100644
--- a/ydb/core/tx/replication/controller/target_discoverer_ut.cpp
+++ b/ydb/core/tx/replication/controller/target_discoverer_ut.cpp
@@ -124,7 +124,7 @@ Y_UNIT_TEST_SUITE(TargetDiscoverer) {
UNIT_ASSERT_VALUES_EQUAL(toAdd.at(0).Config->GetSrcPath(), "/Root/Topic");
UNIT_ASSERT_VALUES_EQUAL(toAdd.at(0).Config->GetDstPath(), "/Root/Replicated/Table");
UNIT_ASSERT_VALUES_EQUAL(toAdd.at(0).Kind, TReplication::ETargetKind::Transfer);
- auto p = std::dynamic_pointer_cast<TTargetTransfer::TTransferConfig>(toAdd.at(0).Config);
+ auto p = std::dynamic_pointer_cast<const TTargetTransfer::TTransferConfig>(toAdd.at(0).Config);
UNIT_ASSERT(p);
UNIT_ASSERT_VALUES_EQUAL(p->GetTransformLambda(), "lambda body");
}
diff --git a/ydb/core/tx/replication/controller/target_table.cpp b/ydb/core/tx/replication/controller/target_table.cpp
index b9c4a773c1..e60b71693e 100644
--- a/ydb/core/tx/replication/controller/target_table.cpp
+++ b/ydb/core/tx/replication/controller/target_table.cpp
@@ -141,6 +141,14 @@ TTargetTransfer::TTargetTransfer(TReplication* replication, ui64 id, const IConf
{
}
+void TTargetTransfer::UpdateConfig(const NKikimrReplication::TReplicationConfig& cfg) {
+ auto& t = cfg.GetTransferSpecific().GetTargets(0);
+ Config = std::make_shared<TTargetTransfer::TTransferConfig>(
+ GetConfig()->GetSrcPath(),
+ GetConfig()->GetDstPath(),
+ t.GetTransformLambda());
+}
+
TString TTargetTransfer::BuildStreamPath() const {
return CanonizePath(GetSrcPath());
}
diff --git a/ydb/core/tx/replication/controller/target_table.h b/ydb/core/tx/replication/controller/target_table.h
index 1ca7aa1ca0..6b43fd2fc1 100644
--- a/ydb/core/tx/replication/controller/target_table.h
+++ b/ydb/core/tx/replication/controller/target_table.h
@@ -66,6 +66,7 @@ public:
explicit TTargetTransfer(TReplication* replication,
ui64 id, const IConfig::TPtr& config);
+ void UpdateConfig(const NKikimrReplication::TReplicationConfig&) override;
protected:
TString BuildStreamPath() const override;
diff --git a/ydb/core/tx/replication/controller/tx_alter_dst_result.cpp b/ydb/core/tx/replication/controller/tx_alter_dst_result.cpp
index 940b6ba90b..4154b62792 100644
--- a/ydb/core/tx/replication/controller/tx_alter_dst_result.cpp
+++ b/ydb/core/tx/replication/controller/tx_alter_dst_result.cpp
@@ -4,6 +4,7 @@ namespace NKikimr::NReplication::NController {
class TController::TTxAlterDstResult: public TTxBase {
TEvPrivate::TEvAlterDstResult::TPtr Ev;
+ TReplication::TPtr Replication;
public:
explicit TTxAlterDstResult(TController* self, TEvPrivate::TEvAlterDstResult::TPtr& ev)
@@ -22,14 +23,14 @@ public:
const auto rid = Ev->Get()->ReplicationId;
const auto tid = Ev->Get()->TargetId;
- auto replication = Self->Find(rid);
- if (!replication) {
+ Replication = Self->Find(rid);
+ if (!Replication) {
CLOG_W(ctx, "Unknown replication"
<< ": rid# " << rid);
return true;
}
- auto* target = replication->FindTarget(tid);
+ auto* target = Replication->FindTarget(tid);
if (!target) {
CLOG_W(ctx, "Unknown target"
<< ": rid# " << rid
@@ -46,16 +47,20 @@ public:
}
if (Ev->Get()->IsSuccess()) {
- target->SetDstState(TReplication::EDstState::Done);
+ target->SetDstState(NextState(Replication->GetDesiredState()));
+ target->UpdateConfig(Replication->GetConfig());
CLOG_N(ctx, "Target dst altered"
<< ": rid# " << rid
<< ", tid# " << tid);
- if (replication->CheckAlterDone()) {
+ if (Replication->CheckAlterDone()) {
CLOG_N(ctx, "Replication altered"
<< ": rid# " << rid);
- replication->SetState(TReplication::EState::Done);
+ Replication->SetState(Replication->GetDesiredState());
+ if (Replication->GetState() != TReplication::EState::Ready) {
+ Replication.Reset();
+ }
}
} else {
target->SetDstState(TReplication::EDstState::Error);
@@ -63,7 +68,7 @@ public:
<< ": " << NKikimrScheme::EStatus_Name(Ev->Get()->Status)
<< ", " << Ev->Get()->Error);
- replication->SetState(TReplication::EState::Error, TStringBuilder() << "Error in target #" << target->GetId()
+ Replication->SetState(TReplication::EState::Error, TStringBuilder() << "Error in target #" << target->GetId()
<< ": " << target->GetIssue());
CLOG_E(ctx, "Alter dst error"
@@ -75,8 +80,8 @@ public:
NIceDb::TNiceDb db(txc.DB);
db.Table<Schema::Replications>().Key(rid).Update(
- NIceDb::TUpdate<Schema::Replications::State>(replication->GetState()),
- NIceDb::TUpdate<Schema::Replications::Issue>(replication->GetIssue())
+ NIceDb::TUpdate<Schema::Replications::State>(Replication->GetState()),
+ NIceDb::TUpdate<Schema::Replications::Issue>(Replication->GetIssue())
);
db.Table<Schema::Targets>().Key(rid, tid).Update(
NIceDb::TUpdate<Schema::Targets::DstState>(target->GetDstState()),
@@ -86,8 +91,25 @@ public:
return true;
}
+ TReplication::EDstState NextState(TReplication::EState state) {
+ switch (state) {
+ case TReplication::EState::Done:
+ return TReplication::EDstState::Done;
+ case TReplication::EState::Ready:
+ return TReplication::EDstState::Ready;
+ case TReplication::EState::Error:
+ return TReplication::EDstState::Error;
+ case TReplication::EState::Removing:
+ return TReplication::EDstState::Removing;
+ }
+ }
+
void Complete(const TActorContext& ctx) override {
CLOG_D(ctx, "Complete");
+
+ if (Replication) {
+ Replication->Progress(ctx);
+ }
}
}; // TTxAlterDstResult
diff --git a/ydb/core/tx/replication/controller/tx_alter_replication.cpp b/ydb/core/tx/replication/controller/tx_alter_replication.cpp
index bf50c9548e..d1688184c0 100644
--- a/ydb/core/tx/replication/controller/tx_alter_replication.cpp
+++ b/ydb/core/tx/replication/controller/tx_alter_replication.cpp
@@ -37,27 +37,48 @@ public:
return true;
}
+ bool alter = false;
+
+ const auto& oldConfig = Replication->GetConfig();
+ const auto& newConfig = record.GetConfig();
+
+ if (oldConfig.HasTransferSpecific()) {
+ auto& oldLambda = oldConfig.GetTransferSpecific().GetTargets(0).GetTransformLambda();
+ auto& newLambda = newConfig.GetTransferSpecific().GetTargets(0).GetTransformLambda();
+
+ alter = oldLambda != newLambda;
+ }
+
+ auto desiredState = Replication->GetState();
+ if (record.HasSwitchState()) {
+ switch (record.GetSwitchState().GetStateCase()) {
+ case NKikimrReplication::TReplicationState::kDone:
+ desiredState = TReplication::EState::Done;
+ alter = true;
+ break;
+ default:
+ Y_ABORT("Invalid state");
+ }
+ }
+
+ if (alter) {
+ Replication->SetDesiredState(desiredState);
+ }
+
Replication->SetConfig(std::move(*record.MutableConfig()));
NIceDb::TNiceDb db(txc.DB);
db.Table<Schema::Replications>().Key(Replication->GetId()).Update(
- NIceDb::TUpdate<Schema::Replications::Config>(record.GetConfig().SerializeAsString())
+ NIceDb::TUpdate<Schema::Replications::Config>(record.GetConfig().SerializeAsString()),
+ NIceDb::TUpdate<Schema::Replications::DesiredState>(desiredState)
);
- if (!record.HasSwitchState()) {
+ if (!alter) {
Result->Record.SetStatus(NKikimrReplication::TEvAlterReplicationResult::SUCCESS);
return true;
}
- switch (record.GetSwitchState().GetStateCase()) {
- case NKikimrReplication::TReplicationState::kDone:
- break;
- default:
- Y_ABORT("Invalid state");
- }
-
Result->Record.SetStatus(NKikimrReplication::TEvAlterReplicationResult::SUCCESS);
- bool alter = false;
for (ui64 tid = 0; tid < Replication->GetNextTargetId(); ++tid) {
auto* target = Replication->FindTarget(tid);
if (!target) {
diff --git a/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp b/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp
index 8649dc0610..cd247e2b5d 100644
--- a/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp
+++ b/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp
@@ -47,7 +47,7 @@ public:
const auto tid = Replication->AddTarget(target.Kind, target.Config);
TString transformLambda;
- if (auto p = std::dynamic_pointer_cast<TTargetTransfer::TTransferConfig>(target.Config)) {
+ if (auto p = std::dynamic_pointer_cast<const TTargetTransfer::TTransferConfig>(target.Config)) {
transformLambda = p->GetTransformLambda();
}
diff --git a/ydb/core/tx/replication/controller/tx_init.cpp b/ydb/core/tx/replication/controller/tx_init.cpp
index af05d350a1..99db3921c9 100644
--- a/ydb/core/tx/replication/controller/tx_init.cpp
+++ b/ydb/core/tx/replication/controller/tx_init.cpp
@@ -53,10 +53,12 @@ class TController::TTxInit: public TTxBase {
const auto state = rowset.GetValue<Schema::Replications::State>();
const auto issue = rowset.GetValue<Schema::Replications::Issue>();
const auto nextTid = rowset.GetValue<Schema::Replications::NextTargetId>();
+ const auto desiredState = rowset.GetValue<Schema::Replications::DesiredState>();
auto replication = Self->Add(rid, pathId, config);
replication->SetState(state, issue);
replication->SetNextTargetId(nextTid);
+ replication->SetDesiredState(desiredState);
if (!rowset.Next()) {
return false;
diff --git a/ydb/tests/functional/transfer/main.cpp b/ydb/tests/functional/transfer/main.cpp
index 6a96711a1a..b24dc77ed9 100644
--- a/ydb/tests/functional/transfer/main.cpp
+++ b/ydb/tests/functional/transfer/main.cpp
@@ -114,10 +114,11 @@ TMessage _withMessageGroupId(const TString& messageGroupId) {
}
struct TConfig {
- const char* TableDDL;
- const char* Lambda;
+ TString TableDDL;
+ const TString Lambda;
const TVector<TMessage> Messages;
TVector<std::pair<TString, std::shared_ptr<IChecker>>> Expectations;
+ const TVector<TString> AlterLambdas;
};
@@ -142,7 +143,7 @@ struct MainTestCase {
auto topicClient = TTopicClient(driver);
{
- auto tableDDL = Sprintf(Config.TableDDL, TableName.data());
+ auto tableDDL = Sprintf(Config.TableDDL.data(), TableName.data());
auto res = session.ExecuteQuery(tableDDL, TTxControl::NoTx()).GetValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}
@@ -157,18 +158,39 @@ struct MainTestCase {
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}
- {
- auto res = session.ExecuteQuery(Sprintf(R"(
- %s;
+ TVector<TString> lambdas;
+ lambdas.insert(lambdas.end(), Config.AlterLambdas.begin(), Config.AlterLambdas.end());
+ lambdas.push_back(Config.Lambda);
+
+ for (size_t i = 0; i < lambdas.size(); ++i) {
+ auto lambda = lambdas[i];
+ if (!i) {
+ auto res = session.ExecuteQuery(Sprintf(R"(
+ %s;
+
+ CREATE TRANSFER `%s`
+ FROM `%s` TO `%s` USING $l
+ WITH (
+ CONNECTION_STRING = 'grpc://%s'
+ -- , TOKEN = 'user@builtin'
+ );
+ )", lambda.data(), TransferName.data(), TopicName.data(), TableName.data(), connectionString.data()), TTxControl::NoTx()).GetValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ } else {
+ Sleep(TDuration::Seconds(1));
- CREATE TRANSFER `%s`
- FROM `%s` TO `%s` USING $l
- WITH (
- CONNECTION_STRING = 'grpc://%s'
- -- , TOKEN = 'user@builtin'
- );
- )", Config.Lambda, TransferName.data(), TopicName.data(), TableName.data(), connectionString.data()), TTxControl::NoTx()).GetValueSync();
- UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ auto res = session.ExecuteQuery(Sprintf(R"(
+ %s;
+
+ ALTER TRANSFER `%s`
+ SET USING $l;
+ )", lambda.data(), TransferName.data()), TTxControl::NoTx()).GetValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+
+ if (i == lambdas.size() - 1) {
+ Sleep(TDuration::Seconds(1));
+ }
+ }
}
{
@@ -665,5 +687,62 @@ Y_UNIT_TEST_SUITE(Transfer)
}).Run();
}
+ Y_UNIT_TEST(AlterLambda)
+ {
+ MainTestCase({
+ .TableDDL = R"(
+ CREATE TABLE `%s` (
+ Key Uint64 NOT NULL,
+ Message Utf8 NOT NULL,
+ PRIMARY KEY (Key)
+ ) WITH (
+ STORE = COLUMN
+ );
+ )",
+
+ .Lambda = R"(
+ $l = ($x) -> {
+ return [
+ <|
+ Key:CAST($x._offset AS Uint64),
+ Message:CAST($x._data || " new lambda" AS Utf8)
+ |>
+ ];
+ };
+ )",
+
+ .Messages = {{"Message-1"}},
+
+ .Expectations = {
+ _C("Message", TString("Message-1 new lambda")),
+ },
+
+ .AlterLambdas = {
+ R"(
+ $l = ($x) -> {
+ return [
+ <|
+ Key:CAST($x._offset AS Uint64),
+ Message:CAST($x._data || " 1 lambda" AS Utf8)
+ |>
+ ];
+ };
+ )",
+ R"(
+ $l = ($x) -> {
+ return [
+ <|
+ Key:CAST($x._offset AS Uint64),
+ Message:CAST($x._data || " 2 lambda" AS Utf8)
+ |>
+ ];
+ };
+ )",
+ }
+
+ }).Run();
+ }
+
+
}