Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions src/pymc_core/hardware/gpio_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import sys
import threading
import time
from typing import Callable, Dict, Optional
from typing import Any, Callable, Dict, Optional

try:
from periphery import GPIO
from periphery import GPIO, EdgeEvent
except ImportError:
print("\nError: python-periphery library is required for GPIO management.")
print("━" * 60)
Expand Down Expand Up @@ -279,12 +279,13 @@ def _monitor_edge_events(self, pin_number: int, stop_event: threading.Event) ->

while not stop_event.is_set() and pin_number in self._pins:
try:
# Wait for edge event - hardware will wake us up
event = gpio.poll(30.0)

if event and not stop_event.is_set():
# Only call callback if pin is HIGH
if gpio.read():
# Wait for edge event (kernel blocks until interrupt)
if gpio.poll(30.0) and not stop_event.is_set():
# Consume event from kernel queue to prevent repeated triggers
event: EdgeEvent = gpio.read_event()

# Only process rising edges (kernel filters, but verify)
if event.edge == "rising":
callback = self._input_callbacks.get(pin_number)
if callback:
callback()
Expand Down
150 changes: 62 additions & 88 deletions src/pymc_core/hardware/lora/LoRaRF/SX126x.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,55 @@
import time

import spidev

from .base import BaseLoRa

spi = spidev.SpiDev()
_gpio_manager = None

from gpiozero import Device

# Force gpiozero to use LGPIOFactory - no RPi.GPIO fallback
from gpiozero.pins.lgpio import LGPIOFactory

Device.pin_factory = LGPIOFactory()

# GPIOZero helpers for pin management
from gpiozero import DigitalInputDevice, DigitalOutputDevice

_gpio_pins = {}
def set_gpio_manager(gpio_manager):
"""Set the GPIO manager instance to be used by this module"""
global _gpio_manager
_gpio_manager = gpio_manager


def _get_output(pin):
if pin not in _gpio_pins:
_gpio_pins[pin] = DigitalOutputDevice(pin)
return _gpio_pins[pin]
"""Get output pin via centralized GPIO manager (setup only if needed)"""
if _gpio_manager is None:
raise RuntimeError("GPIO manager not initialized. Call set_gpio_manager() first.")
# Only setup if pin doesn't exist yet
if pin not in _gpio_manager._pins:
_gpio_manager.setup_output_pin(pin, initial_value=True)
return _gpio_manager._pins[pin]


def _get_input(pin):
if pin not in _gpio_pins:
_gpio_pins[pin] = DigitalInputDevice(pin)
return _gpio_pins[pin]
"""Get input pin via centralized GPIO manager (setup only if needed)"""
if _gpio_manager is None:
raise RuntimeError("GPIO manager not initialized. Call set_gpio_manager() first.")
# Only setup if pin doesn't exist yet
if pin not in _gpio_manager._pins:
_gpio_manager.setup_input_pin(pin)
return _gpio_manager._pins[pin]


def _get_output_safe(pin):
"""Get output pin safely - return None if GPIO busy"""
try:
if pin not in _gpio_pins:
_gpio_pins[pin] = DigitalOutputDevice(pin)
return _gpio_pins[pin]
except Exception as e:
if "GPIO busy" in str(e):
if _gpio_manager is None:
return None
# Only setup if pin doesn't exist yet
if pin not in _gpio_manager._pins:
if not _gpio_manager.setup_output_pin(pin, initial_value=True):
return None
raise e
return _gpio_manager._pins.get(pin)


def _get_input_safe(pin):
"""Get input pin safely - return None if GPIO busy"""
try:
if pin not in _gpio_pins:
_gpio_pins[pin] = DigitalInputDevice(pin)
return _gpio_pins[pin]
except Exception as e:
if "GPIO busy" in str(e):
if _gpio_manager is None:
return None
# Only setup if pin doesn't exist yet
if pin not in _gpio_manager._pins:
if not _gpio_manager.setup_input_pin(pin):
return None
raise e
return _gpio_manager._pins.get(pin)


class SX126x(BaseLoRa):
Expand Down Expand Up @@ -386,24 +383,17 @@ def end(self):
except Exception:
pass

# Close all GPIO pins
global _gpio_pins
for pin_num, pin_obj in list(_gpio_pins.items()):
try:
pin_obj.close()
except Exception:
pass

_gpio_pins.clear()
# GPIO cleanup is handled by the centralized gpio_manager
# No need to close pins here as they're managed by GPIOPinManager

def reset(self) -> bool:
reset_pin = _get_output_safe(self._reset)
if reset_pin is None:
return True # Continue if reset pin unavailable

reset_pin.off()
reset_pin.write(False) # periphery: write(False) = LOW
time.sleep(0.001)
reset_pin.on()
reset_pin.write(True) # periphery: write(True) = HIGH
return not self.busyCheck()

def sleep(self, option=SLEEP_WARM_START):
Expand All @@ -417,7 +407,7 @@ def wake(self):
if self._wake != -1:
wake_pin = _get_output_safe(self._wake)
if wake_pin:
wake_pin.off()
wake_pin.write(False)
time.sleep(0.0005)
self.setStandby(self.STANDBY_RC)
self._fixResistanceAntenna()
Expand All @@ -432,7 +422,7 @@ def busyCheck(self, timeout: int = _busyTimeout):
return False # Assume not busy to continue

t = time.time()
while busy_pin.value:
while busy_pin.read(): # periphery: read() returns True for HIGH
if (time.time() - t) > (timeout / 1000):
return True
return False
Expand Down Expand Up @@ -496,16 +486,12 @@ def setPins(
self._txen = txen
self._rxen = rxen
self._wake = wake
# gpiozero pins are initialized on first use by _get_output/_get_input
# periphery pins are initialized on first use by _get_output/_get_input
_get_output(reset)
_get_input(busy)
_get_output(self._cs_define)
# Only create a DigitalInputDevice for IRQ if not already managed externally
# (e.g., by main driver with gpiozero.Button). This avoids double allocation errors.
# If you use a Button in the main driver, do NOT call _get_input here.
# Commented out to prevent double allocation:
# if irq != -1:
# _get_input(irq)
# IRQ pin managed externally by sx1262_wrapper.py via gpio_manager
# Do NOT initialize it here to avoid double allocation
if txen != -1:
_get_output(txen)
# if rxen != -1: _get_output(rxen)
Expand Down Expand Up @@ -880,8 +866,8 @@ def beginPacket(self):
self.setBufferBaseAddress(self._bufferIndex, (self._bufferIndex + 0xFF) % 0xFF)
# save current txen pin state and set txen pin to LOW
if self._txen != -1:
self._txState = _get_output(self._txen).value
_get_output(self._txen).off()
self._txState = _get_output(self._txen).read()
_get_output(self._txen).write(False)
self._fixLoRaBw500(self._bw)

def endPacket(self, timeout: int = TX_SINGLE) -> bool:
Expand Down Expand Up @@ -959,11 +945,11 @@ def request(self, timeout: int = RX_SINGLE) -> bool:
self._statusWait = self.STATUS_RX_CONTINUOUS
# save current txen pin state and set txen pin to high
if self._txen != -1:
self._txState = _get_output(self._txen).value
_get_output(self._txen).on()
self._txState = _get_output(self._txen).read()
_get_output(self._txen).write(True)
# set device to receive mode with configured timeout, single, or continuous operation
self.setRx(rxTimeout)
# IRQ event handling should be implemented in the higher-level driver using gpiozero Button
# IRQ event handling should be implemented in the higher-level driver using periphery GPIO
return True

def listen(self, rxPeriod: int, sleepPeriod: int) -> bool:
Expand All @@ -984,11 +970,11 @@ def listen(self, rxPeriod: int, sleepPeriod: int) -> bool:
sleepPeriod = 0x00FFFFFF
# save current txen pin state and set txen pin to high
if self._txen != -1:
self._txState = _get_output(self._txen).value
_get_output(self._txen).on()
self._txState = _get_output(self._txen).read()
_get_output(self._txen).write(True)
# set device to receive mode with configured receive and sleep period
self.setRxDutyCycle(rxPeriod, sleepPeriod)
# IRQ event handling should be implemented in the higher-level driver using gpiozero Button
# IRQ event handling should be implemented in the higher-level driver using periphery GPIO
return True

def available(self) -> int:
Expand Down Expand Up @@ -1056,18 +1042,12 @@ def wait(self, timeout: int = 0) -> bool:
# for transmit, calculate transmit time and set back txen pin to previous state
self._transmitTime = time.time() - self._transmitTime
if self._txen != -1:
if self._txState:
_get_output(self._txen).on()
else:
_get_output(self._txen).off()
_get_output(self._txen).write(self._txState)
elif self._statusWait == self.STATUS_RX_WAIT:
# for receive, get received payload length and buffer index and set back txen pin to previous state
(self._payloadTxRx, self._bufferIndex) = self.getRxBufferStatus()
if self._txen != -1:
if self._txState:
_get_output(self._txen).on()
else:
_get_output(self._txen).off()
_get_output(self._txen).write(self._txState)
self._fixRxTimeout()
elif self._statusWait == self.STATUS_RX_CONTINUOUS:
# for receive continuous, get received payload length and buffer index and clear IRQ status
Expand Down Expand Up @@ -1155,10 +1135,7 @@ def _interruptTx(self, channel=None):
self._transmitTime = time.time() - self._transmitTime
# set back txen pin to previous state
if self._txen != -1:
if self._txState:
_get_output(self._txen).on()
else:
_get_output(self._txen).off()
_get_output(self._txen).write(self._txState)
# store IRQ status
self._statusIrq = self.getIrqStatus()
# call onTransmit function
Expand All @@ -1168,10 +1145,7 @@ def _interruptTx(self, channel=None):
def _interruptRx(self, channel=None):
# set back txen pin to previous state
if self._txen != -1:
if self._txState:
_get_output(self._txen).on()
else:
_get_output(self._txen).off()
_get_output(self._txen).write(self._txState)
self._fixRxTimeout()
# store IRQ status
self._statusIrq = self.getIrqStatus()
Expand Down Expand Up @@ -1494,23 +1468,23 @@ def _writeBytes(self, opCode: int, data: tuple, nBytes: int):
# Adaptive CS control based on CS pin type
if self._cs_define != 8: # Manual CS pin (like Waveshare GPIO 21)
# Simple CS control for manual pins
_get_output(self._cs_define).off()
_get_output(self._cs_define).write(False)
buf = [opCode]
for i in range(nBytes):
buf.append(data[i])
spi.xfer2(buf)
_get_output(self._cs_define).on()
_get_output(self._cs_define).write(True)
else: # Kernel CS pin (like ClockworkPi GPIO 8)
# Timing-based CS control for kernel CS pins
_get_output(self._cs_define).on() # Initial high state
_get_output(self._cs_define).off()
_get_output(self._cs_define).write(True) # Initial high state
_get_output(self._cs_define).write(False)
time.sleep(0.000001) # 1µs setup time for CS
buf = [opCode]
for i in range(nBytes):
buf.append(data[i])
spi.xfer2(buf)
time.sleep(0.000001) # 1µs hold time before CS release
_get_output(self._cs_define).on()
_get_output(self._cs_define).write(True)

def _readBytes(self, opCode: int, nBytes: int, address: tuple = (), nAddress: int = 0) -> tuple:
if self.busyCheck():
Expand All @@ -1519,18 +1493,18 @@ def _readBytes(self, opCode: int, nBytes: int, address: tuple = (), nAddress: in
# Adaptive CS control based on CS pin type
if self._cs_define != 8: # Manual CS pin (like Waveshare GPIO 21)
# Simple CS control for manual pins
_get_output(self._cs_define).off()
_get_output(self._cs_define).write(False)
buf = [opCode]
for i in range(nAddress):
buf.append(address[i])
for i in range(nBytes):
buf.append(0x00)
feedback = spi.xfer2(buf)
_get_output(self._cs_define).on()
_get_output(self._cs_define).write(True)
else: # Kernel CS pin (like ClockworkPi GPIO 8)
# Timing-based CS control for kernel CS pins
_get_output(self._cs_define).on() # Initial high state
_get_output(self._cs_define).off()
_get_output(self._cs_define).write(True) # Initial high state
_get_output(self._cs_define).write(False)
time.sleep(0.000001) # 1µs setup time for CS
buf = [opCode]
for i in range(nAddress):
Expand All @@ -1539,7 +1513,7 @@ def _readBytes(self, opCode: int, nBytes: int, address: tuple = (), nAddress: in
buf.append(0x00)
feedback = spi.xfer2(buf)
time.sleep(0.000001) # 1µs hold time before CS release
_get_output(self._cs_define).on()
_get_output(self._cs_define).write(True)

return tuple(feedback[nAddress + 1 :])

Expand Down
Loading