From 2db1c58512760fcb5a850df852e44833c97ed856 Mon Sep 17 00:00:00 2001 From: Alberto Contreras Date: Fri, 13 May 2022 21:48:28 +0200 Subject: [PATCH] Drop support of *-sk keys in cc_ssh (#1451) Reference:https://github.com/canonical/cloud-init/commit/2db1c58512760fcb5a850df852e44833c97ed856 Conflict:(1)only change tests/unittests/config/test_cc_ssh.py (2)don't add test_handle_invalid_ssh_keys_are_skipped (3)don't change TestSshSchema (4)format diffs. - Delete *-sk keys from cloud-init-schema.json under cc_ssh.{ssh_keys,ssh_genkeytypes} - Log a warning if some given key is unsupported or unknown. - Port tests to Pytests, add some types and increase unittest coverage. --- cloudinit/config/tests/test_ssh.py | 457 ++++++++++++----------------- 1 file changed, 192 insertions(+), 265 deletions(-) diff --git a/cloudinit/config/tests/test_ssh.py b/cloudinit/config/tests/test_ssh.py index 87ccdb6..714949c 100644 --- a/cloudinit/config/tests/test_ssh.py +++ b/cloudinit/config/tests/test_ssh.py @@ -1,11 +1,14 @@ # This file is part of cloud-init. See LICENSE file for license information. import os.path +from typing import Optional +from unittest import mock from cloudinit.config import cc_ssh from cloudinit import ssh_util -from cloudinit.tests.helpers import CiTestCase, mock +from tests.unittests.util import get_cloud import logging +import pytest LOG = logging.getLogger(__name__) @@ -14,68 +17,79 @@ KEY_NAMES_NO_DSA = [name for name in cc_ssh.GENERATE_KEY_NAMES if name not in 'dsa'] +@pytest.fixture(scope="function") +def publish_hostkey_test_setup(tmpdir): + test_hostkeys = { + "dsa": ("ssh-dss", "AAAAB3NzaC1kc3MAAACB"), + "ecdsa": ("ecdsa-sha2-nistp256", "AAAAE2VjZ"), + "ed25519": ("ssh-ed25519", "AAAAC3NzaC1lZDI"), + "rsa": ("ssh-rsa", "AAAAB3NzaC1yc2EAAA"), + } + test_hostkey_files = [] + hostkey_tmpdir = tmpdir + for key_type in cc_ssh.GENERATE_KEY_NAMES: + filename = "ssh_host_%s_key.pub" % key_type + filepath = os.path.join(hostkey_tmpdir, filename) + test_hostkey_files.append(filepath) + with open(filepath, "w") as f: + f.write(" ".join(test_hostkeys[key_type])) + + cc_ssh.KEY_FILE_TPL = os.path.join(hostkey_tmpdir, "ssh_host_%s_key") + yield test_hostkeys, test_hostkey_files + + +def _replace_options(user: Optional[str] = None) -> str: + options = ssh_util.DISABLE_USER_OPTS + if user: + new_user = user + else: + new_user = "NONE" + options = options.replace("$USER", new_user) + options = options.replace("$DISABLE_USER", "root") + return options + @mock.patch(MODPATH + "ssh_util.setup_user_keys") -class TestHandleSsh(CiTestCase): +class TestHandleSsh: """Test cc_ssh handling of ssh config.""" - def _publish_hostkey_test_setup(self): - self.test_hostkeys = { - 'dsa': ('ssh-dss', 'AAAAB3NzaC1kc3MAAACB'), - 'ecdsa': ('ecdsa-sha2-nistp256', 'AAAAE2VjZ'), - 'ed25519': ('ssh-ed25519', 'AAAAC3NzaC1lZDI'), - 'rsa': ('ssh-rsa', 'AAAAB3NzaC1yc2EAAA'), - } - self.test_hostkey_files = [] - hostkey_tmpdir = self.tmp_dir() - for key_type in cc_ssh.GENERATE_KEY_NAMES: - key_data = self.test_hostkeys[key_type] - filename = 'ssh_host_%s_key.pub' % key_type - filepath = os.path.join(hostkey_tmpdir, filename) - self.test_hostkey_files.append(filepath) - with open(filepath, 'w') as f: - f.write(' '.join(key_data)) - - cc_ssh.KEY_FILE_TPL = os.path.join(hostkey_tmpdir, 'ssh_host_%s_key') - - def test_apply_credentials_with_user(self, m_setup_keys): - """Apply keys for the given user and root.""" - keys = ["key1"] - user = "clouduser" - cc_ssh.apply_credentials(keys, user, False, ssh_util.DISABLE_USER_OPTS) - self.assertEqual([mock.call(set(keys), user), - mock.call(set(keys), "root", options="")], - m_setup_keys.call_args_list) - - def test_apply_credentials_with_no_user(self, m_setup_keys): - """Apply keys for root only.""" - keys = ["key1"] - user = None - cc_ssh.apply_credentials(keys, user, False, ssh_util.DISABLE_USER_OPTS) - self.assertEqual([mock.call(set(keys), "root", options="")], - m_setup_keys.call_args_list) - - def test_apply_credentials_with_user_disable_root(self, m_setup_keys): - """Apply keys for the given user and disable root ssh.""" - keys = ["key1"] - user = "clouduser" - options = ssh_util.DISABLE_USER_OPTS - cc_ssh.apply_credentials(keys, user, True, options) - options = options.replace("$USER", user) - options = options.replace("$DISABLE_USER", "root") - self.assertEqual([mock.call(set(keys), user), - mock.call(set(keys), "root", options=options)], - m_setup_keys.call_args_list) - - def test_apply_credentials_with_no_user_disable_root(self, m_setup_keys): - """Apply keys no user and disable root ssh.""" - keys = ["key1"] - user = None + @pytest.mark.parametrize( + "keys,user,disable_root_opts", + [ + # For the given user and root. + pytest.param(["key1"], "clouduser", False, id="with_user"), + # For root only. + pytest.param(["key1"], None, False, id="with_no_user"), + # For the given user and disable root ssh. + pytest.param( + ["key1"], + "clouduser", + True, + id="with_user_disable_root", + ), + # No user and disable root ssh. + pytest.param( + ["key1"], + None, + True, + id="with_no_user_disable_root", + ), + ], + ) + def test_apply_credentials( + self, m_setup_keys, keys, user, disable_root_opts + ): options = ssh_util.DISABLE_USER_OPTS - cc_ssh.apply_credentials(keys, user, True, options) - options = options.replace("$USER", "NONE") - options = options.replace("$DISABLE_USER", "root") - self.assertEqual([mock.call(set(keys), "root", options=options)], - m_setup_keys.call_args_list) + cc_ssh.apply_credentials(keys, user, disable_root_opts, options) + if not disable_root_opts: + expected_options = "" + else: + expected_options = _replace_options(user) + expected_calls = [ + mock.call(set(keys), "root", options=expected_options) + ] + if user: + expected_calls = [mock.call(set(keys), user)] + expected_calls + assert expected_calls == m_setup_keys.call_args_list @mock.patch(MODPATH + "glob.glob") @mock.patch(MODPATH + "ug_util.normalize_users_groups") @@ -90,20 +104,20 @@ class TestHandleSsh(CiTestCase): m_path_exists.return_value = True m_nug.return_value = ([], {}) cc_ssh.PUBLISH_HOST_KEYS = False - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) + cloud = get_cloud(distro="ubuntu", metadata={"public-keys": keys}) cc_ssh.handle("name", cfg, cloud, LOG, None) options = ssh_util.DISABLE_USER_OPTS.replace("$USER", "NONE") options = options.replace("$DISABLE_USER", "root") m_glob.assert_called_once_with('/etc/ssh/ssh_host_*key*') - self.assertIn( - [mock.call('/etc/ssh/ssh_host_rsa_key'), - mock.call('/etc/ssh/ssh_host_dsa_key'), - mock.call('/etc/ssh/ssh_host_ecdsa_key'), - mock.call('/etc/ssh/ssh_host_ed25519_key')], - m_path_exists.call_args_list) - self.assertEqual([mock.call(set(keys), "root", options=options)], - m_setup_keys.call_args_list) + assert [ + mock.call("/etc/ssh/ssh_host_rsa_key"), + mock.call("/etc/ssh/ssh_host_dsa_key"), + mock.call("/etc/ssh/ssh_host_ecdsa_key"), + mock.call("/etc/ssh/ssh_host_ed25519_key"), + ] in m_path_exists.call_args_list + assert [ + mock.call(set(keys), "root", options=options) + ] == m_setup_keys.call_args_list @mock.patch(MODPATH + "glob.glob") @mock.patch(MODPATH + "ug_util.normalize_users_groups") @@ -120,231 +134,144 @@ class TestHandleSsh(CiTestCase): # Mock os.path.exits to True to short-circuit the key writing logic m_path_exists.return_value = True m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) - cc_ssh.handle("name", cfg, cloud, LOG, None) - - options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user) - options = options.replace("$DISABLE_USER", "root") - self.assertEqual([mock.call(set(), user), - mock.call(set(), "root", options=options)], - m_setup_keys.call_args_list) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_no_cfg_and_default_root(self, m_path_exists, m_nug, - m_glob, m_setup_keys): - """Test handle with no config and a default distro user.""" - cfg = {} - keys = ["key1"] - user = "clouduser" - m_glob.return_value = [] # Return no matching keys to prevent removal - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) + cloud = get_cloud(distro="ubuntu", metadata={"public-keys": keys}) cc_ssh.handle("name", cfg, cloud, LOG, None) options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user) options = options.replace("$DISABLE_USER", "root") - self.assertEqual([mock.call(set(keys), user), - mock.call(set(keys), "root", options=options)], - m_setup_keys.call_args_list) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_cfg_with_explicit_disable_root(self, m_path_exists, m_nug, - m_glob, m_setup_keys): - """Test handle with explicit disable_root and a default distro user.""" - # This test is identical to test_handle_no_cfg_and_default_root, - # except this uses an explicit cfg value - cfg = {"disable_root": True} - keys = ["key1"] - user = "clouduser" - m_glob.return_value = [] # Return no matching keys to prevent removal - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) - cc_ssh.handle("name", cfg, cloud, LOG, None) - - options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user) - options = options.replace("$DISABLE_USER", "root") - self.assertEqual([mock.call(set(keys), user), - mock.call(set(keys), "root", options=options)], - m_setup_keys.call_args_list) - + assert [ + mock.call(set(), user), + mock.call(set(), "root", options=options), + ] == m_setup_keys.call_args_list + + @pytest.mark.parametrize( + "cfg,mock_get_public_ssh_keys,empty_opts", + [ + pytest.param({}, False, False, id="no_cfg"), + pytest.param( + {"disable_root": True}, + False, + False, + id="explicit_disable_root", + ), + # When disable_root == False, the ssh redirect for root is skipped + pytest.param( + {"disable_root": False}, + True, + True, + id="cfg_without_disable_root", + ), + ], + ) @mock.patch(MODPATH + "glob.glob") @mock.patch(MODPATH + "ug_util.normalize_users_groups") @mock.patch(MODPATH + "os.path.exists") - def test_handle_cfg_without_disable_root(self, m_path_exists, m_nug, - m_glob, m_setup_keys): - """Test handle with disable_root == False.""" - # When disable_root == False, the ssh redirect for root is skipped - cfg = {"disable_root": False} + def test_handle_default_root( + self, + m_path_exists, + m_nug, + m_glob, + m_setup_keys, + cfg, + mock_get_public_ssh_keys, + empty_opts, + ): + """Test handle with a default distro user.""" keys = ["key1"] user = "clouduser" m_glob.return_value = [] # Return no matching keys to prevent removal # Mock os.path.exits to True to short-circuit the key writing logic m_path_exists.return_value = True m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) + cloud = get_cloud(distro="ubuntu", metadata={"public-keys": keys}) + if mock_get_public_ssh_keys: + cloud.get_public_ssh_keys = mock.Mock(return_value=keys) cloud.get_public_ssh_keys = mock.Mock(return_value=keys) cc_ssh.handle("name", cfg, cloud, LOG, None) - self.assertEqual([mock.call(set(keys), user), - mock.call(set(keys), "root", options="")], - m_setup_keys.call_args_list) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_publish_hostkeys_default( - self, m_path_exists, m_nug, m_glob, m_setup_keys): - """Test handle with various configs for ssh_publish_hostkeys.""" - self._publish_hostkey_test_setup() - cc_ssh.PUBLISH_HOST_KEYS = True - keys = ["key1"] - user = "clouduser" - # Return no matching keys for first glob, test keys for second. - m_glob.side_effect = iter([ - [], - self.test_hostkey_files, - ]) - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) - cloud.datasource.publish_host_keys = mock.Mock() - - cfg = {} - expected_call = [self.test_hostkeys[key_type] for key_type - in KEY_NAMES_NO_DSA] - cc_ssh.handle("name", cfg, cloud, LOG, None) - self.assertEqual([mock.call(expected_call)], - cloud.datasource.publish_host_keys.call_args_list) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_publish_hostkeys_config_enable( - self, m_path_exists, m_nug, m_glob, m_setup_keys): - """Test handle with various configs for ssh_publish_hostkeys.""" - self._publish_hostkey_test_setup() - cc_ssh.PUBLISH_HOST_KEYS = False - keys = ["key1"] - user = "clouduser" - # Return no matching keys for first glob, test keys for second. - m_glob.side_effect = iter([ - [], - self.test_hostkey_files, - ]) - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) - cloud.datasource.publish_host_keys = mock.Mock() - - cfg = {'ssh_publish_hostkeys': {'enabled': True}} - expected_call = [self.test_hostkeys[key_type] for key_type - in KEY_NAMES_NO_DSA] - cc_ssh.handle("name", cfg, cloud, LOG, None) - self.assertEqual([mock.call(expected_call)], - cloud.datasource.publish_host_keys.call_args_list) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_publish_hostkeys_config_disable( - self, m_path_exists, m_nug, m_glob, m_setup_keys): - """Test handle with various configs for ssh_publish_hostkeys.""" - self._publish_hostkey_test_setup() - cc_ssh.PUBLISH_HOST_KEYS = True - keys = ["key1"] - user = "clouduser" - # Return no matching keys for first glob, test keys for second. - m_glob.side_effect = iter([ - [], - self.test_hostkey_files, - ]) - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) - cloud.datasource.publish_host_keys = mock.Mock() - - cfg = {'ssh_publish_hostkeys': {'enabled': False}} - cc_ssh.handle("name", cfg, cloud, LOG, None) - self.assertFalse(cloud.datasource.publish_host_keys.call_args_list) - cloud.datasource.publish_host_keys.assert_not_called() - + if empty_opts: + options = "" + else: + options = _replace_options(user) + assert [ + mock.call(set(keys), user), + mock.call(set(keys), "root", options=options), + ] == m_setup_keys.call_args_list + + @pytest.mark.parametrize( + "cfg, expected_key_types", + [ + pytest.param({}, KEY_NAMES_NO_DSA, id="default"), + pytest.param( + {"ssh_publish_hostkeys": {"enabled": True}}, + KEY_NAMES_NO_DSA, + id="config_enable", + ), + pytest.param( + {"ssh_publish_hostkeys": {"enabled": False}}, + None, + id="config_disable", + ), + pytest.param( + { + "ssh_publish_hostkeys": { + "enabled": True, + "blacklist": ["dsa", "rsa"], + } + }, + ["ecdsa", "ed25519"], + id="config_blacklist", + ), + pytest.param( + {"ssh_publish_hostkeys": {"enabled": True, "blacklist": []}}, + cc_ssh.GENERATE_KEY_NAMES, + id="empty_blacklist", + ), + ], + ) @mock.patch(MODPATH + "glob.glob") @mock.patch(MODPATH + "ug_util.normalize_users_groups") @mock.patch(MODPATH + "os.path.exists") - def test_handle_publish_hostkeys_config_blacklist( - self, m_path_exists, m_nug, m_glob, m_setup_keys): + def test_handle_publish_hostkeys( + self, + m_path_exists, + m_nug, + m_glob, + m_setup_keys, + publish_hostkey_test_setup, + cfg, + expected_key_types, + ): """Test handle with various configs for ssh_publish_hostkeys.""" - self._publish_hostkey_test_setup() + test_hostkeys, test_hostkey_files = publish_hostkey_test_setup cc_ssh.PUBLISH_HOST_KEYS = True keys = ["key1"] user = "clouduser" # Return no matching keys for first glob, test keys for second. m_glob.side_effect = iter([ [], - self.test_hostkey_files, + test_hostkey_files, ]) # Mock os.path.exits to True to short-circuit the key writing logic m_path_exists.return_value = True m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) + cloud = get_cloud(distro='ubuntu', metadata={'public-keys': keys}) cloud.datasource.publish_host_keys = mock.Mock() - cfg = {'ssh_publish_hostkeys': {'enabled': True, - 'blacklist': ['dsa', 'rsa']}} - expected_call = [self.test_hostkeys[key_type] for key_type - in ['ecdsa', 'ed25519']] - cc_ssh.handle("name", cfg, cloud, LOG, None) - self.assertEqual([mock.call(expected_call)], - cloud.datasource.publish_host_keys.call_args_list) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_publish_hostkeys_empty_blacklist( - self, m_path_exists, m_nug, m_glob, m_setup_keys): - """Test handle with various configs for ssh_publish_hostkeys.""" - self._publish_hostkey_test_setup() - cc_ssh.PUBLISH_HOST_KEYS = True - keys = ["key1"] - user = "clouduser" - # Return no matching keys for first glob, test keys for second. - m_glob.side_effect = iter([ - [], - self.test_hostkey_files, - ]) - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud( - distro='ubuntu', metadata={'public-keys': keys}) - cloud.datasource.publish_host_keys = mock.Mock() - - cfg = {'ssh_publish_hostkeys': {'enabled': True, - 'blacklist': []}} - expected_call = [self.test_hostkeys[key_type] for key_type - in cc_ssh.GENERATE_KEY_NAMES] + expected_calls = [] + if expected_key_types is not None: + expected_calls = [ + mock.call( + [ + test_hostkeys[key_type] + for key_type in expected_key_types + ] + ) + ] cc_ssh.handle("name", cfg, cloud, LOG, None) - self.assertEqual([mock.call(expected_call)], - cloud.datasource.publish_host_keys.call_args_list) + assert ( + expected_calls == cloud.datasource.publish_host_keys.call_args_list + ) @mock.patch(MODPATH + "ug_util.normalize_users_groups") @mock.patch(MODPATH + "util.write_file") @@ -360,7 +287,7 @@ class TestHandleSsh(CiTestCase): public_name = "{}_public".format(key_type) cert_name = "{}_certificate".format(key_type) - # Actual key contents don"t have to be realistic + # Actual key contents don't have to be realistic private_value = "{}_PRIVATE_KEY".format(key_type) public_value = "{}_PUBLIC_KEY".format(key_type) cert_value = "{}_CERT_KEY".format(key_type) @@ -397,9 +324,9 @@ class TestHandleSsh(CiTestCase): m_nug.return_value = ([], {}) with mock.patch(MODPATH + 'ssh_util.parse_ssh_config', return_value=[]): - cc_ssh.handle("name", cfg, self.tmp_cloud(distro='ubuntu'), - LOG, None) + cc_ssh.handle("name", cfg, get_cloud(distro="ubuntu"), LOG, None) # Check that all expected output has been done. for call_ in expected_calls: - self.assertIn(call_, m_write_file.call_args_list) + assert call_ in m_write_file.call_args_list + -- 2.33.0