aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-05-23 15:28:34 +0300
committerilnaz <ilnaz@ydb.tech>2023-05-23 15:28:34 +0300
commit3a5d4cb4da36d491f57782e316476a793654e52a (patch)
tree9ef7cd9e45d5ac8571436f6e46b17de533bf9ad9
parent61a58106f42a87d3d324e03e77642faf0a488071 (diff)
downloadydb-3a5d4cb4da36d491f57782e316476a793654e52a.tar.gz
Aws region in cluster config
-rw-r--r--ydb/core/base/appdata.h1
-rw-r--r--ydb/core/driver_lib/run/run.cpp4
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp2
-rw-r--r--ydb/core/protos/config.proto5
-rw-r--r--ydb/core/protos/console_config.proto1
-rw-r--r--ydb/core/testlib/actors/test_runtime.cpp1
-rw-r--r--ydb/core/testlib/basics/appdata.cpp6
-rw-r--r--ydb/core/testlib/basics/appdata.h2
-rw-r--r--ydb/core/testlib/test_client.cpp1
-rw-r--r--ydb/core/testlib/test_client.h2
-rw-r--r--ydb/core/tx/datashard/change_record.cpp2
-rw-r--r--ydb/core/tx/datashard/change_record.h4
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp12
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.h6
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp50
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h1
17 files changed, 92 insertions, 11 deletions
diff --git a/ydb/core/base/appdata.h b/ydb/core/base/appdata.h
index 0b1c70780cc..dc2b02faa2f 100644
--- a/ydb/core/base/appdata.h
+++ b/ydb/core/base/appdata.h
@@ -148,6 +148,7 @@ struct TAppData {
NKikimrConfig::TCompactionConfig CompactionConfig;
NKikimrConfig::TDomainsConfig DomainsConfig;
NKikimrConfig::TBootstrap BootstrapConfig;
+ NKikimrConfig::TAwsCompatibilityConfig AwsCompatibilityConfig;
std::optional<NKikimrSharedCache::TSharedCacheConfig> SharedCacheConfig;
bool EnforceUserTokenRequirement = false;
bool AllowHugeKeyValueDeletes = true; // delete when all clients limit deletes per request
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index 357498e4aca..a196f6c7fb7 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -1114,6 +1114,10 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig)
AppData->SharedCacheConfig = runConfig.AppConfig.GetSharedCacheConfig();
}
+ if (runConfig.AppConfig.HasAwsCompatibilityConfig()) {
+ AppData->AwsCompatibilityConfig = runConfig.AppConfig.GetAwsCompatibilityConfig();
+ }
+
// setup resource profiles
AppData->ResourceProfiles = new TResourceProfiles;
if (runConfig.AppConfig.GetBootstrapConfig().ResourceProfilesSize())
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index d332e0f01ec..cbf27bdccf8 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -2888,7 +2888,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
auto query = R"(
--!syntax_v1
ALTER TABLE `/Root/table` ADD CHANGEFEED `feed` WITH (
- MODE = 'NEW_AND_OLD_IMAGES', FORMAT = 'DOCUMENT_TABLE_JSON', AWS_REGION = 'aws:region'
+ MODE = 'NEW_AND_OLD_IMAGES', FORMAT = 'DYNAMODB_STREAMS_JSON', AWS_REGION = 'aws:region'
);
)";
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 18fe7fde277..521799f151f 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1780,6 +1780,10 @@ message TLocalPgWireConfig {
optional string SslCertificate = 5;
}
+message TAwsCompatibilityConfig {
+ optional string AwsRegion = 1;
+}
+
message TLabel {
optional string Name = 1;
optional string Value = 2;
@@ -1848,6 +1852,7 @@ message TAppConfig {
optional TConveyorConfig ConveyorConfig = 65;
optional TColumnShardConfig ColumnShardConfig = 66;
optional TLocalPgWireConfig LocalPgWireConfig = 69;
+ optional TAwsCompatibilityConfig AwsCompatibilityConfig = 70;
repeated TNamedConfig NamedConfigs = 100;
optional string ClusterYamlConfig = 101;
diff --git a/ydb/core/protos/console_config.proto b/ydb/core/protos/console_config.proto
index 5293ff2afd6..1d80f943c20 100644
--- a/ydb/core/protos/console_config.proto
+++ b/ydb/core/protos/console_config.proto
@@ -122,6 +122,7 @@ message TConfigItem {
YamlConfigEnabledItem = 64;
ConveyorConfigItem = 65;
ColumnShardConfigItem = 66;
+ AwsCompatibilityConfigItem = 70;
NamedConfigsItem = 100;
ClusterYamlConfigItem = 101;
diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp
index b8443e72063..553d915ce32 100644
--- a/ydb/core/testlib/actors/test_runtime.cpp
+++ b/ydb/core/testlib/actors/test_runtime.cpp
@@ -145,6 +145,7 @@ namespace NActors {
nodeAppData->DataShardConfig = app0->DataShardConfig;
nodeAppData->ColumnShardConfig = app0->ColumnShardConfig;
nodeAppData->MeteringConfig = app0->MeteringConfig;
+ nodeAppData->AwsCompatibilityConfig = app0->AwsCompatibilityConfig;
nodeAppData->EnableMvccSnapshotWithLegacyDomainRoot = app0->EnableMvccSnapshotWithLegacyDomainRoot;
nodeAppData->IoContextFactory = app0->IoContextFactory;
if (KeyConfigGenerator) {
diff --git a/ydb/core/testlib/basics/appdata.cpp b/ydb/core/testlib/basics/appdata.cpp
index d710e466d0a..cd496db1efc 100644
--- a/ydb/core/testlib/basics/appdata.cpp
+++ b/ydb/core/testlib/basics/appdata.cpp
@@ -57,6 +57,7 @@ namespace NKikimr {
app->ColumnShardConfig = ColumnShardConfig;
app->SchemeShardConfig = SchemeShardConfig;
app->MeteringConfig = MeteringConfig;
+ app->AwsCompatibilityConfig = AwsCompatibilityConfig;
app->FeatureFlags = FeatureFlags;
// This is a special setting active in test runtime only
@@ -193,4 +194,9 @@ namespace NKikimr {
{
FeatureFlags.SetEnableDbCounters(value);
}
+
+ void TAppPrepare::SetAwsRegion(const TString& value)
+ {
+ AwsCompatibilityConfig.SetAwsRegion(value);
+ }
}
diff --git a/ydb/core/testlib/basics/appdata.h b/ydb/core/testlib/basics/appdata.h
index 90e8ae859f7..b902ce89cb3 100644
--- a/ydb/core/testlib/basics/appdata.h
+++ b/ydb/core/testlib/basics/appdata.h
@@ -80,6 +80,7 @@ namespace NKikimr {
void SetEnableProtoSourceIdInfo(std::optional<bool> value);
void SetEnablePqBilling(std::optional<bool> value);
void SetEnableDbCounters(bool value);
+ void SetAwsRegion(const TString& value);
TIntrusivePtr<TChannelProfiles> Channels;
NKikimrBlobStorage::TNodeWardenServiceSet BSConf;
@@ -94,6 +95,7 @@ namespace NKikimr {
NKikimrConfig::TSchemeShardConfig SchemeShardConfig;
NKikimrConfig::TMeteringConfig MeteringConfig;
NKikimrPQ::TPQConfig PQConfig;
+ NKikimrConfig::TAwsCompatibilityConfig AwsCompatibilityConfig;
private:
TAutoPtr<TMine> Mine;
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 3a93133f463..fd070a1681e 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -173,6 +173,7 @@ namespace Tests {
app.SetKeepSnapshotTimeout(Settings->KeepSnapshotTimeout);
app.SetChangesQueueItemsLimit(Settings->ChangesQueueItemsLimit);
app.SetChangesQueueBytesLimit(Settings->ChangesQueueBytesLimit);
+ app.SetAwsRegion(Settings->AwsRegion);
app.CompactionConfig = Settings->CompactionConfig;
app.FeatureFlags = Settings->FeatureFlags;
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index d838b6aac74..91862d263a0 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -132,6 +132,7 @@ namespace Tests {
std::shared_ptr<NKikimr::NPQ::TPersQueueMirrorReaderFactory> PersQueueMirrorReaderFactory = std::make_shared<NKikimr::NPQ::TPersQueueMirrorReaderFactory>();
bool EnableMetering = false;
TString MeteringFilePath;
+ TString AwsRegion;
std::function<IActor*(const NKikimrProto::TAuthConfig&)> CreateTicketParser = NKikimr::CreateTicketParser;
std::shared_ptr<TGrpcServiceFactory> GrpcServiceFactory;
@@ -170,6 +171,7 @@ namespace Tests {
TServerSettings& SetChangesQueueItemsLimit(ui64 value) { ChangesQueueItemsLimit = value; return *this; }
TServerSettings& SetChangesQueueBytesLimit(ui64 value) { ChangesQueueBytesLimit = value; return *this; }
TServerSettings& SetMeteringFilePath(const TString& path) { EnableMetering = true; MeteringFilePath = path; return *this; }
+ TServerSettings& SetAwsRegion(const TString& value) { AwsRegion = value; return *this; }
TServerSettings& SetPersQueueGetReadSessionsInfoWorkerFactory(
std::shared_ptr<NKikimr::NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> factory
) {
diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp
index 14b20389468..82ab36e514d 100644
--- a/ydb/core/tx/datashard/change_record.cpp
+++ b/ydb/core/tx/datashard/change_record.cpp
@@ -263,7 +263,7 @@ static void ToAttributeValues(TUserTable::TCPtr schema, NJson::TJsonValue& value
}
}
-void TChangeRecord::SerializeToDocApiJson(NJson::TJsonValue& json, const TDocApiJsonOptions& opts) const {
+void TChangeRecord::SerializeToDynamoDBStreamsJson(NJson::TJsonValue& json, const TAwsJsonOptions& opts) const {
Y_VERIFY(Kind == EKind::CdcDataChange);
Y_VERIFY(Schema);
diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h
index 4668e2717c4..a9b74a65492 100644
--- a/ydb/core/tx/datashard/change_record.h
+++ b/ydb/core/tx/datashard/change_record.h
@@ -27,7 +27,7 @@ public:
CdcDataChange,
};
- struct TDocApiJsonOptions {
+ struct TAwsJsonOptions {
TString AwsRegion;
NKikimrSchemeOp::ECdcStreamMode StreamMode;
ui64 ShardId;
@@ -49,7 +49,7 @@ public:
void SerializeToProto(NKikimrChangeExchange::TChangeRecord& record) const;
void SerializeToYdbJson(NJson::TJsonValue& json, bool virtualTimestamps) const;
- void SerializeToDocApiJson(NJson::TJsonValue& json, const TDocApiJsonOptions& opts) const;
+ void SerializeToDynamoDBStreamsJson(NJson::TJsonValue& json, const TAwsJsonOptions& opts) const;
TConstArrayRef<TCell> GetKey() const;
i64 GetSeqNo() const;
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 552e1901783..0aa7065c80c 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -82,6 +82,12 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
LOG_D("Handle " << ev->Get()->ToString());
NKikimrClient::TPersQueueRequest request;
+ const auto awsJsonOpts = TChangeRecord::TAwsJsonOptions{
+ .AwsRegion = Stream.AwsRegion.GetOrElse(AppData()->AwsCompatibilityConfig.GetAwsRegion()),
+ .StreamMode = Stream.Mode,
+ .ShardId = DataShard.TabletId,
+ };
+
for (const auto& record : ev->Get()->Records) {
if (record.GetSeqNo() <= MaxSeqNo) {
continue;
@@ -108,11 +114,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
case NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson: {
NJson::TJsonValue json;
if (Stream.Format == NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson) {
- record.SerializeToDocApiJson(json, TChangeRecord::TDocApiJsonOptions{
- .AwsRegion = Stream.AwsRegion,
- .StreamMode = Stream.Mode,
- .ShardId = DataShard.TabletId,
- });
+ record.SerializeToDynamoDBStreamsJson(json, awsJsonOpts);
} else {
record.SerializeToYdbJson(json, Stream.VirtualTimestamps);
}
diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h
index 631e5209255..476d4214fbd 100644
--- a/ydb/core/tx/datashard/datashard_user_table.h
+++ b/ydb/core/tx/datashard/datashard_user_table.h
@@ -295,7 +295,7 @@ struct TUserTable : public TThrRefBase {
EFormat Format;
EState State;
bool VirtualTimestamps = false;
- TString AwsRegion;
+ TMaybe<TString> AwsRegion;
TCdcStream() = default;
@@ -305,8 +305,10 @@ struct TUserTable : public TThrRefBase {
, Format(streamDesc.GetFormat())
, State(streamDesc.GetState())
, VirtualTimestamps(streamDesc.GetVirtualTimestamps())
- , AwsRegion(streamDesc.GetAwsRegion())
{
+ if (const auto& awsRegion = streamDesc.GetAwsRegion()) {
+ AwsRegion = awsRegion;
+ }
}
};
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 4f5d7c1ddef..6e0e152990d 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -829,6 +829,11 @@ Y_UNIT_TEST_SUITE(Cdc) {
return streamDesc;
}
+ TCdcStream WithAwsRegion(const TString& awsRegion, TCdcStream streamDesc) {
+ streamDesc.AwsRegion = awsRegion;
+ return streamDesc;
+ }
+
TString CalcPartitionKey(const TString& data) {
NJson::TJsonValue json;
UNIT_ASSERT(NJson::ReadJsonTree(data, &json));
@@ -2360,6 +2365,51 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
+ Y_UNIT_TEST(AwsRegion) {
+ TPortManager portManager;
+ TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
+ .SetUseRealThreads(false)
+ .SetDomainName("Root")
+ .SetAwsRegion("defaultRegion")
+ );
+
+ auto& runtime = *server->GetRuntime();
+ const auto edgeActor = runtime.AllocateEdgeActor();
+
+ SetupLogging(runtime);
+ InitRoot(server, edgeActor);
+ CreateShardedTable(server, edgeActor, "/Root", "Table", DocApiTable());
+
+ WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
+ KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson, "Stream1")));
+ WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
+ WithAwsRegion("customRegion", KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson, "Stream2"))));
+
+ ExecSQL(server, edgeActor, R"(
+ UPSERT INTO `/Root/Table` (__Hash, id_shard, id_sort, __RowData) VALUES (
+ 1, "10", "100", JsonDocument('{"M":{"color":{"S":"pink"},"weight":{"N":"4.5"}}}')
+ );
+ )");
+
+ auto checkAwsRegion = [&](const TString& path, const char* awsRegion) {
+ while (true) {
+ const auto records = GetRecords(runtime, edgeActor, path, 0);
+ if (records.size() >= 1) {
+ for (const auto& [_, record] : records) {
+ UNIT_ASSERT_STRING_CONTAINS(record, Sprintf(R"("awsRegion":"%s")", awsRegion));
+ }
+
+ break;
+ }
+
+ SimulateSleep(server, TDuration::Seconds(1));
+ }
+ };
+
+ checkAwsRegion("/Root/Table/Stream1", "defaultRegion");
+ checkAwsRegion("/Root/Table/Stream2", "customRegion");
+ }
+
} // Cdc
} // NKikimr
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp
index eafc085ebbd..bc446d86443 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_common.cpp
@@ -1640,6 +1640,9 @@ ui64 AsyncAlterAddStream(
if (streamDesc.InitialState) {
desc.MutableStreamDescription()->SetState(*streamDesc.InitialState);
}
+ if (streamDesc.AwsRegion) {
+ desc.MutableStreamDescription()->SetAwsRegion(*streamDesc.AwsRegion);
+ }
return RunSchemeTx(*server->GetRuntime(), std::move(request));
}
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index 4b256e4f946..7b407f87e7a 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -436,6 +436,7 @@ struct TShardedTableOptions {
EFormat Format;
TMaybe<EState> InitialState;
bool VirtualTimestamps = false;
+ TMaybe<TString> AwsRegion;
};
using TAttributes = THashMap<TString, TString>;