cloud-init/backport-Drop-support-of-sk-keys-in-cc_ssh-1451.patch
2023-08-26 14:46:22 +08:00

573 lines
24 KiB
Diff

From 2db1c58512760fcb5a850df852e44833c97ed856 Mon Sep 17 00:00:00 2001
From: Alberto Contreras <alberto.contreras@canonical.com>
Date: Fri, 13 May 2022 21:48:28 +0200
Subject: [PATCH] Drop support of *-sk keys in cc_ssh (#1451)
Reference:https://github.com/canonical/cloud-init/commit/2db1c58512760fcb5a850df852e44833c97ed856
Conflict:(1)only change tests/unittests/config/test_cc_ssh.py
(2)don't add test_handle_invalid_ssh_keys_are_skipped
(3)don't change TestSshSchema
(4)format diffs.
- Delete *-sk keys from cloud-init-schema.json under
cc_ssh.{ssh_keys,ssh_genkeytypes}
- Log a warning if some given key is unsupported or unknown.
- Port tests to Pytests, add some types and increase unittest
coverage.
---
cloudinit/config/tests/test_ssh.py | 457 ++++++++++++-----------------
1 file changed, 192 insertions(+), 265 deletions(-)
diff --git a/cloudinit/config/tests/test_ssh.py b/cloudinit/config/tests/test_ssh.py
index 87ccdb6..714949c 100644
--- a/cloudinit/config/tests/test_ssh.py
+++ b/cloudinit/config/tests/test_ssh.py
@@ -1,11 +1,14 @@
# This file is part of cloud-init. See LICENSE file for license information.
import os.path
+from typing import Optional
+from unittest import mock
from cloudinit.config import cc_ssh
from cloudinit import ssh_util
-from cloudinit.tests.helpers import CiTestCase, mock
+from tests.unittests.util import get_cloud
import logging
+import pytest
LOG = logging.getLogger(__name__)
@@ -14,68 +17,79 @@ KEY_NAMES_NO_DSA = [name for name in cc_ssh.GENERATE_KEY_NAMES
if name not in 'dsa']
+@pytest.fixture(scope="function")
+def publish_hostkey_test_setup(tmpdir):
+ test_hostkeys = {
+ "dsa": ("ssh-dss", "AAAAB3NzaC1kc3MAAACB"),
+ "ecdsa": ("ecdsa-sha2-nistp256", "AAAAE2VjZ"),
+ "ed25519": ("ssh-ed25519", "AAAAC3NzaC1lZDI"),
+ "rsa": ("ssh-rsa", "AAAAB3NzaC1yc2EAAA"),
+ }
+ test_hostkey_files = []
+ hostkey_tmpdir = tmpdir
+ for key_type in cc_ssh.GENERATE_KEY_NAMES:
+ filename = "ssh_host_%s_key.pub" % key_type
+ filepath = os.path.join(hostkey_tmpdir, filename)
+ test_hostkey_files.append(filepath)
+ with open(filepath, "w") as f:
+ f.write(" ".join(test_hostkeys[key_type]))
+
+ cc_ssh.KEY_FILE_TPL = os.path.join(hostkey_tmpdir, "ssh_host_%s_key")
+ yield test_hostkeys, test_hostkey_files
+
+
+def _replace_options(user: Optional[str] = None) -> str:
+ options = ssh_util.DISABLE_USER_OPTS
+ if user:
+ new_user = user
+ else:
+ new_user = "NONE"
+ options = options.replace("$USER", new_user)
+ options = options.replace("$DISABLE_USER", "root")
+ return options
+
@mock.patch(MODPATH + "ssh_util.setup_user_keys")
-class TestHandleSsh(CiTestCase):
+class TestHandleSsh:
"""Test cc_ssh handling of ssh config."""
- def _publish_hostkey_test_setup(self):
- self.test_hostkeys = {
- 'dsa': ('ssh-dss', 'AAAAB3NzaC1kc3MAAACB'),
- 'ecdsa': ('ecdsa-sha2-nistp256', 'AAAAE2VjZ'),
- 'ed25519': ('ssh-ed25519', 'AAAAC3NzaC1lZDI'),
- 'rsa': ('ssh-rsa', 'AAAAB3NzaC1yc2EAAA'),
- }
- self.test_hostkey_files = []
- hostkey_tmpdir = self.tmp_dir()
- for key_type in cc_ssh.GENERATE_KEY_NAMES:
- key_data = self.test_hostkeys[key_type]
- filename = 'ssh_host_%s_key.pub' % key_type
- filepath = os.path.join(hostkey_tmpdir, filename)
- self.test_hostkey_files.append(filepath)
- with open(filepath, 'w') as f:
- f.write(' '.join(key_data))
-
- cc_ssh.KEY_FILE_TPL = os.path.join(hostkey_tmpdir, 'ssh_host_%s_key')
-
- def test_apply_credentials_with_user(self, m_setup_keys):
- """Apply keys for the given user and root."""
- keys = ["key1"]
- user = "clouduser"
- cc_ssh.apply_credentials(keys, user, False, ssh_util.DISABLE_USER_OPTS)
- self.assertEqual([mock.call(set(keys), user),
- mock.call(set(keys), "root", options="")],
- m_setup_keys.call_args_list)
-
- def test_apply_credentials_with_no_user(self, m_setup_keys):
- """Apply keys for root only."""
- keys = ["key1"]
- user = None
- cc_ssh.apply_credentials(keys, user, False, ssh_util.DISABLE_USER_OPTS)
- self.assertEqual([mock.call(set(keys), "root", options="")],
- m_setup_keys.call_args_list)
-
- def test_apply_credentials_with_user_disable_root(self, m_setup_keys):
- """Apply keys for the given user and disable root ssh."""
- keys = ["key1"]
- user = "clouduser"
- options = ssh_util.DISABLE_USER_OPTS
- cc_ssh.apply_credentials(keys, user, True, options)
- options = options.replace("$USER", user)
- options = options.replace("$DISABLE_USER", "root")
- self.assertEqual([mock.call(set(keys), user),
- mock.call(set(keys), "root", options=options)],
- m_setup_keys.call_args_list)
-
- def test_apply_credentials_with_no_user_disable_root(self, m_setup_keys):
- """Apply keys no user and disable root ssh."""
- keys = ["key1"]
- user = None
+ @pytest.mark.parametrize(
+ "keys,user,disable_root_opts",
+ [
+ # For the given user and root.
+ pytest.param(["key1"], "clouduser", False, id="with_user"),
+ # For root only.
+ pytest.param(["key1"], None, False, id="with_no_user"),
+ # For the given user and disable root ssh.
+ pytest.param(
+ ["key1"],
+ "clouduser",
+ True,
+ id="with_user_disable_root",
+ ),
+ # No user and disable root ssh.
+ pytest.param(
+ ["key1"],
+ None,
+ True,
+ id="with_no_user_disable_root",
+ ),
+ ],
+ )
+ def test_apply_credentials(
+ self, m_setup_keys, keys, user, disable_root_opts
+ ):
options = ssh_util.DISABLE_USER_OPTS
- cc_ssh.apply_credentials(keys, user, True, options)
- options = options.replace("$USER", "NONE")
- options = options.replace("$DISABLE_USER", "root")
- self.assertEqual([mock.call(set(keys), "root", options=options)],
- m_setup_keys.call_args_list)
+ cc_ssh.apply_credentials(keys, user, disable_root_opts, options)
+ if not disable_root_opts:
+ expected_options = ""
+ else:
+ expected_options = _replace_options(user)
+ expected_calls = [
+ mock.call(set(keys), "root", options=expected_options)
+ ]
+ if user:
+ expected_calls = [mock.call(set(keys), user)] + expected_calls
+ assert expected_calls == m_setup_keys.call_args_list
@mock.patch(MODPATH + "glob.glob")
@mock.patch(MODPATH + "ug_util.normalize_users_groups")
@@ -90,20 +104,20 @@ class TestHandleSsh(CiTestCase):
m_path_exists.return_value = True
m_nug.return_value = ([], {})
cc_ssh.PUBLISH_HOST_KEYS = False
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
+ cloud = get_cloud(distro="ubuntu", metadata={"public-keys": keys})
cc_ssh.handle("name", cfg, cloud, LOG, None)
options = ssh_util.DISABLE_USER_OPTS.replace("$USER", "NONE")
options = options.replace("$DISABLE_USER", "root")
m_glob.assert_called_once_with('/etc/ssh/ssh_host_*key*')
- self.assertIn(
- [mock.call('/etc/ssh/ssh_host_rsa_key'),
- mock.call('/etc/ssh/ssh_host_dsa_key'),
- mock.call('/etc/ssh/ssh_host_ecdsa_key'),
- mock.call('/etc/ssh/ssh_host_ed25519_key')],
- m_path_exists.call_args_list)
- self.assertEqual([mock.call(set(keys), "root", options=options)],
- m_setup_keys.call_args_list)
+ assert [
+ mock.call("/etc/ssh/ssh_host_rsa_key"),
+ mock.call("/etc/ssh/ssh_host_dsa_key"),
+ mock.call("/etc/ssh/ssh_host_ecdsa_key"),
+ mock.call("/etc/ssh/ssh_host_ed25519_key"),
+ ] in m_path_exists.call_args_list
+ assert [
+ mock.call(set(keys), "root", options=options)
+ ] == m_setup_keys.call_args_list
@mock.patch(MODPATH + "glob.glob")
@mock.patch(MODPATH + "ug_util.normalize_users_groups")
@@ -120,231 +134,144 @@ class TestHandleSsh(CiTestCase):
# Mock os.path.exits to True to short-circuit the key writing logic
m_path_exists.return_value = True
m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cc_ssh.handle("name", cfg, cloud, LOG, None)
-
- options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user)
- options = options.replace("$DISABLE_USER", "root")
- self.assertEqual([mock.call(set(), user),
- mock.call(set(), "root", options=options)],
- m_setup_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_no_cfg_and_default_root(self, m_path_exists, m_nug,
- m_glob, m_setup_keys):
- """Test handle with no config and a default distro user."""
- cfg = {}
- keys = ["key1"]
- user = "clouduser"
- m_glob.return_value = [] # Return no matching keys to prevent removal
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
+ cloud = get_cloud(distro="ubuntu", metadata={"public-keys": keys})
cc_ssh.handle("name", cfg, cloud, LOG, None)
options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user)
options = options.replace("$DISABLE_USER", "root")
- self.assertEqual([mock.call(set(keys), user),
- mock.call(set(keys), "root", options=options)],
- m_setup_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_cfg_with_explicit_disable_root(self, m_path_exists, m_nug,
- m_glob, m_setup_keys):
- """Test handle with explicit disable_root and a default distro user."""
- # This test is identical to test_handle_no_cfg_and_default_root,
- # except this uses an explicit cfg value
- cfg = {"disable_root": True}
- keys = ["key1"]
- user = "clouduser"
- m_glob.return_value = [] # Return no matching keys to prevent removal
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cc_ssh.handle("name", cfg, cloud, LOG, None)
-
- options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user)
- options = options.replace("$DISABLE_USER", "root")
- self.assertEqual([mock.call(set(keys), user),
- mock.call(set(keys), "root", options=options)],
- m_setup_keys.call_args_list)
-
+ assert [
+ mock.call(set(), user),
+ mock.call(set(), "root", options=options),
+ ] == m_setup_keys.call_args_list
+
+ @pytest.mark.parametrize(
+ "cfg,mock_get_public_ssh_keys,empty_opts",
+ [
+ pytest.param({}, False, False, id="no_cfg"),
+ pytest.param(
+ {"disable_root": True},
+ False,
+ False,
+ id="explicit_disable_root",
+ ),
+ # When disable_root == False, the ssh redirect for root is skipped
+ pytest.param(
+ {"disable_root": False},
+ True,
+ True,
+ id="cfg_without_disable_root",
+ ),
+ ],
+ )
@mock.patch(MODPATH + "glob.glob")
@mock.patch(MODPATH + "ug_util.normalize_users_groups")
@mock.patch(MODPATH + "os.path.exists")
- def test_handle_cfg_without_disable_root(self, m_path_exists, m_nug,
- m_glob, m_setup_keys):
- """Test handle with disable_root == False."""
- # When disable_root == False, the ssh redirect for root is skipped
- cfg = {"disable_root": False}
+ def test_handle_default_root(
+ self,
+ m_path_exists,
+ m_nug,
+ m_glob,
+ m_setup_keys,
+ cfg,
+ mock_get_public_ssh_keys,
+ empty_opts,
+ ):
+ """Test handle with a default distro user."""
keys = ["key1"]
user = "clouduser"
m_glob.return_value = [] # Return no matching keys to prevent removal
# Mock os.path.exits to True to short-circuit the key writing logic
m_path_exists.return_value = True
m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
+ cloud = get_cloud(distro="ubuntu", metadata={"public-keys": keys})
+ if mock_get_public_ssh_keys:
+ cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
cc_ssh.handle("name", cfg, cloud, LOG, None)
- self.assertEqual([mock.call(set(keys), user),
- mock.call(set(keys), "root", options="")],
- m_setup_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_publish_hostkeys_default(
- self, m_path_exists, m_nug, m_glob, m_setup_keys):
- """Test handle with various configs for ssh_publish_hostkeys."""
- self._publish_hostkey_test_setup()
- cc_ssh.PUBLISH_HOST_KEYS = True
- keys = ["key1"]
- user = "clouduser"
- # Return no matching keys for first glob, test keys for second.
- m_glob.side_effect = iter([
- [],
- self.test_hostkey_files,
- ])
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cloud.datasource.publish_host_keys = mock.Mock()
-
- cfg = {}
- expected_call = [self.test_hostkeys[key_type] for key_type
- in KEY_NAMES_NO_DSA]
- cc_ssh.handle("name", cfg, cloud, LOG, None)
- self.assertEqual([mock.call(expected_call)],
- cloud.datasource.publish_host_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_publish_hostkeys_config_enable(
- self, m_path_exists, m_nug, m_glob, m_setup_keys):
- """Test handle with various configs for ssh_publish_hostkeys."""
- self._publish_hostkey_test_setup()
- cc_ssh.PUBLISH_HOST_KEYS = False
- keys = ["key1"]
- user = "clouduser"
- # Return no matching keys for first glob, test keys for second.
- m_glob.side_effect = iter([
- [],
- self.test_hostkey_files,
- ])
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cloud.datasource.publish_host_keys = mock.Mock()
-
- cfg = {'ssh_publish_hostkeys': {'enabled': True}}
- expected_call = [self.test_hostkeys[key_type] for key_type
- in KEY_NAMES_NO_DSA]
- cc_ssh.handle("name", cfg, cloud, LOG, None)
- self.assertEqual([mock.call(expected_call)],
- cloud.datasource.publish_host_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_publish_hostkeys_config_disable(
- self, m_path_exists, m_nug, m_glob, m_setup_keys):
- """Test handle with various configs for ssh_publish_hostkeys."""
- self._publish_hostkey_test_setup()
- cc_ssh.PUBLISH_HOST_KEYS = True
- keys = ["key1"]
- user = "clouduser"
- # Return no matching keys for first glob, test keys for second.
- m_glob.side_effect = iter([
- [],
- self.test_hostkey_files,
- ])
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cloud.datasource.publish_host_keys = mock.Mock()
-
- cfg = {'ssh_publish_hostkeys': {'enabled': False}}
- cc_ssh.handle("name", cfg, cloud, LOG, None)
- self.assertFalse(cloud.datasource.publish_host_keys.call_args_list)
- cloud.datasource.publish_host_keys.assert_not_called()
-
+ if empty_opts:
+ options = ""
+ else:
+ options = _replace_options(user)
+ assert [
+ mock.call(set(keys), user),
+ mock.call(set(keys), "root", options=options),
+ ] == m_setup_keys.call_args_list
+
+ @pytest.mark.parametrize(
+ "cfg, expected_key_types",
+ [
+ pytest.param({}, KEY_NAMES_NO_DSA, id="default"),
+ pytest.param(
+ {"ssh_publish_hostkeys": {"enabled": True}},
+ KEY_NAMES_NO_DSA,
+ id="config_enable",
+ ),
+ pytest.param(
+ {"ssh_publish_hostkeys": {"enabled": False}},
+ None,
+ id="config_disable",
+ ),
+ pytest.param(
+ {
+ "ssh_publish_hostkeys": {
+ "enabled": True,
+ "blacklist": ["dsa", "rsa"],
+ }
+ },
+ ["ecdsa", "ed25519"],
+ id="config_blacklist",
+ ),
+ pytest.param(
+ {"ssh_publish_hostkeys": {"enabled": True, "blacklist": []}},
+ cc_ssh.GENERATE_KEY_NAMES,
+ id="empty_blacklist",
+ ),
+ ],
+ )
@mock.patch(MODPATH + "glob.glob")
@mock.patch(MODPATH + "ug_util.normalize_users_groups")
@mock.patch(MODPATH + "os.path.exists")
- def test_handle_publish_hostkeys_config_blacklist(
- self, m_path_exists, m_nug, m_glob, m_setup_keys):
+ def test_handle_publish_hostkeys(
+ self,
+ m_path_exists,
+ m_nug,
+ m_glob,
+ m_setup_keys,
+ publish_hostkey_test_setup,
+ cfg,
+ expected_key_types,
+ ):
"""Test handle with various configs for ssh_publish_hostkeys."""
- self._publish_hostkey_test_setup()
+ test_hostkeys, test_hostkey_files = publish_hostkey_test_setup
cc_ssh.PUBLISH_HOST_KEYS = True
keys = ["key1"]
user = "clouduser"
# Return no matching keys for first glob, test keys for second.
m_glob.side_effect = iter([
[],
- self.test_hostkey_files,
+ test_hostkey_files,
])
# Mock os.path.exits to True to short-circuit the key writing logic
m_path_exists.return_value = True
m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
+ cloud = get_cloud(distro='ubuntu', metadata={'public-keys': keys})
cloud.datasource.publish_host_keys = mock.Mock()
- cfg = {'ssh_publish_hostkeys': {'enabled': True,
- 'blacklist': ['dsa', 'rsa']}}
- expected_call = [self.test_hostkeys[key_type] for key_type
- in ['ecdsa', 'ed25519']]
- cc_ssh.handle("name", cfg, cloud, LOG, None)
- self.assertEqual([mock.call(expected_call)],
- cloud.datasource.publish_host_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_publish_hostkeys_empty_blacklist(
- self, m_path_exists, m_nug, m_glob, m_setup_keys):
- """Test handle with various configs for ssh_publish_hostkeys."""
- self._publish_hostkey_test_setup()
- cc_ssh.PUBLISH_HOST_KEYS = True
- keys = ["key1"]
- user = "clouduser"
- # Return no matching keys for first glob, test keys for second.
- m_glob.side_effect = iter([
- [],
- self.test_hostkey_files,
- ])
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cloud.datasource.publish_host_keys = mock.Mock()
-
- cfg = {'ssh_publish_hostkeys': {'enabled': True,
- 'blacklist': []}}
- expected_call = [self.test_hostkeys[key_type] for key_type
- in cc_ssh.GENERATE_KEY_NAMES]
+ expected_calls = []
+ if expected_key_types is not None:
+ expected_calls = [
+ mock.call(
+ [
+ test_hostkeys[key_type]
+ for key_type in expected_key_types
+ ]
+ )
+ ]
cc_ssh.handle("name", cfg, cloud, LOG, None)
- self.assertEqual([mock.call(expected_call)],
- cloud.datasource.publish_host_keys.call_args_list)
+ assert (
+ expected_calls == cloud.datasource.publish_host_keys.call_args_list
+ )
@mock.patch(MODPATH + "ug_util.normalize_users_groups")
@mock.patch(MODPATH + "util.write_file")
@@ -360,7 +287,7 @@ class TestHandleSsh(CiTestCase):
public_name = "{}_public".format(key_type)
cert_name = "{}_certificate".format(key_type)
- # Actual key contents don"t have to be realistic
+ # Actual key contents don't have to be realistic
private_value = "{}_PRIVATE_KEY".format(key_type)
public_value = "{}_PUBLIC_KEY".format(key_type)
cert_value = "{}_CERT_KEY".format(key_type)
@@ -397,9 +324,9 @@ class TestHandleSsh(CiTestCase):
m_nug.return_value = ([], {})
with mock.patch(MODPATH + 'ssh_util.parse_ssh_config',
return_value=[]):
- cc_ssh.handle("name", cfg, self.tmp_cloud(distro='ubuntu'),
- LOG, None)
+ cc_ssh.handle("name", cfg, get_cloud(distro="ubuntu"), LOG, None)
# Check that all expected output has been done.
for call_ in expected_calls:
- self.assertIn(call_, m_write_file.call_args_list)
+ assert call_ in m_write_file.call_args_list
+
--
2.33.0