aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <hor911@gmail.com>2022-06-30 19:03:49 +0300
committerAleksandr Khoroshilov <hor911@gmail.com>2022-06-30 19:03:49 +0300
commitfd58b11067041cacf6feca44fe47f46bc01422dc (patch)
treeb43874aa28eb9adb801623260afe5e89fc3e8646
parent81fd9ea2d075516cb0725f85118dcca474cd42e3 (diff)
downloadydb-fd58b11067041cacf6feca44fe47f46bc01422dc.tar.gz
DC awareness
ref:cf26d330761ea0ba13530ba17766b91269d2ab35
-rw-r--r--library/cpp/actors/core/interconnect.cpp7
-rw-r--r--library/cpp/actors/core/interconnect.h2
-rw-r--r--ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp6
-rw-r--r--ydb/core/yq/libs/actors/nodes_manager.cpp44
-rw-r--r--ydb/core/yq/libs/actors/nodes_manager.h1
-rw-r--r--ydb/core/yq/libs/config/protos/nodes_manager.proto1
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp13
-rw-r--r--ydb/core/yq/libs/control_plane_storage/schema.h1
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp1
-rw-r--r--ydb/core/yq/libs/init/init.cpp1
-rw-r--r--ydb/core/yq/libs/protos/yq_private.proto1
-rw-r--r--ydb/tests/library/harness/kikimr_config.py2
-rw-r--r--ydb/tests/library/harness/kikimr_runner.py13
13 files changed, 77 insertions, 16 deletions
diff --git a/library/cpp/actors/core/interconnect.cpp b/library/cpp/actors/core/interconnect.cpp
index 9b377cbb00..192fb68769 100644
--- a/library/cpp/actors/core/interconnect.cpp
+++ b/library/cpp/actors/core/interconnect.cpp
@@ -76,6 +76,13 @@ namespace NActors {
: TNodeLocation(ParseLocation(s))
{}
+ TNodeLocation::TNodeLocation(const TString& DataCenter, const TString& Module, const TString& Rack, const TString& Unit) {
+ if (DataCenter) Items.emplace_back(TKeys::DataCenter, DataCenter);
+ if (Module) Items.emplace_back(TKeys::Module, Module);
+ if (Rack) Items.emplace_back(TKeys::Rack, Rack);
+ if (Unit) Items.emplace_back(TKeys::Unit, Unit);
+ }
+
NActorsInterconnect::TNodeLocation TNodeLocation::ParseLocation(const TString& s) {
NActorsInterconnect::TNodeLocation res;
const bool success = res.ParseFromString(s);
diff --git a/library/cpp/actors/core/interconnect.h b/library/cpp/actors/core/interconnect.h
index 5ecac1deec..37fd7bff4d 100644
--- a/library/cpp/actors/core/interconnect.h
+++ b/library/cpp/actors/core/interconnect.h
@@ -57,6 +57,7 @@ namespace NActors {
TNodeLocation() = default;
TNodeLocation(const TNodeLocation&) = default;
TNodeLocation(TNodeLocation&&) = default;
+ TNodeLocation(const TString& DataCenter, const TString& Module = "", const TString& Rack = "", const TString& Unit = "");
// protobuf-parser ctor
explicit TNodeLocation(const NActorsInterconnect::TNodeLocation& location);
@@ -79,6 +80,7 @@ namespace NActors {
TString GetDataCenterId() const { return ToStringUpTo(TKeys::DataCenter); }
TString GetModuleId() const { return ToStringUpTo(TKeys::Module); }
TString GetRackId() const { return ToStringUpTo(TKeys::Rack); }
+ TString GetUnitId() const { return ToStringUpTo(TKeys::Unit); }
TString ToString() const { return ToStringUpTo(TKeys::E(Max<int>())); }
TString ToStringUpTo(TKeys::E upToKey) const;
diff --git a/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp b/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp
index bddc8b499b..41b39def48 100644
--- a/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp
+++ b/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp
@@ -643,6 +643,12 @@ protected:
}
}
+ if (config.ParseResult->Has("data-center")) {
+ if (AppConfig.HasYandexQueryConfig()) {
+ AppConfig.MutableYandexQueryConfig()->MutableNodesManager()->SetDataCenter(to_lower(DataCenter));
+ }
+ }
+
// MessageBus options.
if (!AppConfig.HasMessageBusConfig()) {
diff --git a/ydb/core/yq/libs/actors/nodes_manager.cpp b/ydb/core/yq/libs/actors/nodes_manager.cpp
index b8bbe99f75..0e930e518d 100644
--- a/ydb/core/yq/libs/actors/nodes_manager.cpp
+++ b/ydb/core/yq/libs/actors/nodes_manager.cpp
@@ -42,6 +42,7 @@ public:
const ::NYql::NCommon::TServiceCounters& serviceCounters,
const NConfig::TPrivateApiConfig& privateApiConfig,
const ui32& icPort,
+ const TString& dataCenter,
const TString& tenant,
ui64 mkqlInitialMemoryLimit)
: WorkerManagerCounters(workerManagerCounters)
@@ -53,6 +54,7 @@ public:
, MkqlInitialMemoryLimit(mkqlInitialMemoryLimit)
, YqSharedResources(yqSharedResources)
, IcPort(icPort)
+ , DataCenter(dataCenter)
, InternalServiceId(MakeInternalServiceActorId())
{
@@ -96,32 +98,39 @@ private:
ui64 memoryAllocated = AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic());
TVector<TPeer> nodes;
for (ui32 i = 0; i < count; ++i) {
- TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0};
+ TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0, DataCenter};
bool selfPlacement = true;
if (!Peers.empty()) {
auto FirstPeer = NextPeer;
while (true) {
- if (NextPeer >= Peers.size()) {
+ auto& nextNode = Peers[NextPeer];
+
+ if (++NextPeer >= Peers.size()) {
NextPeer = 0;
}
- auto& nextNode = Peers[NextPeer];
- ++NextPeer;
-
- if (NextPeer == FirstPeer // we closed loop w/o success, fallback to round robin then
- || nextNode.MemoryLimit == 0 // not limit defined for the node
- || nextNode.MemoryLimit > nextNode.MemoryAllocated + MkqlInitialMemoryLimit // memory is enough
+ if ( (DataCenter.empty() || nextNode.DataCenter.empty() || DataCenter == nextNode.DataCenter) // non empty DC must match
+ && ( nextNode.MemoryLimit == 0 // memory is NOT limited
+ || nextNode.MemoryLimit >= nextNode.MemoryAllocated + MkqlInitialMemoryLimit) // or enough
) {
// adjust allocated size to place next tasks correctly, will be reset after next health check
nextNode.MemoryAllocated += MkqlInitialMemoryLimit;
+ if (nextNode.NodeId == SelfId().NodeId()) {
+ // eventually synced self allocation info
+ memoryAllocated += MkqlInitialMemoryLimit;
+ }
node = nextNode;
selfPlacement = false;
break;
}
+
+ if (NextPeer == FirstPeer) { // we closed loop w/o success, fallback to self placement then
+ break;
+ }
}
}
if (selfPlacement) {
- if (memoryLimit == 0 || memoryLimit > memoryAllocated + MkqlInitialMemoryLimit) {
+ if (memoryLimit == 0 || memoryLimit >= memoryAllocated + MkqlInitialMemoryLimit) {
memoryAllocated += MkqlInitialMemoryLimit;
} else {
placementFailure = true;
@@ -190,6 +199,7 @@ private:
node.set_memory_limit(AtomicGet(WorkerManagerCounters.MkqlMemoryLimit->GetAtomic()));
node.set_memory_allocated(AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic()));
node.set_interconnect_port(IcPort);
+ node.set_data_center(DataCenter);
Send(InternalServiceId, new TEvInternalService::TEvHealthCheckRequest(request));
}
@@ -211,9 +221,16 @@ private:
nodesInfo.reserve(res.nodes().size());
Peers.clear();
+ std::set<ui32> nodeIds; // may be not unique
for (const auto& node : res.nodes()) {
+
+ if (nodeIds.contains(node.node_id())) {
+ continue;
+ }
+ nodeIds.insert(node.node_id());
+
Peers.push_back({node.node_id(), node.instance_id() + "," + node.hostname(),
- node.active_workers(), node.memory_limit(), node.memory_allocated()});
+ node.active_workers(), node.memory_limit(), node.memory_allocated(), node.data_center()});
if (node.interconnect_port()) {
nodesInfo.emplace_back(TEvInterconnect::TNodeInfo{
@@ -222,7 +239,7 @@ private:
node.hostname(), // host
node.hostname(), // resolveHost
static_cast<ui16>(node.interconnect_port()),
- /* NodeLocation = */{}});
+ TNodeLocation(node.data_center())});
}
}
@@ -251,6 +268,7 @@ private:
NYq::TYqSharedResources::TPtr YqSharedResources;
const ui32 IcPort; // Interconnect Port
+ TString DataCenter;
struct TPeer {
ui32 NodeId;
@@ -258,6 +276,7 @@ private:
ui64 ActiveWorkers;
ui64 MemoryLimit;
ui64 MemoryAllocated;
+ TString DataCenter;
};
TVector<TPeer> Peers;
ui32 ResourceIdPart = 0;
@@ -279,11 +298,12 @@ IActor* CreateNodesManager(
const NConfig::TPrivateApiConfig& privateApiConfig,
const NYq::TYqSharedResources::TPtr& yqSharedResources,
const ui32& icPort,
+ const TString& dataCenter,
const TString& tenant,
ui64 mkqlInitialMemoryLimit) {
return new TNodesManagerActor(yqSharedResources, workerManagerCounters,
timeProvider, randomProvider,
- serviceCounters, privateApiConfig, icPort, tenant, mkqlInitialMemoryLimit);
+ serviceCounters, privateApiConfig, icPort, dataCenter, tenant, mkqlInitialMemoryLimit);
}
} // namespace NYq
diff --git a/ydb/core/yq/libs/actors/nodes_manager.h b/ydb/core/yq/libs/actors/nodes_manager.h
index 111181c2b8..4a9b47f8b9 100644
--- a/ydb/core/yq/libs/actors/nodes_manager.h
+++ b/ydb/core/yq/libs/actors/nodes_manager.h
@@ -28,6 +28,7 @@ IActor* CreateNodesManager(
const NConfig::TPrivateApiConfig& privateApiConfig,
const NYq::TYqSharedResources::TPtr& yqSharedResources,
const ui32& icPort,
+ const TString& dataCenter = "",
const TString& tenant = "",
ui64 mkqlInitialMemoryLimit = 0);
diff --git a/ydb/core/yq/libs/config/protos/nodes_manager.proto b/ydb/core/yq/libs/config/protos/nodes_manager.proto
index cfc1073cae..5b390d3ba0 100644
--- a/ydb/core/yq/libs/config/protos/nodes_manager.proto
+++ b/ydb/core/yq/libs/config/protos/nodes_manager.proto
@@ -10,4 +10,5 @@ message TNodesManagerConfig {
bool Enabled = 1;
uint32 Port = 2;
string Host = 3;
+ string DataCenter = 4;
}
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp b/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp
index 733828438d..5fd2f1fdf3 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp
@@ -22,6 +22,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth
const ui64 memoryLimit = node.memory_limit();
const ui64 memoryAllocated = node.memory_allocated();
const ui32 icPort = node.interconnect_port();
+ const TString dataCenter = node.data_center();
const TString nodeAddress = node.node_address();
const auto ttl = TDuration::Seconds(5);
const auto deadline = startTime + ttl * 3;
@@ -47,6 +48,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth
node->set_memory_limit(memoryLimit);
node->set_memory_allocated(memoryAllocated);
node->set_node_address(nodeAddress);
+ node->set_data_center(dataCenter);
}
TSqlQueryBuilder readQueryBuilder(YdbConnection->TablePathPrefix, "NodesHealthCheck(read)");
@@ -54,7 +56,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth
readQueryBuilder.AddString("tenant", tenant);
readQueryBuilder.AddText(
"SELECT `" NODE_ID_COLUMN_NAME "`, `" INSTANCE_ID_COLUMN_NAME "`, `" HOST_NAME_COLUMN_NAME "`, `" ACTIVE_WORKERS_COLUMN_NAME"`, `" MEMORY_LIMIT_COLUMN_NAME"`, "
- "`" MEMORY_ALLOCATED_COLUMN_NAME"`, `" INTERCONNECT_PORT_COLUMN_NAME "`, `" NODE_ADDRESS_COLUMN_NAME "` FROM `" NODES_TABLE_NAME "`\n"
+ "`" MEMORY_ALLOCATED_COLUMN_NAME"`, `" INTERCONNECT_PORT_COLUMN_NAME "`, `" NODE_ADDRESS_COLUMN_NAME"`, `" DATA_CENTER_COLUMN_NAME "` FROM `" NODES_TABLE_NAME "`\n"
"WHERE `" TENANT_COLUMN_NAME"` = $tenant AND `" EXPIRE_AT_COLUMN_NAME "` >= $now;\n"
);
@@ -73,6 +75,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth
node->set_memory_allocated(*parser.ColumnParser(MEMORY_ALLOCATED_COLUMN_NAME).GetOptionalUint64());
node->set_interconnect_port(parser.ColumnParser(INTERCONNECT_PORT_COLUMN_NAME).GetOptionalUint32().GetOrElse(0));
node->set_node_address(*parser.ColumnParser(NODE_ADDRESS_COLUMN_NAME).GetOptionalString());
+ node->set_data_center(*parser.ColumnParser(DATA_CENTER_COLUMN_NAME).GetOptionalString());
}
}
}
@@ -88,11 +91,15 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth
writeQueryBuilder.AddUint64("memory_allocated", memoryAllocated);
writeQueryBuilder.AddUint32("ic_port", icPort);
writeQueryBuilder.AddString("node_address", nodeAddress);
+ writeQueryBuilder.AddString("data_center", dataCenter);
writeQueryBuilder.AddText(
"UPSERT INTO `" NODES_TABLE_NAME "`\n"
"(`" TENANT_COLUMN_NAME "`, `" NODE_ID_COLUMN_NAME "`, `" INSTANCE_ID_COLUMN_NAME "`,\n"
- "`" HOST_NAME_COLUMN_NAME "`, `" EXPIRE_AT_COLUMN_NAME "`, `" ACTIVE_WORKERS_COLUMN_NAME"`, `" MEMORY_LIMIT_COLUMN_NAME"`, `" MEMORY_ALLOCATED_COLUMN_NAME "`, `" INTERCONNECT_PORT_COLUMN_NAME "`, `" NODE_ADDRESS_COLUMN_NAME "`)\n"
- "VALUES ($tenant ,$node_id, $instance_id, $hostname, $deadline, $active_workers, $memory_limit, $memory_allocated, $ic_port, $node_address);\n"
+ "`" HOST_NAME_COLUMN_NAME "`, `" EXPIRE_AT_COLUMN_NAME "`, `" ACTIVE_WORKERS_COLUMN_NAME"`,\n"
+ "`" MEMORY_LIMIT_COLUMN_NAME "`, `" MEMORY_ALLOCATED_COLUMN_NAME "`, `" INTERCONNECT_PORT_COLUMN_NAME "`,\n"
+ "`" NODE_ADDRESS_COLUMN_NAME "`, `" DATA_CENTER_COLUMN_NAME"`)\n"
+ "VALUES ($tenant ,$node_id, $instance_id, $hostname, $deadline, $active_workers, $memory_limit,\n"
+ "$memory_allocated, $ic_port, $node_address, $data_center);\n"
);
const auto writeQuery = writeQueryBuilder.Build();
return std::make_pair(writeQuery.Sql, writeQuery.Params);
diff --git a/ydb/core/yq/libs/control_plane_storage/schema.h b/ydb/core/yq/libs/control_plane_storage/schema.h
index f4f2fea21c..344be98e12 100644
--- a/ydb/core/yq/libs/control_plane_storage/schema.h
+++ b/ydb/core/yq/libs/control_plane_storage/schema.h
@@ -63,6 +63,7 @@ namespace NYq {
#define MEMORY_ALLOCATED_COLUMN_NAME "memory_allocated"
#define INTERCONNECT_PORT_COLUMN_NAME "interconnect_port"
#define NODE_ADDRESS_COLUMN_NAME "node_address"
+#define DATA_CENTER_COLUMN_NAME "data_center"
#define HOST_NAME_COLUMN_NAME "hostname"
#define OWNER_COLUMN_NAME "owner"
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
index da842c919a..72189f65f3 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
@@ -169,6 +169,7 @@ void TYdbControlPlaneStorageActor::CreateNodesTable()
.AddNullableColumn(EXPIRE_AT_COLUMN_NAME, EPrimitiveType::Timestamp)
.AddNullableColumn(INTERCONNECT_PORT_COLUMN_NAME, EPrimitiveType::Uint32)
.AddNullableColumn(NODE_ADDRESS_COLUMN_NAME, EPrimitiveType::String)
+ .AddNullableColumn(DATA_CENTER_COLUMN_NAME, EPrimitiveType::String)
.SetTtlSettings(EXPIRE_AT_COLUMN_NAME)
.SetPrimaryKeyColumns({TENANT_COLUMN_NAME, NODE_ID_COLUMN_NAME})
.Build();
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 4826f78de9..15a0524d12 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -201,6 +201,7 @@ void Init(
protoConfig.GetPrivateApi(),
yqSharedResources,
icPort,
+ protoConfig.GetNodesManager().GetDataCenter(),
tenant,
mkqlInitialMemoryLimit);
diff --git a/ydb/core/yq/libs/protos/yq_private.proto b/ydb/core/yq/libs/protos/yq_private.proto
index 0af8d46c22..a67c0db9ab 100644
--- a/ydb/core/yq/libs/protos/yq_private.proto
+++ b/ydb/core/yq/libs/protos/yq_private.proto
@@ -150,6 +150,7 @@ message NodeInfo {
uint64 memory_allocated = 6;
uint32 interconnect_port = 7;
string node_address = 8;
+ string data_center = 9;
}
message NodesHealthCheckRequest {
diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py
index 12ff9e154f..02f92a9c44 100644
--- a/ydb/tests/library/harness/kikimr_config.py
+++ b/ydb/tests/library/harness/kikimr_config.py
@@ -136,6 +136,7 @@ class KikimrConfigGenerator(object):
bs_cache_file_path=None,
yq_tenant=None,
use_legacy_pq=False,
+ dc_mapping={},
):
self._version = version
self.use_log_files = use_log_files
@@ -197,6 +198,7 @@ class KikimrConfigGenerator(object):
self.__output_path = output_path or yatest_common.output_path()
self.node_kind = node_kind
self.yq_tenant = yq_tenant
+ self.dc_mapping = dc_mapping
self.__bs_cache_file_path = bs_cache_file_path
diff --git a/ydb/tests/library/harness/kikimr_runner.py b/ydb/tests/library/harness/kikimr_runner.py
index fcb15f49cd..d7c43575de 100644
--- a/ydb/tests/library/harness/kikimr_runner.py
+++ b/ydb/tests/library/harness/kikimr_runner.py
@@ -48,10 +48,11 @@ def join(a, b):
class KiKiMRNode(daemon.Daemon, kikimr_node_interface.NodeInterface):
def __init__(self, node_idx, config_path, port_allocator, cluster_name, configurator,
udfs_dir=None, role='node', node_broker_port=None, tenant_affiliation=None, encryption_key=None,
- binary_path=None):
+ binary_path=None, data_center=None):
super(kikimr_node_interface.NodeInterface, self).__init__()
self.node_id = node_idx
+ self.data_center = data_center
self.__cwd = None
self.__config_path = config_path
self.__cluster_name = cluster_name
@@ -180,6 +181,11 @@ class KiKiMRNode(daemon.Daemon, kikimr_node_interface.NodeInterface):
if self.sqs_port is not None:
command.extend(["--sqs-port=%d" % self.sqs_port])
+ if self.data_center is not None:
+ command.append(
+ "--data-center=%s" % self.data_center
+ )
+
logger.info('CFG_DIR_PATH="%s"', self.__config_path)
logger.info("Final command: %s", ' '.join(command).replace(self.__config_path, '$CFG_DIR_PATH'))
return command
@@ -357,6 +363,10 @@ class KiKiMR(kikimr_cluster_interface.KiKiMRClusterInterface):
def __register_node(self):
node_index = next(self._node_index_allocator)
+ data_center = None
+ if isinstance(self.__configurator.dc_mapping, dict):
+ if node_index in self.__configurator.dc_mapping:
+ data_center = self.__configurator.dc_mapping[node_index]
self._nodes[node_index] = KiKiMRNode(
node_index,
self.config_path,
@@ -365,6 +375,7 @@ class KiKiMR(kikimr_cluster_interface.KiKiMRClusterInterface):
configurator=self.__configurator,
udfs_dir=self.__common_udfs_dir,
tenant_affiliation=self.__configurator.yq_tenant,
+ data_center=data_center,
)
return self._nodes[node_index]