diff --git a/Lib/http/server.py b/Lib/http/server.py index 160d3eefc7cbdf..bf6d271668b17d 100644 --- a/Lib/http/server.py +++ b/Lib/http/server.py @@ -112,6 +112,11 @@ DEFAULT_ERROR_CONTENT_TYPE = "text/html;charset=utf-8" +def _validate_header_string(value): + """Validate header values preventing CRLF injection.""" + if isinstance(value, str) and ('\r' in value or '\n' in value): + raise ValueError('Invalid header name/value: contains CR or LF') + class HTTPServer(socketserver.TCPServer): allow_reuse_address = True # Seems to make sense in testing environment @@ -547,12 +552,15 @@ def send_response_only(self, code, message=None): message = '' if not hasattr(self, '_headers_buffer'): self._headers_buffer = [] + _validate_header_string(message) self._headers_buffer.append(("%s %d %s\r\n" % (self.protocol_version, code, message)).encode( 'latin-1', 'strict')) def send_header(self, keyword, value): """Send a MIME header to the headers buffer.""" + _validate_header_string(keyword) + _validate_header_string(value) if self.request_version != 'HTTP/0.9': if not hasattr(self, '_headers_buffer'): self._headers_buffer = [] diff --git a/Lib/test/test_httpservers.py b/Lib/test/test_httpservers.py index 7da5e3a1957588..924ebbcb88cbb9 100644 --- a/Lib/test/test_httpservers.py +++ b/Lib/test/test_httpservers.py @@ -1052,6 +1052,19 @@ def test_header_buffering_of_send_response_only(self): handler.end_headers() self.assertEqual(output.numWrites, 1) + def test_send_response_only_rejects_crlf_message(self): + input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') + output = AuditableBytesIO() + handler = SocketlessRequestHandler() + handler.rfile = input + handler.wfile = output + handler.request_version = 'HTTP/1.1' + + with self.assertRaises(ValueError) as ctx: + handler.send_response_only(418, 'value\r\nSet-Cookie: custom=true') + self.assertIn('Invalid header name/value: contains CR or LF', + str(ctx.exception)) + def test_header_buffering_of_send_header(self): input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') @@ -1068,6 +1081,32 @@ def test_header_buffering_of_send_header(self): self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n') self.assertEqual(output.numWrites, 1) + def test_crlf_injection_in_header_value(self): + input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') + output = AuditableBytesIO() + handler = SocketlessRequestHandler() + handler.rfile = input + handler.wfile = output + handler.request_version = 'HTTP/1.1' + + with self.assertRaises(ValueError) as ctx: + handler.send_header('X-Custom', 'value\r\nSet-Cookie: custom=true') + self.assertIn('Invalid header name/value: contains CR or LF', + str(ctx.exception)) + + def test_crlf_injection_in_header_name(self): + input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') + output = AuditableBytesIO() + handler = SocketlessRequestHandler() + handler.rfile = input + handler.wfile = output + handler.request_version = 'HTTP/1.1' + + with self.assertRaises(ValueError) as ctx: + handler.send_header('X-Inj\r\nSet-Cookie', 'value') + self.assertIn('Invalid header name/value: contains CR or LF', + str(ctx.exception)) + def test_header_unbuffered_when_continue(self): def _readAndReseek(f): diff --git a/Lib/test/test_wsgiref.py b/Lib/test/test_wsgiref.py index e04a4d2c2218a3..5c88dc989aa919 100644 --- a/Lib/test/test_wsgiref.py +++ b/Lib/test/test_wsgiref.py @@ -503,6 +503,39 @@ def testExtras(self): '\r\n' ) + def test_crlf_rejection_in_setitem(self): + h = Headers() + for crlf in ('\r\n', '\r', '\n'): + with self.subTest(crlf_repr=repr(crlf)): + with self.assertRaises(ValueError) as ctx: + h['X-Custom'] = f'value{crlf}Set-Cookie: test' + self.assertIn('CR or LF', str(ctx.exception)) + + def test_crlf_rejection_in_setdefault(self): + for crlf in ('\r\n', '\r', '\n'): + with self.subTest(crlf_repr=repr(crlf)): + h = Headers() + with self.assertRaises(ValueError) as ctx: + h.setdefault('X-Custom', f'value{crlf}Set-Cookie: test') + self.assertIn('CR or LF', str(ctx.exception)) + + def test_crlf_rejection_in_add_header(self): + for crlf in ('\r\n', '\r', '\n'): + with self.subTest(location='value', crlf_repr=repr(crlf)): + h = Headers() + with self.assertRaises(ValueError) as ctx: + h.add_header('X-Custom', f'value{crlf}Set-Cookie: test') + self.assertIn('CR or LF', str(ctx.exception)) + + with self.subTest(location='param', crlf_repr=repr(crlf)): + h = Headers() + with self.assertRaises(ValueError) as ctx: + h.add_header('Content-Disposition', + 'attachment', + filename=f'test{crlf}.txt') + self.assertIn('CR or LF', str(ctx.exception)) + + class ErrorHandler(BaseCGIHandler): """Simple handler subclass for testing BaseHandler""" diff --git a/Lib/wsgiref/headers.py b/Lib/wsgiref/headers.py index c78879f80c7df2..87c218dd572c4b 100644 --- a/Lib/wsgiref/headers.py +++ b/Lib/wsgiref/headers.py @@ -35,12 +35,14 @@ def __init__(self, headers=None): self._headers = headers if __debug__: for k, v in headers: - self._convert_string_type(k) - self._convert_string_type(v) + self._validate_header_string(k) + self._validate_header_string(v) - def _convert_string_type(self, value): - """Convert/check value type.""" + def _validate_header_string(self, value): + """Validate header type and value.""" if type(value) is str: + if '\r' in value or '\n' in value: + raise ValueError('Invalid header name/value: contains CR or LF') return value raise AssertionError("Header names/values must be" " of type str (got {0})".format(repr(value))) @@ -53,14 +55,15 @@ def __setitem__(self, name, val): """Set the value of a header.""" del self[name] self._headers.append( - (self._convert_string_type(name), self._convert_string_type(val))) + (self._validate_header_string(name), + self._validate_header_string(val))) def __delitem__(self,name): """Delete all occurrences of a header, if present. Does *not* raise an exception if the header is missing. """ - name = self._convert_string_type(name.lower()) + name = self._validate_header_string(name.lower()) self._headers[:] = [kv for kv in self._headers if kv[0].lower() != name] def __getitem__(self,name): @@ -87,13 +90,13 @@ def get_all(self, name): fields deleted and re-inserted are always appended to the header list. If no fields exist with the given name, returns an empty list. """ - name = self._convert_string_type(name.lower()) + name = self._validate_header_string(name.lower()) return [kv[1] for kv in self._headers if kv[0].lower()==name] def get(self,name,default=None): """Get the first header value for 'name', or return 'default'""" - name = self._convert_string_type(name.lower()) + name = self._validate_header_string(name.lower()) for k,v in self._headers: if k.lower()==name: return v @@ -148,8 +151,8 @@ def setdefault(self,name,value): and value 'value'.""" result = self.get(name) if result is None: - self._headers.append((self._convert_string_type(name), - self._convert_string_type(value))) + self._headers.append((self._validate_header_string(name), + self._validate_header_string(value))) return value else: return result @@ -172,13 +175,13 @@ def add_header(self, _name, _value, **_params): """ parts = [] if _value is not None: - _value = self._convert_string_type(_value) + _value = self._validate_header_string(_value) parts.append(_value) for k, v in _params.items(): - k = self._convert_string_type(k) + k = self._validate_header_string(k) if v is None: parts.append(k.replace('_', '-')) else: - v = self._convert_string_type(v) + v = self._validate_header_string(v) parts.append(_formatparam(k.replace('_', '-'), v)) - self._headers.append((self._convert_string_type(_name), "; ".join(parts))) + self._headers.append((self._validate_header_string(_name), "; ".join(parts))) diff --git a/Misc/NEWS.d/next/Library/2025-12-11-22-28-21.gh-issue-142533.SN0f9_.rst b/Misc/NEWS.d/next/Library/2025-12-11-22-28-21.gh-issue-142533.SN0f9_.rst new file mode 100644 index 00000000000000..2b8f567d2d497b --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-12-11-22-28-21.gh-issue-142533.SN0f9_.rst @@ -0,0 +1,2 @@ +Reject CR/LF in HTTP headers in ``http.server`` and ``wsgiref.headers`` to +prevent CRLF injection attacks.