Source code for udsoncan.connections

import socket
import queue
import threading
import logging
import binascii
import sys
from abc import ABC, abstractmethod
import time
from typing import Union, Any, Dict
import ctypes
import selectors

try:
    import can  # type:ignore
    _import_can_err = None
except Exception as e:
    _import_can_err = e

try:
    import isotp    # type:ignore
    _import_isotp_err = None
except Exception as e:
    _import_isotp_err = e

try:
    from udsoncan.j2534 import J2534, TxStatusFlag, Protocol_ID, Error_ID, Ioctl_Flags, Ioctl_ID, SCONFIG_LIST
    _import_j2534_err = None
except Exception as e:
    _import_j2534_err = e

try:
    from aioisotp.sync import SyncISOTPNetwork, SyncConnection  # type:ignore
    _import_aioisotp_err = None
except Exception as e:
    _import_aioisotp_err = e

from udsoncan.Request import Request
from udsoncan.Response import Response
from udsoncan.exceptions import TimeoutException


from typing import Optional, Tuple, cast


class BaseConnection(ABC):

    name: str
    logger: logging.Logger

    def __init__(self, name: Optional[str] = None):
        if name is None:
            self.name = 'Connection'
        else:
            self.name = 'Connection[%s]' % (name)

        self.logger = logging.getLogger(self.name)

[docs] def send(self, data: Union[bytes, Request, Response], timeout: Optional[float] = None) -> None: """Sends data to the underlying transport protocol :param data: The data or object to send. If a Request or Response is given, the value returned by get_payload() will be sent. :type data: bytes, Request, Response :returns: None """ if not self.is_open(): raise RuntimeError("Connection is not opened") if isinstance(data, Request) or isinstance(data, Response): payload = data.get_payload() else: payload = data self.logger.debug('Sending %d bytes : [%s]' % (len(payload), binascii.hexlify(payload).decode('ascii'))) # backward compatibility if 'timeout' in self.specific_send.__code__.co_varnames: self.specific_send(payload, timeout=timeout) else: self.specific_send(payload)
[docs] def wait_frame(self, timeout: Optional[float] = None, exception: bool = False) -> Optional[bytes]: """Waits for the reception of a frame of data from the underlying transport protocol :param timeout: The maximum amount of time to wait before giving up in seconds :type timeout: int :param exception: Boolean value indicating if this function may return exceptions. When ``True``, all exceptions may be raised, including ``TimeoutException`` When ``False``, all exceptions will be logged as ``DEBUG`` and ``None`` will be returned. :type exception: bool :returns: Received data :rtype: bytes or None """ if not self.is_open(): raise RuntimeError("Connection is not opened") try: frame = self.specific_wait_frame(timeout=timeout) except Exception as e: self.logger.debug('No data received: [%s] - %s ' % (e.__class__.__name__, str(e))) if exception == True: raise else: frame = None if frame is not None: self.logger.debug('Received %d bytes : [%s]' % (len(frame), binascii.hexlify(frame).decode('ascii'))) return frame
def __enter__(self): return self
[docs] @abstractmethod def specific_send(self, payload: bytes, timeout: Optional[float] = None) -> None: """The implementation of the send method. :param payload: Data to send :type payload: bytes :returns: None """ pass
[docs] @abstractmethod def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]: """The implementation of the ``wait_frame`` method. :param timeout: The maximum amount of time to wait before giving up :type timeout: int :returns: Received data :rtype: bytes or None """ pass
[docs] @abstractmethod def open(self) -> "BaseConnection": """ Set up the connection object. :returns: None """ pass
[docs] @abstractmethod def close(self) -> None: """ Close the connection object :returns: None """ pass
[docs] @abstractmethod def empty_rxqueue(self) -> None: """ Empty all unread data in the reception buffer. :returns: None """ pass
[docs] @abstractmethod def is_open(self) -> bool: """ Tells if the connection is open. :returns: bool """ pass
def __exit__(self, type, value, traceback): pass
[docs]class SocketConnection(BaseConnection): """ Sends and receives data through a socket. :param sock: The socket to use. This socket must be bound and ready to use. Only ``send()`` and ``recv()`` will be called by this Connection :type sock: socket.socket :param bufsize: Maximum buffer size of the socket, this value is passed to ``recv()`` :type bufsize: int :param name: This name is included in the logger name so that its output can be redirected. The logger name will be ``Connection[<name>]`` :type name: string """ rxqueue: "queue.Queue[bytes]" exit_requested: bool opened: bool rxthread: Optional[threading.Thread] sock: socket.socket bufsize: int def __init__(self, sock: socket.socket, bufsize: int = 4095, name: Optional[str] = None): BaseConnection.__init__(self, name) self.rxqueue = queue.Queue() self.exit_requested = False self.opened = False self.rxthread = None self.sock = sock self.bufsize = bufsize def open(self) -> "SocketConnection": self.exit_requested = False self.rxthread = threading.Thread(target=self.rxthread_task, daemon=True) self.rxthread.start() self.opened = True self.logger.info('Connection opened') return self def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() def is_open(self) -> bool: return self.opened def rxthread_task(self) -> None: sel = selectors.DefaultSelector() sel.register(self.sock, selectors.EVENT_READ) while not self.exit_requested: try: events = sel.select(timeout=0.2) if events: data = self.sock.recv(self.bufsize) if data is not None: self.rxqueue.put(data) except Exception: self.exit_requested = True def close(self) -> None: self.exit_requested = True if self.rxthread is not None: self.rxthread.join() self.opened = False self.logger.info('Connection closed') def specific_send(self, payload: bytes, timeout: Optional[float] = None) -> None: # timeout not used for generic sockets self.sock.send(payload) def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]: if not self.opened: raise RuntimeError("Connection is not open") timedout = False frame = None try: frame = self.rxqueue.get(block=True, timeout=timeout) except queue.Empty: timedout = True if timedout: raise TimeoutException("Did not received frame in time (timeout=%s sec)" % timeout) return frame def empty_rxqueue(self) -> None: while not self.rxqueue.empty(): self.rxqueue.get()
[docs]class IsoTPSocketConnection(BaseConnection): """ Sends and receives data through an ISO-TP socket. Makes cleaner code than SocketConnection but offers no additional functionality. The `can-isotp module <https://github.com/pylessard/python-can-isotp>`_ must be installed in order to use this connection :param interface: The can interface to use (example: ``can0``) :type interface: string :param address: The address used to bind the the socket. Before 1.21, txid/rxid were needed here, this has changed with v1.21 :type address: ``isotp.Address`` or ``isotp.AsymmetricAddress`` :param name: This name is included in the logger name so that its output can be redirected. The logger name will be ``Connection[<name>]`` :type name: string :param tpsock: An optional ISO-TP socket to use instead of creating one. :type tpsock: isotp.socket """ interface: str address: Union["isotp.Address", "isotp.AsymmetricAddress"] rxqueue: "queue.Queue[bytes]" exit_requested: bool opened: bool def __init__(self, interface: str, address: Union["isotp.Address", "isotp.AsymmetricAddress"], name: Optional[str] = None, tpsock: Optional["isotp.socket"] = None, **kwargs ): BaseConnection.__init__(self, name) self.interface = interface self.address = address self.rxqueue = queue.Queue() self.exit_requested = False self.opened = False # Lives with the past. if 'txid' in kwargs or 'rxid' in kwargs: raise RuntimeError( "Provide an isotp.Address to the %s. The interface has changed in a non-backward compatible way and this is now required." % self.__class__.__name__) if tpsock is None: if 'isotp' not in sys.modules: if _import_isotp_err is None: raise ImportError('isotp module is not loaded') else: raise _import_isotp_err self.tpsock = isotp.socket(timeout=0.1) else: self.tpsock = tpsock def open(self) -> "IsoTPSocketConnection": self.tpsock.bind(self.interface, address=self.address) self.exit_requested = False self.rxthread = threading.Thread(target=self.rxthread_task, daemon=True) self.rxthread.start() self.opened = True self.logger.info('Connection opened') return self def __enter__(self) -> "IsoTPSocketConnection": return self def __exit__(self, type, value, traceback) -> None: self.close() def is_open(self) -> bool: return self.tpsock.bound def rxthread_task(self) -> None: sel = selectors.DefaultSelector() sel.register(self.tpsock._socket, selectors.EVENT_READ) while not self.exit_requested: try: events = sel.select(timeout=0.2) if events: data = self.tpsock.recv() if data is not None: self.rxqueue.put(data) except Exception: self.exit_requested = True def close(self) -> None: self.exit_requested = True if self.rxthread is not None: self.rxthread.join() self.tpsock.close() self.opened = False self.logger.info('Connection closed') def specific_send(self, payload: bytes, timeout: Optional[float] = None) -> None: self.tpsock.send(payload) def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]: if not self.opened: raise RuntimeError("Connection is not open") timedout = False frame = None try: frame = self.rxqueue.get(block=True, timeout=timeout) except queue.Empty: timedout = True if timedout: raise TimeoutException("Did not received ISOTP frame in time (timeout=%s sec)" % timeout) return frame def empty_rxqueue(self) -> None: while not self.rxqueue.empty(): self.rxqueue.get()
class IsoTPConnection(IsoTPSocketConnection): """ Same as :class:`IsoTPSocketConnection <udsoncan.connections.IsoTPSocketConnection.Session>`. Exists only for backward compatibility. """ pass
[docs]class QueueConnection(BaseConnection): """ Sends and receives data using 2 Python native queues. - ``MyConnection.fromuserqueue`` : Data read from this queue when ``wait_frame`` is called - ``MyConnection.touserqueue`` : Data written to this queue when ``send`` is called :param mtu: Optional maximum frame size. Messages will be truncated to this size :type mtu: int :param name: This name is included in the logger name so that its output can be redirected. The logger name will be ``Connection[<name>]`` :type name: string """ fromuserqueue: "queue.Queue[bytes]" touserqueue: "queue.Queue[bytes]" opened: bool mtu: int def __init__(self, name: Optional[str] = None, mtu: int = 4095): BaseConnection.__init__(self, name) self.fromuserqueue = queue.Queue() # Client reads from this queue. Other end is simulated self.touserqueue = queue.Queue() # Client writes to this queue. Other end is simulated self.opened = False self.mtu = mtu def open(self) -> "QueueConnection": self.opened = True self.logger.info('Connection opened') return self def __enter__(self) -> "QueueConnection": return self def __exit__(self, type, value, traceback) -> None: self.close() def is_open(self) -> bool: return self.opened def close(self) -> None: self.empty_rxqueue() self.empty_txqueue() self.opened = False self.logger.info('Connection closed') def specific_send(self, payload: bytes, timeout: Optional[float] = None) -> None: if self.mtu is not None: if len(payload) > self.mtu: self.logger.warning("Truncating payload to be set to a length of %d" % (self.mtu)) payload = payload[0:self.mtu] self.touserqueue.put(payload, block=True, timeout=timeout) def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]: if not self.opened: raise RuntimeError("Connection is not open") timedout = False frame = None try: frame = self.fromuserqueue.get(block=True, timeout=timeout) except queue.Empty: timedout = True if timedout: raise TimeoutException("Did not receive frame from user queue in time (timeout=%s sec)" % timeout) if self.mtu is not None: if frame is not None and len(frame) > self.mtu: self.logger.warning("Truncating received payload to a length of %d" % (self.mtu)) frame = frame[0:self.mtu] return frame def empty_rxqueue(self) -> None: while not self.fromuserqueue.empty(): self.fromuserqueue.get() def empty_txqueue(self) -> None: while not self.touserqueue.empty(): self.touserqueue.get()
[docs]class PythonIsoTpConnection(BaseConnection): """ Sends and receives data using a `can-isotp <https://github.com/pylessard/python-can-isotp>`_ Python module which is a Python implementation of the IsoTp transport protocol which can be coupled with `python-can <https://python-can.readthedocs.io>`_ module to interract with CAN hardware `can-isotp <https://github.com/pylessard/python-can-isotp>`_ must be installed in order to use this connection. See an :ref:`example<example_using_python_can>` :param isotp_layer: The IsoTP Transport layer object coming from the ``isotp`` module. :type isotp_layer: :class:`isotp.TransportLayer<isotp.TransportLayer>` :param name: This name is included in the logger name so that its output can be redirected. The logger name will be ``Connection[<name>]`` :type name: string """ subconn: Union["PythonIsoTpV1Connection", "PythonIsoTpV2Connection"] def __init__(self, isotp_layer: Union["isotp.TransportLayerLogic", "isotp.TransportLayer"], name: Optional[str] = None ): BaseConnection.__init__(self, name) import isotp if hasattr(isotp, '_major_version_'): # isotp v2.x if isotp._major_version_ == 2: if isinstance(isotp_layer, isotp.TransportLayer): # This one has its own thread self.subconn = PythonIsoTpV2Connection(isotp_layer, name) elif isinstance(isotp_layer, isotp.TransportLayerLogic): # Need to create a thread for this one self.subconn = PythonIsoTpV1Connection(isotp_layer, name) else: raise ValueError("Invalid isotp layer object") else: raise NotImplementedError("Unsupported isotp version") else: # isotp v1.x self.subconn = PythonIsoTpV1Connection(isotp_layer, name) def open(self) -> "PythonIsoTpConnection": self.subconn.open() return self def __enter__(self) -> "PythonIsoTpConnection": self.subconn.__enter__() return self def __exit__(self, type, value, traceback) -> None: return self.subconn.__exit__(type, value, traceback) def is_open(self) -> bool: return self.subconn.is_open() def close(self) -> None: return self.subconn.close() def specific_send(self, payload: bytes, timeout: Optional[float] = None) -> None: self.subconn.specific_send(payload, timeout) def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]: return self.subconn.specific_wait_frame(timeout) def empty_rxqueue(self) -> None: return self.subconn.empty_rxqueue() def empty_txqueue(self) -> None: return self.subconn.empty_txqueue()
class PythonIsoTpV2Connection(BaseConnection): isotp_layer: "isotp.TransportLayer" opened: bool def __init__(self, isotp_layer: "isotp.TransportLayer", name: Optional[str] = None): BaseConnection.__init__(self, name) self.opened = False self.isotp_layer = isotp_layer assert isinstance(self.isotp_layer, isotp.TransportLayer), 'isotp_layer must be a valid isotp.TransportLayer ' def open(self) -> "PythonIsoTpV2Connection": self.isotp_layer.start() self.opened = True self.logger.info('Connection opened') return self def __enter__(self) -> "PythonIsoTpV2Connection": return self def __exit__(self, type, value, traceback) -> None: self.close() def is_open(self) -> bool: return self.opened def close(self) -> None: self.isotp_layer.stop() self.empty_rxqueue() self.empty_txqueue() self.opened = False self.logger.info('Connection closed') def specific_send(self, payload: bytes, timeout: Optional[float] = None) -> None: self.isotp_layer.send(payload, send_timeout=timeout) def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]: if not self.opened: raise RuntimeError("Connection is not opened") frame = self.isotp_layer.recv(block=True, timeout=timeout) if frame is None: raise TimeoutException("Did not receive IsoTP frame from the Transport layer in time (timeout=%s sec)" % timeout) return bytes(frame) def empty_rxqueue(self) -> None: self.isotp_layer.stop_receiving() self.isotp_layer.clear_rx_queue() def empty_txqueue(self) -> None: self.isotp_layer.stop_sending() self.isotp_layer.clear_tx_queue() class PythonIsoTpV1Connection(BaseConnection): toIsoTPQueue: "queue.Queue[bytes]" fromIsoTPQueue: "queue.Queue[bytes]" rxthread: Optional[threading.Thread] exit_requested: bool opened: bool isotp_layer: "isotp.TransportLayerLogic" def __init__(self, isotp_layer: "isotp.TransportLayerLogic", name: Optional[str] = None): BaseConnection.__init__(self, name) self.toIsoTPQueue = queue.Queue() self.fromIsoTPQueue = queue.Queue() self.rxthread = None self.exit_requested = False self.opened = False self.isotp_layer = isotp_layer # isotp v1 TransportLayer == isotpv2.TransportLayerLogic if hasattr(isotp, 'TransportLayerLogic'): assert isinstance(self.isotp_layer, isotp.TransportLayerLogic), 'isotp_layer must be a valid isotp.TransportLayerLogic' else: assert isinstance(self.isotp_layer, isotp.TransportLayer), 'isotp_layer must be a valid isotp.isotp.TransportLayer' def open(self) -> "PythonIsoTpV1Connection": self.exit_requested = False self.rxthread = threading.Thread(target=self.rxthread_task, daemon=True) self.rxthread.start() self.opened = True self.logger.info('Connection opened') return self def __enter__(self) -> "PythonIsoTpV1Connection": return self def __exit__(self, type, value, traceback) -> None: self.close() def is_open(self) -> bool: return self.opened def close(self) -> None: self.empty_rxqueue() self.empty_txqueue() self.exit_requested = True if self.rxthread is not None: self.rxthread.join() self.isotp_layer.reset() self.opened = False self.logger.info('Connection closed') def specific_send(self, payload: bytes, timeout: Optional[float] = None): self.toIsoTPQueue.put(bytearray(payload)) # isotp.protocol.TransportLayer uses byte array. udsoncan is strict on bytes format def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]: if not self.opened: raise RuntimeError("Connection is not open") timedout = False frame = None try: frame = self.fromIsoTPQueue.get(block=True, timeout=timeout) except queue.Empty: timedout = True if timedout: raise TimeoutException("Did not receive IsoTP frame from the Transport layer in time (timeout=%s sec)" % timeout) if frame is None: return None # isotp.protocol.TransportLayer uses bytearray. udsoncan is strict on bytes format return bytes(frame) def empty_rxqueue(self) -> None: while not self.fromIsoTPQueue.empty(): self.fromIsoTPQueue.get() def empty_txqueue(self) -> None: while not self.toIsoTPQueue.empty(): self.toIsoTPQueue.get() def rxthread_task(self) -> None: while not self.exit_requested: try: while not self.toIsoTPQueue.empty(): self.isotp_layer.send(self.toIsoTPQueue.get()) self.isotp_layer.process() while self.isotp_layer.available(): self.fromIsoTPQueue.put(self.isotp_layer.recv()) time.sleep(self.isotp_layer.sleep_time()) except Exception as e: self.exit_requested = True self.logger.error(str(e))
[docs]class J2534Connection(BaseConnection): """ Sends and receives data through a J2534 Interface. A windows DLL and a J2534 interface must be installed in order to use this connection :param windll: The path to the windows DLL for the J2534 interface (example: 'C:/Program Files{x86}../../openport 2.0/op20pt32.dll') :type interface: string :param rxid: The reception CAN id :type rxid: int :param txid: The transmission CAN id :type txid: int :param name: This name is included in the logger name so that its output can be redirected. The logger name will be ``Connection[<name>]`` :type name: string :param debug: This will enable windows debugging mode in the dll (see tactrix doc for additional information) :type debug: boolean :param args: Optional parameters list (Unused right now). :type args: list :param kwargs: Optional parameters dictionary Unused right now). :type kwargs: dict """ interface: "J2534" protocol: "Protocol_ID" baudrate: int result: "Error_ID" firmwareVersion: "ctypes.Array[ctypes.c_char]" dllVersion: "ctypes.Array[ctypes.c_char]" apiVersion: "ctypes.Array[ctypes.c_char]" rxqueue: "queue.Queue[bytes]" exit_requested: bool opened: bool def __init__(self, windll: str, rxid: int, txid: int, name: Optional[str] = None, debug: bool = False, *args, **kwargs): BaseConnection.__init__(self, name) # Determine mode ID29 or ID11 txFlags = TxStatusFlag.ISO15765_CAN_ID_29.value if txid >> 11 else TxStatusFlag.ISO15765_CAN_ID_11.value # Set up a J2534 interface using the DLL provided self.interface = J2534(windll=windll, rxid=rxid, txid=txid, txFlags=txFlags) # Set the protocol to ISO15765, Baud rate to 500000 self.protocol = Protocol_ID.ISO15765 self.baudrate = 500000 self.debug = debug # Open the interface (connect to the DLL) result, self.devID = self.interface.PassThruOpen() if debug: self.result = self.interface.PassThruIoctl(0, Ioctl_Flags.TX_IOCTL_SET_DLL_DEBUG_FLAGS, SCONFIG_LIST([(0, Ioctl_Flags.TX_IOCTL_DLL_DEBUG_FLAG_J2534_CALLS.value)]) ) self.log_last_operation("PassThruIoctl SET_DLL_DEBUG") # Get the firmeware and DLL version etc, mainly for debugging output self.result, self.firmwareVersion, self.dllVersion, self.apiVersion = self.interface.PassThruReadVersion(self.devID) self.logger.info("J2534 FirmwareVersion: " + str(self.firmwareVersion.value) + ", dllVersoin: " + str(self.dllVersion.value) + ", apiVersion" + str(self.apiVersion.value)) # get the channel ID of the interface (used for subsequent communication) self.result, self.channelID = self.interface.PassThruConnect(self.devID, self.protocol.value, self.baudrate) self.log_last_operation("PassThruConnect") configs = SCONFIG_LIST([ (Ioctl_ID.DATA_RATE.value, 500000), (Ioctl_ID.LOOPBACK.value, 0), (Ioctl_ID.ISO15765_BS.value, 0x20), (Ioctl_ID.ISO15765_STMIN.value, 0), ]) self.result = self.interface.PassThruIoctl(self.channelID, Ioctl_ID.SET_CONFIG, configs) self.log_last_operation("PassThruIoctl SET_CONFIG") self.result = self.interface.PassThruIoctl(self.channelID, Ioctl_ID.CLEAR_MSG_FILTERS) self.log_last_operation("PassThruIoctl CLEAR_MSG_FILTERS") # Set the filters and clear the read buffer (filters will be set based on tx/rxids) self.result = self.interface.PassThruStartMsgFilter(self.channelID, self.protocol.value) self.log_last_operation("PassThruStartMsgFilter") self.result = self.interface.PassThruIoctl(self.channelID, Ioctl_ID.CLEAR_RX_BUFFER) self.log_last_operation("PassThruIoctl CLEAR_RX_BUFFER") self.result = self.interface.PassThruIoctl(self.channelID, Ioctl_ID.CLEAR_TX_BUFFER) self.log_last_operation("PassThruIoctl CLEAR_TX_BUFFER") self.rxqueue = queue.Queue() self.exit_requested = False self.opened = False def open(self) -> "J2534Connection": self.exit_requested = False self.rxthread = threading.Thread(target=self.rxthread_task, daemon=True) self.rxthread.start() self.opened = True self.logger.info('J2534 Connection opened') return self def __enter__(self) -> "J2534Connection": return self def __exit__(self, type, value, traceback) -> None: self.close() def is_open(self) -> bool: return self.opened def rxthread_task(self) -> None: while not self.exit_requested: try: result, data, numMessages = self.interface.PassThruReadMsgs(self.channelID, self.protocol.value, 1, 1) if data is not None: self.rxqueue.put(data) except Exception: self.logger.critical("Exiting J2534 rx thread") self.exit_requested = True def log_last_operation(self, exec_method: str) -> None: res, pErrDescr = self.interface.PassThruGetLastError() if self.result != Error_ID.ERR_SUCCESS: self.logger.error("J2534 %s: %s %s" % (exec_method, self.result, pErrDescr)) elif self.debug: self.logger.debug("J2534 %s: OK" % (exec_method)) def close(self) -> None: self.exit_requested = True self.rxthread.join() self.result = self.interface.PassThruDisconnect(self.channelID) self.opened = False self.log_last_operation("Connection closed") def specific_send(self, payload: bytes, timeout: Optional[float] = None): if timeout is None: timeout = 0 result = self.interface.PassThruWriteMsgs(self.channelID, payload, self.protocol.value, Timeout=int(timeout * 1000)) def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]: if not self.opened: raise RuntimeError("J2534 Connection is not open") timedout = False frame = None try: frame = self.rxqueue.get(block=True, timeout=timeout) except queue.Empty: timedout = True if timedout: raise TimeoutException("Did not received response from J2534 RxQueue (timeout=%s sec)" % timeout) return frame def empty_rxqueue(self) -> None: while not self.rxqueue.empty(): self.rxqueue.get()
class FakeConnection(BaseConnection): """ Sends and receives static data defined in a local dict. Used so that an application can be tested without a live can network """ rxqueue: "queue.Queue[bytes]" exit_requested: bool opened: bool ResponseData: Dict[bytes, bytes] def __init__(self, name=None, debug=False, *args, **kwargs): BaseConnection.__init__(self, name) self.rxqueue = queue.Queue() self.exit_requested = False self.opened = False self.ResponseData = {b'\x10\x03': b'\x50\x03\x12\x23\x34\x45', b'\x22\xf1\x90\xf1\x89\xf1\x91\xf8\x06\xf1\xa3': b'\x22\xf1\x90\xf1\x89\xf1\x91\xf8\x06\xf1\xa3'} def open(self) -> "FakeConnection": self.opened = True self.logger.info('Fake Connection opened') return self def __enter__(self) -> "FakeConnection": return self def __exit__(self, type, value, traceback) -> None: self.close() def is_open(self) -> bool: return self.opened def close(self) -> None: self.exit_requested = True self.opened = False self.logger.info('Fake Connection closed') def specific_send(self, payload: bytes, timeout: Optional[float] = None): self.rxqueue.put(self.ResponseData[payload]) def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]: if not self.opened: raise RuntimeError("Fake Connection is not open") timedout = False frame = None try: frame = self.rxqueue.get(block=True, timeout=timeout) except queue.Empty: timedout = True if timedout: raise TimeoutException("Did not received response from J2534 RxQueue (timeout=%s sec)" % timeout) return frame def empty_rxqueue(self) -> None: while not self.rxqueue.empty(): self.rxqueue.get()
[docs]class SyncAioIsotpConnection(BaseConnection): """ A wrapper for aioisotp sync variant `aioisotp <https://github.com/christiansandberg/aioisotp>`_ must be installed in order to use this connection. See an :ref:`example<example_using_aioisotp>` :param rxid: The reception CAN id :type rxid: int :param txid: The transmission CAN id :type txid: int :param name: This name is included in the logger name so that its output can be redirected. The logger name will be ``Connection[<name>]`` :type name: string :param args: Optional parameters list passed to aioisotp binding method. :type args: list :param kwargs: Optional parameters dictionary passed to aioisotp binding method. :type kwargs: dict """ network: "SyncISOTPNetwork" opened: bool rx_id: int tx_id: int conn: Optional["SyncConnection"] def __init__(self, rx_id: int, tx_id: int, name: Optional[str] = None, *args, **kwargs): BaseConnection.__init__(self, name) self.network = SyncISOTPNetwork(*args, **kwargs) self.opened = False self.rx_id = rx_id self.tx_id = tx_id self.conn = None self.opened = False def specific_send(self, payload: bytes, timeout: Optional[float] = None) -> None: if self.conn is None or not self.opened: raise RuntimeError("Connection is not opened") self.conn.send(payload) def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]: if not self.opened or self.conn is None: raise RuntimeError("Connection is not open") frame = cast(Optional[bytes], self.conn.recv(timeout)) if frame is None and timeout: raise TimeoutException("Did not received frame in time (timeout=%s sec)" % timeout) return frame def open(self) -> "SyncAioIsotpConnection": self.network.open() self.conn = self.network.create_sync_connection(self.rx_id, self.tx_id) self.opened = True self.logger.info("Connection opened") return self def close(self) -> None: self.network.close() self.opened = False self.logger.info("Connection closed") def empty_rxqueue(self) -> None: if self.conn is not None: self.conn.empty() def is_open(self) -> bool: return self.opened def __enter__(self) -> "SyncAioIsotpConnection": return self def __exit__(self, type, value, traceback) -> None: self.close()