From 773a174fb1e40e1d18dbe2625e16337ea401119e Mon Sep 17 00:00:00 2001 From: Jeff Forcier Date: Fri, 15 Dec 2023 23:59:12 -0500 Subject: [PATCH] Basic strict-kex-mode agreement mechanics work Reference:https://github.com/paramiko/paramiko/commit/773a174fb1e40e1d18dbe2625e16337ea401119e Conflict:The comments are different. Therefore, the transport.py file is adapted Due to different versions, some test cases do not exist. Therefore, context adaptation is required when new test cases are added --- paramiko/transport.py | 38 +++++++++++++++++++++++++++++++++--- tests/test_transport.py | 43 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 3 deletions(-) diff --git a/paramiko/transport.py b/paramiko/transport.py index fd26371..2d6d581 100644 --- a/paramiko/transport.py +++ b/paramiko/transport.py @@ -329,6 +329,7 @@ class Transport(threading.Thread, ClosingContextManager): gss_deleg_creds=True, disabled_algorithms=None, server_sig_algs=True, + strict_kex=True, ): """ Create a new SSH session over an existing socket, or socket-like @@ -395,6 +396,10 @@ class Transport(threading.Thread, ClosingContextManager): Whether to send an extra message to compatible clients, in server mode, with a list of supported pubkey algorithms. Default: ``True``. + :param bool strict_kex: + Whether to advertise (and implement, if client also advertises + support for) a "strict kex" mode for safer handshaking. Default: + ``True``. .. versionchanged:: 1.15 Added the ``default_window_size`` and ``default_max_packet_size`` @@ -405,10 +410,14 @@ class Transport(threading.Thread, ClosingContextManager): Added the ``disabled_algorithms`` kwarg. .. versionchanged:: 2.9 Added the ``server_sig_algs`` kwarg. + .. versionchanged:: 3.4 + Added the ``strict_kex`` kwarg. """ self.active = False self.hostname = None self.server_extensions = {} + self.advertise_strict_kex = strict_kex + self.agreed_on_strict_kex = False if isinstance(sock, string_types): # convert "host:port" into (host, port) @@ -2342,12 +2351,18 @@ class Transport(threading.Thread, ClosingContextManager): ) else: available_server_keys = self.preferred_keys - # Signal support for MSG_EXT_INFO. + # Signal support for MSG_EXT_INFO so server will send it to us. # NOTE: doing this here handily means we don't even consider this # value when agreeing on real kex algo to use (which is a common # pitfall when adding this apparently). kex_algos.append("ext-info-c") + # Similar to ext-info, but used in both server modes, so done outside + # of above if/else. + if self.advertise_strict_kex: + which = "s" if self.server_mode else "c" + kex_algos.append(f"kex-strict-{which}-v00@openssh.com") + m = Message() m.add_byte(cMSG_KEXINIT) m.add_bytes(os.urandom(16)) @@ -2427,11 +2442,28 @@ class Transport(threading.Thread, ClosingContextManager): self._log(DEBUG, "kex follows: {}".format(kex_follows)) self._log(DEBUG, "=== Key exchange agreements ===") - # Strip out ext-info "kex algo" + # Record, and strip out, ext-info and/or strict-kex non-algorithms self._remote_ext_info = None + self._remote_strict_kex = None + to_pop = [] for i, algo in enumerate(kex_algo_list): if algo.startswith("ext-info-"): - self._remote_ext_info = kex_algo_list.pop(i) + self._remote_ext_info = algo + to_pop.insert(0, i) + elif algo.startswith("kex-strict-"): + # NOTE: this is what we are expecting from the /remote/ end. + which = "c" if self.server_mode else "s" + expected = f"kex-strict-{which}-v00@openssh.com" + # Set strict mode if agreed. + self.agreed_on_strict_kex = ( + algo == expected and self.advertise_strict_kex + ) + self._log( + DEBUG, f"Strict kex mode: {self.agreed_on_strict_kex}" + ) + to_pop.insert(0, i) + for i in to_pop: + kex_algo_list.pop(i) # as a server, we pick the first item in the client's list that we # support. diff --git a/tests/test_transport.py b/tests/test_transport.py index 6bc0be8..c8cd498 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -24,6 +24,7 @@ from __future__ import with_statement from binascii import hexlify from contextlib import contextmanager +import itertools import select import socket import time @@ -63,6 +64,7 @@ from paramiko.message import Message from .util import needs_builtin, _support, requires_sha1_signing, slow from .loop import LoopSocket +from pytest import skip, mark LONG_BANNER = """\ @@ -1463,3 +1465,44 @@ class TestSHA2SignaturePubkeys(unittest.TestCase): ) as (tc, ts): assert tc.is_authenticated() assert tc._agreed_pubkey_algorithm == "rsa-sha2-256" + + +class TestStrictKex: + def test_kex_algos_includes_kex_strict_c(self): + with server() as (tc, _): + kex = tc._get_latest_kex_init() + assert "kex-strict-c-v00@openssh.com" in kex["kex_algo_list"] + + @mark.parametrize( + "server_active,client_active", + itertools.product([True, False], repeat=2), + ) + def test_mode_agreement(self, server_active, client_active): + with server( + server_init=dict(strict_kex=server_active), + client_init=dict(strict_kex=client_active), + ) as (tc, ts): + if server_active and client_active: + assert tc.agreed_on_strict_kex is True + assert ts.agreed_on_strict_kex is True + else: + assert tc.agreed_on_strict_kex is False + assert ts.agreed_on_strict_kex is False + + def test_mode_advertised_by_default(self): + # NOTE: no explicit strict_kex overrides... + with server() as (tc, ts): + assert all( + ( + tc.advertise_strict_kex, + tc.agreed_on_strict_kex, + ts.advertise_strict_kex, + ts.agreed_on_strict_kex, + ) + ) + + def test_sequence_numbers_reset_on_newkeys(self): + skip() + + def test_error_raised_on_out_of_order_handshakes(self): + skip() -- 2.33.0