diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2024-12-03 18:51:13 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-03 15:51:13 +0000 |
commit | 5c8e3e634e7436f1f9958439b48283cddf6e17e2 (patch) | |
tree | 6d534a313d1465b2fb9ced947207f3dd36cfe6c1 | |
parent | 14a54ab15843ef7b15c3c45bf424294b2444607a (diff) | |
download | ydb-5c8e3e634e7436f1f9958439b48283cddf6e17e2.tar.gz |
Consistency option (kqp part) (#12111)
-rw-r--r-- | ydb/core/kqp/host/kqp_gateway_proxy.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 29 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_gateway.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_gateway.h | 27 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_type_ann.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 31 |
6 files changed, 99 insertions, 0 deletions
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 1f47b88897..2c02d7a7fe 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -2388,6 +2388,12 @@ public: if (const auto& staticCreds = settings.Settings.StaticCredentials) { staticCreds->Serialize(*params.MutableStaticCredentials()); } + if (settings.Settings.WeakConsistency) { + config.MutableWeakConsistency(); + } + if (const auto& consistency = settings.Settings.StrongConsistency) { + consistency->Serialize(*config.MutableStrongConsistency()); + } auto& targets = *config.MutableSpecific(); for (const auto& [src, dst] : settings.Targets) { diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index b2b205be6e..f71b3eb414 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -688,6 +688,30 @@ namespace { } else if (name == "password_secret_name") { dstSettings.EnsureStaticCredentials().PasswordSecretName = setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value(); + } else if (name == "consistency_mode") { + auto value = ToString(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()); + if (to_lower(value) == "strong") { + dstSettings.EnsureStrongConsistency(); + } else if (to_lower(value) == "weak") { + dstSettings.EnsureWeakConsistency(); + } else { + ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()), + TStringBuilder() << "Unknown consistency mode: " << value)); + return false; + } + } else if (name == "commit_interval") { + YQL_ENSURE(setting.Value().Maybe<TCoInterval>()); + const auto value = FromString<i64>( + setting.Value().Cast<TCoInterval>().Literal().Value() + ); + + if (value <= 0) { + ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()), + TStringBuilder() << name << " must be positive")); + return false; + } + + dstSettings.EnsureStrongConsistency().CommitInterval = TDuration::FromValue(value); } else if (name == "state") { auto value = ToString(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()); if (to_lower(value) == "done") { @@ -735,6 +759,11 @@ namespace { return false; } + if (dstSettings.WeakConsistency && dstSettings.StrongConsistency) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Ambiguous consistency mode")); + return false; + } + return true; } diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway.cpp index 233058ee4a..a45fdc5996 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.cpp @@ -76,6 +76,10 @@ void TReplicationSettings::TStaticCredentials::Serialize(NKikimrReplication::TSt } } +void TReplicationSettings::TStrongConsistency::Serialize(NKikimrReplication::TReplicationConfig_TStrongConsistency& proto) const { + proto.SetCommitIntervalMilliSeconds(CommitInterval.MilliSeconds()); +} + TFuture<IKikimrGateway::TGenericResult> IKikimrGateway::CreatePath(const TString& path, TCreateDirFunc createDir) { auto partsHolder = std::make_shared<TVector<TString>>(NKikimr::SplitPath(path)); auto& parts = *partsHolder; diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 86e304fb04..7774ce7f50 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -44,6 +44,7 @@ namespace NKikimr { namespace NKikimrReplication { class TOAuthToken; class TStaticCredentials; + class TReplicationConfig_TStrongConsistency; } namespace NYql { @@ -792,11 +793,21 @@ struct TReplicationSettings { void Serialize(NKikimrReplication::TStaticCredentials& proto) const; }; + struct TWeakConsistency {}; + + struct TStrongConsistency { + TDuration CommitInterval; + + void Serialize(NKikimrReplication::TReplicationConfig_TStrongConsistency& proto) const; + }; + TMaybe<TString> ConnectionString; TMaybe<TString> Endpoint; TMaybe<TString> Database; TMaybe<TOAuthToken> OAuthToken; TMaybe<TStaticCredentials> StaticCredentials; + TMaybe<TWeakConsistency> WeakConsistency; + TMaybe<TStrongConsistency> StrongConsistency; TMaybe<TStateDone> StateDone; TOAuthToken& EnsureOAuthToken() { @@ -815,6 +826,22 @@ struct TReplicationSettings { return *StaticCredentials; } + TWeakConsistency& EnsureWeakConsistency() { + if (!WeakConsistency) { + WeakConsistency = TWeakConsistency(); + } + + return *WeakConsistency; + } + + TStrongConsistency& EnsureStrongConsistency() { + if (!StrongConsistency) { + StrongConsistency = TStrongConsistency(); + } + + return *StrongConsistency; + } + using EFailoverMode = TStateDone::EFailoverMode; TStateDone& EnsureStateDone(EFailoverMode mode = EFailoverMode::Consistent) { if (!StateDone) { diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index 6131498239..f02d512440 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -1771,6 +1771,8 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over "user", "password", "password_secret_name", + "consistency_mode", + "commit_interval", }; if (!CheckReplicationSettings(node.ReplicationSettings(), supportedSettings, ctx)) { diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 6a5f00a860..eb93d82d58 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -6408,6 +6408,37 @@ Y_UNIT_TEST_SUITE(KqpScheme) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "STATE is not supported in CREATE"); } + { + auto query = R"( + --!syntax_v1 + CREATE ASYNC REPLICATION `/Root/replication` FOR + `/Root/table` AS `/Root/replica` + WITH ( + CONNECTION_STRING = "grpc://localhost:2135/?database=/Root", + CONSISTENCY_MODE = "FOO" + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "Unknown consistency mode"); + } + { + auto query = R"( + --!syntax_v1 + CREATE ASYNC REPLICATION `/Root/replication` FOR + `/Root/table` AS `/Root/replica` + WITH ( + CONNECTION_STRING = "grpc://localhost:2135/?database=/Root", + CONSISTENCY_MODE = "WEAK", + COMMIT_INTERVAL = Interval("PT10S") + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "Ambiguous consistency mode"); + } // positive { |