diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-05-23 15:28:34 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-05-23 15:28:34 +0300 |
commit | 3a5d4cb4da36d491f57782e316476a793654e52a (patch) | |
tree | 9ef7cd9e45d5ac8571436f6e46b17de533bf9ad9 | |
parent | 61a58106f42a87d3d324e03e77642faf0a488071 (diff) | |
download | ydb-3a5d4cb4da36d491f57782e316476a793654e52a.tar.gz |
Aws region in cluster config
-rw-r--r-- | ydb/core/base/appdata.h | 1 | ||||
-rw-r--r-- | ydb/core/driver_lib/run/run.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 5 | ||||
-rw-r--r-- | ydb/core/protos/console_config.proto | 1 | ||||
-rw-r--r-- | ydb/core/testlib/actors/test_runtime.cpp | 1 | ||||
-rw-r--r-- | ydb/core/testlib/basics/appdata.cpp | 6 | ||||
-rw-r--r-- | ydb/core/testlib/basics/appdata.h | 2 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.cpp | 1 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 12 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_user_table.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 50 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.h | 1 |
17 files changed, 92 insertions, 11 deletions
diff --git a/ydb/core/base/appdata.h b/ydb/core/base/appdata.h index 0b1c70780cc..dc2b02faa2f 100644 --- a/ydb/core/base/appdata.h +++ b/ydb/core/base/appdata.h @@ -148,6 +148,7 @@ struct TAppData { NKikimrConfig::TCompactionConfig CompactionConfig; NKikimrConfig::TDomainsConfig DomainsConfig; NKikimrConfig::TBootstrap BootstrapConfig; + NKikimrConfig::TAwsCompatibilityConfig AwsCompatibilityConfig; std::optional<NKikimrSharedCache::TSharedCacheConfig> SharedCacheConfig; bool EnforceUserTokenRequirement = false; bool AllowHugeKeyValueDeletes = true; // delete when all clients limit deletes per request diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 357498e4aca..a196f6c7fb7 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1114,6 +1114,10 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig) AppData->SharedCacheConfig = runConfig.AppConfig.GetSharedCacheConfig(); } + if (runConfig.AppConfig.HasAwsCompatibilityConfig()) { + AppData->AwsCompatibilityConfig = runConfig.AppConfig.GetAwsCompatibilityConfig(); + } + // setup resource profiles AppData->ResourceProfiles = new TResourceProfiles; if (runConfig.AppConfig.GetBootstrapConfig().ResourceProfilesSize()) diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index d332e0f01ec..cbf27bdccf8 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -2888,7 +2888,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { auto query = R"( --!syntax_v1 ALTER TABLE `/Root/table` ADD CHANGEFEED `feed` WITH ( - MODE = 'NEW_AND_OLD_IMAGES', FORMAT = 'DOCUMENT_TABLE_JSON', AWS_REGION = 'aws:region' + MODE = 'NEW_AND_OLD_IMAGES', FORMAT = 'DYNAMODB_STREAMS_JSON', AWS_REGION = 'aws:region' ); )"; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 18fe7fde277..521799f151f 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1780,6 +1780,10 @@ message TLocalPgWireConfig { optional string SslCertificate = 5; } +message TAwsCompatibilityConfig { + optional string AwsRegion = 1; +} + message TLabel { optional string Name = 1; optional string Value = 2; @@ -1848,6 +1852,7 @@ message TAppConfig { optional TConveyorConfig ConveyorConfig = 65; optional TColumnShardConfig ColumnShardConfig = 66; optional TLocalPgWireConfig LocalPgWireConfig = 69; + optional TAwsCompatibilityConfig AwsCompatibilityConfig = 70; repeated TNamedConfig NamedConfigs = 100; optional string ClusterYamlConfig = 101; diff --git a/ydb/core/protos/console_config.proto b/ydb/core/protos/console_config.proto index 5293ff2afd6..1d80f943c20 100644 --- a/ydb/core/protos/console_config.proto +++ b/ydb/core/protos/console_config.proto @@ -122,6 +122,7 @@ message TConfigItem { YamlConfigEnabledItem = 64; ConveyorConfigItem = 65; ColumnShardConfigItem = 66; + AwsCompatibilityConfigItem = 70; NamedConfigsItem = 100; ClusterYamlConfigItem = 101; diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp index b8443e72063..553d915ce32 100644 --- a/ydb/core/testlib/actors/test_runtime.cpp +++ b/ydb/core/testlib/actors/test_runtime.cpp @@ -145,6 +145,7 @@ namespace NActors { nodeAppData->DataShardConfig = app0->DataShardConfig; nodeAppData->ColumnShardConfig = app0->ColumnShardConfig; nodeAppData->MeteringConfig = app0->MeteringConfig; + nodeAppData->AwsCompatibilityConfig = app0->AwsCompatibilityConfig; nodeAppData->EnableMvccSnapshotWithLegacyDomainRoot = app0->EnableMvccSnapshotWithLegacyDomainRoot; nodeAppData->IoContextFactory = app0->IoContextFactory; if (KeyConfigGenerator) { diff --git a/ydb/core/testlib/basics/appdata.cpp b/ydb/core/testlib/basics/appdata.cpp index d710e466d0a..cd496db1efc 100644 --- a/ydb/core/testlib/basics/appdata.cpp +++ b/ydb/core/testlib/basics/appdata.cpp @@ -57,6 +57,7 @@ namespace NKikimr { app->ColumnShardConfig = ColumnShardConfig; app->SchemeShardConfig = SchemeShardConfig; app->MeteringConfig = MeteringConfig; + app->AwsCompatibilityConfig = AwsCompatibilityConfig; app->FeatureFlags = FeatureFlags; // This is a special setting active in test runtime only @@ -193,4 +194,9 @@ namespace NKikimr { { FeatureFlags.SetEnableDbCounters(value); } + + void TAppPrepare::SetAwsRegion(const TString& value) + { + AwsCompatibilityConfig.SetAwsRegion(value); + } } diff --git a/ydb/core/testlib/basics/appdata.h b/ydb/core/testlib/basics/appdata.h index 90e8ae859f7..b902ce89cb3 100644 --- a/ydb/core/testlib/basics/appdata.h +++ b/ydb/core/testlib/basics/appdata.h @@ -80,6 +80,7 @@ namespace NKikimr { void SetEnableProtoSourceIdInfo(std::optional<bool> value); void SetEnablePqBilling(std::optional<bool> value); void SetEnableDbCounters(bool value); + void SetAwsRegion(const TString& value); TIntrusivePtr<TChannelProfiles> Channels; NKikimrBlobStorage::TNodeWardenServiceSet BSConf; @@ -94,6 +95,7 @@ namespace NKikimr { NKikimrConfig::TSchemeShardConfig SchemeShardConfig; NKikimrConfig::TMeteringConfig MeteringConfig; NKikimrPQ::TPQConfig PQConfig; + NKikimrConfig::TAwsCompatibilityConfig AwsCompatibilityConfig; private: TAutoPtr<TMine> Mine; diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 3a93133f463..fd070a1681e 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -173,6 +173,7 @@ namespace Tests { app.SetKeepSnapshotTimeout(Settings->KeepSnapshotTimeout); app.SetChangesQueueItemsLimit(Settings->ChangesQueueItemsLimit); app.SetChangesQueueBytesLimit(Settings->ChangesQueueBytesLimit); + app.SetAwsRegion(Settings->AwsRegion); app.CompactionConfig = Settings->CompactionConfig; app.FeatureFlags = Settings->FeatureFlags; diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index d838b6aac74..91862d263a0 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -132,6 +132,7 @@ namespace Tests { std::shared_ptr<NKikimr::NPQ::TPersQueueMirrorReaderFactory> PersQueueMirrorReaderFactory = std::make_shared<NKikimr::NPQ::TPersQueueMirrorReaderFactory>(); bool EnableMetering = false; TString MeteringFilePath; + TString AwsRegion; std::function<IActor*(const NKikimrProto::TAuthConfig&)> CreateTicketParser = NKikimr::CreateTicketParser; std::shared_ptr<TGrpcServiceFactory> GrpcServiceFactory; @@ -170,6 +171,7 @@ namespace Tests { TServerSettings& SetChangesQueueItemsLimit(ui64 value) { ChangesQueueItemsLimit = value; return *this; } TServerSettings& SetChangesQueueBytesLimit(ui64 value) { ChangesQueueBytesLimit = value; return *this; } TServerSettings& SetMeteringFilePath(const TString& path) { EnableMetering = true; MeteringFilePath = path; return *this; } + TServerSettings& SetAwsRegion(const TString& value) { AwsRegion = value; return *this; } TServerSettings& SetPersQueueGetReadSessionsInfoWorkerFactory( std::shared_ptr<NKikimr::NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> factory ) { diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp index 14b20389468..82ab36e514d 100644 --- a/ydb/core/tx/datashard/change_record.cpp +++ b/ydb/core/tx/datashard/change_record.cpp @@ -263,7 +263,7 @@ static void ToAttributeValues(TUserTable::TCPtr schema, NJson::TJsonValue& value } } -void TChangeRecord::SerializeToDocApiJson(NJson::TJsonValue& json, const TDocApiJsonOptions& opts) const { +void TChangeRecord::SerializeToDynamoDBStreamsJson(NJson::TJsonValue& json, const TAwsJsonOptions& opts) const { Y_VERIFY(Kind == EKind::CdcDataChange); Y_VERIFY(Schema); diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h index 4668e2717c4..a9b74a65492 100644 --- a/ydb/core/tx/datashard/change_record.h +++ b/ydb/core/tx/datashard/change_record.h @@ -27,7 +27,7 @@ public: CdcDataChange, }; - struct TDocApiJsonOptions { + struct TAwsJsonOptions { TString AwsRegion; NKikimrSchemeOp::ECdcStreamMode StreamMode; ui64 ShardId; @@ -49,7 +49,7 @@ public: void SerializeToProto(NKikimrChangeExchange::TChangeRecord& record) const; void SerializeToYdbJson(NJson::TJsonValue& json, bool virtualTimestamps) const; - void SerializeToDocApiJson(NJson::TJsonValue& json, const TDocApiJsonOptions& opts) const; + void SerializeToDynamoDBStreamsJson(NJson::TJsonValue& json, const TAwsJsonOptions& opts) const; TConstArrayRef<TCell> GetKey() const; i64 GetSeqNo() const; diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 552e1901783..0aa7065c80c 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -82,6 +82,12 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti LOG_D("Handle " << ev->Get()->ToString()); NKikimrClient::TPersQueueRequest request; + const auto awsJsonOpts = TChangeRecord::TAwsJsonOptions{ + .AwsRegion = Stream.AwsRegion.GetOrElse(AppData()->AwsCompatibilityConfig.GetAwsRegion()), + .StreamMode = Stream.Mode, + .ShardId = DataShard.TabletId, + }; + for (const auto& record : ev->Get()->Records) { if (record.GetSeqNo() <= MaxSeqNo) { continue; @@ -108,11 +114,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti case NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson: { NJson::TJsonValue json; if (Stream.Format == NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson) { - record.SerializeToDocApiJson(json, TChangeRecord::TDocApiJsonOptions{ - .AwsRegion = Stream.AwsRegion, - .StreamMode = Stream.Mode, - .ShardId = DataShard.TabletId, - }); + record.SerializeToDynamoDBStreamsJson(json, awsJsonOpts); } else { record.SerializeToYdbJson(json, Stream.VirtualTimestamps); } diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h index 631e5209255..476d4214fbd 100644 --- a/ydb/core/tx/datashard/datashard_user_table.h +++ b/ydb/core/tx/datashard/datashard_user_table.h @@ -295,7 +295,7 @@ struct TUserTable : public TThrRefBase { EFormat Format; EState State; bool VirtualTimestamps = false; - TString AwsRegion; + TMaybe<TString> AwsRegion; TCdcStream() = default; @@ -305,8 +305,10 @@ struct TUserTable : public TThrRefBase { , Format(streamDesc.GetFormat()) , State(streamDesc.GetState()) , VirtualTimestamps(streamDesc.GetVirtualTimestamps()) - , AwsRegion(streamDesc.GetAwsRegion()) { + if (const auto& awsRegion = streamDesc.GetAwsRegion()) { + AwsRegion = awsRegion; + } } }; diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 4f5d7c1ddef..6e0e152990d 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -829,6 +829,11 @@ Y_UNIT_TEST_SUITE(Cdc) { return streamDesc; } + TCdcStream WithAwsRegion(const TString& awsRegion, TCdcStream streamDesc) { + streamDesc.AwsRegion = awsRegion; + return streamDesc; + } + TString CalcPartitionKey(const TString& data) { NJson::TJsonValue json; UNIT_ASSERT(NJson::ReadJsonTree(data, &json)); @@ -2360,6 +2365,51 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(AwsRegion) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetAwsRegion("defaultRegion") + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", DocApiTable()); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson, "Stream1"))); + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + WithAwsRegion("customRegion", KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson, "Stream2")))); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (__Hash, id_shard, id_sort, __RowData) VALUES ( + 1, "10", "100", JsonDocument('{"M":{"color":{"S":"pink"},"weight":{"N":"4.5"}}}') + ); + )"); + + auto checkAwsRegion = [&](const TString& path, const char* awsRegion) { + while (true) { + const auto records = GetRecords(runtime, edgeActor, path, 0); + if (records.size() >= 1) { + for (const auto& [_, record] : records) { + UNIT_ASSERT_STRING_CONTAINS(record, Sprintf(R"("awsRegion":"%s")", awsRegion)); + } + + break; + } + + SimulateSleep(server, TDuration::Seconds(1)); + } + }; + + checkAwsRegion("/Root/Table/Stream1", "defaultRegion"); + checkAwsRegion("/Root/Table/Stream2", "customRegion"); + } + } // Cdc } // NKikimr diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp index eafc085ebbd..bc446d86443 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/datashard_ut_common.cpp @@ -1640,6 +1640,9 @@ ui64 AsyncAlterAddStream( if (streamDesc.InitialState) { desc.MutableStreamDescription()->SetState(*streamDesc.InitialState); } + if (streamDesc.AwsRegion) { + desc.MutableStreamDescription()->SetAwsRegion(*streamDesc.AwsRegion); + } return RunSchemeTx(*server->GetRuntime(), std::move(request)); } diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index 4b256e4f946..7b407f87e7a 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -436,6 +436,7 @@ struct TShardedTableOptions { EFormat Format; TMaybe<EState> InitialState; bool VirtualTimestamps = false; + TMaybe<TString> AwsRegion; }; using TAttributes = THashMap<TString, TString>; |