Source code for isotp.address

__all__ = ['AddressingMode', 'TargetAddressType', 'Address']

from enum import Enum
from isotp import CanMessage
import abc

from typing import Optional, Any, List, Callable, Dict, Tuple, Union


[docs]class AddressingMode(Enum): Normal_11bits = 0 Normal_29bits = 1 NormalFixed_29bits = 2 Extended_11bits = 3 Extended_29bits = 4 Mixed_11bits = 5 Mixed_29bits = 6 @classmethod def get_name(cls, num: Union[int, "AddressingMode"]) -> str: return cls(num).name
[docs]class TargetAddressType(Enum): Physical = 0 # 1 to 1 communication Functional = 1 # 1 to n communication
class AbstractAddress(abc.ABC): @abc.abstractmethod def get_tx_arbitration_id(self, address_type: TargetAddressType = TargetAddressType.Physical) -> int: raise NotImplementedError("Abstract method") @abc.abstractmethod def get_rx_arbitration_id(self, address_type: TargetAddressType = TargetAddressType.Physical) -> int: raise NotImplementedError("Abstract method") @abc.abstractmethod def requires_tx_extension_byte(self) -> bool: raise NotImplementedError("Abstract method") @abc.abstractmethod def requires_rx_extension_byte(self) -> bool: raise NotImplementedError("Abstract method") @abc.abstractmethod def get_tx_extension_byte(self) -> Optional[int]: raise NotImplementedError("Abstract method") @abc.abstractmethod def get_rx_extension_byte(self) -> Optional[int]: raise NotImplementedError("Abstract method") @abc.abstractmethod def is_tx_29bits(self) -> bool: raise NotImplementedError("Abstract method") @abc.abstractmethod def is_rx_29bits(self) -> bool: raise NotImplementedError("Abstract method") @abc.abstractmethod def is_for_me(self, msg: CanMessage) -> bool: raise NotImplementedError("Abstract method") @abc.abstractmethod def get_rx_prefix_size(self) -> int: raise NotImplementedError("Abstract method") @abc.abstractmethod def get_tx_payload_prefix(self) -> bytes: raise NotImplementedError("Abstract method") @abc.abstractmethod def is_partial_address(self) -> bool: raise NotImplementedError("Abstract method") @abc.abstractmethod def get_content_str(self) -> str: raise NotImplementedError("Abstract method")
[docs]class Address(AbstractAddress): """ Represents the addressing information (N_AI) of the IsoTP layer. Will define what messages will be received and how to craft transmitted message to reach a specific party. Parameters must be given according to the addressing mode. When not needed, a parameter may be left unset or set to ``None``. Both the :class:`TransportLayer<isotp.TransportLayer>` and the :class:`isotp.socket<isotp.socket>` expects this address object :param addressing_mode: The addressing mode. Valid values are defined by the :class:`AddressingMode<isotp.AddressingMode>` class :type addressing_mode: int :param txid: The CAN ID for transmission. Used for these addressing mode: ``Normal_11bits``, ``Normal_29bits``, ``Extended_11bits``, ``Extended_29bits``, ``Mixed_11bits`` :type txid: int | None :param rxid: The CAN ID for reception. Used for these addressing mode: ``Normal_11bits``, ``Normal_29bits``, ``Extended_11bits``, ``Extended_29bits``, ``Mixed_11bits`` :type rxid: int | None :param target_address: Target address (N_TA) used in ``NormalFixed_29bits`` and ``Mixed_29bits`` addressing mode. :type target_address: int | None :param source_address: Source address (N_SA) used in ``NormalFixed_29bits`` and ``Mixed_29bits`` addressing mode. :type source_address: int | None :param physical_id: The CAN ID for physical (unicast) messages. Only bits 28-16 are used. Used for these addressing modes: ``NormalFixed_29bits``, ``Mixed_29bits``. Set to standard mandated value if None. :type physical_id: int | None :param functional_id: The CAN ID for functional (multicast) messages. Only bits 28-16 are used. Used for these addressing modes: ``NormalFixed_29bits``, ``Mixed_29bits``. Set to standard mandated value if None. :type functional_id: int | None :param address_extension: Address extension (N_AE) used in ``Mixed_11bits``, ``Mixed_29bits`` addressing mode :type address_extension: int | None :param rx_only: When using :class:`AsymmetricAddress<isotp.address.AsymmetricAddress>`, indicates that this address is the RX part, disabling validation of TX part :type rx_only: bool :param tx_only: When using :class:`AsymmetricAddress<isotp.address.AsymmetricAddress>`, indicates that this address is the TX part, disabling validation of RX part :type tx_only: bool """ _addressing_mode: AddressingMode _target_address: Optional[int] _source_address: Optional[int] _address_extension: Optional[int] _txid: Optional[int] _rxid: Optional[int] _is_29bits: bool _tx_arbitration_id_physical: int _tx_arbitration_id_functional: int _rx_arbitration_id_physical: int _rx_arbitration_id_functional: int _tx_payload_prefix: bytes _rx_prefix_size: int _rx_only: bool _tx_only: bool def __init__(self, addressing_mode: AddressingMode = AddressingMode.Normal_11bits, txid: Optional[int] = None, rxid: Optional[int] = None, target_address: Optional[int] = None, source_address: Optional[int] = None, physical_id: Optional[int] = None, functional_id: Optional[int] = None, address_extension: Optional[int] = None, rx_only: bool = False, tx_only: bool = False ): self._rx_only = rx_only self._tx_only = tx_only self._addressing_mode = addressing_mode self._target_address = target_address self._source_address = source_address self._address_extension = address_extension self._txid = txid self._rxid = rxid self._is_29bits = True if self._addressing_mode in [ AddressingMode.Normal_29bits, AddressingMode.NormalFixed_29bits, AddressingMode.Extended_29bits, AddressingMode.Mixed_29bits] else False if self._addressing_mode == AddressingMode.NormalFixed_29bits: self.physical_id = 0x18DA0000 if physical_id is None else physical_id & 0x1FFF0000 self.functional_id = 0x18DB0000 if functional_id is None else functional_id & 0x1FFF0000 if self._addressing_mode == AddressingMode.Mixed_29bits: self.physical_id = 0x18CE0000 if physical_id is None else physical_id & 0x1FFF0000 self.functional_id = 0x18CD0000 if functional_id is None else functional_id & 0x1FFF0000 self.validate() # From here, input is good. Do some precomputing for speed optimization without bothering about types or values self._tx_payload_prefix = bytes() self._rx_prefix_size = 0 if not self._tx_only: # Rx supported self._rx_arbitration_id_physical = self._get_rx_arbitration_id(TargetAddressType.Physical) self._rx_arbitration_id_functional = self._get_rx_arbitration_id(TargetAddressType.Functional) if self._addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits, AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: self._rx_prefix_size = 1 if not self._rx_only: # Tx supported self._tx_arbitration_id_physical = self._get_tx_arbitration_id(TargetAddressType.Physical) self._tx_arbitration_id_functional = self._get_tx_arbitration_id(TargetAddressType.Functional) if self._addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: assert self._target_address is not None self._tx_payload_prefix = bytes([self._target_address]) elif self._addressing_mode in [AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: assert self._address_extension is not None self._tx_payload_prefix = bytes([self._address_extension]) if not self._tx_only: if self._addressing_mode in [AddressingMode.Normal_11bits, AddressingMode.Normal_29bits]: setattr(self, 'is_for_me', self._is_for_me_normal) elif self._addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: setattr(self, 'is_for_me', self._is_for_me_extended) elif self._addressing_mode == AddressingMode.NormalFixed_29bits: setattr(self, 'is_for_me', self._is_for_me_normal_fixed) elif self._addressing_mode == AddressingMode.Mixed_11bits: setattr(self, 'is_for_me', self._is_for_me_mixed_11bits) elif self._addressing_mode == AddressingMode.Mixed_29bits: setattr(self, 'is_for_me', self._is_for_me_mixed_29bits) else: raise RuntimeError('This exception should never be raised.') def not_implemented_func_with_partial(*args: Any, **kwargs: Any) -> None: raise NotImplementedError("Not possible with partial address") # Remove unavailable functions to be strict if self._tx_only: setattr(self, 'get_rx_arbitration_id', not_implemented_func_with_partial) setattr(self, 'requires_rx_extension_byte', not_implemented_func_with_partial) setattr(self, 'get_rx_extension_byte', not_implemented_func_with_partial) setattr(self, 'is_rx_29bits', not_implemented_func_with_partial) setattr(self, 'is_for_me', not_implemented_func_with_partial) setattr(self, 'get_rx_prefix_size', not_implemented_func_with_partial) if self._rx_only: setattr(self, 'get_tx_arbitration_id', not_implemented_func_with_partial) setattr(self, 'requires_tx_extension_byte', not_implemented_func_with_partial) setattr(self, 'get_tx_extension_byte', not_implemented_func_with_partial) setattr(self, 'is_tx_29bits', not_implemented_func_with_partial) setattr(self, 'get_tx_payload_prefix', not_implemented_func_with_partial) def validate(self) -> None: if self._rx_only and self._tx_only: raise ValueError("Address cannot be tx only and rx only") if self._addressing_mode not in [AddressingMode.Normal_11bits, AddressingMode.Normal_29bits, AddressingMode.NormalFixed_29bits, AddressingMode.Extended_11bits, AddressingMode.Extended_29bits, AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: raise ValueError('Addressing mode is not valid') if self._addressing_mode in [AddressingMode.Normal_11bits, AddressingMode.Normal_29bits]: if self._rxid is None and not self._tx_only: raise ValueError('rxid must be specified for Normal addressing mode (11 or 29 bits ID)') if self._txid is None and not self._rx_only: raise ValueError('txid must be specified for Normal addressing mode (11 or 29 bits ID)') if self._rxid == self._txid: raise ValueError('txid and rxid must be different for Normal addressing mode') elif self._addressing_mode == AddressingMode.NormalFixed_29bits: if self._target_address is None or self._source_address is None: raise ValueError('target_address and source_address must be specified for Normal Fixed addressing (29 bits ID)') elif self._addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: if not self._rx_only: if self._target_address is None or self._txid is None: raise ValueError('target_address and txid must be specified for Extended addressing mode (11 or 29 bits ID)') if not self._tx_only: if self._source_address is None or self._rxid is None: raise ValueError('source_address and rxid must be specified for Extended addressing mode (11 or 29 bits ID)') if self._rxid == self._txid: raise ValueError('txid and rxid must be different') elif self._addressing_mode == AddressingMode.Mixed_11bits: if self._address_extension is None: raise ValueError('address_extension must be specified for Mixed addressing mode (11 bits ID)') if self._rxid is None and not self._tx_only: raise ValueError('rxid must be specified for Mixed addressing mode (11 bits ID)') if self._txid is None and not self._rx_only: raise ValueError('txid must be specified for Mixed addressing mode (11 bits ID)') if self._rxid == self._txid: raise ValueError('txid and rxid must be different for Mixed addressing mode (11 bits ID)') elif self._addressing_mode == AddressingMode.Mixed_29bits: # partial or full address requires all 3 params. if self._target_address is None or self._source_address is None or self._address_extension is None: raise ValueError('target_address, source_address and address_extension must be specified for Mixed addressing mode (29 bits ID)') if self._target_address is not None: if not isinstance(self._target_address, int): raise ValueError('target_address must be an integer') if self._target_address < 0 or self._target_address > 0xFF: raise ValueError('target_address must be an integer between 0x00 and 0xFF') if self._source_address is not None: if not isinstance(self._source_address, int): raise ValueError('source_address must be an integer') if self._source_address < 0 or self._source_address > 0xFF: raise ValueError('source_address must be an integer between 0x00 and 0xFF') if self._address_extension is not None: if not isinstance(self._address_extension, int): raise ValueError('source_address must be an integer') if self._address_extension < 0 or self._address_extension > 0xFF: raise ValueError('address_extension must be an integer between 0x00 and 0xFF') if self._txid is not None: if not isinstance(self._txid, int): raise ValueError('txid must be an integer') if self._txid < 0: raise ValueError('txid must be greater than 0') if not self._is_29bits: if self._txid > 0x7FF: raise ValueError('txid must be smaller than 0x7FF for 11 bits identifier') if self._rxid is not None: if not isinstance(self._rxid, int): raise ValueError('rxid must be an integer') if self._rxid < 0: raise ValueError('rxid must be greater than 0') if not self._is_29bits: if self._rxid > 0x7FF: raise ValueError('rxid must be smaller than 0x7FF for 11 bits identifier') def is_partial_address(self) -> bool: return self._tx_only or self._rx_only def is_tx_only(self) -> bool: return self._tx_only def is_rx_only(self) -> bool: return self._rx_only def get_rx_prefix_size(self) -> int: return self._rx_prefix_size def get_tx_payload_prefix(self) -> bytes: return self._tx_payload_prefix def is_for_me(self, msg: CanMessage) -> bool: raise NotImplementedError("is_for_me should be overriden in constructor") def get_tx_arbitration_id(self, address_type: TargetAddressType = TargetAddressType.Physical) -> int: if address_type == TargetAddressType.Physical: return self._tx_arbitration_id_physical else: return self._tx_arbitration_id_functional def get_rx_arbitration_id(self, address_type: TargetAddressType = TargetAddressType.Physical) -> int: if address_type == TargetAddressType.Physical: return self._rx_arbitration_id_physical else: return self._rx_arbitration_id_functional def _get_tx_arbitration_id(self, address_type: TargetAddressType) -> int: if self._addressing_mode in (AddressingMode.Normal_11bits, AddressingMode.Normal_29bits, AddressingMode.Extended_11bits, AddressingMode.Extended_29bits, AddressingMode.Mixed_11bits): assert self._txid is not None return self._txid elif self._addressing_mode in [AddressingMode.Mixed_29bits, AddressingMode.NormalFixed_29bits]: assert self._target_address is not None assert self._source_address is not None bits28_16 = self.physical_id if address_type == TargetAddressType.Physical else self.functional_id return bits28_16 | (self._target_address << 8) | self._source_address raise ValueError("Unsupported addressing mode") def _get_rx_arbitration_id(self, address_type: TargetAddressType = TargetAddressType.Physical) -> int: if self._addressing_mode in (AddressingMode.Normal_11bits, AddressingMode.Normal_29bits, AddressingMode.Extended_11bits, AddressingMode.Extended_29bits, AddressingMode.Mixed_11bits): assert self._rxid is not None return self._rxid elif self._addressing_mode in [AddressingMode.Mixed_29bits, AddressingMode.NormalFixed_29bits]: assert self._target_address is not None assert self._source_address is not None bits28_16 = self.physical_id if address_type == TargetAddressType.Physical else self.functional_id return bits28_16 | (self._source_address << 8) | self._target_address raise ValueError("Unsupported addressing mode") def _is_for_me_normal(self, msg: CanMessage) -> bool: if self._is_29bits == msg.is_extended_id: return msg.arbitration_id == self._rxid return False def _is_for_me_extended(self, msg: CanMessage) -> bool: if self._is_29bits == msg.is_extended_id: if msg.data is not None and len(msg.data) > 0: return msg.arbitration_id == self._rxid and int(msg.data[0]) == self._source_address return False def _is_for_me_normal_fixed(self, msg: CanMessage) -> bool: if self._is_29bits == msg.is_extended_id: return (msg.arbitration_id & 0x1FFF0000 in [self.physical_id, self.functional_id]) and (msg.arbitration_id & 0xFF00) >> 8 == self._source_address and msg.arbitration_id & 0xFF == self._target_address return False def _is_for_me_mixed_11bits(self, msg: CanMessage) -> bool: if self._is_29bits == msg.is_extended_id: if msg.data is not None and len(msg.data) > 0: return msg.arbitration_id == self._rxid and int(msg.data[0]) == self._address_extension return False def _is_for_me_mixed_29bits(self, msg: CanMessage) -> bool: if self._is_29bits == msg.is_extended_id: if msg.data is not None and len(msg.data) > 0: return (msg.arbitration_id & 0x1FFF0000) in [self.physical_id, self.functional_id] and (msg.arbitration_id & 0xFF00) >> 8 == self._source_address and msg.arbitration_id & 0xFF == self._target_address and int(msg.data[0]) == self._address_extension return False def _requires_extension_byte(self) -> bool: return True if self._addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits, AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits] else False def requires_rx_extension_byte(self) -> bool: return self._requires_extension_byte() def requires_tx_extension_byte(self) -> bool: return self._requires_extension_byte() def get_tx_extension_byte(self) -> Optional[int]: if self._addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: return self._target_address if self._addressing_mode in [AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: return self._address_extension return None def get_rx_extension_byte(self) -> Optional[int]: if self._addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: return self._source_address if self._addressing_mode in [AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: return self._address_extension return None def is_tx_29bits(self) -> bool: return self._is_29bits def is_rx_29bits(self) -> bool: return self._is_29bits def get_content_str(self) -> str: val_dict = {} keys = ['_target_address', '_source_address', '_address_extension', '_txid', '_rxid'] for key in keys: val = getattr(self, key) if key.startswith('_'): key = key[1:] if val is not None: val_dict[key] = val vals_str = ', '.join(['%s:0x%02x' % (k, val_dict[k]) for k in val_dict]) return '[%s - %s]' % (AddressingMode.get_name(self._addressing_mode), vals_str) def __repr__(self) -> str: return '<IsoTP Address %s at 0x%08x>' % (self.get_content_str(), id(self))
[docs]class AsymmetricAddress(AbstractAddress): """ Address that uses independent addressing modes for transmission and reception. :param tx_addr: The Address object used for transmission :type tx_addr: :class:`Address<isotp.Address>` with ``tx_only=True`` :param rx_addr: The Address object used for reception :type rx_addr: :class:`Address<isotp.Address>` with ``rx_only=True`` """ tx_addr: Address rx_addr: Address def __init__(self, tx_addr: Address, rx_addr: Address): if not isinstance(rx_addr, Address): raise ValueError("rx_addr must be an isotp.Address instance") if not isinstance(tx_addr, Address): raise ValueError("tx_addr must be an isotp.Address instance") if not tx_addr.is_tx_only(): raise ValueError("tx_addr must be configured with tx_only=True") if not rx_addr.is_rx_only(): raise ValueError("rx_addr must be configured with rx_only=True") self.tx_addr = tx_addr self.rx_addr = rx_addr def get_tx_extension_byte(self) -> Optional[int]: return self.tx_addr.get_tx_extension_byte() def get_rx_extension_byte(self) -> Optional[int]: return self.rx_addr.get_rx_extension_byte() def is_for_me(self, msg: CanMessage) -> bool: return self.rx_addr.is_for_me(msg) def get_tx_arbitration_id(self, address_type: TargetAddressType = TargetAddressType.Physical) -> int: return self.tx_addr.get_tx_arbitration_id(address_type) def get_rx_arbitration_id(self, address_type: TargetAddressType = TargetAddressType.Physical) -> int: return self.rx_addr.get_rx_arbitration_id(address_type) def is_tx_29bits(self) -> bool: return self.tx_addr.is_tx_29bits() def is_rx_29bits(self) -> bool: return self.rx_addr.is_rx_29bits() def requires_tx_extension_byte(self) -> bool: return self.tx_addr.requires_tx_extension_byte() def requires_rx_extension_byte(self) -> bool: return self.rx_addr.requires_rx_extension_byte() def get_rx_prefix_size(self) -> int: return self.rx_addr.get_rx_prefix_size() def get_tx_payload_prefix(self) -> bytes: return self.tx_addr.get_tx_payload_prefix() def is_partial_address(self) -> bool: return False def get_content_str(self) -> str: return f"RxAddr: {self.rx_addr.__repr__()} - TxAddr: {self.tx_addr.__repr__()}" def __repr__(self) -> str: return f'<{self.__class__.__name__} - {self.get_content_str()}>'