Source code for isotp.protocol

import queue
import logging
from copy import copy
import binascii
import time
import isotp.address
import isotp.errors
import math
import enum

[docs]class CanMessage: """ Represent a CAN message (ISO-11898) :param arbitration_id: The CAN arbitration ID. Must be a 11 bits value or a 29 bits value if ``extended_id`` is True :type arbitration_id: int :param dlc: The Data Length Code representing the number of bytes in the data field :type dlc: int :param data: The 8 bytes payload of the message :type data: bytearray :param extended_id: When True, the arbitration ID stands on 29 bits. 11 bits when False :type extended_id: bool :param is_fd: When True, message has to be transmitted or has been received in a CAN FD frame. CAN frame when set to False :type extended_id: bool """ __slots__ = 'arbitration_id', 'dlc', 'data', 'is_extended_id', 'is_fd', 'bitrate_switch' def __init__(self, arbitration_id=None, dlc=None, data=None, extended_id=False, is_fd=False, bitrate_switch=False): self.arbitration_id = arbitration_id self.dlc = dlc self.data = data self.is_extended_id = extended_id self.is_fd = is_fd self.bitrate_switch = bitrate_switch
class PDU: """ Converts a CAN Message into a meaningful PDU such as SingleFrame, FirstFrame, ConsecutiveFrame, FlowControl :param msg: The CAN message :type msg: `isotp.protocol.CanMessage` """ __slots__ = 'type', 'length', 'data', 'blocksize', 'stmin', 'stmin_sec', 'seqnum', 'flow_status', 'rx_dl', 'escape_sequence', 'can_dl' class Type: SINGLE_FRAME = 0 FIRST_FRAME = 1 CONSECUTIVE_FRAME = 2 FLOW_CONTROL = 3 class FlowStatus: ContinueToSend = 0 Wait = 1 Overflow = 2 def __init__(self, msg = None, start_of_data=0): self.type = None self.length = None self.data = None self.blocksize = None self.stmin = None self.stmin_sec = None self.seqnum = None self.flow_status = None self.rx_dl = None self.escape_sequence = False self.can_dl = None if msg is None: return if len(msg.data) < start_of_data: raise ValueError("Received message is missing data according to prefix size") self.can_dl = len( msg.data) self.rx_dl = max(8, self.can_dl) msg_data = msg.data[start_of_data:] datalen = len(msg_data) # Guarantee at least presence of byte #1 if datalen > 0: hnb = (msg_data[0] >> 4) & 0xF if hnb > 3: raise ValueError('Received message with unknown frame type %d' % hnb) self.type = int(hnb) else: raise ValueError('Empty CAN frame') if self.type == self.Type.SINGLE_FRAME: length_placeholder = int(msg_data[0]) & 0xF if length_placeholder != 0: self.length = length_placeholder if self.length > datalen-1: raise ValueError("Received Single Frame with length of %d while there is room for %d bytes of data with this configuration" % (self.length, datalen-1)) self.data = msg_data[1:][:self.length] else: # Escape seuqence if datalen < 2: raise ValueError('Single frame with escape sequence must be at least %d bytes long with this configuration' % (2+start_of_data)) self.escape_sequence = True self.length = int(msg_data[1]) if self.length == 0: raise ValueError("Received Single Frame with length of 0 bytes") if self.length > datalen-2: raise ValueError("Received Single Frame with length of %d while there is room for %d bytes of data with this configuration" % (self.length, datalen-2)) self.data = msg_data[2:][:self.length] elif self.type == self.Type.FIRST_FRAME: if datalen < 2: raise ValueError('First frame without escape sequence must be at least %d bytes long with this configuration' % (2+start_of_data)) length_placeholder = ((int(msg_data[0]) & 0xF) << 8) | int(msg_data[1]) if length_placeholder != 0: # Frame is maximum 4095 bytes self.length = length_placeholder self.data = msg_data[2:][:min(self.length, datalen-2)] else: # Frame is larger than 4095 bytes if datalen < 6: raise ValueError('First frame with escape sequence must be at least %d bytes long with this configuration' % (6+start_of_data)) self.escape_sequence = True self.length = (msg_data[2] << 24) | (msg_data[3] << 16) | (msg_data[4] << 8) | (msg_data[5] << 0) self.data = msg_data[6:][:min(self.length, datalen-6)] elif self.type == self.Type.CONSECUTIVE_FRAME: self.seqnum = int(msg_data[0]) & 0xF self.data = msg_data[1:] # No need to check size as this will return empty data if overflow. elif self.type == self.Type.FLOW_CONTROL: if datalen < 3: raise ValueError('Flow Control frame must be at least %d bytes with the actual configuration' % (3+start_of_data)) self.flow_status = int(msg_data[0]) & 0xF if self.flow_status >= 3: raise ValueError('Unknown flow status') self.blocksize = int(msg_data[1]) stmin_temp = int(msg_data[2]) if stmin_temp >= 0 and stmin_temp <= 0x7F: self.stmin_sec = stmin_temp / 1000 elif stmin_temp >= 0xf1 and stmin_temp <= 0xF9: self.stmin_sec = (stmin_temp - 0xF0) / 10000 if self.stmin_sec is None: raise ValueError('Invalid StMin received in Flow Control') else: self.stmin = stmin_temp @classmethod def craft_flow_control_data(cls, flow_status, blocksize, stmin): return bytearray([ (0x30 | (flow_status) & 0xF), blocksize&0xFF, stmin & 0xFF]) def name(self): if self.type is None: return "[None]" if self.type == self.Type.SINGLE_FRAME: return "SINGLE_FRAME" elif self.type == self.Type.FIRST_FRAME: return "FIRST_FRAME" elif self.type == self.Type.CONSECUTIVE_FRAME: return "CONSECUTIVE_FRAME" elif self.type == self.Type.FLOW_CONTROL: return "FLOW_CONTROL" else: return "Reserved" class RateLimiter: TIME_SLOT_LENGTH = 0.005 def __init__(self, mean_bitrate = None, window_size_sec = 0.1): self.enabled = False self.mean_bitrate = mean_bitrate self.window_size_sec = window_size_sec self.error_reason = '' self.reset() if self.can_be_enabled(): self.enable() def can_be_enabled(self): try: float(self.mean_bitrate) except: self.error_reason = 'mean_bitrate is not numerical' return False if float(self.mean_bitrate) <= 0: self.error_reason = 'mean_bitrate must be greater than 0' return False try: float(self.window_size_sec) except: self.error_reason = 'window_size_sec is not numerical' return False if float(self.window_size_sec) <= 0: self.error_reason = 'window_size_sec must be greater than 0' return False return True def set_bitrate(self, mean_bitrate): self.mean_bitrate = mean_bitrate def enable(self): if self.can_be_enabled(): self.mean_bitrate = float(self.mean_bitrate) self.window_size_sec = float(self.window_size_sec) self.enabled = True self.reset() else: raise ValueError('Cannot enable Rate Limiter. \n %s' % self.error_reason ) def disable(self): self.enabled = False def reset(self): self.burst_bitcount = [] self.burst_time = [] self.bit_total = 0 self.window_bit_max = self.mean_bitrate*self.window_size_sec def update(self): if not self.enabled: self.reset() return t = time.time() while len(self.burst_time) > 0: t2 = self.burst_time[0] if t-t2 > self.window_size_sec: self.burst_time.pop(0) n_to_remove = self.burst_bitcount.pop(0) self.bit_total -= n_to_remove else: break def allowed_bytes(self): no_limit = 0xFFFFFFFF if not self.enabled: return no_limit allowed_bits = max(self.window_bit_max - self.bit_total, 0) return math.floor(allowed_bits/8) def inform_byte_sent(self, datalen): if self.enabled: bytelen = datalen*8; t = time.time() self.bit_total += bytelen if len(self.burst_time) == 0: self.burst_time.append(t) self.burst_bitcount.append(bytelen) else: last_time = self.burst_time[-1] if t - last_time > self.TIME_SLOT_LENGTH: self.burst_time.append(t) self.burst_bitcount.append(bytelen) else: self.burst_bitcount[-1] += bytelen
[docs]class TransportLayer: """ The IsoTP transport layer implementation :param rxfn: Function to be called by the transport layer to read the CAN layer. Must return a :class:`isotp.CanMessage<isotp.CanMessage>` or None if no message has been received. :type rxfn: Callable :param txfn: Function to be called by the transport layer to send a message on the CAN layer. This function should receive a :class:`isotp.CanMessage<isotp.CanMessage>` :type txfn: Callable :param address: The address information of CAN messages. Includes the addressing mode, txid/rxid, source/target address and address extension. See :class:`isotp.Address<isotp.Address>` for more details. :type address: isotp.Address :param error_handler: A function to be called when an error has been detected. An :class:`isotp.IsoTpError<isotp.IsoTpError>` (inheriting Exception class) will be given as sole parameter. See the :ref:`Error section<Errors>` :type error_handler: Callable :param params: List of parameters for the transport layer :type params: dict """ LOGGER_NAME = 'isotp' class Params: __slots__ = ( 'stmin', 'blocksize', 'squash_stmin_requirement', 'rx_flowcontrol_timeout', 'rx_consecutive_frame_timeout', 'tx_padding', 'wftmax', 'tx_data_length', 'tx_data_min_length', 'max_frame_size', 'can_fd', 'bitrate_switch', 'default_target_address_type', 'rate_limit_max_bitrate', 'rate_limit_window_size', 'rate_limit_enable', 'listen_mode' ) def __init__(self): self.stmin = 0 self.blocksize = 8 self.squash_stmin_requirement = False self.rx_flowcontrol_timeout = 1000 self.rx_consecutive_frame_timeout = 1000 self.tx_padding = None self.wftmax = 0 self.tx_data_length = 8 self.tx_data_min_length = None self.max_frame_size = 4095 self.can_fd = False self.bitrate_switch = False self.default_target_address_type = isotp.address.TargetAddressType.Physical self.rate_limit_max_bitrate = 100000000 self.rate_limit_window_size = 0.2 self.rate_limit_enable = False self.listen_mode = False def set(self, key, val, validate=True): param_alias = { 'll_data_length' : 'tx_data_length' # For backward compatibility } if key in param_alias: key = param_alias[key] setattr(self, key, val) if validate: self.validate() def validate(self): if not isinstance(self.rx_flowcontrol_timeout, int): raise ValueError('rx_flowcontrol_timeout must be an integer') if self.rx_flowcontrol_timeout < 0: raise ValueError('rx_flowcontrol_timeout must be positive integer') if not isinstance(self.rx_consecutive_frame_timeout, int): raise ValueError('rx_consecutive_frame_timeout must be an integer') if self.rx_consecutive_frame_timeout < 0: raise ValueError('rx_consecutive_frame_timeout must be positive integer') if self.tx_padding is not None: if not isinstance(self.tx_padding, int): raise ValueError('tx_padding must be an integer') if self.tx_padding < 0 or self.tx_padding > 0xFF: raise ValueError('tx_padding must be an integer between 0x00 and 0xFF') if not isinstance(self.stmin, int): raise ValueError('stmin must be an integer') if self.stmin < 0 or self.stmin > 0xFF: raise ValueError('stmin must be positive integer between 0x00 and 0xFF') if not isinstance(self.blocksize, int): raise ValueError('blocksize must be an integer') if self.blocksize < 0 or self.blocksize > 0xFF: raise ValueError('blocksize must be and integer between 0x00 and 0xFF') if not isinstance(self.squash_stmin_requirement, bool): raise ValueError('squash_stmin_requirement must be a boolean value') if not isinstance(self.wftmax, int): raise ValueError('wftmax must be an integer') if self.wftmax < 0 : raise ValueError('wftmax must be and integer equal or greater than 0') if not isinstance(self.tx_data_length, int) : raise ValueError('tx_data_length must be an integer') if self.tx_data_length not in [8,12,16,20,24,32,48,64]: raise ValueError('tx_data_length must be one of these value : 8, 12, 16, 20, 24, 32, 48, 64 ') if self.tx_data_min_length is not None: if not isinstance(self.tx_data_min_length, int) : raise ValueError('tx_data_min_length must be an integer') if self.tx_data_min_length not in [1,2,3,4,5,6,7,8,12,16,20,24,32,48,64]: raise ValueError('tx_data_min_length must be one of these value : 1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 20, 24, 32, 48, 64 ') if self.tx_data_min_length > self.tx_data_length: raise ValueError('tx_data_min_length cannot be greater than tx_data_length') if not isinstance(self.max_frame_size, int): raise ValueError('max_frame_size must be an integer') if self.max_frame_size < 0: raise ValueError('max_frame_size must be a positive integer') if not isinstance(self.can_fd, bool): raise ValueError('can_fd must be a boolean value') if not isinstance(self.bitrate_switch, bool): raise ValueError('bitrate_switch must be a boolean value') if not isinstance(self.default_target_address_type, int): raise ValueError('default_target_address_type must be an integer') if self.default_target_address_type not in [isotp.address.TargetAddressType.Physical, isotp.address.TargetAddressType.Functional]: raise ValueError('default_target_address_type must be either be Physical (%d) or Functional (%d)' % (isotp.address.TargetAddressType.Physical, isotp.address.TargetAddressType.Functional)) if not isinstance(self.rate_limit_max_bitrate, int): raise ValueError('rate_limit_max_bitrate must be an integer') if self.rate_limit_max_bitrate <= 0: raise ValueError('rate_limit_max_bitrate must be greater than 0') if not (isinstance(self.rate_limit_window_size, float) or isinstance(self.rate_limit_window_size, int)): raise ValueError('rate_limit_window_size must be a float ') if self.rate_limit_window_size <= 0: raise ValueError('rate_limit_window_size must be greater than 0') if not isinstance(self.rate_limit_enable, bool): raise ValueError('rate_limit_enable must be a boolean value') if self.rate_limit_max_bitrate * self.rate_limit_window_size < self.tx_data_length * 8: raise ValueError('Rate limiter is so restrictive that a SingleFrame cannot be sent. Please, allow a higher bitrate or increase the window size. (tx_data_length = %d)' % self.tx_data_length) if not isinstance(self.listen_mode, bool): raise ValueError('listen_mode must be a boolean value') class Timer: def __init__(self, timeout): self.set_timeout(timeout) self.start_time = None def set_timeout(self, timeout): self.timeout = timeout def start(self, timeout=None): if timeout is not None: self.set_timeout(timeout) self.start_time = time.time() def stop(self): self.start_time = None def elapsed(self): if self.start_time is not None: return time.time() - self.start_time else: return 0 def is_timed_out(self): if self.is_stopped(): return False else: return self.elapsed() > self.timeout or self.timeout == 0 def is_stopped(self): return self.start_time == None class RxState(enum.Enum): IDLE = 0 WAIT_CF = 1 class TxState(enum.Enum): IDLE = 0 WAIT_FC = 1 TRANSMIT_CF = 2 TRANSMIT_SF_STANDBY = 3 TRANSMIT_FF_STANDBY = 4 def __init__(self, rxfn, txfn, address=None, error_handler=None, params=None): self.params = self.Params() self.logger = logging.getLogger(self.LOGGER_NAME) if params is not None: for k in params: self.params.set(k, params[k], validate=False) self.params.validate() self.remote_blocksize = None # Block size received in Flow Control message self.rxfn = rxfn # Function to call to receive a CAN message self.txfn = txfn # Function to call to transmit a CAN message self.set_address(address) self.tx_queue = queue.Queue() # Layer Input queue for IsoTP frame self.rx_queue = queue.Queue() # Layer Output queue for IsoTP frame self.tx_standby_msg = None self.rx_state = self.RxState.IDLE # State of the reception FSM self.tx_state = self.TxState.IDLE # State of the transmission FSM self.rx_block_counter = 0 self.last_seqnum = None # Consecutive frame Sequence number of previous message self.rx_frame_length = 0 # Length of IsoTP frame being received at the moment self.tx_frame_length = 0 # Length of the data that we are sending self.last_flow_control_frame = None # When a FlowControl is received. Put here self.tx_block_counter = 0 # Keeps track of how many block we've sent self.tx_seqnum = 0 # Keeps track of the actual sequence number while sending self.wft_counter = 0 # Keeps track of how many wait frame we've received self.pending_flow_control_tx = False # Flag indicating that we need to transmit a flow control message. Set by Rx Process, Cleared by Tx Process self.empty_rx_buffer() self.empty_tx_buffer() self.timer_tx_stmin = self.Timer(timeout = 0) self.error_handler = error_handler self.actual_rxdl = None self.timings = { (self.RxState.IDLE, self.TxState.IDLE) : 0.05, (self.RxState.IDLE, self.TxState.WAIT_FC) : 0.01, } self.load_params() def load_params(self): self.params.validate() self.timer_rx_fc = self.Timer(timeout = float(self.params.rx_flowcontrol_timeout) / 1000) self.timer_rx_cf = self.Timer(timeout = float(self.params.rx_consecutive_frame_timeout) / 1000) self.rate_limiter = RateLimiter(mean_bitrate = self.params.rate_limit_max_bitrate, window_size_sec = self.params.rate_limit_window_size) if self.params.rate_limit_enable: self.rate_limiter.enable()
[docs] def send(self, data, target_address_type=None): """ Enqueue an IsoTP frame to be sent over CAN network :param data: The data to be sent :type data: bytearray :param target_address_type: Optional parameter that can be Physical (0) for 1-to-1 communication or Functional (1) for 1-to-n. See :class:`isotp.TargetAddressType<isotp.TargetAddressType>`. If not provided, parameter :ref:`default_target_address_type<param_default_target_address_type>` will be used (default to `Physical`) :type target_address_type: int :raises ValueError: Input parameter is not a bytearray or not convertible to bytearray :raises RuntimeError: Transmit queue is full """ if target_address_type is None: target_address_type = self.params.default_target_address_type if not isinstance(data, bytearray): try: data = bytearray(data) except: raise ValueError('data must be a bytearray') if self.tx_queue.full(): raise RuntimeError('Transmit queue is full') if target_address_type == isotp.address.TargetAddressType.Functional: length_bytes = 1 if self.params.tx_data_length == 8 else 2 maxlen = self.params.tx_data_length - length_bytes - len(self.address.tx_payload_prefix) if len(data) > maxlen: raise ValueError('Cannot send multipacket frame with Functional TargetAddressType') self.tx_queue.put( {'data':data, 'target_address_type':target_address_type}) # frame is always an IsoTPFrame here
# Receive an IsoTP frame. Output of the layer
[docs] def recv(self): """ Dequeue an IsoTP frame from the reception queue if available. :return: The next available IsoTP frame :rtype: bytearray or None """ if not self.rx_queue.empty(): return self.rx_queue.get()
[docs] def available(self): """ Returns ``True`` if an IsoTP frame is awaiting in the reception ``queue``. ``False`` otherwise """ return not self.rx_queue.empty()
[docs] def transmitting(self): """ Returns ``True`` if an IsoTP frame is being transmitted. ``False`` otherwise """ return not self.tx_queue.empty() or self.tx_state != self.TxState.IDLE
[docs] def process(self): """ Function to be called periodically, as fast as possible. This function is non-blocking. """ self.check_timeouts_rx() self.rate_limiter.update() msg = True while msg is not None: msg = self.rxfn() if msg is not None: self.logger.debug("Receiving : <%03X> (%d)\t %s" % (msg.arbitration_id, len(msg.data), binascii.hexlify(msg.data))) self.process_rx(msg) msg = True while msg is not None: msg = self.process_tx() if msg is not None: self.logger.debug("Sending : <%03X> (%d)\t %s" % (msg.arbitration_id, len(msg.data), binascii.hexlify(msg.data))) self.txfn(msg)
def check_timeouts_rx(self): # Check timeout first if self.timer_rx_cf.is_timed_out(): self.trigger_error(isotp.errors.ConsecutiveFrameTimeoutError("Reception of CONSECUTIVE_FRAME timed out.")) self.stop_receiving() def process_rx(self, msg): if not self.address.is_for_me(msg): return # Decoding of message into PDU try: pdu = PDU(msg, start_of_data=self.address.rx_prefix_size) except Exception as e: self.trigger_error(isotp.errors.InvalidCanDataError("Received invalid CAN frame. %s" % (str(e)))) self.stop_receiving() return # Process Flow Control message if pdu.type == PDU.Type.FLOW_CONTROL: self.last_flow_control_frame = pdu # Given to process_tx method. Queue of 1 message depth return # Nothing else to be done with FlowControl. Return and wait for next message if pdu.type == PDU.Type.SINGLE_FRAME: if pdu.can_dl > 8 and pdu.escape_sequence == False: self.trigger_error(isotp.errors.MissingEscapeSequenceError('For SingleFrames conveyed on a CAN message with data length (CAN_DL) > 8, length should be encoded on byte #1 and byte #0 should be 0x00')) return # Process the state machine if self.rx_state == self.RxState.IDLE: self.rx_frame_length = 0 self.timer_rx_cf.stop() if pdu.type == PDU.Type.SINGLE_FRAME: if pdu.data is not None: self.rx_queue.put(copy(pdu.data)) elif pdu.type == PDU.Type.FIRST_FRAME: self.start_reception_after_first_frame_if_valid(pdu) elif pdu.type == PDU.Type.CONSECUTIVE_FRAME: self.trigger_error(isotp.errors.UnexpectedConsecutiveFrameError('Received a ConsecutiveFrame while reception was idle. Ignoring')) elif self.rx_state == self.RxState.WAIT_CF: if pdu.type == PDU.Type.SINGLE_FRAME: if pdu.data is not None: self.rx_queue.put(copy(pdu.data)) self.rx_state = self.RxState.IDLE self.trigger_error(isotp.errors.ReceptionInterruptedWithSingleFrameError('Reception of IsoTP frame interrupted with a new SingleFrame')) elif pdu.type == PDU.Type.FIRST_FRAME: self.start_reception_after_first_frame_if_valid(pdu) self.trigger_error(isotp.errors.ReceptionInterruptedWithFirstFrameError('Reception of IsoTP frame interrupted with a new FirstFrame')) elif pdu.type == PDU.Type.CONSECUTIVE_FRAME: expected_seqnum = (self.last_seqnum +1) & 0xF if pdu.seqnum == expected_seqnum: bytes_to_receive = (self.rx_frame_length - len(self.rx_buffer) ) if pdu.rx_dl != self.actual_rxdl and pdu.rx_dl < bytes_to_receive: self.trigger_error(isotp.errors.ChangingInvalidRXDLError("Received a ConsecutiveFrame with RX_DL=%d while expected RX_DL=%d. Ignoring frame" % (pdu.rx_dl, self.actual_rxdl))) return self.start_rx_cf_timer() # Received a CF message. Restart counter. Timeout handled above. self.last_seqnum = pdu.seqnum self.append_rx_data(pdu.data[:bytes_to_receive]) # Python handle overflow if len(self.rx_buffer) >= self.rx_frame_length: self.rx_queue.put(copy(self.rx_buffer)) # Data complete self.stop_receiving() # Go back to IDLE. Reset all variables and timers. else: self.rx_block_counter += 1 if self.params.blocksize > 0 and (self.rx_block_counter % self.params.blocksize) == 0: self.request_tx_flowcontrol(PDU.FlowStatus.ContinueToSend) # Sets a flag to 1. process_tx will send it for use. self.timer_rx_cf.stop() # We stop the timer until the flow control message is gone. This timer is reactivated in the process_tx(). else: self.stop_receiving() self.trigger_error(isotp.errors.WrongSequenceNumberError('Received a ConsecutiveFrame with wrong SequenceNumber. Expecting 0x%X, Received 0x%X' % (expected_seqnum, pdu.seqnum))) def process_tx(self): output_msg = None # Value outputed. If None, no subsequent call to process_tx will be done. allowed_bytes = self.rate_limiter.allowed_bytes() # Sends flow control if process_rx requested it if self.pending_flow_control_tx: self.pending_flow_control_tx = False if self.pending_flowcontrol_status == PDU.FlowStatus.ContinueToSend: self.start_rx_cf_timer() # We tell the sending party that it can continue to send data, so we start checking the timeout again if not self.params.listen_mode: # Inhibit Flow Control in listen mode. return self.make_flow_control(flow_status=self.pending_flowcontrol_status); # No need to wait. # Handle flow control reception flow_control_frame = self.last_flow_control_frame # Reads the last message received and clears it. (Dequeue message) self.last_flow_control_frame = None if flow_control_frame is not None: if flow_control_frame.flow_status == PDU.FlowStatus.Overflow: # Needs to stop sending. self.stop_sending() self.trigger_error(isotp.errors.OverflowError('Received a FlowControl PDU indicating an Overflow. Stopping transmission.')) return if self.tx_state == self.TxState.IDLE: self.trigger_error(isotp.errors.UnexpectedFlowControlError('Received a FlowControl message while transmission was Idle. Ignoring')) else: if flow_control_frame.flow_status == PDU.FlowStatus.Wait: if self.params.wftmax == 0: self.trigger_error(isotp.errors.UnsuportedWaitFrameError('Received a FlowControl requesting to wait, but wftmax is set to 0')) elif self.wft_counter >= self.params.wftmax: self.trigger_error(isotp.errors.MaximumWaitFrameReachedError('Received %d wait frame which is the maximum set in params.wftmax' % (self.wft_counter))) self.stop_sending() else: self.wft_counter += 1 if self.tx_state in [self.TxState.WAIT_FC, self.TxState.TRANSMIT_CF]: self.tx_state = self.TxState.WAIT_FC self.start_rx_fc_timer() elif flow_control_frame.flow_status == PDU.FlowStatus.ContinueToSend and not self.timer_rx_fc.is_timed_out(): self.wft_counter = 0 self.timer_rx_fc.stop() self.timer_tx_stmin.set_timeout(flow_control_frame.stmin_sec) self.remote_blocksize = flow_control_frame.blocksize if self.tx_state == self.TxState.WAIT_FC: self.tx_block_counter = 0 self.timer_tx_stmin.start() elif self.tx_state == self.TxState.TRANSMIT_CF: pass self.tx_state = self.TxState.TRANSMIT_CF # ======= Timeouts ====== if self.timer_rx_fc.is_timed_out(): self.trigger_error(isotp.errors.FlowControlTimeoutError('Reception of FlowControl timed out. Stopping transmission')) self.stop_sending() # ======= FSM ====== # Check this first as we may have another isotp frame to send and we need to handle it right away without waiting for next "process()" call if self.tx_state != self.TxState.IDLE and len(self.tx_buffer) == 0: self.stop_sending() if self.tx_state == self.TxState.IDLE: read_tx_queue = True # Read until we get non-empty frame to send while read_tx_queue: read_tx_queue = False if not self.tx_queue.empty(): popped_object = self.tx_queue.get() if len(popped_object['data']) == 0: read_tx_queue = True # Read another frame from tx_queue else: self.tx_buffer = bytearray(popped_object['data']) size_on_first_byte = (len(self.tx_buffer) + len(self.address.tx_payload_prefix)) <= 7 size_offset = 1 if size_on_first_byte else 2 # Single frame if len(self.tx_buffer) <= self.params.tx_data_length-size_offset-len(self.address.tx_payload_prefix): if size_on_first_byte: msg_data = self.address.tx_payload_prefix + bytearray([0x0 | len(self.tx_buffer)]) + self.tx_buffer else: msg_data = self.address.tx_payload_prefix + bytearray([0x0, len(self.tx_buffer)]) + self.tx_buffer arbitration_id = self.address.get_tx_arbitraton_id(popped_object['target_address_type']) msg_temp = self.make_tx_msg(arbitration_id, msg_data) if len(msg_data) > allowed_bytes: self.tx_standby_msg = msg_temp self.tx_state = self.TxState.TRANSMIT_SF_STANDBY else: output_msg = msg_temp # Multi frame - First Frame else: self.tx_frame_length = len(self.tx_buffer) encode_length_on_2_first_bytes = True if self.tx_frame_length <= 0xFFF else False if encode_length_on_2_first_bytes: data_length = self.params.tx_data_length-2-len(self.address.tx_payload_prefix) msg_data = self.address.tx_payload_prefix + bytearray([0x10|((self.tx_frame_length >> 8) & 0xF), self.tx_frame_length&0xFF]) + self.tx_buffer[:data_length] else: data_length = self.params.tx_data_length-6-len(self.address.tx_payload_prefix) msg_data = self.address.tx_payload_prefix + bytearray([0x10, 0x00, (self.tx_frame_length>>24) & 0xFF, (self.tx_frame_length>>16) & 0xFF, (self.tx_frame_length>>8) & 0xFF, (self.tx_frame_length>>0) & 0xFF]) + self.tx_buffer[:data_length] arbitration_id = self.address.get_tx_arbitraton_id() self.tx_buffer = self.tx_buffer[data_length:] self.tx_seqnum = 1 msg_temp = self.make_tx_msg(arbitration_id, msg_data) if len(msg_data) <= allowed_bytes: output_msg = msg_temp self.tx_state = self.TxState.WAIT_FC self.start_rx_fc_timer() else: self.tx_standby_msg = msg_temp self.tx_state = self.TxState.TRANSMIT_FF_STANDBY elif self.tx_state in [self.TxState.TRANSMIT_SF_STANDBY, self.TxState.TRANSMIT_FF_STANDBY]: # This states serves if the rate limiter prevent from starting a new transmission. # We need to pop the isotp frame to know if the rate limiter must kick, but isnce the data is already popped, # we can't stay in IDLE state. So we come here until the rate limiter gives us permission to proceed. if self.tx_standby_msg is not None: if len(self.tx_standby_msg.data) <= allowed_bytes: output_msg = self.tx_standby_msg self.tx_standby_msg = None if self.tx_state == self.TxState.TRANSMIT_FF_STANDBY: self.start_rx_fc_timer() self.tx_state = self.TxState.WAIT_FC # After a first frame, we wait for flow control else: self.tx_state = self.TxState.IDLE # After a single frame, there's nothing to do elif self.tx_state == self.TxState.WAIT_FC: pass # Nothing to do. Flow control will make the FSM switch state by calling init_tx_consecutive_frame elif self.tx_state == self.TxState.TRANSMIT_CF: if self.timer_tx_stmin.is_timed_out() or self.params.squash_stmin_requirement: data_length = self.params.tx_data_length-1-len(self.address.tx_payload_prefix) msg_data = self.address.tx_payload_prefix + bytearray([0x20 | self.tx_seqnum]) + self.tx_buffer[:data_length] arbitration_id = self.address.get_tx_arbitraton_id() msg_temp = self.make_tx_msg(arbitration_id, msg_data) if len(msg_temp.data) <= allowed_bytes: output_msg = msg_temp self.tx_buffer = self.tx_buffer[data_length:] self.tx_seqnum = (self.tx_seqnum + 1 ) & 0xF self.timer_tx_stmin.start() self.tx_block_counter+=1 if (len(self.tx_buffer) == 0): self.stop_sending() elif self.remote_blocksize != 0 and self.tx_block_counter >= self.remote_blocksize: self.tx_state = self.TxState.WAIT_FC self.start_rx_fc_timer() if output_msg is not None: self.rate_limiter.inform_byte_sent(len(output_msg.data)) return output_msg def set_sleep_timing(self, idle, wait_fc): """ Sets values in seconds that can be passed to ``time.sleep()`` when the stack is processed in a different thread. """ self.timings = { (self.RxState.IDLE, self.TxState.IDLE) : idle, (self.RxState.IDLE, self.TxState.WAIT_FC) : wait_fc, }
[docs] def set_address(self, address): """ Sets the layer :class:`Address<isotp.Address>`. Can be set after initialization if needed. """ if not isinstance(address, isotp.address.Address): raise ValueError('address must be a valid Address instance') self.address = address if self.address.txid is not None and (self.address.txid > 0x7F4 and self.address.txid < 0x7F6 or self.address.txid > 0x7FA and self.address.txid < 0x7FB) : self.logger.warning('Used txid overlaps the range of ID reserved by ISO-15765 (0x7F4-0x7F6 and 0x7FA-0x7FB)') if self.address.rxid is not None and (self.address.rxid > 0x7F4 and self.address.rxid < 0x7F6 or self.address.rxid > 0x7FA and self.address.rxid < 0x7FB) : self.logger.warning('Used rxid overlaps the range of ID reserved by ISO-15765 (0x7F4-0x7F6 and 0x7FA-0x7FB)')
def pad_message_data(self, msg_data): must_pad = False padding_byte = 0xCC if self.params.tx_padding is None else self.params.tx_padding if self.params.tx_data_length == 8: if self.params.tx_data_min_length is None: if self.params.tx_padding is not None: # ISO-15765:2016 - 10.4.2.1 must_pad = True target_length = 8 else: # ISO-15765:2016 - 10.4.2.2 pass else: # issue #27 must_pad = True target_length = self.params.tx_data_min_length elif self.params.tx_data_length > 8: if self.params.tx_data_min_length is None: # ISO-15765:2016 - 10.4.2.3 target_length = self.get_nearest_can_fd_size(len(msg_data)) must_pad = True else: # Issue #27 must_pad = True target_length = max(self.params.tx_data_min_length, self.get_nearest_can_fd_size(len(msg_data))) if must_pad and len(msg_data) < target_length: msg_data.extend(bytearray([padding_byte & 0xFF] * (target_length-len(msg_data)))) def empty_rx_buffer(self): self.rx_buffer = bytearray() def empty_tx_buffer(self): self.tx_buffer = bytearray() def start_rx_fc_timer(self): self.timer_rx_fc = self.Timer(timeout = float(self.params.rx_flowcontrol_timeout) / 1000) self.timer_rx_fc.start() def start_rx_cf_timer(self): self.timer_rx_cf = self.Timer(timeout = float(self.params.rx_consecutive_frame_timeout) / 1000) self.timer_rx_cf.start() def append_rx_data(self, data): self.rx_buffer.extend(data) def request_tx_flowcontrol(self, status=PDU.FlowStatus.ContinueToSend): self.pending_flow_control_tx = True self.pending_flowcontrol_status = status def stop_sending_flow_control(self): self.pending_flow_control_tx = False self.last_flow_control_frame = None def make_tx_msg(self, arbitration_id, data): self.pad_message_data(data) return CanMessage(arbitration_id = arbitration_id, dlc=self.get_dlc(data, validate_tx=True), data=data, extended_id=self.address.is_29bits, is_fd=self.params.can_fd, bitrate_switch=self.params.bitrate_switch) def get_dlc(self, data, validate_tx=False): fdlen = self.get_nearest_can_fd_size(len(data)) if validate_tx: if self.params.tx_data_length == 8: if fdlen < 2 or fdlen > 8: raise ValueError("Impossible DLC size for payload of %d bytes with tx_data_length of %d" % (len(data), self.params.tx_data_length )) if fdlen >= 2 and fdlen <= 8: return fdlen elif fdlen == 12: return 9 elif fdlen == 16: return 10 elif fdlen == 20: return 11 elif fdlen == 24: return 12 elif fdlen == 32: return 13 elif fdlen == 48: return 14 elif fdlen == 64: return 15 raise ValueError("Impossible DLC size for payload of %d bytes with tx_data_length of %d" % (len(data), self.params.tx_data_length)) def get_nearest_can_fd_size(self, size): if size <= 8: return size if size<=12: return 12 if size<=16: return 16 if size<=20: return 20 if size<=24: return 24 if size<=32: return 32 if size<=48: return 48 if size<=64: return 64 raise ValueError("Impossible data size for CAN FD : %d " % (size)) def make_flow_control(self, flow_status=PDU.FlowStatus.ContinueToSend, blocksize=None, stmin=None): if blocksize is None: blocksize = self.params.blocksize if stmin is None: stmin = self.params.stmin data = PDU.craft_flow_control_data(flow_status, blocksize, stmin) return self.make_tx_msg(self.address.get_tx_arbitraton_id(), self.address.tx_payload_prefix + data) def request_wait_flow_control(self): self.must_wait_for_flow_control = True def stop_sending(self): self.empty_tx_buffer() self.tx_state = self.TxState.IDLE self.tx_frame_length = 0 self.timer_rx_fc.stop() self.timer_tx_stmin.stop() self.remote_blocksize = None self.tx_block_counter = 0 self.tx_seqnum = 0 self.wft_counter = 0 self.tx_standby_msg = None def stop_receiving(self): self.actual_rxdl = None self.rx_state = self.RxState.IDLE self.empty_rx_buffer() self.stop_sending_flow_control() self.timer_rx_cf.stop() # Init the reception of a multi-pdu frame. def start_reception_after_first_frame_if_valid(self, pdu): self.empty_rx_buffer() if pdu.rx_dl not in [8,12,16,20,24,32,48,64]: self.trigger_error(isotp.errors.InvalidCanFdFirstFrameRXDL("Received a FirstFrame with a RX_DL value of %d which is invalid according to ISO-15765-2" % (pdu.rx_dl))) self.stop_receiving() return self.actual_rxdl = pdu.rx_dl if pdu.length > self.params.max_frame_size: self.trigger_error(isotp.errors.FrameTooLongError("Received a Frist Frame with a length of %d bytes, but params.max_frame_size is set to %d bytes. Ignoring" % (pdu.length, self.params.max_frame_size))) self.request_tx_flowcontrol(PDU.FlowStatus.Overflow) self.rx_state = self.RxState.IDLE else: self.rx_state = self.RxState.WAIT_CF self.rx_frame_length = pdu.length self.append_rx_data(pdu.data) self.request_tx_flowcontrol(PDU.FlowStatus.ContinueToSend) self.start_rx_cf_timer() self.last_seqnum = 0 self.rx_block_counter = 0 def trigger_error(self, error): if self.error_handler is not None: if hasattr(self.error_handler, '__call__') and isinstance(error, isotp.errors.IsoTpError): self.error_handler(error) else: self.logger.warning('Given error handler is not a callable object.') self.logger.warning(str(error)) # Clears everything within the layer.
[docs] def reset(self): """ Reset the layer: Empty all buffers, set the internal state machines to Idle """ while not self.tx_queue.empty(): self.tx_queue.get() while not self.rx_queue.empty(): self.rx_queue.get() self.stop_sending() self.stop_receiving() self.rate_limiter.reset()
# Gives a time to pass to time.sleep() based on the state of the FSM. Avoid using too much CPU
[docs] def sleep_time(self): """ Returns a value in seconds that can be passed to ``time.sleep()`` when the stack is processed in a different thread. The value will change according to the internal state machine state, sleeping longer while idle and shorter when active. """ key = (self.rx_state, self.tx_state) if key in self.timings: return self.timings[key] else: return 0.001
[docs]class CanStack(TransportLayer): """ The IsoTP transport using `python-can <https://python-can.readthedocs.io>`__ as CAN layer. python-can must be installed in order to use this class. All parameters except the ``bus`` parameter will be given to the :class:`TransportLayer<isotp.TransportLayer>` constructor :param bus: A python-can bus object implementing ``recv`` and ``send`` :type bus: BusABC :param address: The address information of CAN messages. Includes the addressing mode, txid/rxid, source/target address and address extension. See :class:`isotp.Address<isotp.Address>` for more details. :type address: isotp.Address :param error_handler: A function to be called when an error has been detected. An :class:`isotp.protocol.IsoTpError<isotp.protocol.IsoTpError>` (inheriting Exception class) will be given as sole parameter :type error_handler: Callable :param params: List of parameters for the transport layer :type params: dict """ def _tx_canbus_3plus(self, msg): self.bus.send(can.Message(arbitration_id=msg.arbitration_id, data = msg.data, is_extended_id=msg.is_extended_id, is_fd=msg.is_fd, bitrate_switch=msg.bitrate_switch)) def _tx_canbus_3minus(self, msg): self.bus.send(can.Message(arbitration_id=msg.arbitration_id, data = msg.data, extended_id=msg.is_extended_id, is_fd=msg.is_fd, bitrate_switch=msg.bitrate_switch)) def rx_canbus(self): msg = self.bus.recv(self.timeout) if msg is not None: return CanMessage(arbitration_id=msg.arbitration_id, data=msg.data, extended_id=msg.is_extended_id, is_fd=msg.is_fd, bitrate_switch=msg.bitrate_switch) def __init__(self, bus, timeout=0.0, *args, **kwargs): global can import can # Backward compatibility stuff. message_input_args = can.Message.__init__.__code__.co_varnames[:can.Message.__init__.__code__.co_argcount] if 'is_extended_id' in message_input_args: self.tx_canbus = self._tx_canbus_3plus else: self.tx_canbus = self._tx_canbus_3minus self.set_bus(bus) self.timeout = timeout TransportLayer.__init__(self, rxfn=self.rx_canbus, txfn=self.tx_canbus, *args, **kwargs) def set_bus(self, bus): if not isinstance(bus, can.BusABC): raise ValueError('bus must be a python-can BusABC object') self.bus=bus