Source code for isotp.address

[docs]class AddressingMode: Normal_11bits = 0 Normal_29bits = 1 NormalFixed_29bits = 2 Extended_11bits = 3 Extended_29bits = 4 Mixed_11bits = 5 Mixed_29bits = 6 @classmethod def get_name(cls, num): if num == cls.Normal_11bits: return 'Normal_11bits' if num == cls.Normal_29bits: return 'Normal_29bits' if num == cls.NormalFixed_29bits: return 'NormalFixed_29bits' if num == cls.Extended_11bits: return 'Extended_11bits' if num == cls.Extended_29bits: return 'Extended_29bits' if num == cls.Mixed_11bits: return 'Mixed_11bits' if num == cls.Mixed_29bits: return 'Mixed_29bits' return 'Unknown'
[docs]class TargetAddressType: Physical = 0 # 1 to 1 communication Functional = 1 # 1 to n communication
[docs]class Address: """ Represents the addressing information (N_AI) of the IsoTP layer. Will define what messages will be received and how to craft transmitted message to reach a specific party. Parameters must be given according to the addressing mode. When not needed, a parameter may be left unset or set to ``None``. Both the :class:`TransportLayer<isotp.TransportLayer>` and the :class:`isotp.socket<isotp.socket>` expects this address object :param addressing_mode: The addressing mode. Valid values are defined by the :class:`AddressingMode<isotp.AddressingMode>` class :type addressing_mode: int :param txid: The CAN ID for transmission. Used for these addressing mode: ``Normal_11bits``, ``Normal_29bits``, ``Extended_11bits``, ``Extended_29bits``, ``Mixed_11bits`` :type txid: int or None :param rxid: The CAN ID for reception. Used for these addressing mode: ``Normal_11bits``, ``Normal_29bits``, ``Extended_11bits``, ``Extended_29bits``, ``Mixed_11bits`` :type rxid: int or None :param target_address: Target address (N_TA) used in ``NormalFixed_29bits`` and ``Mixed_29bits`` addressing mode. :type target_address: int or None :param source_address: Source address (N_SA) used in ``NormalFixed_29bits`` and ``Mixed_29bits`` addressing mode. :type source_address: int or None :param physical_id: The CAN ID for physical (unicast) messages. Only bits 28-16 are used. Used for these addressing modes: ``NormalFixed_29bits``, ``Mixed_29bits``. Set to standard mandated value if None. :type: int or None :param functional_id: The CAN ID for functional (multicast) messages. Only bits 28-16 are used. Used for these addressing modes: ``NormalFixed_29bits``, ``Mixed_29bits``. Set to standard mandated value if None. :type: int or None :param address_extension: Address extension (N_AE) used in ``Mixed_11bits``, ``Mixed_29bits`` addressing mode :type address_extension: int or None """ def __init__(self, addressing_mode=AddressingMode.Normal_11bits, txid=None, rxid=None, target_address=None, source_address=None, physical_id=None, functional_id=None, address_extension=None, **kwargs): self.addressing_mode = addressing_mode self.target_address = target_address self.source_address = source_address self.address_extension = address_extension self.txid = txid self.rxid = rxid self.is_29bits = True if self.addressing_mode in [ AddressingMode.Normal_29bits, AddressingMode.NormalFixed_29bits, AddressingMode.Extended_29bits, AddressingMode.Mixed_29bits] else False if self.addressing_mode == AddressingMode.NormalFixed_29bits: self.physical_id = 0x18DA0000 if physical_id is None else physical_id & 0x1FFF0000 self.functional_id = 0x18DB0000 if functional_id is None else functional_id & 0x1FFF0000 if self.addressing_mode == AddressingMode.Mixed_29bits: self.physical_id = 0x18CE0000 if physical_id is None else physical_id & 0x1FFF0000 self.functional_id = 0x18CD0000 if functional_id is None else functional_id & 0x1FFF0000 self.validate() # From here, input is good. Do some precomputing for speed optimization without bothering about types or values self.tx_arbitration_id_physical = self._get_tx_arbitraton_id(TargetAddressType.Physical) self.tx_arbitration_id_functional = self._get_tx_arbitraton_id(TargetAddressType.Functional) self.rx_arbitration_id_physical = self._get_rx_arbitration_id(TargetAddressType.Physical) self.rx_arbitration_id_functional = self._get_rx_arbitration_id(TargetAddressType.Functional) self.tx_payload_prefix = bytearray() self.rx_prefix_size = 0 if self.addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: self.tx_payload_prefix.extend(bytearray([self.target_address])) self.rx_prefix_size = 1 elif self.addressing_mode in [AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: self.tx_payload_prefix.extend(bytearray([self.address_extension])) self.rx_prefix_size = 1 if self.addressing_mode in [AddressingMode.Normal_11bits, AddressingMode.Normal_29bits]: self.is_for_me = self._is_for_me_normal elif self.addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: self.is_for_me = self._is_for_me_extended elif self.addressing_mode == AddressingMode.NormalFixed_29bits: self.is_for_me = self._is_for_me_normalfixed elif self.addressing_mode == AddressingMode.Mixed_11bits: self.is_for_me = self._is_for_me_mixed_11bits elif self.addressing_mode == AddressingMode.Mixed_29bits: self.is_for_me = self._is_for_me_mixed_29bits else: raise RuntimeError('This exception should never be raised.') def validate(self): if self.addressing_mode not in [AddressingMode.Normal_11bits, AddressingMode.Normal_29bits, AddressingMode.NormalFixed_29bits, AddressingMode.Extended_11bits, AddressingMode.Extended_29bits, AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: raise ValueError('Addressing mode is not valid') if self.addressing_mode in [AddressingMode.Normal_11bits, AddressingMode.Normal_29bits]: if self.rxid is None or self.txid is None: raise ValueError('txid and rxid must be specified for Normal addressing mode (11 or 29 bits ID)') if self.rxid == self.txid: raise ValueError('txid and rxid must be different for Normal addressing mode') elif self.addressing_mode == AddressingMode.NormalFixed_29bits: if self.target_address is None or self.source_address is None: raise ValueError('target_address and source_address must be specified for Normal Fixed addressing (29 bits ID)') elif self.addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: if self.target_address is None or self.rxid is None or self.txid is None: raise ValueError('target_address, rxid and txid must be specified for Extended addressing mode (11 or 29 bits ID)') if self.rxid == self.txid: raise ValueError('txid and rxid must be different') elif self.addressing_mode == AddressingMode.Mixed_11bits: if self.rxid is None or self.txid is None or self.address_extension is None: raise ValueError('rxid, txid and address_extension must be specified for Mixed addressing mode (11 bits ID)') elif self.addressing_mode == AddressingMode.Mixed_29bits: if self.target_address is None or self.source_address is None or self.address_extension is None: raise ValueError('target_address, source_address and address_extension must be specified for Mixed addressing mode (29 bits ID)') if self.target_address is not None: if not isinstance(self.target_address, int): raise ValueError('target_address must be an integer') if self.target_address < 0 or self.target_address > 0xFF: raise ValueError('target_address must be an integer between 0x00 and 0xFF') if self.source_address is not None: if not isinstance(self.source_address, int): raise ValueError('source_address must be an integer') if self.source_address < 0 or self.source_address > 0xFF: raise ValueError('source_address must be an integer between 0x00 and 0xFF') if self.address_extension is not None: if not isinstance(self.address_extension, int): raise ValueError('source_address must be an integer') if self.address_extension < 0 or self.address_extension > 0xFF: raise ValueError('address_extension must be an integer between 0x00 and 0xFF') if self.txid is not None: if not isinstance(self.txid, int): raise ValueError('txid must be an integer') if self.txid < 0: raise ValueError('txid must be greater than 0') if not self.is_29bits: if self.txid > 0x7FF: raise ValueError('txid must be smaller than 0x7FF for 11 bits identifier') if self.rxid is not None: if not isinstance(self.rxid, int): raise ValueError('rxid must be an integer') if self.rxid < 0: raise ValueError('rxid must be greater than 0') if not self.is_29bits: if self.rxid > 0x7FF: raise ValueError('rxid must be smaller than 0x7FF for 11 bits identifier') def get_tx_arbitraton_id(self, address_type=TargetAddressType.Physical): if address_type == TargetAddressType.Physical: return self.tx_arbitration_id_physical else: return self.tx_arbitration_id_functional def get_rx_arbitraton_id(self, address_type=TargetAddressType.Physical): if address_type == TargetAddressType.Physical: return self.rx_arbitration_id_physical else: return self.rx_arbitration_id_functional def _get_tx_arbitraton_id(self, address_type): if self.addressing_mode == AddressingMode.Normal_11bits: return self.txid elif self.addressing_mode == AddressingMode.Normal_29bits: return self.txid elif self.addressing_mode == AddressingMode.Extended_11bits: return self.txid elif self.addressing_mode == AddressingMode.Extended_29bits: return self.txid elif self.addressing_mode == AddressingMode.Mixed_11bits: return self.txid elif self.addressing_mode in [AddressingMode.Mixed_29bits, AddressingMode.NormalFixed_29bits]: bits28_16 = self.physical_id if address_type == TargetAddressType.Physical else self.functional_id return bits28_16 | (self.target_address << 8) | self.source_address def _get_rx_arbitration_id(self, address_type=TargetAddressType.Physical): if self.addressing_mode == AddressingMode.Normal_11bits: return self.rxid elif self.addressing_mode == AddressingMode.Normal_29bits: return self.rxid elif self.addressing_mode == AddressingMode.Extended_11bits: return self.rxid elif self.addressing_mode == AddressingMode.Extended_29bits: return self.rxid elif self.addressing_mode == AddressingMode.Mixed_11bits: return self.rxid elif self.addressing_mode in [AddressingMode.Mixed_29bits, AddressingMode.NormalFixed_29bits]: bits28_16 = self.physical_id if address_type == TargetAddressType.Physical else self.functional_id return bits28_16 | (self.source_address << 8) | self.target_address def _is_for_me_normal(self, msg): if self.is_29bits == msg.is_extended_id: return msg.arbitration_id == self.rxid return False def _is_for_me_extended(self, msg): if self.is_29bits == msg.is_extended_id: if msg.data is not None and len(msg.data) > 0: return msg.arbitration_id == self.rxid and int(msg.data[0]) == self.source_address return False def _is_for_me_normalfixed(self, msg): if self.is_29bits == msg.is_extended_id: return (msg.arbitration_id & 0x1FFF0000 in [self.physical_id, self.functional_id]) and (msg.arbitration_id & 0xFF00) >> 8 == self.source_address and msg.arbitration_id & 0xFF == self.target_address return False def _is_for_me_mixed_11bits(self, msg): if self.is_29bits == msg.is_extended_id: if msg.data is not None and len(msg.data) > 0: return msg.arbitration_id == self.rxid and int(msg.data[0]) == self.address_extension return False def _is_for_me_mixed_29bits(self, msg): if self.is_29bits == msg.is_extended_id: if msg.data is not None and len(msg.data) > 0: return (msg.arbitration_id & 0x1FFF0000) in [self.physical_id, self.functional_id] and (msg.arbitration_id & 0xFF00) >> 8 == self.source_address and msg.arbitration_id & 0xFF == self.target_address and int(msg.data[0]) == self.address_extension return False def requires_extension_byte(self): return True if self.addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits, AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits] else False def get_tx_extension_byte(self): if self.addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: return self.target_address if self.addressing_mode in [AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: return self.address_extension def get_rx_extension_byte(self): if self.addressing_mode in [AddressingMode.Extended_11bits, AddressingMode.Extended_29bits]: return self.source_address if self.addressing_mode in [AddressingMode.Mixed_11bits, AddressingMode.Mixed_29bits]: return self.address_extension def get_content_str(self): val_dict = {} keys = ['target_address', 'source_address', 'address_extension', 'txid', 'rxid'] for key in keys: val = getattr(self, key) if val is not None: val_dict[key] = val vals_str = ', '.join(['%s:0x%02x' % (k, val_dict[k]) for k in val_dict]) return '[%s - %s]' % (AddressingMode.get_name(self.addressing_mode), vals_str) def __repr__(self): return '<IsoTP Address %s at 0x%08x>' % (self.get_content_str(), id(self))