aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvporyadke <zalyalov@ydb.tech>2024-11-28 11:58:52 +0300
committerGitHub <noreply@github.com>2024-11-28 09:58:52 +0100
commit5c44bcafe0448a4655aab7a1f65a3a4530d987be (patch)
tree0499cc0f4ded9dc05bbd8503c8d36a58aa8ae94f
parent34a26dd332227aa4537d135c19c5a711044ffcfb (diff)
downloadydb-5c44bcafe0448a4655aab7a1f65a3a4530d987be.tar.gz
test followers with different ydb versions + fixes based on it (#12024)
-rw-r--r--ydb/core/mind/hive/hive_impl.cpp5
-rw-r--r--ydb/core/mind/hive/node_info.cpp12
-rw-r--r--ydb/core/mind/hive/tx__sync_tablets.cpp24
-rw-r--r--ydb/tests/functional/compatibility/test_followers.py154
-rw-r--r--ydb/tests/functional/compatibility/ya.make1
-rw-r--r--ydb/tests/library/clients/kikimr_http_client.py2
6 files changed, 180 insertions, 18 deletions
diff --git a/ydb/core/mind/hive/hive_impl.cpp b/ydb/core/mind/hive/hive_impl.cpp
index b752d31572..94c4671295 100644
--- a/ydb/core/mind/hive/hive_impl.cpp
+++ b/ydb/core/mind/hive/hive_impl.cpp
@@ -1609,6 +1609,11 @@ void THive::DeleteTablet(TTabletId tabletId) {
}
Y_ENSURE_LOG(nt->second.LockedTablets.count(&tablet) == 0, " Deleting tablet found on node " << nt->first << " in locked set");
}
+ for (const auto& followerGroup : tablet.FollowerGroups) {
+ for (auto& [_, dataCenter] : DataCenters) {
+ dataCenter.Followers.erase({tabletId, followerGroup.Id});
+ }
+ }
const i64 tabletsTotalDiff = -1 - (tablet.Followers.size());
UpdateCounterTabletsTotal(tabletsTotalDiff);
UpdateDomainTabletsTotal(tablet.ObjectDomain, tabletsTotalDiff);
diff --git a/ydb/core/mind/hive/node_info.cpp b/ydb/core/mind/hive/node_info.cpp
index ae5d1c4c31..5b12263baa 100644
--- a/ydb/core/mind/hive/node_info.cpp
+++ b/ydb/core/mind/hive/node_info.cpp
@@ -110,13 +110,11 @@ bool TNodeInfo::MatchesFilter(const TNodeFilter& filter, TTabletDebugState* debu
bool result = false;
for (const auto& candidate : effectiveAllowedDomains) {
- if (Hive.DomainHasNodes(candidate)) {
- result = std::find(ServicedDomains.begin(),
- ServicedDomains.end(),
- candidate) != ServicedDomains.end();
- if (result) {
- break;
- }
+ result = std::find(ServicedDomains.begin(),
+ ServicedDomains.end(),
+ candidate) != ServicedDomains.end();
+ if (result) {
+ break;
}
}
diff --git a/ydb/core/mind/hive/tx__sync_tablets.cpp b/ydb/core/mind/hive/tx__sync_tablets.cpp
index 33c84e313f..334a8ae84f 100644
--- a/ydb/core/mind/hive/tx__sync_tablets.cpp
+++ b/ydb/core/mind/hive/tx__sync_tablets.cpp
@@ -49,6 +49,18 @@ public:
tabletsToStop.insert(tablet->GetFullTabletId());
}
}
+ auto foundTablet = [&](TTabletInfo* tablet, const TString& state) {
+ auto tabletId = tablet->GetFullTabletId();
+ if (node.MatchesFilter(tablet->NodeFilter)) {
+ BLOG_TRACE("THive::TTxSyncTablets(" << Local << ") confirmed " << state << " tablet " << tabletId);
+ tabletsToStop.erase(tabletId);
+ } else {
+ BLOG_TRACE("THive::TTxSyncTablets(" << Local << ") confirmed " << state << " tablet " << tabletId << ", but it's not allowed to run on this node");
+ }
+ if (tablet->GetLeader().IsBootingSuppressed()) {
+ tablet->InitiateStop(SideEffects);
+ }
+ };
for (const NKikimrLocal::TEvSyncTablets_TTabletInfo& ti : SyncTablets.GetInbootTablets()) {
auto tabletId = std::pair<TTabletId, TFollowerId>(ti.GetTabletId(), ti.GetFollowerId());
TTabletInfo* tablet = Self->FindTablet(tabletId);
@@ -58,11 +70,7 @@ public:
tablet->GetLeader().KnownGeneration = ti.GetGeneration();
}
tablet->BecomeStarting(node.Id);
- BLOG_TRACE("THive::TTxSyncTablets(" << Local << ") confirmed starting tablet " << tabletId);
- tabletsToStop.erase(tabletId);
- if (tablet->GetLeader().IsBootingSuppressed()) {
- tablet->InitiateStop(SideEffects);
- }
+ foundTablet(tablet, "starting");
continue;
}
} else {
@@ -90,11 +98,7 @@ public:
NIceDb::TUpdate<Schema::TabletFollowerTablet::FollowerNode>(tablet->NodeId));
}
}
- BLOG_TRACE("THive::TTxSyncTablets(" << Local << ") confirmed running tablet " << tabletId);
- tabletsToStop.erase(tabletId);
- if (tablet->GetLeader().IsBootingSuppressed()) {
- tablet->InitiateStop(SideEffects);
- }
+ foundTablet(tablet, "running");
continue;
} else if (ti.GetBootMode() == NKikimrLocal::EBootMode::BOOT_MODE_FOLLOWER) {
SideEffects.Send(Local, new TEvLocal::TEvStopTablet(tabletId)); // the tablet is running somewhere else
diff --git a/ydb/tests/functional/compatibility/test_followers.py b/ydb/tests/functional/compatibility/test_followers.py
new file mode 100644
index 0000000000..7b931eee29
--- /dev/null
+++ b/ydb/tests/functional/compatibility/test_followers.py
@@ -0,0 +1,154 @@
+# -*- coding: utf-8 -*-
+import logging
+import time
+import yatest
+from ydb.tests.library.harness.kikimr_runner import KiKiMR
+from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
+from ydb.tests.library.harness.param_constants import kikimr_driver_path
+from ydb.tests.library.harness.util import LogLevels
+from ydb.tests.library.clients.kikimr_http_client import SwaggerClient
+from ydb.tests.library.common.types import Erasure, TabletStates, TabletTypes
+from ydb.tests.oss.ydb_sdk_import import ydb
+
+logger = logging.getLogger(__name__)
+
+
+class TestFollowersCompatibility(object):
+ @classmethod
+ def setup_class(cls):
+ last_stable_path = yatest.common.binary_path("ydb/tests/library/compatibility/ydbd-last-stable")
+ binary_paths = [kikimr_driver_path(), last_stable_path]
+ cls.datacenters = [1, 2, 3]
+ cls.dc_map = {i : cls.datacenters[(i - 1) % 3] for i in range(1, 10)}
+ cls.dc_map[0] = "NO DC"
+ cls.cfg = KikimrConfigGenerator(erasure=Erasure.MIRROR_3_DC,
+ binary_paths=binary_paths,
+ dc_mapping=cls.dc_map,
+ additional_log_configs={'HIVE': LogLevels.DEBUG},
+ use_in_memory_pdisks=False)
+ cls.cluster = KiKiMR(cls.cfg)
+ cls.cluster.start()
+ cls.endpoint = "%s:%s" % (
+ cls.cluster.nodes[1].host, cls.cluster.nodes[1].port
+ )
+ cls.driver = ydb.Driver(
+ ydb.DriverConfig(
+ database='/Root',
+ endpoint=cls.endpoint
+ )
+ )
+ cls.driver.wait()
+
+ @classmethod
+ def teardown_class(cls):
+ if hasattr(cls, 'driver'):
+ cls.driver.stop()
+
+ if hasattr(cls, 'cluster'):
+ cls.cluster.stop(kill=True)
+
+ def check_followers(self, node_idx=1):
+ client = SwaggerClient(self.cluster.nodes[node_idx].host, self.cluster.nodes[node_idx].mon_port)
+ try:
+ data = client.tablet_info()
+ except Exception:
+ return False, "could not connect"
+ if not data:
+ return False, "no answer from server"
+ tablet_info = data['TabletStateInfo']
+ tablet_to_dc = dict()
+ hive_node = 0
+ for tablet in tablet_info:
+ logger.debug(f"tablet_info: {tablet}")
+ if tablet.get('FollowerId', 0) == 0:
+ if tablet['Type'] == int(TabletTypes.FLAT_HIVE):
+ hive_node = tablet['NodeId']
+ if tablet['State'] != TabletStates.Active:
+ return False, "hive is down"
+ continue
+ if tablet['State'] != TabletStates.Active:
+ continue
+ tablet_id = tablet['TabletId']
+ if tablet_id not in tablet_to_dc:
+ tablet_to_dc[tablet_id] = []
+ tablet_to_dc[tablet_id].append(self.dc_map[tablet['NodeId']])
+ if hive_node == 0:
+ return False, "hive is down"
+ for tablet_id, data_centers in tablet_to_dc.items():
+ if self.cfg.get_binary_path(hive_node) == kikimr_driver_path():
+ if len(set(data_centers)) != len(self.datacenters) or len(data_centers) != len(self.datacenters):
+ msg = f"datacenters for tablet {tablet_id} are {data_centers}, hive on node {hive_node} - new version"
+ logger.info(msg)
+ return False, msg
+ else: # A very relaxed check for old hive version
+ if len(data_centers) > len(self.datacenters):
+ msg = f"datacenters for tablet {tablet_id} are {data_centers}, hive on node {hive_node} - old version"
+ logger.info(msg)
+ return False, msg
+ return True, "ok"
+
+ def test_followers_compatability(self):
+ session = ydb.retry_operation_sync(lambda: self.driver.table_client.session().create())
+
+ with ydb.SessionPool(self.driver, size=1) as pool:
+ with pool.checkout() as session:
+ session.execute_scheme(
+ """create table `sample_table` (
+ id Uint64,
+ value Uint64,
+ payload Utf8,
+ PRIMARY KEY(id)
+ )
+ WITH (
+ AUTO_PARTITIONING_BY_SIZE = ENABLED,
+ AUTO_PARTITIONING_PARTITION_SIZE_MB = 1,
+ READ_REPLICAS_SETTINGS = \"PER_AZ:1\"
+ );"""
+ )
+ id_ = 0
+
+ upsert_count = 4 # per iteration
+ iteration_count = 20
+ # Simulate some load with dc outages, so that:
+ # - Hive restarts and runs on different ydb versions
+ # - Tablets are splitting
+ # - Number of followers is changing
+ for i in range(iteration_count):
+ for node_id, node in self.cluster.nodes.items():
+ if node.data_center == self.datacenters[i % len(self.datacenters)]:
+ node.stop()
+ rows = []
+ for j in range(upsert_count):
+ row = {}
+ row["id"] = id_
+ row["value"] = 1
+ row["payload"] = "DEADBEEF" * 1024 * 256
+ rows.append(row)
+ id_ += 1
+
+ column_types = ydb.BulkUpsertColumns()
+ column_types.add_column("id", ydb.PrimitiveType.Uint64)
+ column_types.add_column("value", ydb.PrimitiveType.Uint64)
+ column_types.add_column("payload", ydb.PrimitiveType.Utf8)
+ try:
+ self.driver.table_client.bulk_upsert(
+ "Root/sample_table", rows, column_types
+ )
+ except Exception as e:
+ logger.error(e)
+
+ for node_id, node in self.cluster.nodes.items():
+ if node.data_center == self.datacenters[i % len(self.datacenters)]:
+ node.start()
+ retry_count = 0
+ backoff = .1
+ while True:
+ retry_count += 1
+ logger.info(f"check_followers: iteration {i}, try {retry_count}")
+ ok, msg = self.check_followers()
+ if retry_count == 5:
+ assert ok, msg
+ if ok:
+ break
+ time.sleep(backoff)
+ backoff *= 2
diff --git a/ydb/tests/functional/compatibility/ya.make b/ydb/tests/functional/compatibility/ya.make
index 2721872e5f..c7e2bf34b3 100644
--- a/ydb/tests/functional/compatibility/ya.make
+++ b/ydb/tests/functional/compatibility/ya.make
@@ -2,6 +2,7 @@ PY3TEST()
ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")
TEST_SRCS(
+ test_followers.py
test_compatibility.py
)
diff --git a/ydb/tests/library/clients/kikimr_http_client.py b/ydb/tests/library/clients/kikimr_http_client.py
index 68bbfe7375..b7a42e1926 100644
--- a/ydb/tests/library/clients/kikimr_http_client.py
+++ b/ydb/tests/library/clients/kikimr_http_client.py
@@ -146,7 +146,7 @@ class SwaggerClient(object):
return self.__hive_info(tablet_type=int(tablet_type))
def hive_info_all(self):
- return self.__hive_info()
+ return self.__hive_info(followers="true")
def __hive_info(self, **kwargs):
return self.__http_get_and_parse_json(