Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/mypy-generated.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[mypy]
python_version = 3.6
python_version = 3.8
namespace_packages = False
ignore_missing_imports = False
follow_imports = normal
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ jobs:
- name: Lint generated python code
run: |
flake8 --config=.github/workflows/flake8-generated.cfg --statistics dialects/

- name: Type check generated python code
if: matrix.python-version != '3.7'
run: |
# Latest version of mypy that supports Python 3.7 is too old
mypy --config-file ./.github/workflows/mypy-generated.ini dialects/v10/*.py dialects/v20/*.py

- name: Generate messages
Expand Down
52 changes: 20 additions & 32 deletions generator/mavgen_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def generate_preamble(outf, msgs, basename, args, xml):
import sys
import time
from builtins import object, range
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Type, Union, cast
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Type, Union

WIRE_PROTOCOL_VERSION = "${wire_protocol_version}"
DIALECT = "${DIALECT}"
Expand Down Expand Up @@ -200,7 +200,7 @@ def __init__(self, msgId: int, name: str) -> None:

def format_attr(self, field: str) -> Union[str, float, int]:
"""override field getter"""
raw_attr = cast(Union[bytes, float, int], getattr(self, field))
raw_attr: Union[bytes, float, int] = getattr(self, field)
if isinstance(raw_attr, bytes):
return raw_attr.decode(errors="backslashreplace").rstrip("\\x00")
return raw_attr
Expand Down Expand Up @@ -754,6 +754,9 @@ def __init__(self) -> None:
self.reject_count = 0


MAVLinkV1Header = Tuple[bytes, int, int, int, int, int]
MAVLinkV2Header = Tuple[bytes, int, int, int, int, int, int, int, int]

class MAVLink(object):
"""MAVLink protocol handling class"""

Expand Down Expand Up @@ -872,10 +875,8 @@ def __parse_char_legacy(self) -> Optional[MAVLink_message]:
self.have_prefix_error = False
if self.buf_len() >= 3:
sbuf = self.buf[self.buf_index : 3 + self.buf_index]
(magic, self.expected_length, incompat_flags) = cast(
Tuple[int, int, int],
self.mav20_h3_unpacker.unpack(sbuf),
)
unpacked_h3: Tuple[int, int, int] = self.mav20_h3_unpacker.unpack(sbuf)
magic, self.expected_length, incompat_flags = unpacked_h3
if magic == PROTOCOL_MARKER_V2 and (incompat_flags & MAVLINK_IFLAG_SIGNED):
self.expected_length += MAVLINK_SIGNATURE_BLOCK_LEN
self.expected_length += header_len + 2
Expand Down Expand Up @@ -916,10 +917,8 @@ def check_signature(self, msgbuf: bytearray, srcSystem: int, srcComponent: int)

timestamp_buf = msgbuf[-12:-6]
link_id = msgbuf[-13]
(tlow, thigh) = cast(
Tuple[int, int],
self.mav_sign_unpacker.unpack(timestamp_buf),
)
tbytes: Tuple[int, int] = self.mav_sign_unpacker.unpack(timestamp_buf)
tlow, thigh = tbytes
timestamp = tlow + (thigh << 32)

# see if the timestamp is acceptable
Expand Down Expand Up @@ -960,26 +959,21 @@ def decode(self, msgbuf: bytearray) -> MAVLink_message:
if msgbuf[0] != PROTOCOL_MARKER_V1:
headerlen = 10
try:
magic, mlen, incompat_flags, compat_flags, seq, srcSystem, srcComponent, msgIdlow, msgIdhigh = cast(
Tuple[bytes, int, int, int, int, int, int, int, int],
self.mav20_unpacker.unpack(msgbuf[:headerlen]),
)
header_v2: MAVLinkV2Header = self.mav20_unpacker.unpack(msgbuf[:headerlen])
except struct.error as emsg:
raise MAVError("Unable to unpack MAVLink header: %s" % emsg)
magic, mlen, incompat_flags, compat_flags, seq, srcSystem, srcComponent, msgIdlow, msgIdhigh = header_v2
msgId = msgIdlow | (msgIdhigh << 16)
mapkey = msgId
else:
headerlen = 6
try:
magic, mlen, seq, srcSystem, srcComponent, msgId = cast(
Tuple[bytes, int, int, int, int, int],
self.mav10_unpacker.unpack(msgbuf[:headerlen]),
)
incompat_flags = 0
compat_flags = 0
header_v1: MAVLinkV1Header = self.mav10_unpacker.unpack(msgbuf[:headerlen])
except struct.error as emsg:
raise MAVError("Unable to unpack MAVLink header: %s" % emsg)
mapkey = msgId
magic, mlen, seq, srcSystem, srcComponent, msgId = header_v1
incompat_flags = 0
compat_flags = 0
mapkey = msgId
if (incompat_flags & MAVLINK_IFLAG_SIGNED) != 0:
signature_len = MAVLINK_SIGNATURE_BLOCK_LEN
else:
Expand All @@ -1001,10 +995,7 @@ def decode(self, msgbuf: bytearray) -> MAVLink_message:

# decode the checksum
try:
(crc,) = cast(
Tuple[int],
self.mav_csum_unpacker.unpack(msgbuf[-(2 + signature_len) :][:2]),
)
crc: int = self.mav_csum_unpacker.unpack(msgbuf[-(2 + signature_len) :][:2])[0]
except struct.error as emsg:
raise MAVError("Unable to unpack MAVLink CRC: %s" % emsg)
crcbuf = msgbuf[1 : -(2 + signature_len)]
Expand Down Expand Up @@ -1051,14 +1042,11 @@ def decode(self, msgbuf: bytearray) -> MAVLink_message:
raise MAVError("Bad message of type %s length %u needs %s" % (msgtype, len(mbuf), csize))
mbuf = mbuf[:csize]
try:
t = cast(
Tuple[Union[bytes, int, float], ...],
msgtype.unpacker.unpack(mbuf),
)
t: Tuple[Union[bytes, int, float], ...] = msgtype.unpacker.unpack(mbuf)
except struct.error as emsg:
raise MAVError("Unable to unpack MAVLink payload type=%s payloadLength=%u: %s" % (msgtype, len(mbuf), emsg))

tlist: List[Union[bytes, float, int, Sequence[float], Sequence[int]]] = list(t)
tlist: List[Union[bytes, float, int, Sequence[Union[bytes, float, int]]]] = list(t)
# handle sorted fields
if ${sort_fields}:
if sum(len_map) == len(len_map):
Expand All @@ -1076,7 +1064,7 @@ def decode(self, msgbuf: bytearray) -> MAVLink_message:
if L == 1 or isinstance(field, bytes):
tlist.append(field)
else:
tlist.append(cast(Union[Sequence[int], Sequence[float]], list(t[tip : (tip + L)])))
tlist.append(list(t[tip : (tip + L)]))

# terminate any strings
for i, elem in enumerate(tlist):
Expand Down