aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2024-12-03 18:51:13 +0300
committerGitHub <noreply@github.com>2024-12-03 15:51:13 +0000
commit5c8e3e634e7436f1f9958439b48283cddf6e17e2 (patch)
tree6d534a313d1465b2fb9ced947207f3dd36cfe6c1
parent14a54ab15843ef7b15c3c45bf424294b2444607a (diff)
downloadydb-5c8e3e634e7436f1f9958439b48283cddf6e17e2.tar.gz
Consistency option (kqp part) (#12111)
-rw-r--r--ydb/core/kqp/host/kqp_gateway_proxy.cpp6
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp29
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway.cpp4
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway.h27
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_type_ann.cpp2
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp31
6 files changed, 99 insertions, 0 deletions
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
index 1f47b88897..2c02d7a7fe 100644
--- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp
+++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
@@ -2388,6 +2388,12 @@ public:
if (const auto& staticCreds = settings.Settings.StaticCredentials) {
staticCreds->Serialize(*params.MutableStaticCredentials());
}
+ if (settings.Settings.WeakConsistency) {
+ config.MutableWeakConsistency();
+ }
+ if (const auto& consistency = settings.Settings.StrongConsistency) {
+ consistency->Serialize(*config.MutableStrongConsistency());
+ }
auto& targets = *config.MutableSpecific();
for (const auto& [src, dst] : settings.Targets) {
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index b2b205be6e..f71b3eb414 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -688,6 +688,30 @@ namespace {
} else if (name == "password_secret_name") {
dstSettings.EnsureStaticCredentials().PasswordSecretName =
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value();
+ } else if (name == "consistency_mode") {
+ auto value = ToString(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value());
+ if (to_lower(value) == "strong") {
+ dstSettings.EnsureStrongConsistency();
+ } else if (to_lower(value) == "weak") {
+ dstSettings.EnsureWeakConsistency();
+ } else {
+ ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
+ TStringBuilder() << "Unknown consistency mode: " << value));
+ return false;
+ }
+ } else if (name == "commit_interval") {
+ YQL_ENSURE(setting.Value().Maybe<TCoInterval>());
+ const auto value = FromString<i64>(
+ setting.Value().Cast<TCoInterval>().Literal().Value()
+ );
+
+ if (value <= 0) {
+ ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
+ TStringBuilder() << name << " must be positive"));
+ return false;
+ }
+
+ dstSettings.EnsureStrongConsistency().CommitInterval = TDuration::FromValue(value);
} else if (name == "state") {
auto value = ToString(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value());
if (to_lower(value) == "done") {
@@ -735,6 +759,11 @@ namespace {
return false;
}
+ if (dstSettings.WeakConsistency && dstSettings.StrongConsistency) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos), "Ambiguous consistency mode"));
+ return false;
+ }
+
return true;
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway.cpp
index 233058ee4a..a45fdc5996 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway.cpp
@@ -76,6 +76,10 @@ void TReplicationSettings::TStaticCredentials::Serialize(NKikimrReplication::TSt
}
}
+void TReplicationSettings::TStrongConsistency::Serialize(NKikimrReplication::TReplicationConfig_TStrongConsistency& proto) const {
+ proto.SetCommitIntervalMilliSeconds(CommitInterval.MilliSeconds());
+}
+
TFuture<IKikimrGateway::TGenericResult> IKikimrGateway::CreatePath(const TString& path, TCreateDirFunc createDir) {
auto partsHolder = std::make_shared<TVector<TString>>(NKikimr::SplitPath(path));
auto& parts = *partsHolder;
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h
index 86e304fb04..7774ce7f50 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway.h
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h
@@ -44,6 +44,7 @@ namespace NKikimr {
namespace NKikimrReplication {
class TOAuthToken;
class TStaticCredentials;
+ class TReplicationConfig_TStrongConsistency;
}
namespace NYql {
@@ -792,11 +793,21 @@ struct TReplicationSettings {
void Serialize(NKikimrReplication::TStaticCredentials& proto) const;
};
+ struct TWeakConsistency {};
+
+ struct TStrongConsistency {
+ TDuration CommitInterval;
+
+ void Serialize(NKikimrReplication::TReplicationConfig_TStrongConsistency& proto) const;
+ };
+
TMaybe<TString> ConnectionString;
TMaybe<TString> Endpoint;
TMaybe<TString> Database;
TMaybe<TOAuthToken> OAuthToken;
TMaybe<TStaticCredentials> StaticCredentials;
+ TMaybe<TWeakConsistency> WeakConsistency;
+ TMaybe<TStrongConsistency> StrongConsistency;
TMaybe<TStateDone> StateDone;
TOAuthToken& EnsureOAuthToken() {
@@ -815,6 +826,22 @@ struct TReplicationSettings {
return *StaticCredentials;
}
+ TWeakConsistency& EnsureWeakConsistency() {
+ if (!WeakConsistency) {
+ WeakConsistency = TWeakConsistency();
+ }
+
+ return *WeakConsistency;
+ }
+
+ TStrongConsistency& EnsureStrongConsistency() {
+ if (!StrongConsistency) {
+ StrongConsistency = TStrongConsistency();
+ }
+
+ return *StrongConsistency;
+ }
+
using EFailoverMode = TStateDone::EFailoverMode;
TStateDone& EnsureStateDone(EFailoverMode mode = EFailoverMode::Consistent) {
if (!StateDone) {
diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
index 6131498239..f02d512440 100644
--- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
@@ -1771,6 +1771,8 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
"user",
"password",
"password_secret_name",
+ "consistency_mode",
+ "commit_interval",
};
if (!CheckReplicationSettings(node.ReplicationSettings(), supportedSettings, ctx)) {
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index 6a5f00a860..eb93d82d58 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -6408,6 +6408,37 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "STATE is not supported in CREATE");
}
+ {
+ auto query = R"(
+ --!syntax_v1
+ CREATE ASYNC REPLICATION `/Root/replication` FOR
+ `/Root/table` AS `/Root/replica`
+ WITH (
+ CONNECTION_STRING = "grpc://localhost:2135/?database=/Root",
+ CONSISTENCY_MODE = "FOO"
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "Unknown consistency mode");
+ }
+ {
+ auto query = R"(
+ --!syntax_v1
+ CREATE ASYNC REPLICATION `/Root/replication` FOR
+ `/Root/table` AS `/Root/replica`
+ WITH (
+ CONNECTION_STRING = "grpc://localhost:2135/?database=/Root",
+ CONSISTENCY_MODE = "WEAK",
+ COMMIT_INTERVAL = Interval("PT10S")
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "Ambiguous consistency mode");
+ }
// positive
{