|
18 | 18 | import hashlib |
19 | 19 | import logging |
20 | 20 |
|
| 21 | +import urllib3.response |
| 22 | + |
21 | 23 | from google.resumable_media import _download |
22 | 24 | from google.resumable_media import common |
23 | 25 | from google.resumable_media.requests import _helpers |
24 | 26 |
|
25 | 27 |
|
26 | 28 | _LOGGER = logging.getLogger(__name__) |
27 | | -_HASH_HEADER = u"x-goog-hash" |
| 29 | +_SINGLE_GET_CHUNK_SIZE = 8192 |
| 30 | +_HASH_HEADER = u'x-goog-hash' |
28 | 31 | _MISSING_MD5 = u"""\ |
29 | 32 | No MD5 checksum was returned from the service while downloading {} |
30 | 33 | (which happens for composite objects), so client-side content integrity |
@@ -113,13 +116,12 @@ def _write_to_stream(self, response): |
113 | 116 | with response: |
114 | 117 | # NOTE: This might "donate" ``md5_hash`` to the decoder and replace |
115 | 118 | # it with a ``_DoNothingHash``. |
116 | | - body_iter = response.raw.stream( |
117 | | - _helpers._SINGLE_GET_CHUNK_SIZE, decode_content=False |
118 | | - ) |
| 119 | + local_hash = _add_decoder(response.raw, md5_hash) |
| 120 | + body_iter = response.iter_content( |
| 121 | + chunk_size=_SINGLE_GET_CHUNK_SIZE, decode_unicode=False) |
119 | 122 | for chunk in body_iter: |
120 | 123 | self._stream.write(chunk) |
121 | | - md5_hash.update(chunk) |
122 | | - response._content_consumed = True |
| 124 | + local_hash.update(chunk) |
123 | 125 |
|
124 | 126 | if expected_md5_hash is None: |
125 | 127 | return |
@@ -155,15 +157,16 @@ def consume(self, transport): |
155 | 157 | """ |
156 | 158 | method, url, payload, headers = self._prepare_request() |
157 | 159 | # NOTE: We assume "payload is None" but pass it along anyway. |
158 | | - response = _helpers.http_request( |
159 | | - transport, |
160 | | - method, |
161 | | - url, |
162 | | - data=payload, |
163 | | - headers=headers, |
164 | | - retry_strategy=self._retry_strategy, |
165 | | - stream=True, |
166 | | - ) |
| 160 | + request_kwargs = { |
| 161 | + u'data': payload, |
| 162 | + u'headers': headers, |
| 163 | + u'retry_strategy': self._retry_strategy, |
| 164 | + } |
| 165 | + if self._stream is not None: |
| 166 | + request_kwargs[u'stream'] = True |
| 167 | + |
| 168 | + result = _helpers.http_request( |
| 169 | + transport, method, url, **request_kwargs) |
167 | 170 |
|
168 | 171 | self._process_response(response) |
169 | 172 |
|
@@ -216,17 +219,11 @@ def consume_next_chunk(self, transport): |
216 | 219 | """ |
217 | 220 | method, url, payload, headers = self._prepare_request() |
218 | 221 | # NOTE: We assume "payload is None" but pass it along anyway. |
219 | | - response = _helpers.http_request( |
220 | | - transport, |
221 | | - method, |
222 | | - url, |
223 | | - data=payload, |
224 | | - headers=headers, |
225 | | - retry_strategy=self._retry_strategy, |
226 | | - stream=True, |
227 | | - ) |
228 | | - self._process_response(response) |
229 | | - return response |
| 222 | + result = _helpers.http_request( |
| 223 | + transport, method, url, data=payload, headers=headers, |
| 224 | + retry_strategy=self._retry_strategy) |
| 225 | + self._process_response(result) |
| 226 | + return result |
230 | 227 |
|
231 | 228 |
|
232 | 229 | def _parse_md5_header(header_value, response): |
@@ -294,3 +291,58 @@ def update(self, unused_chunk): |
294 | 291 | Args: |
295 | 292 | unused_chunk (bytes): A chunk of data. |
296 | 293 | """ |
| 294 | + |
| 295 | + |
| 296 | +def _add_decoder(response_raw, md5_hash): |
| 297 | + """Patch the ``_decoder`` on a ``urllib3`` response. |
| 298 | +
|
| 299 | + This is so that we can intercept the compressed bytes before they are |
| 300 | + decoded. |
| 301 | +
|
| 302 | + Only patches if the content encoding is ``gzip``. |
| 303 | +
|
| 304 | + Args: |
| 305 | + response_raw (urllib3.response.HTTPResponse): The raw response for |
| 306 | + an HTTP request. |
| 307 | + md5_hash (Union[_DoNothingHash, hashlib.md5]): A hash function which |
| 308 | + will get updated when it encounters compressed bytes. |
| 309 | +
|
| 310 | + Returns: |
| 311 | + Union[_DoNothingHash, hashlib.md5]: Either the original ``md5_hash`` |
| 312 | + if ``_decoder`` is not patched. Otherwise, returns a ``_DoNothingHash`` |
| 313 | + since the caller will no longer need to hash to decoded bytes. |
| 314 | + """ |
| 315 | + encoding = response_raw.headers.get(u'content-encoding', u'').lower() |
| 316 | + if encoding != u'gzip': |
| 317 | + return md5_hash |
| 318 | + |
| 319 | + response_raw._decoder = _GzipDecoder(md5_hash) |
| 320 | + return _DoNothingHash() |
| 321 | + |
| 322 | + |
| 323 | +class _GzipDecoder(urllib3.response.GzipDecoder): |
| 324 | + """Custom subclass of ``urllib3`` decoder for ``gzip``-ed bytes. |
| 325 | +
|
| 326 | + Allows an MD5 hash function to see the compressed bytes before they are |
| 327 | + decoded. This way the hash of the compressed value can be computed. |
| 328 | +
|
| 329 | + Args: |
| 330 | + md5_hash (Union[_DoNothingHash, hashlib.md5]): A hash function which |
| 331 | + will get updated when it encounters compressed bytes. |
| 332 | + """ |
| 333 | + |
| 334 | + def __init__(self, md5_hash): |
| 335 | + super(_GzipDecoder, self).__init__() |
| 336 | + self._md5_hash = md5_hash |
| 337 | + |
| 338 | + def decompress(self, data): |
| 339 | + """Decompress the bytes. |
| 340 | +
|
| 341 | + Args: |
| 342 | + data (bytes): The compressed bytes to be decompressed. |
| 343 | +
|
| 344 | + Returns: |
| 345 | + bytes: The decompressed bytes from ``data``. |
| 346 | + """ |
| 347 | + self._md5_hash.update(data) |
| 348 | + return super(_GzipDecoder, self).decompress(data) |
0 commit comments