Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Lib/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@

DEFAULT_ERROR_CONTENT_TYPE = "text/html;charset=utf-8"

def _validate_header_string(value):
"""Validate header values preventing CRLF injection."""
if isinstance(value, str) and ('\r' in value or '\n' in value):
raise ValueError('Invalid header name/value: contains CR or LF')

class HTTPServer(socketserver.TCPServer):

allow_reuse_address = True # Seems to make sense in testing environment
Expand Down Expand Up @@ -547,12 +552,15 @@ def send_response_only(self, code, message=None):
message = ''
if not hasattr(self, '_headers_buffer'):
self._headers_buffer = []
_validate_header_string(message)
self._headers_buffer.append(("%s %d %s\r\n" %
(self.protocol_version, code, message)).encode(
'latin-1', 'strict'))

def send_header(self, keyword, value):
"""Send a MIME header to the headers buffer."""
_validate_header_string(keyword)
_validate_header_string(value)
if self.request_version != 'HTTP/0.9':
if not hasattr(self, '_headers_buffer'):
self._headers_buffer = []
Expand Down
39 changes: 39 additions & 0 deletions Lib/test/test_httpservers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,19 @@ def test_header_buffering_of_send_response_only(self):
handler.end_headers()
self.assertEqual(output.numWrites, 1)

def test_send_response_only_rejects_crlf_message(self):
input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
output = AuditableBytesIO()
handler = SocketlessRequestHandler()
handler.rfile = input
handler.wfile = output
handler.request_version = 'HTTP/1.1'

with self.assertRaises(ValueError) as ctx:
handler.send_response_only(418, 'value\r\nSet-Cookie: custom=true')
self.assertIn('Invalid header name/value: contains CR or LF',
str(ctx.exception))

def test_header_buffering_of_send_header(self):

input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
Expand All @@ -1068,6 +1081,32 @@ def test_header_buffering_of_send_header(self):
self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n')
self.assertEqual(output.numWrites, 1)

def test_crlf_injection_in_header_value(self):
input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
output = AuditableBytesIO()
handler = SocketlessRequestHandler()
handler.rfile = input
handler.wfile = output
handler.request_version = 'HTTP/1.1'

with self.assertRaises(ValueError) as ctx:
handler.send_header('X-Custom', 'value\r\nSet-Cookie: custom=true')
self.assertIn('Invalid header name/value: contains CR or LF',
str(ctx.exception))

def test_crlf_injection_in_header_name(self):
input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
output = AuditableBytesIO()
handler = SocketlessRequestHandler()
handler.rfile = input
handler.wfile = output
handler.request_version = 'HTTP/1.1'

with self.assertRaises(ValueError) as ctx:
handler.send_header('X-Inj\r\nSet-Cookie', 'value')
self.assertIn('Invalid header name/value: contains CR or LF',
str(ctx.exception))

def test_header_unbuffered_when_continue(self):

def _readAndReseek(f):
Expand Down
33 changes: 33 additions & 0 deletions Lib/test/test_wsgiref.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,39 @@ def testExtras(self):
'\r\n'
)

def test_crlf_rejection_in_setitem(self):
h = Headers()
for crlf in ('\r\n', '\r', '\n'):
with self.subTest(crlf_repr=repr(crlf)):
with self.assertRaises(ValueError) as ctx:
h['X-Custom'] = f'value{crlf}Set-Cookie: test'
self.assertIn('CR or LF', str(ctx.exception))

def test_crlf_rejection_in_setdefault(self):
for crlf in ('\r\n', '\r', '\n'):
with self.subTest(crlf_repr=repr(crlf)):
h = Headers()
with self.assertRaises(ValueError) as ctx:
h.setdefault('X-Custom', f'value{crlf}Set-Cookie: test')
self.assertIn('CR or LF', str(ctx.exception))

def test_crlf_rejection_in_add_header(self):
for crlf in ('\r\n', '\r', '\n'):
with self.subTest(location='value', crlf_repr=repr(crlf)):
h = Headers()
with self.assertRaises(ValueError) as ctx:
h.add_header('X-Custom', f'value{crlf}Set-Cookie: test')
self.assertIn('CR or LF', str(ctx.exception))

with self.subTest(location='param', crlf_repr=repr(crlf)):
h = Headers()
with self.assertRaises(ValueError) as ctx:
h.add_header('Content-Disposition',
'attachment',
filename=f'test{crlf}.txt')
self.assertIn('CR or LF', str(ctx.exception))


class ErrorHandler(BaseCGIHandler):
"""Simple handler subclass for testing BaseHandler"""

Expand Down
31 changes: 17 additions & 14 deletions Lib/wsgiref/headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ def __init__(self, headers=None):
self._headers = headers
if __debug__:
for k, v in headers:
self._convert_string_type(k)
self._convert_string_type(v)
self._validate_header_string(k)
self._validate_header_string(v)

def _convert_string_type(self, value):
"""Convert/check value type."""
def _validate_header_string(self, value):
"""Validate header type and value."""
if type(value) is str:
if '\r' in value or '\n' in value:
raise ValueError('Invalid header name/value: contains CR or LF')
return value
raise AssertionError("Header names/values must be"
" of type str (got {0})".format(repr(value)))
Expand All @@ -53,14 +55,15 @@ def __setitem__(self, name, val):
"""Set the value of a header."""
del self[name]
self._headers.append(
(self._convert_string_type(name), self._convert_string_type(val)))
(self._validate_header_string(name),
self._validate_header_string(val)))

def __delitem__(self,name):
"""Delete all occurrences of a header, if present.

Does *not* raise an exception if the header is missing.
"""
name = self._convert_string_type(name.lower())
name = self._validate_header_string(name.lower())
self._headers[:] = [kv for kv in self._headers if kv[0].lower() != name]

def __getitem__(self,name):
Expand All @@ -87,13 +90,13 @@ def get_all(self, name):
fields deleted and re-inserted are always appended to the header list.
If no fields exist with the given name, returns an empty list.
"""
name = self._convert_string_type(name.lower())
name = self._validate_header_string(name.lower())
return [kv[1] for kv in self._headers if kv[0].lower()==name]


def get(self,name,default=None):
"""Get the first header value for 'name', or return 'default'"""
name = self._convert_string_type(name.lower())
name = self._validate_header_string(name.lower())
for k,v in self._headers:
if k.lower()==name:
return v
Expand Down Expand Up @@ -148,8 +151,8 @@ def setdefault(self,name,value):
and value 'value'."""
result = self.get(name)
if result is None:
self._headers.append((self._convert_string_type(name),
self._convert_string_type(value)))
self._headers.append((self._validate_header_string(name),
self._validate_header_string(value)))
return value
else:
return result
Expand All @@ -172,13 +175,13 @@ def add_header(self, _name, _value, **_params):
"""
parts = []
if _value is not None:
_value = self._convert_string_type(_value)
_value = self._validate_header_string(_value)
parts.append(_value)
for k, v in _params.items():
k = self._convert_string_type(k)
k = self._validate_header_string(k)
if v is None:
parts.append(k.replace('_', '-'))
else:
v = self._convert_string_type(v)
v = self._validate_header_string(v)
parts.append(_formatparam(k.replace('_', '-'), v))
self._headers.append((self._convert_string_type(_name), "; ".join(parts)))
self._headers.append((self._validate_header_string(_name), "; ".join(parts)))
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Reject CR/LF in HTTP headers in ``http.server`` and ``wsgiref.headers`` to
prevent CRLF injection attacks.
Loading