python-paramiko/backport-0002-CVE-2023-48795.patch
zhangpan 30d6fa75a8 fix CVE-2023-48795
(cherry picked from commit eee395ba519760fae8c5eb1a104027c07fd5a182)
2024-01-12 09:52:41 +08:00

172 lines
6.7 KiB
Diff

From 773a174fb1e40e1d18dbe2625e16337ea401119e Mon Sep 17 00:00:00 2001
From: Jeff Forcier <jeff@bitprophet.org>
Date: Fri, 15 Dec 2023 23:59:12 -0500
Subject: [PATCH] Basic strict-kex-mode agreement mechanics work
Reference:https://github.com/paramiko/paramiko/commit/773a174fb1e40e1d18dbe2625e16337ea401119e
Conflict:The comments are different. Therefore, the transport.py file is adapted
Due to different versions, some test cases do not exist. Therefore, context adaptation is required when new test cases are added
---
paramiko/transport.py | 38 +++++++++++++++++++++++++++++++++---
tests/test_transport.py | 43 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 78 insertions(+), 3 deletions(-)
diff --git a/paramiko/transport.py b/paramiko/transport.py
index fd26371..2d6d581 100644
--- a/paramiko/transport.py
+++ b/paramiko/transport.py
@@ -329,6 +329,7 @@ class Transport(threading.Thread, ClosingContextManager):
gss_deleg_creds=True,
disabled_algorithms=None,
server_sig_algs=True,
+ strict_kex=True,
):
"""
Create a new SSH session over an existing socket, or socket-like
@@ -395,6 +396,10 @@ class Transport(threading.Thread, ClosingContextManager):
Whether to send an extra message to compatible clients, in server
mode, with a list of supported pubkey algorithms. Default:
``True``.
+ :param bool strict_kex:
+ Whether to advertise (and implement, if client also advertises
+ support for) a "strict kex" mode for safer handshaking. Default:
+ ``True``.
.. versionchanged:: 1.15
Added the ``default_window_size`` and ``default_max_packet_size``
@@ -405,10 +410,14 @@ class Transport(threading.Thread, ClosingContextManager):
Added the ``disabled_algorithms`` kwarg.
.. versionchanged:: 2.9
Added the ``server_sig_algs`` kwarg.
+ .. versionchanged:: 3.4
+ Added the ``strict_kex`` kwarg.
"""
self.active = False
self.hostname = None
self.server_extensions = {}
+ self.advertise_strict_kex = strict_kex
+ self.agreed_on_strict_kex = False
if isinstance(sock, string_types):
# convert "host:port" into (host, port)
@@ -2342,12 +2351,18 @@ class Transport(threading.Thread, ClosingContextManager):
)
else:
available_server_keys = self.preferred_keys
- # Signal support for MSG_EXT_INFO.
+ # Signal support for MSG_EXT_INFO so server will send it to us.
# NOTE: doing this here handily means we don't even consider this
# value when agreeing on real kex algo to use (which is a common
# pitfall when adding this apparently).
kex_algos.append("ext-info-c")
+ # Similar to ext-info, but used in both server modes, so done outside
+ # of above if/else.
+ if self.advertise_strict_kex:
+ which = "s" if self.server_mode else "c"
+ kex_algos.append(f"kex-strict-{which}-v00@openssh.com")
+
m = Message()
m.add_byte(cMSG_KEXINIT)
m.add_bytes(os.urandom(16))
@@ -2427,11 +2442,28 @@ class Transport(threading.Thread, ClosingContextManager):
self._log(DEBUG, "kex follows: {}".format(kex_follows))
self._log(DEBUG, "=== Key exchange agreements ===")
- # Strip out ext-info "kex algo"
+ # Record, and strip out, ext-info and/or strict-kex non-algorithms
self._remote_ext_info = None
+ self._remote_strict_kex = None
+ to_pop = []
for i, algo in enumerate(kex_algo_list):
if algo.startswith("ext-info-"):
- self._remote_ext_info = kex_algo_list.pop(i)
+ self._remote_ext_info = algo
+ to_pop.insert(0, i)
+ elif algo.startswith("kex-strict-"):
+ # NOTE: this is what we are expecting from the /remote/ end.
+ which = "c" if self.server_mode else "s"
+ expected = f"kex-strict-{which}-v00@openssh.com"
+ # Set strict mode if agreed.
+ self.agreed_on_strict_kex = (
+ algo == expected and self.advertise_strict_kex
+ )
+ self._log(
+ DEBUG, f"Strict kex mode: {self.agreed_on_strict_kex}"
+ )
+ to_pop.insert(0, i)
+ for i in to_pop:
+ kex_algo_list.pop(i)
# as a server, we pick the first item in the client's list that we
# support.
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 6bc0be8..c8cd498 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -24,6 +24,7 @@ from __future__ import with_statement
from binascii import hexlify
from contextlib import contextmanager
+import itertools
import select
import socket
import time
@@ -63,6 +64,7 @@ from paramiko.message import Message
from .util import needs_builtin, _support, requires_sha1_signing, slow
from .loop import LoopSocket
+from pytest import skip, mark
LONG_BANNER = """\
@@ -1463,3 +1465,44 @@ class TestSHA2SignaturePubkeys(unittest.TestCase):
) as (tc, ts):
assert tc.is_authenticated()
assert tc._agreed_pubkey_algorithm == "rsa-sha2-256"
+
+
+class TestStrictKex:
+ def test_kex_algos_includes_kex_strict_c(self):
+ with server() as (tc, _):
+ kex = tc._get_latest_kex_init()
+ assert "kex-strict-c-v00@openssh.com" in kex["kex_algo_list"]
+
+ @mark.parametrize(
+ "server_active,client_active",
+ itertools.product([True, False], repeat=2),
+ )
+ def test_mode_agreement(self, server_active, client_active):
+ with server(
+ server_init=dict(strict_kex=server_active),
+ client_init=dict(strict_kex=client_active),
+ ) as (tc, ts):
+ if server_active and client_active:
+ assert tc.agreed_on_strict_kex is True
+ assert ts.agreed_on_strict_kex is True
+ else:
+ assert tc.agreed_on_strict_kex is False
+ assert ts.agreed_on_strict_kex is False
+
+ def test_mode_advertised_by_default(self):
+ # NOTE: no explicit strict_kex overrides...
+ with server() as (tc, ts):
+ assert all(
+ (
+ tc.advertise_strict_kex,
+ tc.agreed_on_strict_kex,
+ ts.advertise_strict_kex,
+ ts.agreed_on_strict_kex,
+ )
+ )
+
+ def test_sequence_numbers_reset_on_newkeys(self):
+ skip()
+
+ def test_error_raised_on_out_of_order_handshakes(self):
+ skip()
--
2.33.0