diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-04-06 19:04:42 +0300 |
---|---|---|
committer | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-04-06 19:04:42 +0300 |
commit | eabc63496b5b31fddc07c6169fb53f4dc007dd36 (patch) | |
tree | 044d52fac6d775412764cd1f6ee36e384400b7bc | |
parent | 266ea0cb71ae74107fce6507a81992b33596dffd (diff) | |
download | ydb-eabc63496b5b31fddc07c6169fb53f4dc007dd36.tar.gz |
fix functional tests
ref:64b0caeb029eb129495367d3540bdc14a65b46db
-rw-r--r-- | ydb/tests/functional/api/test_public_api.py | 3 | ||||
-rw-r--r-- | ydb/tests/functional/sqs/test_acl.py | 54 |
2 files changed, 26 insertions, 31 deletions
diff --git a/ydb/tests/functional/api/test_public_api.py b/ydb/tests/functional/api/test_public_api.py index 072ee35de38..24ad8766fb1 100644 --- a/ydb/tests/functional/api/test_public_api.py +++ b/ydb/tests/functional/api/test_public_api.py @@ -907,7 +907,8 @@ actions { ) response = self.driver.scheme_client.describe_path(directory_name) - assert_that(response.effective_permissions[0].subject, equal_to('gvit@staff')) + subjects = set([permission.subject for permission in response.effective_permissions]) + assert "gvit@staff" in subjects def test_too_many_pending_transactions(self): driver = ydb.Driver(self.connection_params) diff --git a/ydb/tests/functional/sqs/test_acl.py b/ydb/tests/functional/sqs/test_acl.py index e5221f0f43b..e587249f97d 100644 --- a/ydb/tests/functional/sqs/test_acl.py +++ b/ydb/tests/functional/sqs/test_acl.py @@ -4,11 +4,11 @@ import logging import time import pytest -from hamcrest import assert_that, equal_to, none, is_not, is_, raises +from hamcrest import assert_that, none, is_not, is_, raises import ydb.tests.library.common.yatest_common as yatest_common -from sqs_test_base import KikimrSqsTestBase, get_sqs_client_path, get_test_with_sqs_installation_by_path, get_test_with_sqs_tenant_installation, to_bytes +from sqs_test_base import KikimrSqsTestBase, get_sqs_client_path, get_test_with_sqs_installation_by_path, get_test_with_sqs_tenant_installation class SqsACLTest(KikimrSqsTestBase): @@ -57,26 +57,24 @@ class SqsACLTest(KikimrSqsTestBase): return execute.std_out raise RuntimeError("Failed to list permissions") - def __generate_expected_simplified_permissions_response(self, sid=None, permission_names=[]): - resp = 'AccountPermissions {' - if not permission_names: - resp += '}' - else: - resp += 'Permissions {{ Subject: \"{}\"'.format(sid) - for permission_name in permission_names: - resp += 'PermissionNames: \"{}\"'.format(permission_name) - resp += '}} EffectivePermissions {{ Subject: \"{}\"'.format(sid) - for permission_name in permission_names: - resp += 'PermissionNames: \"{}\"'.format(permission_name) - resp += '}}' + def _extract_permissions_for(self, sid, message): + permissions = set() + for part in message.decode('utf-8').split('Subject'): + if sid not in part: + continue - return resp + for probably_name in part.split('PermissionNames:'): + if 'EffectivePermissions' in probably_name: + continue - def __clean_and_compare(self, s1, s2): - s1 = to_bytes(s1)[to_bytes(s1).find(to_bytes('AccountPermissions')):].replace(to_bytes('\n'), to_bytes('')).replace(to_bytes(' '), to_bytes('')) - s2 = to_bytes(s2)[to_bytes(s2).find(to_bytes('AccountPermissions')):].replace(to_bytes('\n'), to_bytes('')).replace(to_bytes(' '), to_bytes('')) + if sid in probably_name: + continue - assert_that(s1, equal_to(s2)) + probably_name = "".join(list(filter(lambda x: x.isalnum(), probably_name))) + if probably_name: + permissions.add(probably_name) + + return list(permissions) def test_modify_permissions(self): queue_url = self._create_queue_and_assert(self.queue_name, False, True) @@ -89,9 +87,8 @@ class SqsACLTest(KikimrSqsTestBase): send_message_permission = 'SendMessage' # no permissions expected - description = self._list_permissions(self._username) - expected_description = self.__generate_expected_simplified_permissions_response() - self.__clean_and_compare(description, expected_description) + description = self._extract_permissions_for(alkonavt_sid, self._list_permissions(self._username)) + assert description == [] # two permissions expected self._modify_permissions( @@ -100,29 +97,26 @@ class SqsACLTest(KikimrSqsTestBase): alkonavt_sid + ':' + ','.join([create_queue_permission, send_message_permission]) ) description = self._list_permissions(self._username) - expected_description = self.__generate_expected_simplified_permissions_response(alkonavt_sid, [create_queue_permission, send_message_permission]) - self.__clean_and_compare(description, expected_description) + assert self._extract_permissions_for(alkonavt_sid, description) == [create_queue_permission, send_message_permission] # single permission expected self._modify_permissions(self._username, 'revoke', alkonavt_sid + ':' + create_queue_permission) description = self._list_permissions(self._username) - expected_description = self.__generate_expected_simplified_permissions_response(alkonavt_sid, [send_message_permission]) - self.__clean_and_compare(description, expected_description) + assert self._extract_permissions_for(alkonavt_sid, description) == [send_message_permission] receive_message_permission = 'ReceiveMessage' # other single permission expected self._modify_permissions(self._username, 'set', alkonavt_sid + ':' + receive_message_permission) description = self._list_permissions(self._username) - expected_description = self.__generate_expected_simplified_permissions_response(alkonavt_sid, [receive_message_permission]) - self.__clean_and_compare(description, expected_description) + assert self._extract_permissions_for(alkonavt_sid, description) == [receive_message_permission] # clear all permissions self._modify_permissions(self._username, 'set', berkanavt_sid + ':' + receive_message_permission) self._modify_permissions(self._username, 'revoke', alkonavt_sid + ':' + create_queue_permission, True) description = self._list_permissions(self._username) - expected_description = self.__generate_expected_simplified_permissions_response() - self.__clean_and_compare(description, expected_description) + assert self._extract_permissions_for(alkonavt_sid, description) == [] + assert self._extract_permissions_for(berkanavt_sid, description) == [] def test_apply_permissions(self): queue_url = self._create_queue_and_assert(self.queue_name, False, True) |