import socket as socket_module
import os
import isotp.address
from typing import TYPE_CHECKING, Optional, Union
if TYPE_CHECKING:
from . import opts
mtu = 4095
def check_support() -> None:
if not hasattr(socket_module, 'CAN_ISOTP'):
if os.name == 'nt':
raise NotImplementedError("This module cannot be used on Windows")
else:
raise NotImplementedError(
"Your version of Python does not offer support for CAN ISO-TP protocol. Support have been added since Python 3.7 on Linux build > 2.6.15.")
class flags:
LISTEN_MODE = 0x001
"""Puts the socket in Listen mode, which prevents transmission of data"""
EXTEND_ADDR = 0x002
"""When set, an address extension byte (set in socket general options) will be added to each payload sent. Unless RX_EXT_ADDR is also set, this value will be expected for reception as well"""
TX_PADDING = 0x004
"""Enables padding of transmitted data with a byte set in the socket general options"""
RX_PADDING = 0x008
""" Indicates that data padding is possible in reception. Must be set for CHK_PAD_LEN and CHK_PAD_DATA to have an effect"""
CHK_PAD_LEN = 0x010
""" Makes the socket validate the padding length of the CAN message"""
CHK_PAD_DATA = 0x020
""" Makes the socket validate the padding bytes of the CAN message"""
HALF_DUPLEX = 0x040
""" Sets the socket in half duplex mode, forcing transmission and reception to happen sequentially """
FORCE_TXSTMIN = 0x080
"""Forces the socket to use the separation time sets in general options, overriding stmin value received in flow control frames."""
FORCE_RXSTMIN = 0x100
""" Forces the socket to ignore any message received faster than stmin given in the flow control frame"""
RX_EXT_ADDR = 0x200
""" When sets, a different extended address can be used for reception than for transmission."""
WAIT_TX_DONE = 0x400
""" When set, we wait for tx completion to make sure the PDU is completely passed to the CAN netdevice queue."""
class LinkLayerProtocol:
CAN = 16
""" Internal structure size of a CAN 2.0 frame"""
CAN_FD = 72
""" Internal structure size of a CAN FD frame"""
[docs]class socket:
"""
A IsoTP socket wrapper for easy configuration
:param timeout: Passed down to the socket ``settimeout`` method. Control the blocking/non-blocking behavior of the socket
:type timeout: int | None
"""
# We want that syntax isotp.socket.flags and isotp.socket.mtu
# This is a workaround for sphinx autodoc that fails to load docstring for nested-class members
flags = flags
LinkLayerProtocol = LinkLayerProtocol
interface: Optional[str]
address: Optional[isotp.address.AbstractAddress]
bound: bool
closed: bool
_socket: socket_module.socket
def __init__(self, timeout: Optional[float] = None) -> None:
check_support()
from . import opts # import only if required.
self.interface = None
self.address = None
self.bound = False
self.closed = False
self._socket = socket_module.socket(socket_module.AF_CAN, socket_module.SOCK_DGRAM, socket_module.CAN_ISOTP)
if timeout is not None and timeout > 0:
self.settimeout(timeout)
def settimeout(self, value: Optional[float]) -> None:
self._socket.settimeout(value)
def gettimeout(self) -> Optional[float]:
return self._socket.gettimeout()
def send(self, data: bytes, flags: int = 0) -> int:
if not self.bound:
raise RuntimeError("bind() must be called before using the socket")
return self._socket.send(data, flags)
def recv(self, bufsize: int = mtu, flags: int = 0) -> bytes:
if not self.bound:
raise RuntimeError("bind() must be called before using the socket")
return self._socket.recv(bufsize, flags)
[docs] def set_ll_opts(self,
mtu: Optional[int] = None,
tx_dl: Optional[int] = None,
tx_flags: Optional[int] = None
) -> "opts.LinkLayerOpts":
"""
Sets the link layer options. Default values are set to work with CAN 2.0. Link layer may be configure to work in CAN FD.
Values of ``None`` will leave the parameter unchanged
:param mtu: The internal CAN frame structure size. Possible values are defined in :class:`isotp.socket.LinkLayerProtocol<isotp.socket.LinkLayerProtocol>`
:type mtu: int
:param tx_dl: The CAN message payload length. For CAN 2.0, this value should be 8. For CAN FD, possible values are 8,12,16,20,24,32,48,64
:type tx_dl: int
:param tx_flags: Link layer flags.
:type tx_flags: int
:rtype: :class:`isotp.opts.LinkLayerOpts<isotp.opts.LinkLayerOpts>`
"""
if self.bound:
raise RuntimeError("Options must be set before calling bind()")
return opts.LinkLayerOpts.write(self._socket,
mtu=mtu,
tx_dl=tx_dl,
tx_flags=tx_flags)
[docs] def set_opts(self,
optflag: Optional[int] = None,
frame_txtime: Optional[int] = None,
ext_address: Optional[int] = None,
txpad: Optional[int] = None,
rxpad: Optional[int] = None,
rx_ext_address: Optional[int] = None,
tx_stmin: Optional[int] = None) -> "opts.GeneralOpts":
"""
Sets the general options of the socket. Values of ``None`` will leave the parameter unchanged
:param optflag: A list of flags modifying the protocol behavior. Refer to :class:`socket.flags<isotp.socket.flags>`
:type optflag: int
:param frame_txtime: Frame transmission time (N_As/N_Ar) in nanoseconds.
:type frame_txtime: int
:param ext_address: The extended address to use. If not None, flags.EXTEND_ADDR will be set.
:type ext_address: int
:param txpad: The byte to use to pad the transmitted CAN messages. If not None, flags.TX_PADDING will be set
:type txpad: int
:param rxpad: The byte to use to pad the transmitted CAN messages. If not None, flags.RX_PADDING will be set
:type rxpad: int
:param rx_ext_address: The extended address to use in reception. If not None, flags.RX_EXT_ADDR will be set
:type rx_ext_address: int
:param tx_stmin: Sets the transmit separation time (time between consecutive frame) in nanoseconds. This value will override the value received through FlowControl frame. If not None, flags.FORCE_TXSTMIN will be set
:type tx_stmin: int
:rtype: :class:`isotp.opts.GeneralOpts<isotp.opts.GeneralOpts>`
"""
if self.bound:
raise RuntimeError("Options must be set before calling bind()")
return opts.GeneralOpts.write(self._socket,
optflag=optflag,
frame_txtime=frame_txtime,
ext_address=ext_address,
txpad=txpad,
rxpad=rxpad,
rx_ext_address=rx_ext_address,
tx_stmin=tx_stmin
)
[docs] def set_fc_opts(self, bs: Optional[int] = None, stmin: Optional[int] = None, wftmax: Optional[int] = None) -> "opts.FlowControlOpts":
"""
Sets the flow control options of the socket. Values of ``None`` will leave the parameter unchanged
:param bs: The block size sent in the flow control message. Indicates the number of consecutive frame a sender can send before the socket sends a new flow control. A block size of 0 means that no additional flow control message will be sent (block size of infinity)
:type bs: int
:param stmin: The minimum separation time sent in the flow control message. Indicates the amount of time to wait between 2 consecutive frame. This value will be sent as is over CAN. Values from 1 to 127 means milliseconds. Values from 0xF1 to 0xF9 means 100us to 900us. 0 Means no timing requirements
:type stmin: int
:param wftmax: Maximum number of wait frame (flow control message with flow status=1) allowed before dropping a message. 0 means that wait frame are not allowed
:type wftmax: int
:rtype: :class:`isotp.opts.FlowControlOpts<isotp.opts.FlowControlOpts>`
"""
if self.bound:
raise RuntimeError("Options must be set before calling bind()")
return opts.FlowControlOpts.write(self._socket, bs=bs, stmin=stmin, wftmax=wftmax)
def get_ll_opts(self) -> "opts.LinkLayerOpts":
return opts.LinkLayerOpts.read(self._socket)
def get_opts(self) -> "opts.GeneralOpts":
return opts.GeneralOpts.read(self._socket)
def get_fc_opts(self) -> "opts.FlowControlOpts":
return opts.FlowControlOpts.read(self._socket)
[docs] def bind(self, interface: str, address: Union[isotp.Address, isotp.AsymmetricAddress]) -> None:
"""
Binds the socket to an address.
:param interface: The network interface to use
:type interface: string
:param address: The address to bind to.
:type address: :class:`isotp.Address<isotp.Address>`
"""
if not isinstance(interface, str):
raise ValueError("interface must be a string")
if not isinstance(address, (isotp.Address, isotp.AsymmetricAddress)):
raise ValueError("address and instance of isotp.Address or isotp.AsymmetricAddress")
if isinstance(address, isotp.AsymmetricAddress):
if address.requires_rx_extension_byte() != address.requires_tx_extension_byte():
# See https://github.com/hartkopp/can-isotp/issues/62
raise ValueError("The IsoTP socket module does not support asymmetric addresses with inconsistent address_extension byte")
self.interface = interface
self.address = address
# IsoTP sockets doesn't provide an interface to modify the target address type. We assume physical.
# If functional is required, it Ids can be manually crafted in Normal / extended mode
rxid = self.address.get_rx_arbitration_id(isotp.TargetAddressType.Physical)
txid = self.address.get_tx_arbitration_id(isotp.TargetAddressType.Physical)
if self.address.is_rx_29bits():
rxid = (rxid & socket_module.CAN_EFF_MASK) | socket_module.CAN_EFF_FLAG
else:
rxid = rxid & socket_module.CAN_SFF_MASK
if self.address.is_tx_29bits():
txid = (txid & socket_module.CAN_EFF_MASK) | socket_module.CAN_EFF_FLAG
else:
txid = txid & socket_module.CAN_SFF_MASK
if self.address.requires_tx_extension_byte() or self.address.requires_rx_extension_byte():
o = self.get_opts()
assert o.optflag is not None
if self.address.requires_tx_extension_byte():
o.optflag |= self.flags.EXTEND_ADDR
if self.address.requires_rx_extension_byte():
o.optflag |= self.flags.RX_EXT_ADDR
self.set_opts(optflag=o.optflag, ext_address=self.address.get_tx_extension_byte(), rx_ext_address=self.address.get_rx_extension_byte())
self._socket.bind((interface, rxid, txid))
self.bound = True
def fileno(self) -> int:
"""Returns the socket file descriptor"""
return self._socket.fileno()
def real_socket(self) -> socket_module.socket:
"""Return the real socket object hidden by the fake isotp socket object"""
return self._socket
def close(self) -> None:
"""Closes the socket"""
self._socket.close()
self.bound = False
self.closed = True
self.address = None
def __delete__(self) -> None:
if isinstance(self._socket, socket_module.socket):
self._socket.close()
def __repr__(self) -> str:
if self.bound:
assert self.address is not None
return "<ISO-TP Socket: %s, %s>" % (self.interface, self.address.get_content_str())
else:
status = "Closed" if self.closed else "Unbound"
return "<%s ISO-TP Socket at 0x%s>" % (status, hex(id(self)))