Merge "Use proper queries to update user properties"
This commit is contained in:
commit
5510241c31
@ -0,0 +1,4 @@
|
|||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- Use SET PASSWORD and RENAME USER queries
|
||||||
|
to update user properties.
|
@ -346,68 +346,43 @@ class CreateUser(object):
|
|||||||
return " ".join(query) + ";"
|
return " ".join(query) + ";"
|
||||||
|
|
||||||
|
|
||||||
class UpdateUser(object):
|
class RenameUser(object):
|
||||||
|
|
||||||
def __init__(self, user, host=None, clear=None, new_user=None,
|
def __init__(self, user, host=None, new_user=None,
|
||||||
new_host=None):
|
new_host=None):
|
||||||
self.user = user
|
self.user = user
|
||||||
self.host = host
|
self.host = host or '%'
|
||||||
self.clear = clear
|
|
||||||
self.new_user = new_user
|
self.new_user = new_user
|
||||||
self.new_host = new_host
|
self.new_host = new_host
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return str(self)
|
return str(self)
|
||||||
|
|
||||||
@property
|
def __str__(self):
|
||||||
def _set_password(self):
|
properties = {'old_name': self.user,
|
||||||
if self.clear:
|
'old_host': self.host,
|
||||||
return "Password=PASSWORD('%s')" % self.clear
|
'new_name': self.new_user or self.user,
|
||||||
|
'new_host': self.new_host or self.host}
|
||||||
|
return ("RENAME USER '%(old_name)s'@'%(old_host)s' TO "
|
||||||
|
"'%(new_name)s'@'%(new_host)s';" % properties)
|
||||||
|
|
||||||
@property
|
|
||||||
def _set_user(self):
|
|
||||||
if self.new_user:
|
|
||||||
return "User='%s'" % self.new_user
|
|
||||||
|
|
||||||
@property
|
class SetPassword(object):
|
||||||
def _set_host(self):
|
|
||||||
if self.new_host:
|
|
||||||
return "Host='%s'" % self.new_host
|
|
||||||
|
|
||||||
@property
|
def __init__(self, user, host=None, new_password=None):
|
||||||
def _host(self):
|
self.user = user
|
||||||
if not self.host:
|
self.host = host or '%'
|
||||||
return "%"
|
self.new_password = new_password or ''
|
||||||
return self.host
|
|
||||||
|
|
||||||
@property
|
def __repr__(self):
|
||||||
def _set_attrs(self):
|
return str(self)
|
||||||
sets = [self._set_user,
|
|
||||||
self._set_host,
|
|
||||||
self._set_password,
|
|
||||||
]
|
|
||||||
sets = [s for s in sets if s]
|
|
||||||
sets = ', '.join(sets)
|
|
||||||
return 'SET %s' % sets
|
|
||||||
|
|
||||||
@property
|
|
||||||
def _where(self):
|
|
||||||
clauses = []
|
|
||||||
if self.user:
|
|
||||||
clauses.append("User = '%s'" % self.user)
|
|
||||||
if self.host:
|
|
||||||
clauses.append("Host = '%s'" % self._host)
|
|
||||||
if not clauses:
|
|
||||||
return ""
|
|
||||||
return "WHERE %s" % " AND ".join(clauses)
|
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
query = ["UPDATE mysql.user",
|
properties = {'user_name': self.user,
|
||||||
self._set_attrs,
|
'user_host': self.host,
|
||||||
self._where,
|
'new_password': self.new_password}
|
||||||
]
|
return ("SET PASSWORD FOR '%(user_name)s'@'%(user_host)s' = "
|
||||||
query = [q for q in query if q]
|
"PASSWORD('%(new_password)s');" % properties)
|
||||||
return " ".join(query) + ";"
|
|
||||||
|
|
||||||
|
|
||||||
class DropUser(object):
|
class DropUser(object):
|
||||||
|
@ -258,8 +258,8 @@ class BaseMySqlAdmin(object):
|
|||||||
user = models.MySQLUser()
|
user = models.MySQLUser()
|
||||||
user.deserialize(user_dict)
|
user.deserialize(user_dict)
|
||||||
LOG.debug("\tDeserialized: %s." % user.__dict__)
|
LOG.debug("\tDeserialized: %s." % user.__dict__)
|
||||||
uu = sql_query.UpdateUser(user.name, host=user.host,
|
uu = sql_query.SetPassword(user.name, host=user.host,
|
||||||
clear=user.password)
|
new_password=user.password)
|
||||||
t = text(str(uu))
|
t = text(str(uu))
|
||||||
client.execute(t)
|
client.execute(t)
|
||||||
|
|
||||||
@ -267,33 +267,28 @@ class BaseMySqlAdmin(object):
|
|||||||
"""Change the attributes of an existing user."""
|
"""Change the attributes of an existing user."""
|
||||||
LOG.debug("Changing user attributes for user %s." % username)
|
LOG.debug("Changing user attributes for user %s." % username)
|
||||||
user = self._get_user(username, hostname)
|
user = self._get_user(username, hostname)
|
||||||
db_access = set()
|
|
||||||
grantee = set()
|
new_name = user_attrs.get('name')
|
||||||
with self.local_sql_client(self.mysql_app.get_engine()) as client:
|
new_host = user_attrs.get('host')
|
||||||
q = sql_query.Query()
|
new_password = user_attrs.get('password')
|
||||||
q.columns = ["grantee", "table_schema"]
|
|
||||||
q.tables = ["information_schema.SCHEMA_PRIVILEGES"]
|
if new_name or new_host or new_password:
|
||||||
q.group = ["grantee", "table_schema"]
|
|
||||||
q.where = ["privilege_type != 'USAGE'"]
|
with self.local_sql_client(self.mysql_app.get_engine()) as client:
|
||||||
t = text(str(q))
|
|
||||||
db_result = client.execute(t)
|
if new_password is not None:
|
||||||
for db in db_result:
|
uu = sql_query.SetPassword(user.name, host=user.host,
|
||||||
grantee.add(db['grantee'])
|
new_password=new_password)
|
||||||
if db['grantee'] == "'%s'@'%s'" % (user.name, user.host):
|
|
||||||
db_name = db['table_schema']
|
t = text(str(uu))
|
||||||
db_access.add(db_name)
|
client.execute(t)
|
||||||
with self.local_sql_client(self.mysql_app.get_engine()) as client:
|
|
||||||
uu = sql_query.UpdateUser(user.name, host=user.host,
|
if new_name or new_host:
|
||||||
clear=user_attrs.get('password'),
|
uu = sql_query.RenameUser(user.name, host=user.host,
|
||||||
new_user=user_attrs.get('name'),
|
new_user=new_name,
|
||||||
new_host=user_attrs.get('host'))
|
new_host=new_host)
|
||||||
t = text(str(uu))
|
t = text(str(uu))
|
||||||
client.execute(t)
|
client.execute(t)
|
||||||
uname = user_attrs.get('name') or username
|
|
||||||
host = user_attrs.get('host') or hostname
|
|
||||||
find_user = "'%s'@'%s'" % (uname, host)
|
|
||||||
if find_user not in grantee:
|
|
||||||
self.grant_access(uname, host, db_access)
|
|
||||||
|
|
||||||
def create_database(self, databases):
|
def create_database(self, databases):
|
||||||
"""Create the list of specified databases."""
|
"""Create the list of specified databases."""
|
||||||
@ -664,8 +659,9 @@ class BaseMySqlApp(object):
|
|||||||
def _generate_root_password(client):
|
def _generate_root_password(client):
|
||||||
"""Generate and set a random root password and forget about it."""
|
"""Generate and set a random root password and forget about it."""
|
||||||
localhost = "localhost"
|
localhost = "localhost"
|
||||||
uu = sql_query.UpdateUser("root", host=localhost,
|
uu = sql_query.SetPassword(
|
||||||
clear=utils.generate_random_password())
|
"root", host=localhost,
|
||||||
|
new_password=utils.generate_random_password())
|
||||||
t = text(str(uu))
|
t = text(str(uu))
|
||||||
client.execute(t)
|
client.execute(t)
|
||||||
|
|
||||||
@ -1060,8 +1056,8 @@ class BaseMySqlRootAccess(object):
|
|||||||
LOG.debug(err)
|
LOG.debug(err)
|
||||||
with self.local_sql_client(self.mysql_app.get_engine()) as client:
|
with self.local_sql_client(self.mysql_app.get_engine()) as client:
|
||||||
print(client)
|
print(client)
|
||||||
uu = sql_query.UpdateUser(user.name, host=user.host,
|
uu = sql_query.SetPassword(user.name, host=user.host,
|
||||||
clear=user.password)
|
new_password=user.password)
|
||||||
t = text(str(uu))
|
t = text(str(uu))
|
||||||
client.execute(t)
|
client.execute(t)
|
||||||
|
|
||||||
|
@ -135,6 +135,11 @@ class UserActionsCreateGroup(TestGroup):
|
|||||||
"""Update an existing user."""
|
"""Update an existing user."""
|
||||||
self.test_runner.run_user_attribute_update()
|
self.test_runner.run_user_attribute_update()
|
||||||
|
|
||||||
|
@test(depends_on=[update_user_attributes])
|
||||||
|
def recreate_user_with_no_access(self):
|
||||||
|
"""Re-create a renamed user with no access rights."""
|
||||||
|
self.test_runner.run_user_recreate_with_no_access()
|
||||||
|
|
||||||
@test
|
@test
|
||||||
def show_nonexisting_user(self):
|
def show_nonexisting_user(self):
|
||||||
"""Ensure show on non-existing user fails."""
|
"""Ensure show on non-existing user fails."""
|
||||||
|
@ -34,10 +34,15 @@ class UserActionsRunner(TestRunner):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(UserActionsRunner, self).__init__()
|
super(UserActionsRunner, self).__init__()
|
||||||
self.user_defs = []
|
self.user_defs = []
|
||||||
|
self.renamed_user_orig_def = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def first_user_def(self):
|
def first_user_def(self):
|
||||||
if self.user_defs:
|
if self.user_defs:
|
||||||
|
# Try to use the first user with databases if any.
|
||||||
|
for user_def in self.user_defs:
|
||||||
|
if user_def['databases']:
|
||||||
|
return user_def
|
||||||
return self.user_defs[0]
|
return self.user_defs[0]
|
||||||
raise SkipTest("No valid user definitions provided.")
|
raise SkipTest("No valid user definitions provided.")
|
||||||
|
|
||||||
@ -359,6 +364,7 @@ class UserActionsRunner(TestRunner):
|
|||||||
expected_def = None
|
expected_def = None
|
||||||
for user_def in self.user_defs:
|
for user_def in self.user_defs:
|
||||||
if user_def['name'] == user_name:
|
if user_def['name'] == user_name:
|
||||||
|
self.renamed_user_orig_def = dict(user_def)
|
||||||
user_def.update(update_attribites)
|
user_def.update(update_attribites)
|
||||||
expected_def = user_def
|
expected_def = user_def
|
||||||
|
|
||||||
@ -368,6 +374,30 @@ class UserActionsRunner(TestRunner):
|
|||||||
self.assert_user_show(instance_id, expected_def, 200)
|
self.assert_user_show(instance_id, expected_def, 200)
|
||||||
self.assert_users_list(instance_id, self.user_defs, 200)
|
self.assert_users_list(instance_id, self.user_defs, 200)
|
||||||
|
|
||||||
|
def run_user_recreate_with_no_access(self, expected_http_code=202):
|
||||||
|
if (self.renamed_user_orig_def and
|
||||||
|
self.renamed_user_orig_def['databases']):
|
||||||
|
self.assert_user_recreate_with_no_access(
|
||||||
|
self.instance_info.id, self.renamed_user_orig_def,
|
||||||
|
expected_http_code)
|
||||||
|
else:
|
||||||
|
raise SkipTest("No renamed users with databases.")
|
||||||
|
|
||||||
|
def assert_user_recreate_with_no_access(self, instance_id, original_def,
|
||||||
|
expected_http_code=202):
|
||||||
|
# Recreate a previously renamed user without assigning any access
|
||||||
|
# rights to it.
|
||||||
|
recreated_user_def = dict(original_def)
|
||||||
|
recreated_user_def.update({'databases': []})
|
||||||
|
user_def = self.assert_users_create(
|
||||||
|
instance_id, [recreated_user_def], expected_http_code)
|
||||||
|
|
||||||
|
# Append the new user to defs for cleanup.
|
||||||
|
self.user_defs.extend(user_def)
|
||||||
|
|
||||||
|
# Assert empty user access.
|
||||||
|
self.assert_user_access_show(instance_id, recreated_user_def, 200)
|
||||||
|
|
||||||
def run_user_delete(self, expected_http_code=202):
|
def run_user_delete(self, expected_http_code=202):
|
||||||
for user_def in self.user_defs:
|
for user_def in self.user_defs:
|
||||||
self.assert_user_delete(
|
self.assert_user_delete(
|
||||||
|
@ -474,67 +474,43 @@ class MySqlAdminTest(trove_testtools.TestCase):
|
|||||||
|
|
||||||
def test_change_passwords(self):
|
def test_change_passwords(self):
|
||||||
user = [{"name": "test_user", "host": "%", "password": "password"}]
|
user = [{"name": "test_user", "host": "%", "password": "password"}]
|
||||||
expected = ("UPDATE mysql.user SET Password="
|
expected = ("SET PASSWORD FOR 'test_user'@'%' = PASSWORD('password');")
|
||||||
"PASSWORD('password') WHERE User = 'test_user' "
|
|
||||||
"AND Host = '%';")
|
|
||||||
with patch.object(self.mock_client, 'execute') as mock_execute:
|
with patch.object(self.mock_client, 'execute') as mock_execute:
|
||||||
self.mySqlAdmin.change_passwords(user)
|
self.mySqlAdmin.change_passwords(user)
|
||||||
self._assert_execute_call(expected, mock_execute)
|
self._assert_execute_call(expected, mock_execute)
|
||||||
|
|
||||||
def test_update_attributes_password(self):
|
def test_update_attributes_password(self):
|
||||||
db_result = [{"grantee": "'test_user'@'%'", "table_schema": "db1"},
|
expected = ("SET PASSWORD FOR 'test_user'@'%' = PASSWORD('password');")
|
||||||
{"grantee": "'test_user'@'%'", "table_schema": "db2"}]
|
|
||||||
expected = ("UPDATE mysql.user SET Password="
|
|
||||||
"PASSWORD('password') WHERE User = 'test_user' "
|
|
||||||
"AND Host = '%';")
|
|
||||||
user = MagicMock()
|
user = MagicMock()
|
||||||
user.name = "test_user"
|
user.name = "test_user"
|
||||||
user.host = "%"
|
user.host = "%"
|
||||||
user_attrs = {"password": "password"}
|
user_attrs = {"password": "password"}
|
||||||
with patch.object(self.mock_client, 'execute',
|
with patch.object(self.mock_client, 'execute') as mock_execute:
|
||||||
return_value=db_result) as mock_execute:
|
|
||||||
with patch.object(self.mySqlAdmin, '_get_user', return_value=user):
|
with patch.object(self.mySqlAdmin, '_get_user', return_value=user):
|
||||||
with patch.object(self.mySqlAdmin, 'grant_access'):
|
self.mySqlAdmin.update_attributes('test_user', '%', user_attrs)
|
||||||
self.mySqlAdmin.update_attributes('test_user', '%',
|
self._assert_execute_call(expected, mock_execute)
|
||||||
user_attrs)
|
|
||||||
self.assertEqual(0,
|
|
||||||
self.mySqlAdmin.grant_access.call_count)
|
|
||||||
self._assert_execute_call(expected, mock_execute,
|
|
||||||
call_idx=1)
|
|
||||||
|
|
||||||
def test_update_attributes_name(self):
|
def test_update_attributes_name(self):
|
||||||
user = MagicMock()
|
user = MagicMock()
|
||||||
user.name = "test_user"
|
user.name = "test_user"
|
||||||
user.host = "%"
|
user.host = "%"
|
||||||
user_attrs = {"name": "new_name"}
|
user_attrs = {"name": "new_name"}
|
||||||
expected = ("UPDATE mysql.user SET User='new_name' "
|
expected = ("RENAME USER 'test_user'@'%' TO 'new_name'@'%';")
|
||||||
"WHERE User = 'test_user' AND Host = '%';")
|
|
||||||
with patch.object(self.mock_client, 'execute') as mock_execute:
|
with patch.object(self.mock_client, 'execute') as mock_execute:
|
||||||
with patch.object(self.mySqlAdmin, '_get_user', return_value=user):
|
with patch.object(self.mySqlAdmin, '_get_user', return_value=user):
|
||||||
with patch.object(self.mySqlAdmin, 'grant_access'):
|
self.mySqlAdmin.update_attributes('test_user', '%', user_attrs)
|
||||||
self.mySqlAdmin.update_attributes('test_user', '%',
|
self._assert_execute_call(expected, mock_execute)
|
||||||
user_attrs)
|
|
||||||
self.mySqlAdmin.grant_access.assert_called_with(
|
|
||||||
'new_name', '%', set([]))
|
|
||||||
self._assert_execute_call(expected, mock_execute,
|
|
||||||
call_idx=1)
|
|
||||||
|
|
||||||
def test_update_attributes_host(self):
|
def test_update_attributes_host(self):
|
||||||
user = MagicMock()
|
user = MagicMock()
|
||||||
user.name = "test_user"
|
user.name = "test_user"
|
||||||
user.host = "%"
|
user.host = "%"
|
||||||
user_attrs = {"host": "new_host"}
|
user_attrs = {"host": "new_host"}
|
||||||
expected = ("UPDATE mysql.user SET Host='new_host' "
|
expected = ("RENAME USER 'test_user'@'%' TO 'test_user'@'new_host';")
|
||||||
"WHERE User = 'test_user' AND Host = '%';")
|
|
||||||
with patch.object(self.mock_client, 'execute') as mock_execute:
|
with patch.object(self.mock_client, 'execute') as mock_execute:
|
||||||
with patch.object(self.mySqlAdmin, '_get_user', return_value=user):
|
with patch.object(self.mySqlAdmin, '_get_user', return_value=user):
|
||||||
with patch.object(self.mySqlAdmin, 'grant_access'):
|
self.mySqlAdmin.update_attributes('test_user', '%', user_attrs)
|
||||||
self.mySqlAdmin.update_attributes('test_user', '%',
|
self._assert_execute_call(expected, mock_execute)
|
||||||
user_attrs)
|
|
||||||
self.mySqlAdmin.grant_access.assert_called_with(
|
|
||||||
'test_user', 'new_host', set([]))
|
|
||||||
self._assert_execute_call(expected, mock_execute,
|
|
||||||
call_idx=1)
|
|
||||||
|
|
||||||
def test_create_database(self):
|
def test_create_database(self):
|
||||||
databases = []
|
databases = []
|
||||||
@ -1439,9 +1415,8 @@ class MySqlAppTest(trove_testtools.TestCase):
|
|||||||
return_value=self.mock_client):
|
return_value=self.mock_client):
|
||||||
self.mySqlApp.secure_root()
|
self.mySqlApp.secure_root()
|
||||||
update_root_password, _ = self.mock_execute.call_args_list[0]
|
update_root_password, _ = self.mock_execute.call_args_list[0]
|
||||||
update_expected = ("UPDATE mysql.user SET Password="
|
update_expected = ("SET PASSWORD FOR 'root'@'localhost' = "
|
||||||
"PASSWORD('some_password') "
|
"PASSWORD('some_password');")
|
||||||
"WHERE User = 'root' AND Host = 'localhost';")
|
|
||||||
|
|
||||||
remove_root, _ = self.mock_execute.call_args_list[1]
|
remove_root, _ = self.mock_execute.call_args_list[1]
|
||||||
remove_expected = ("DELETE FROM mysql.user WHERE "
|
remove_expected = ("DELETE FROM mysql.user WHERE "
|
||||||
@ -1682,7 +1657,7 @@ class MySqlRootStatusTest(trove_testtools.TestCase):
|
|||||||
mock_execute.assert_any_call(TextClauseMatcher(
|
mock_execute.assert_any_call(TextClauseMatcher(
|
||||||
'GRANT ALL PRIVILEGES ON *.*'))
|
'GRANT ALL PRIVILEGES ON *.*'))
|
||||||
mock_execute.assert_any_call(TextClauseMatcher(
|
mock_execute.assert_any_call(TextClauseMatcher(
|
||||||
'UPDATE mysql.user'))
|
'SET PASSWORD'))
|
||||||
|
|
||||||
@patch.object(MySqlRootAccess, 'enable_root')
|
@patch.object(MySqlRootAccess, 'enable_root')
|
||||||
def test_root_disable(self, enable_root_mock):
|
def test_root_disable(self, enable_root_mock):
|
||||||
|
@ -351,82 +351,59 @@ class CreateUserTest(QueryTestBase):
|
|||||||
"IDENTIFIED BY 'password123';", str(cu))
|
"IDENTIFIED BY 'password123';", str(cu))
|
||||||
|
|
||||||
|
|
||||||
class UpdateUserTest(QueryTestBase):
|
class RenameUserTest(QueryTestBase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(UpdateUserTest, self).setUp()
|
super(RenameUserTest, self).setUp()
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
super(UpdateUserTest, self).tearDown()
|
super(RenameUserTest, self).tearDown()
|
||||||
|
|
||||||
def test_rename_user(self):
|
def test_rename_user(self):
|
||||||
username = 'root'
|
username = 'root'
|
||||||
hostname = 'localhost'
|
hostname = 'localhost'
|
||||||
new_user = 'root123'
|
new_user = 'root123'
|
||||||
uu = sql_query.UpdateUser(user=username, host=hostname,
|
uu = sql_query.RenameUser(user=username, host=hostname,
|
||||||
new_user=new_user)
|
new_user=new_user)
|
||||||
self.assertEqual("UPDATE mysql.user SET User='root123' "
|
self.assertEqual("RENAME USER 'root'@'localhost' "
|
||||||
"WHERE User = 'root' "
|
"TO 'root123'@'localhost';", str(uu))
|
||||||
"AND Host = 'localhost';", str(uu))
|
|
||||||
|
|
||||||
def test_change_password(self):
|
|
||||||
username = 'root'
|
|
||||||
hostname = 'localhost'
|
|
||||||
new_password = 'password123'
|
|
||||||
uu = sql_query.UpdateUser(user=username, host=hostname,
|
|
||||||
clear=new_password)
|
|
||||||
self.assertEqual("UPDATE mysql.user SET "
|
|
||||||
"Password=PASSWORD('password123') "
|
|
||||||
"WHERE User = 'root' "
|
|
||||||
"AND Host = 'localhost';", str(uu))
|
|
||||||
|
|
||||||
def test_change_host(self):
|
def test_change_host(self):
|
||||||
username = 'root'
|
username = 'root'
|
||||||
hostname = 'localhost'
|
hostname = 'localhost'
|
||||||
new_host = '%'
|
new_host = '%'
|
||||||
uu = sql_query.UpdateUser(user=username, host=hostname,
|
uu = sql_query.RenameUser(user=username, host=hostname,
|
||||||
new_host=new_host)
|
new_host=new_host)
|
||||||
self.assertEqual("UPDATE mysql.user SET Host='%' "
|
self.assertEqual("RENAME USER 'root'@'localhost' "
|
||||||
"WHERE User = 'root' "
|
"TO 'root'@'%';", str(uu))
|
||||||
"AND Host = 'localhost';", str(uu))
|
|
||||||
|
|
||||||
def test_change_password_and_username(self):
|
|
||||||
username = 'root'
|
|
||||||
hostname = 'localhost'
|
|
||||||
new_user = 'root123'
|
|
||||||
new_password = 'password123'
|
|
||||||
uu = sql_query.UpdateUser(user=username, host=hostname,
|
|
||||||
clear=new_password, new_user=new_user)
|
|
||||||
self.assertEqual("UPDATE mysql.user SET User='root123', "
|
|
||||||
"Password=PASSWORD('password123') "
|
|
||||||
"WHERE User = 'root' "
|
|
||||||
"AND Host = 'localhost';", str(uu))
|
|
||||||
|
|
||||||
def test_change_username_password_hostname(self):
|
|
||||||
username = 'root'
|
|
||||||
hostname = 'localhost'
|
|
||||||
new_user = 'root123'
|
|
||||||
new_password = 'password123'
|
|
||||||
new_host = '%'
|
|
||||||
uu = sql_query.UpdateUser(user=username, host=hostname,
|
|
||||||
clear=new_password, new_user=new_user,
|
|
||||||
new_host=new_host)
|
|
||||||
self.assertEqual("UPDATE mysql.user SET User='root123', "
|
|
||||||
"Host='%', "
|
|
||||||
"Password=PASSWORD('password123') "
|
|
||||||
"WHERE User = 'root' "
|
|
||||||
"AND Host = 'localhost';", str(uu))
|
|
||||||
|
|
||||||
def test_change_username_and_hostname(self):
|
def test_change_username_and_hostname(self):
|
||||||
username = 'root'
|
username = 'root'
|
||||||
hostname = 'localhost'
|
hostname = 'localhost'
|
||||||
new_user = 'root123'
|
new_user = 'root123'
|
||||||
new_host = '%'
|
new_host = '%'
|
||||||
uu = sql_query.UpdateUser(user=username, host=hostname,
|
uu = sql_query.RenameUser(user=username, host=hostname,
|
||||||
new_host=new_host, new_user=new_user)
|
new_user=new_user, new_host=new_host)
|
||||||
self.assertEqual("UPDATE mysql.user SET User='root123', "
|
self.assertEqual("RENAME USER 'root'@'localhost' "
|
||||||
"Host='%' "
|
"TO 'root123'@'%';", str(uu))
|
||||||
"WHERE User = 'root' "
|
|
||||||
"AND Host = 'localhost';", str(uu))
|
|
||||||
|
class SetPasswordTest(QueryTestBase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(SetPasswordTest, self).setUp()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super(SetPasswordTest, self).tearDown()
|
||||||
|
|
||||||
|
def test_alter_user(self):
|
||||||
|
username = 'root'
|
||||||
|
hostname = 'localhost'
|
||||||
|
new_password = 'new_password'
|
||||||
|
uu = sql_query.SetPassword(user=username, host=hostname,
|
||||||
|
new_password=new_password)
|
||||||
|
self.assertEqual("SET PASSWORD FOR 'root'@'localhost' = "
|
||||||
|
"PASSWORD('new_password');", str(uu))
|
||||||
|
|
||||||
|
|
||||||
class DropUserTest(QueryTestBase):
|
class DropUserTest(QueryTestBase):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user