Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions miio/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,32 @@ def _decode(self, obj, context, path):
_LOGGER.debug("Unable to decrypt, returning raw bytes: %s", obj)
return obj

decoded = decrypted.decode('utf-8')
try:
return json.loads(decoded)
except:
# list of adaption functions for malformed json payload (quirks)
decrypted_quirks = [
# try without modifications first
lambda decrypted_bytes: decrypted_bytes,
# powerstrip returns malformed JSON if the device is not
# connected to the cloud, so we try to fix it here carefully.
lambda decrypted_bytes: decrypted_bytes.replace(b',,"otu_stat"', b',"otu_stat"'),
# xiaomi cloud returns malformed json when answering _sync.batch_gen_room_up_url
# command so try to sanitize it
lambda decrypted_bytes:
decrypted_bytes[:decrypted_bytes.rfind(b'\x00')]
if b'\x00' in decrypted_bytes
else decrypted_bytes
]

for i, quirk in enumerate(decrypted_quirks):
decoded = quirk(decrypted).decode('utf-8')
try:
# powerstrip returns invalid JSON if the device is not
# connected to the cloud, so we try to fix it here carefully.
decoded = decoded.replace(',,"otu_stat"', ',"otu_stat"')
return json.loads(decoded)
except Exception as ex:
_LOGGER.error("unable to parse json '%s': %s", decoded, ex)
# log the error when decrypted bytes couldn't be loaded
# after trying all quirk adaptions
if i == len(decrypted_quirks) - 1:
_LOGGER.error("unable to parse json '%s': %s", decoded, ex)

return None


Message = Struct(
Expand Down
46 changes: 45 additions & 1 deletion miio/tests/test_protocol.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from unittest import TestCase
from .. import Utils
from ..protocol import Message
import binascii


class TestProtocol(TestCase):
Expand All @@ -17,7 +19,7 @@ def test_encrypt(self):

encrypted = Utils.encrypt(payload, token)
decrypted = Utils.decrypt(encrypted, token)
self.assertEquals(payload, decrypted)
assert payload == decrypted

def test_invalid_token(self):
payload = b"hello world"
Expand All @@ -32,3 +34,45 @@ def test_invalid_token(self):
Utils.encrypt(payload, wrong_length)
with self.assertRaises(ValueError):
Utils.decrypt(payload, wrong_length)

def test_decode_json_payload(self):
token = bytes.fromhex(32 * '0')
ctx = {'token': token}

def build_msg(data):
encrypted_data = Utils.encrypt(data, token)

# header
magic = binascii.unhexlify(b'2131')
length = (32 + len(encrypted_data)).to_bytes(2, byteorder='big')
unknown = binascii.unhexlify(b'00000000')
did = binascii.unhexlify(b'01234567')
epoch = binascii.unhexlify(b'00000000')

checksum = Utils.md5(magic+length+unknown+did+epoch+token+encrypted_data)

return magic+length+unknown+did+epoch+checksum+encrypted_data
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to write my own message building function because Message.build() will always produce valid json but we want to explicitly test if Message.parse() can handle invalid json input.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's great to have tests for such behaviours, so thanks for this!


# can parse message with valid json
serialized_msg = build_msg(b'{"id": 123456}')
parsed_msg = Message.parse(serialized_msg, **ctx)
assert parsed_msg.data.value
assert isinstance(parsed_msg.data.value, dict)
assert parsed_msg.data.value['id'] == 123456

# can parse message with invalid json for edge case powerstrip
# when not connected to cloud
serialized_msg = build_msg(b'{"id": 123456,,"otu_stat":0}')
parsed_msg = Message.parse(serialized_msg, **ctx)
assert parsed_msg.data.value
assert isinstance(parsed_msg.data.value, dict)
assert parsed_msg.data.value['id'] == 123456
assert parsed_msg.data.value['otu_stat'] == 0

# can parse message with invalid json for edge case xiaomi cloud
# reply to _sync.batch_gen_room_up_url
serialized_msg = build_msg(b'{"id": 123456}\x00k')
parsed_msg = Message.parse(serialized_msg, **ctx)
assert parsed_msg.data.value
assert isinstance(parsed_msg.data.value, dict)
assert parsed_msg.data.value['id'] == 123456