aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2022-04-06 19:04:42 +0300
committerVitalii Gridnev <gridnevvvit@gmail.com>2022-04-06 19:04:42 +0300
commiteabc63496b5b31fddc07c6169fb53f4dc007dd36 (patch)
tree044d52fac6d775412764cd1f6ee36e384400b7bc
parent266ea0cb71ae74107fce6507a81992b33596dffd (diff)
downloadydb-eabc63496b5b31fddc07c6169fb53f4dc007dd36.tar.gz
fix functional tests
ref:64b0caeb029eb129495367d3540bdc14a65b46db
-rw-r--r--ydb/tests/functional/api/test_public_api.py3
-rw-r--r--ydb/tests/functional/sqs/test_acl.py54
2 files changed, 26 insertions, 31 deletions
diff --git a/ydb/tests/functional/api/test_public_api.py b/ydb/tests/functional/api/test_public_api.py
index 072ee35de38..24ad8766fb1 100644
--- a/ydb/tests/functional/api/test_public_api.py
+++ b/ydb/tests/functional/api/test_public_api.py
@@ -907,7 +907,8 @@ actions {
)
response = self.driver.scheme_client.describe_path(directory_name)
- assert_that(response.effective_permissions[0].subject, equal_to('gvit@staff'))
+ subjects = set([permission.subject for permission in response.effective_permissions])
+ assert "gvit@staff" in subjects
def test_too_many_pending_transactions(self):
driver = ydb.Driver(self.connection_params)
diff --git a/ydb/tests/functional/sqs/test_acl.py b/ydb/tests/functional/sqs/test_acl.py
index e5221f0f43b..e587249f97d 100644
--- a/ydb/tests/functional/sqs/test_acl.py
+++ b/ydb/tests/functional/sqs/test_acl.py
@@ -4,11 +4,11 @@ import logging
import time
import pytest
-from hamcrest import assert_that, equal_to, none, is_not, is_, raises
+from hamcrest import assert_that, none, is_not, is_, raises
import ydb.tests.library.common.yatest_common as yatest_common
-from sqs_test_base import KikimrSqsTestBase, get_sqs_client_path, get_test_with_sqs_installation_by_path, get_test_with_sqs_tenant_installation, to_bytes
+from sqs_test_base import KikimrSqsTestBase, get_sqs_client_path, get_test_with_sqs_installation_by_path, get_test_with_sqs_tenant_installation
class SqsACLTest(KikimrSqsTestBase):
@@ -57,26 +57,24 @@ class SqsACLTest(KikimrSqsTestBase):
return execute.std_out
raise RuntimeError("Failed to list permissions")
- def __generate_expected_simplified_permissions_response(self, sid=None, permission_names=[]):
- resp = 'AccountPermissions {'
- if not permission_names:
- resp += '}'
- else:
- resp += 'Permissions {{ Subject: \"{}\"'.format(sid)
- for permission_name in permission_names:
- resp += 'PermissionNames: \"{}\"'.format(permission_name)
- resp += '}} EffectivePermissions {{ Subject: \"{}\"'.format(sid)
- for permission_name in permission_names:
- resp += 'PermissionNames: \"{}\"'.format(permission_name)
- resp += '}}'
+ def _extract_permissions_for(self, sid, message):
+ permissions = set()
+ for part in message.decode('utf-8').split('Subject'):
+ if sid not in part:
+ continue
- return resp
+ for probably_name in part.split('PermissionNames:'):
+ if 'EffectivePermissions' in probably_name:
+ continue
- def __clean_and_compare(self, s1, s2):
- s1 = to_bytes(s1)[to_bytes(s1).find(to_bytes('AccountPermissions')):].replace(to_bytes('\n'), to_bytes('')).replace(to_bytes(' '), to_bytes(''))
- s2 = to_bytes(s2)[to_bytes(s2).find(to_bytes('AccountPermissions')):].replace(to_bytes('\n'), to_bytes('')).replace(to_bytes(' '), to_bytes(''))
+ if sid in probably_name:
+ continue
- assert_that(s1, equal_to(s2))
+ probably_name = "".join(list(filter(lambda x: x.isalnum(), probably_name)))
+ if probably_name:
+ permissions.add(probably_name)
+
+ return list(permissions)
def test_modify_permissions(self):
queue_url = self._create_queue_and_assert(self.queue_name, False, True)
@@ -89,9 +87,8 @@ class SqsACLTest(KikimrSqsTestBase):
send_message_permission = 'SendMessage'
# no permissions expected
- description = self._list_permissions(self._username)
- expected_description = self.__generate_expected_simplified_permissions_response()
- self.__clean_and_compare(description, expected_description)
+ description = self._extract_permissions_for(alkonavt_sid, self._list_permissions(self._username))
+ assert description == []
# two permissions expected
self._modify_permissions(
@@ -100,29 +97,26 @@ class SqsACLTest(KikimrSqsTestBase):
alkonavt_sid + ':' + ','.join([create_queue_permission, send_message_permission])
)
description = self._list_permissions(self._username)
- expected_description = self.__generate_expected_simplified_permissions_response(alkonavt_sid, [create_queue_permission, send_message_permission])
- self.__clean_and_compare(description, expected_description)
+ assert self._extract_permissions_for(alkonavt_sid, description) == [create_queue_permission, send_message_permission]
# single permission expected
self._modify_permissions(self._username, 'revoke', alkonavt_sid + ':' + create_queue_permission)
description = self._list_permissions(self._username)
- expected_description = self.__generate_expected_simplified_permissions_response(alkonavt_sid, [send_message_permission])
- self.__clean_and_compare(description, expected_description)
+ assert self._extract_permissions_for(alkonavt_sid, description) == [send_message_permission]
receive_message_permission = 'ReceiveMessage'
# other single permission expected
self._modify_permissions(self._username, 'set', alkonavt_sid + ':' + receive_message_permission)
description = self._list_permissions(self._username)
- expected_description = self.__generate_expected_simplified_permissions_response(alkonavt_sid, [receive_message_permission])
- self.__clean_and_compare(description, expected_description)
+ assert self._extract_permissions_for(alkonavt_sid, description) == [receive_message_permission]
# clear all permissions
self._modify_permissions(self._username, 'set', berkanavt_sid + ':' + receive_message_permission)
self._modify_permissions(self._username, 'revoke', alkonavt_sid + ':' + create_queue_permission, True)
description = self._list_permissions(self._username)
- expected_description = self.__generate_expected_simplified_permissions_response()
- self.__clean_and_compare(description, expected_description)
+ assert self._extract_permissions_for(alkonavt_sid, description) == []
+ assert self._extract_permissions_for(berkanavt_sid, description) == []
def test_apply_permissions(self):
queue_url = self._create_queue_and_assert(self.queue_name, False, True)