Use proper queries to update user properties
Bug 1380880 occurs when a user gets renamed and another one gets created wit its original name. It turns out that the new user wrongly inherits access rights of the original user. MySQL guest currently updates user properties by changing fields in the internal 'mysql' database. This patch set replaces the UPDATE with more generic SET PASSWORD and RENAME statements that should work on all MySQL versions. We no longer need to transfer privileges on a user name/host change since the RENAME statement does it for us under the covers. As an additional benefit this solution is also fully compatible with MySQL 5.7 which has changed the definition of the internal 'user' table and the current code no longer works. Closes-Bug: 1380880 Change-Id: I37fdec77184715ed889d8ea6d446282c98903258
This commit is contained in:
parent
75656bda73
commit
dc7ccce006
@ -0,0 +1,4 @@
|
|||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- Use SET PASSWORD and RENAME USER queries
|
||||||
|
to update user properties.
|
@ -346,68 +346,43 @@ class CreateUser(object):
|
|||||||
return " ".join(query) + ";"
|
return " ".join(query) + ";"
|
||||||
|
|
||||||
|
|
||||||
class UpdateUser(object):
|
class RenameUser(object):
|
||||||
|
|
||||||
def __init__(self, user, host=None, clear=None, new_user=None,
|
def __init__(self, user, host=None, new_user=None,
|
||||||
new_host=None):
|
new_host=None):
|
||||||
self.user = user
|
self.user = user
|
||||||
self.host = host
|
self.host = host or '%'
|
||||||
self.clear = clear
|
|
||||||
self.new_user = new_user
|
self.new_user = new_user
|
||||||
self.new_host = new_host
|
self.new_host = new_host
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return str(self)
|
return str(self)
|
||||||
|
|
||||||
@property
|
def __str__(self):
|
||||||
def _set_password(self):
|
properties = {'old_name': self.user,
|
||||||
if self.clear:
|
'old_host': self.host,
|
||||||
return "Password=PASSWORD('%s')" % self.clear
|
'new_name': self.new_user or self.user,
|
||||||
|
'new_host': self.new_host or self.host}
|
||||||
|
return ("RENAME USER '%(old_name)s'@'%(old_host)s' TO "
|
||||||
|
"'%(new_name)s'@'%(new_host)s';" % properties)
|
||||||
|
|
||||||
@property
|
|
||||||
def _set_user(self):
|
|
||||||
if self.new_user:
|
|
||||||
return "User='%s'" % self.new_user
|
|
||||||
|
|
||||||
@property
|
class SetPassword(object):
|
||||||
def _set_host(self):
|
|
||||||
if self.new_host:
|
|
||||||
return "Host='%s'" % self.new_host
|
|
||||||
|
|
||||||
@property
|
def __init__(self, user, host=None, new_password=None):
|
||||||
def _host(self):
|
self.user = user
|
||||||
if not self.host:
|
self.host = host or '%'
|
||||||
return "%"
|
self.new_password = new_password or ''
|
||||||
return self.host
|
|
||||||
|
|
||||||
@property
|
def __repr__(self):
|
||||||
def _set_attrs(self):
|
return str(self)
|
||||||
sets = [self._set_user,
|
|
||||||
self._set_host,
|
|
||||||
self._set_password,
|
|
||||||
]
|
|
||||||
sets = [s for s in sets if s]
|
|
||||||
sets = ', '.join(sets)
|
|
||||||
return 'SET %s' % sets
|
|
||||||
|
|
||||||
@property
|
|
||||||
def _where(self):
|
|
||||||
clauses = []
|
|
||||||
if self.user:
|
|
||||||
clauses.append("User = '%s'" % self.user)
|
|
||||||
if self.host:
|
|
||||||
clauses.append("Host = '%s'" % self._host)
|
|
||||||
if not clauses:
|
|
||||||
return ""
|
|
||||||
return "WHERE %s" % " AND ".join(clauses)
|
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
query = ["UPDATE mysql.user",
|
properties = {'user_name': self.user,
|
||||||
self._set_attrs,
|
'user_host': self.host,
|
||||||
self._where,
|
'new_password': self.new_password}
|
||||||
]
|
return ("SET PASSWORD FOR '%(user_name)s'@'%(user_host)s' = "
|
||||||
query = [q for q in query if q]
|
"PASSWORD('%(new_password)s');" % properties)
|
||||||
return " ".join(query) + ";"
|
|
||||||
|
|
||||||
|
|
||||||
class DropUser(object):
|
class DropUser(object):
|
||||||
|
@ -257,8 +257,8 @@ class BaseMySqlAdmin(object):
|
|||||||
user = models.MySQLUser()
|
user = models.MySQLUser()
|
||||||
user.deserialize(user_dict)
|
user.deserialize(user_dict)
|
||||||
LOG.debug("\tDeserialized: %s." % user.__dict__)
|
LOG.debug("\tDeserialized: %s." % user.__dict__)
|
||||||
uu = sql_query.UpdateUser(user.name, host=user.host,
|
uu = sql_query.SetPassword(user.name, host=user.host,
|
||||||
clear=user.password)
|
new_password=user.password)
|
||||||
t = text(str(uu))
|
t = text(str(uu))
|
||||||
client.execute(t)
|
client.execute(t)
|
||||||
|
|
||||||
@ -266,33 +266,28 @@ class BaseMySqlAdmin(object):
|
|||||||
"""Change the attributes of an existing user."""
|
"""Change the attributes of an existing user."""
|
||||||
LOG.debug("Changing user attributes for user %s." % username)
|
LOG.debug("Changing user attributes for user %s." % username)
|
||||||
user = self._get_user(username, hostname)
|
user = self._get_user(username, hostname)
|
||||||
db_access = set()
|
|
||||||
grantee = set()
|
new_name = user_attrs.get('name')
|
||||||
|
new_host = user_attrs.get('host')
|
||||||
|
new_password = user_attrs.get('password')
|
||||||
|
|
||||||
|
if new_name or new_host or new_password:
|
||||||
|
|
||||||
with self.local_sql_client(self.mysql_app.get_engine()) as client:
|
with self.local_sql_client(self.mysql_app.get_engine()) as client:
|
||||||
q = sql_query.Query()
|
|
||||||
q.columns = ["grantee", "table_schema"]
|
if new_password is not None:
|
||||||
q.tables = ["information_schema.SCHEMA_PRIVILEGES"]
|
uu = sql_query.SetPassword(user.name, host=user.host,
|
||||||
q.group = ["grantee", "table_schema"]
|
new_password=new_password)
|
||||||
q.where = ["privilege_type != 'USAGE'"]
|
|
||||||
t = text(str(q))
|
t = text(str(uu))
|
||||||
db_result = client.execute(t)
|
client.execute(t)
|
||||||
for db in db_result:
|
|
||||||
grantee.add(db['grantee'])
|
if new_name or new_host:
|
||||||
if db['grantee'] == "'%s'@'%s'" % (user.name, user.host):
|
uu = sql_query.RenameUser(user.name, host=user.host,
|
||||||
db_name = db['table_schema']
|
new_user=new_name,
|
||||||
db_access.add(db_name)
|
new_host=new_host)
|
||||||
with self.local_sql_client(self.mysql_app.get_engine()) as client:
|
|
||||||
uu = sql_query.UpdateUser(user.name, host=user.host,
|
|
||||||
clear=user_attrs.get('password'),
|
|
||||||
new_user=user_attrs.get('name'),
|
|
||||||
new_host=user_attrs.get('host'))
|
|
||||||
t = text(str(uu))
|
t = text(str(uu))
|
||||||
client.execute(t)
|
client.execute(t)
|
||||||
uname = user_attrs.get('name') or username
|
|
||||||
host = user_attrs.get('host') or hostname
|
|
||||||
find_user = "'%s'@'%s'" % (uname, host)
|
|
||||||
if find_user not in grantee:
|
|
||||||
self.grant_access(uname, host, db_access)
|
|
||||||
|
|
||||||
def create_database(self, databases):
|
def create_database(self, databases):
|
||||||
"""Create the list of specified databases."""
|
"""Create the list of specified databases."""
|
||||||
@ -659,8 +654,9 @@ class BaseMySqlApp(object):
|
|||||||
def _generate_root_password(client):
|
def _generate_root_password(client):
|
||||||
"""Generate and set a random root password and forget about it."""
|
"""Generate and set a random root password and forget about it."""
|
||||||
localhost = "localhost"
|
localhost = "localhost"
|
||||||
uu = sql_query.UpdateUser("root", host=localhost,
|
uu = sql_query.SetPassword(
|
||||||
clear=utils.generate_random_password())
|
"root", host=localhost,
|
||||||
|
new_password=utils.generate_random_password())
|
||||||
t = text(str(uu))
|
t = text(str(uu))
|
||||||
client.execute(t)
|
client.execute(t)
|
||||||
|
|
||||||
@ -1055,8 +1051,8 @@ class BaseMySqlRootAccess(object):
|
|||||||
LOG.debug(err)
|
LOG.debug(err)
|
||||||
with self.local_sql_client(self.mysql_app.get_engine()) as client:
|
with self.local_sql_client(self.mysql_app.get_engine()) as client:
|
||||||
print(client)
|
print(client)
|
||||||
uu = sql_query.UpdateUser(user.name, host=user.host,
|
uu = sql_query.SetPassword(user.name, host=user.host,
|
||||||
clear=user.password)
|
new_password=user.password)
|
||||||
t = text(str(uu))
|
t = text(str(uu))
|
||||||
client.execute(t)
|
client.execute(t)
|
||||||
|
|
||||||
|
@ -135,6 +135,11 @@ class UserActionsCreateGroup(TestGroup):
|
|||||||
"""Update an existing user."""
|
"""Update an existing user."""
|
||||||
self.test_runner.run_user_attribute_update()
|
self.test_runner.run_user_attribute_update()
|
||||||
|
|
||||||
|
@test(depends_on=[update_user_attributes])
|
||||||
|
def recreate_user_with_no_access(self):
|
||||||
|
"""Re-create a renamed user with no access rights."""
|
||||||
|
self.test_runner.run_user_recreate_with_no_access()
|
||||||
|
|
||||||
@test
|
@test
|
||||||
def show_nonexisting_user(self):
|
def show_nonexisting_user(self):
|
||||||
"""Ensure show on non-existing user fails."""
|
"""Ensure show on non-existing user fails."""
|
||||||
|
@ -34,10 +34,15 @@ class UserActionsRunner(TestRunner):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(UserActionsRunner, self).__init__()
|
super(UserActionsRunner, self).__init__()
|
||||||
self.user_defs = []
|
self.user_defs = []
|
||||||
|
self.renamed_user_orig_def = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def first_user_def(self):
|
def first_user_def(self):
|
||||||
if self.user_defs:
|
if self.user_defs:
|
||||||
|
# Try to use the first user with databases if any.
|
||||||
|
for user_def in self.user_defs:
|
||||||
|
if user_def['databases']:
|
||||||
|
return user_def
|
||||||
return self.user_defs[0]
|
return self.user_defs[0]
|
||||||
raise SkipTest("No valid user definitions provided.")
|
raise SkipTest("No valid user definitions provided.")
|
||||||
|
|
||||||
@ -351,6 +356,7 @@ class UserActionsRunner(TestRunner):
|
|||||||
expected_def = None
|
expected_def = None
|
||||||
for user_def in self.user_defs:
|
for user_def in self.user_defs:
|
||||||
if user_def['name'] == user_name:
|
if user_def['name'] == user_name:
|
||||||
|
self.renamed_user_orig_def = dict(user_def)
|
||||||
user_def.update(update_attribites)
|
user_def.update(update_attribites)
|
||||||
expected_def = user_def
|
expected_def = user_def
|
||||||
|
|
||||||
@ -360,6 +366,30 @@ class UserActionsRunner(TestRunner):
|
|||||||
self.assert_user_show(instance_id, expected_def, 200)
|
self.assert_user_show(instance_id, expected_def, 200)
|
||||||
self.assert_users_list(instance_id, self.user_defs, 200)
|
self.assert_users_list(instance_id, self.user_defs, 200)
|
||||||
|
|
||||||
|
def run_user_recreate_with_no_access(self, expected_http_code=202):
|
||||||
|
if (self.renamed_user_orig_def and
|
||||||
|
self.renamed_user_orig_def['databases']):
|
||||||
|
self.assert_user_recreate_with_no_access(
|
||||||
|
self.instance_info.id, self.renamed_user_orig_def,
|
||||||
|
expected_http_code)
|
||||||
|
else:
|
||||||
|
raise SkipTest("No renamed users with databases.")
|
||||||
|
|
||||||
|
def assert_user_recreate_with_no_access(self, instance_id, original_def,
|
||||||
|
expected_http_code=202):
|
||||||
|
# Recreate a previously renamed user without assigning any access
|
||||||
|
# rights to it.
|
||||||
|
recreated_user_def = dict(original_def)
|
||||||
|
recreated_user_def.update({'databases': []})
|
||||||
|
user_def = self.assert_users_create(
|
||||||
|
instance_id, [recreated_user_def], expected_http_code)
|
||||||
|
|
||||||
|
# Append the new user to defs for cleanup.
|
||||||
|
self.user_defs.extend(user_def)
|
||||||
|
|
||||||
|
# Assert empty user access.
|
||||||
|
self.assert_user_access_show(instance_id, recreated_user_def, 200)
|
||||||
|
|
||||||
def run_user_delete(self, expected_http_code=202):
|
def run_user_delete(self, expected_http_code=202):
|
||||||
for user_def in self.user_defs:
|
for user_def in self.user_defs:
|
||||||
self.assert_user_delete(
|
self.assert_user_delete(
|
||||||
|
@ -474,67 +474,43 @@ class MySqlAdminTest(trove_testtools.TestCase):
|
|||||||
|
|
||||||
def test_change_passwords(self):
|
def test_change_passwords(self):
|
||||||
user = [{"name": "test_user", "host": "%", "password": "password"}]
|
user = [{"name": "test_user", "host": "%", "password": "password"}]
|
||||||
expected = ("UPDATE mysql.user SET Password="
|
expected = ("SET PASSWORD FOR 'test_user'@'%' = PASSWORD('password');")
|
||||||
"PASSWORD('password') WHERE User = 'test_user' "
|
|
||||||
"AND Host = '%';")
|
|
||||||
with patch.object(self.mock_client, 'execute') as mock_execute:
|
with patch.object(self.mock_client, 'execute') as mock_execute:
|
||||||
self.mySqlAdmin.change_passwords(user)
|
self.mySqlAdmin.change_passwords(user)
|
||||||
self._assert_execute_call(expected, mock_execute)
|
self._assert_execute_call(expected, mock_execute)
|
||||||
|
|
||||||
def test_update_attributes_password(self):
|
def test_update_attributes_password(self):
|
||||||
db_result = [{"grantee": "'test_user'@'%'", "table_schema": "db1"},
|
expected = ("SET PASSWORD FOR 'test_user'@'%' = PASSWORD('password');")
|
||||||
{"grantee": "'test_user'@'%'", "table_schema": "db2"}]
|
|
||||||
expected = ("UPDATE mysql.user SET Password="
|
|
||||||
"PASSWORD('password') WHERE User = 'test_user' "
|
|
||||||
"AND Host = '%';")
|
|
||||||
user = MagicMock()
|
user = MagicMock()
|
||||||
user.name = "test_user"
|
user.name = "test_user"
|
||||||
user.host = "%"
|
user.host = "%"
|
||||||
user_attrs = {"password": "password"}
|
user_attrs = {"password": "password"}
|
||||||
with patch.object(self.mock_client, 'execute',
|
with patch.object(self.mock_client, 'execute') as mock_execute:
|
||||||
return_value=db_result) as mock_execute:
|
|
||||||
with patch.object(self.mySqlAdmin, '_get_user', return_value=user):
|
with patch.object(self.mySqlAdmin, '_get_user', return_value=user):
|
||||||
with patch.object(self.mySqlAdmin, 'grant_access'):
|
self.mySqlAdmin.update_attributes('test_user', '%', user_attrs)
|
||||||
self.mySqlAdmin.update_attributes('test_user', '%',
|
self._assert_execute_call(expected, mock_execute)
|
||||||
user_attrs)
|
|
||||||
self.assertEqual(0,
|
|
||||||
self.mySqlAdmin.grant_access.call_count)
|
|
||||||
self._assert_execute_call(expected, mock_execute,
|
|
||||||
call_idx=1)
|
|
||||||
|
|
||||||
def test_update_attributes_name(self):
|
def test_update_attributes_name(self):
|
||||||
user = MagicMock()
|
user = MagicMock()
|
||||||
user.name = "test_user"
|
user.name = "test_user"
|
||||||
user.host = "%"
|
user.host = "%"
|
||||||
user_attrs = {"name": "new_name"}
|
user_attrs = {"name": "new_name"}
|
||||||
expected = ("UPDATE mysql.user SET User='new_name' "
|
expected = ("RENAME USER 'test_user'@'%' TO 'new_name'@'%';")
|
||||||
"WHERE User = 'test_user' AND Host = '%';")
|
|
||||||
with patch.object(self.mock_client, 'execute') as mock_execute:
|
with patch.object(self.mock_client, 'execute') as mock_execute:
|
||||||
with patch.object(self.mySqlAdmin, '_get_user', return_value=user):
|
with patch.object(self.mySqlAdmin, '_get_user', return_value=user):
|
||||||
with patch.object(self.mySqlAdmin, 'grant_access'):
|
self.mySqlAdmin.update_attributes('test_user', '%', user_attrs)
|
||||||
self.mySqlAdmin.update_attributes('test_user', '%',
|
self._assert_execute_call(expected, mock_execute)
|
||||||
user_attrs)
|
|
||||||
self.mySqlAdmin.grant_access.assert_called_with(
|
|
||||||
'new_name', '%', set([]))
|
|
||||||
self._assert_execute_call(expected, mock_execute,
|
|
||||||
call_idx=1)
|
|
||||||
|
|
||||||
def test_update_attributes_host(self):
|
def test_update_attributes_host(self):
|
||||||
user = MagicMock()
|
user = MagicMock()
|
||||||
user.name = "test_user"
|
user.name = "test_user"
|
||||||
user.host = "%"
|
user.host = "%"
|
||||||
user_attrs = {"host": "new_host"}
|
user_attrs = {"host": "new_host"}
|
||||||
expected = ("UPDATE mysql.user SET Host='new_host' "
|
expected = ("RENAME USER 'test_user'@'%' TO 'test_user'@'new_host';")
|
||||||
"WHERE User = 'test_user' AND Host = '%';")
|
|
||||||
with patch.object(self.mock_client, 'execute') as mock_execute:
|
with patch.object(self.mock_client, 'execute') as mock_execute:
|
||||||
with patch.object(self.mySqlAdmin, '_get_user', return_value=user):
|
with patch.object(self.mySqlAdmin, '_get_user', return_value=user):
|
||||||
with patch.object(self.mySqlAdmin, 'grant_access'):
|
self.mySqlAdmin.update_attributes('test_user', '%', user_attrs)
|
||||||
self.mySqlAdmin.update_attributes('test_user', '%',
|
self._assert_execute_call(expected, mock_execute)
|
||||||
user_attrs)
|
|
||||||
self.mySqlAdmin.grant_access.assert_called_with(
|
|
||||||
'test_user', 'new_host', set([]))
|
|
||||||
self._assert_execute_call(expected, mock_execute,
|
|
||||||
call_idx=1)
|
|
||||||
|
|
||||||
def test_create_database(self):
|
def test_create_database(self):
|
||||||
databases = []
|
databases = []
|
||||||
@ -1437,9 +1413,8 @@ class MySqlAppTest(trove_testtools.TestCase):
|
|||||||
return_value=self.mock_client):
|
return_value=self.mock_client):
|
||||||
self.mySqlApp.secure_root()
|
self.mySqlApp.secure_root()
|
||||||
update_root_password, _ = self.mock_execute.call_args_list[0]
|
update_root_password, _ = self.mock_execute.call_args_list[0]
|
||||||
update_expected = ("UPDATE mysql.user SET Password="
|
update_expected = ("SET PASSWORD FOR 'root'@'localhost' = "
|
||||||
"PASSWORD('some_password') "
|
"PASSWORD('some_password');")
|
||||||
"WHERE User = 'root' AND Host = 'localhost';")
|
|
||||||
|
|
||||||
remove_root, _ = self.mock_execute.call_args_list[1]
|
remove_root, _ = self.mock_execute.call_args_list[1]
|
||||||
remove_expected = ("DELETE FROM mysql.user WHERE "
|
remove_expected = ("DELETE FROM mysql.user WHERE "
|
||||||
@ -1680,7 +1655,7 @@ class MySqlRootStatusTest(trove_testtools.TestCase):
|
|||||||
mock_execute.assert_any_call(TextClauseMatcher(
|
mock_execute.assert_any_call(TextClauseMatcher(
|
||||||
'GRANT ALL PRIVILEGES ON *.*'))
|
'GRANT ALL PRIVILEGES ON *.*'))
|
||||||
mock_execute.assert_any_call(TextClauseMatcher(
|
mock_execute.assert_any_call(TextClauseMatcher(
|
||||||
'UPDATE mysql.user'))
|
'SET PASSWORD'))
|
||||||
|
|
||||||
@patch.object(MySqlRootAccess, 'enable_root')
|
@patch.object(MySqlRootAccess, 'enable_root')
|
||||||
def test_root_disable(self, enable_root_mock):
|
def test_root_disable(self, enable_root_mock):
|
||||||
|
@ -351,82 +351,59 @@ class CreateUserTest(QueryTestBase):
|
|||||||
"IDENTIFIED BY 'password123';", str(cu))
|
"IDENTIFIED BY 'password123';", str(cu))
|
||||||
|
|
||||||
|
|
||||||
class UpdateUserTest(QueryTestBase):
|
class RenameUserTest(QueryTestBase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(UpdateUserTest, self).setUp()
|
super(RenameUserTest, self).setUp()
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
super(UpdateUserTest, self).tearDown()
|
super(RenameUserTest, self).tearDown()
|
||||||
|
|
||||||
def test_rename_user(self):
|
def test_rename_user(self):
|
||||||
username = 'root'
|
username = 'root'
|
||||||
hostname = 'localhost'
|
hostname = 'localhost'
|
||||||
new_user = 'root123'
|
new_user = 'root123'
|
||||||
uu = sql_query.UpdateUser(user=username, host=hostname,
|
uu = sql_query.RenameUser(user=username, host=hostname,
|
||||||
new_user=new_user)
|
new_user=new_user)
|
||||||
self.assertEqual("UPDATE mysql.user SET User='root123' "
|
self.assertEqual("RENAME USER 'root'@'localhost' "
|
||||||
"WHERE User = 'root' "
|
"TO 'root123'@'localhost';", str(uu))
|
||||||
"AND Host = 'localhost';", str(uu))
|
|
||||||
|
|
||||||
def test_change_password(self):
|
|
||||||
username = 'root'
|
|
||||||
hostname = 'localhost'
|
|
||||||
new_password = 'password123'
|
|
||||||
uu = sql_query.UpdateUser(user=username, host=hostname,
|
|
||||||
clear=new_password)
|
|
||||||
self.assertEqual("UPDATE mysql.user SET "
|
|
||||||
"Password=PASSWORD('password123') "
|
|
||||||
"WHERE User = 'root' "
|
|
||||||
"AND Host = 'localhost';", str(uu))
|
|
||||||
|
|
||||||
def test_change_host(self):
|
def test_change_host(self):
|
||||||
username = 'root'
|
username = 'root'
|
||||||
hostname = 'localhost'
|
hostname = 'localhost'
|
||||||
new_host = '%'
|
new_host = '%'
|
||||||
uu = sql_query.UpdateUser(user=username, host=hostname,
|
uu = sql_query.RenameUser(user=username, host=hostname,
|
||||||
new_host=new_host)
|
new_host=new_host)
|
||||||
self.assertEqual("UPDATE mysql.user SET Host='%' "
|
self.assertEqual("RENAME USER 'root'@'localhost' "
|
||||||
"WHERE User = 'root' "
|
"TO 'root'@'%';", str(uu))
|
||||||
"AND Host = 'localhost';", str(uu))
|
|
||||||
|
|
||||||
def test_change_password_and_username(self):
|
|
||||||
username = 'root'
|
|
||||||
hostname = 'localhost'
|
|
||||||
new_user = 'root123'
|
|
||||||
new_password = 'password123'
|
|
||||||
uu = sql_query.UpdateUser(user=username, host=hostname,
|
|
||||||
clear=new_password, new_user=new_user)
|
|
||||||
self.assertEqual("UPDATE mysql.user SET User='root123', "
|
|
||||||
"Password=PASSWORD('password123') "
|
|
||||||
"WHERE User = 'root' "
|
|
||||||
"AND Host = 'localhost';", str(uu))
|
|
||||||
|
|
||||||
def test_change_username_password_hostname(self):
|
|
||||||
username = 'root'
|
|
||||||
hostname = 'localhost'
|
|
||||||
new_user = 'root123'
|
|
||||||
new_password = 'password123'
|
|
||||||
new_host = '%'
|
|
||||||
uu = sql_query.UpdateUser(user=username, host=hostname,
|
|
||||||
clear=new_password, new_user=new_user,
|
|
||||||
new_host=new_host)
|
|
||||||
self.assertEqual("UPDATE mysql.user SET User='root123', "
|
|
||||||
"Host='%', "
|
|
||||||
"Password=PASSWORD('password123') "
|
|
||||||
"WHERE User = 'root' "
|
|
||||||
"AND Host = 'localhost';", str(uu))
|
|
||||||
|
|
||||||
def test_change_username_and_hostname(self):
|
def test_change_username_and_hostname(self):
|
||||||
username = 'root'
|
username = 'root'
|
||||||
hostname = 'localhost'
|
hostname = 'localhost'
|
||||||
new_user = 'root123'
|
new_user = 'root123'
|
||||||
new_host = '%'
|
new_host = '%'
|
||||||
uu = sql_query.UpdateUser(user=username, host=hostname,
|
uu = sql_query.RenameUser(user=username, host=hostname,
|
||||||
new_host=new_host, new_user=new_user)
|
new_user=new_user, new_host=new_host)
|
||||||
self.assertEqual("UPDATE mysql.user SET User='root123', "
|
self.assertEqual("RENAME USER 'root'@'localhost' "
|
||||||
"Host='%' "
|
"TO 'root123'@'%';", str(uu))
|
||||||
"WHERE User = 'root' "
|
|
||||||
"AND Host = 'localhost';", str(uu))
|
|
||||||
|
class SetPasswordTest(QueryTestBase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(SetPasswordTest, self).setUp()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super(SetPasswordTest, self).tearDown()
|
||||||
|
|
||||||
|
def test_alter_user(self):
|
||||||
|
username = 'root'
|
||||||
|
hostname = 'localhost'
|
||||||
|
new_password = 'new_password'
|
||||||
|
uu = sql_query.SetPassword(user=username, host=hostname,
|
||||||
|
new_password=new_password)
|
||||||
|
self.assertEqual("SET PASSWORD FOR 'root'@'localhost' = "
|
||||||
|
"PASSWORD('new_password');", str(uu))
|
||||||
|
|
||||||
|
|
||||||
class DropUserTest(QueryTestBase):
|
class DropUserTest(QueryTestBase):
|
||||||
|
Loading…
Reference in New Issue
Block a user