Examples
Blocking transmission with python-can
# In this example, we transmit a payload using a blocking send()
import isotp
import logging
from can.interfaces.socketcan import SocketcanBus
def my_error_handler(error):
# Called from a different thread, needs to be thread safe
logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))
bus = SocketcanBus(channel='vcan0')
addr = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x123, txid=0x456)
params = {
'blocking_send' : True
}
stack = isotp.CanStack(bus, address=addr, error_handler=my_error_handler, params=params)
try:
stack.start()
stack.send(b'Hello, this is a long payload sent in small chunks', timeout=2) # Blocking send, raise on error
print("Payload transmission successfully completed.") # Success is guaranteed because send() can raise
except isotp.BlockingSendFailure: # Happens for any kind of failure, including timeouts
print("Send failed")
finally:
stack.stop()
bus.shutdown()
Non-blocking transmission with python-can
# In this example, we transmit a payload sing a non-blocking send()
import isotp
import logging
import time
from can.interfaces.socketcan import SocketcanBus
def my_error_handler(error):
# Called from a different thread, needs to be thread safe
logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))
bus = SocketcanBus(channel='vcan0')
addr = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x123, txid=0x456)
stack = isotp.CanStack(bus, address=addr, error_handler=my_error_handler)
try:
stack.start()
stack.send(b'Hello, this is a long payload sent in small chunks') # Non-blocking send, does not raise exception.
while stack.transmitting():
time.sleep(0.005)
print("Payload transmission done.") # May have failed, use the error_handler to know
finally:
stack.stop()
bus.shutdown()
Different type of addresses
import isotp
isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x123, txid=0x456)
isotp.Address(isotp.AddressingMode.Normal_29bits, rxid=0x123456, txid=0x789ABC)
isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0x11, target_address=0x22)
isotp.Address(isotp.AddressingMode.Extended_11bits, rxid=0x123, txid=0x456, source_address=0x55, target_address=0xAA)
isotp.Address(isotp.AddressingMode.Extended_29bits, rxid=0x123456, txid=0x789ABC, source_address=0x55, target_address=0xAA)
isotp.Address(isotp.AddressingMode.Mixed_11bits, rxid=0x123, txid=0x456, address_extension=0x99)
isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=0x11, target_address=0x22, address_extension=0x99)
# Asymmetric Addresses
isotp.AsymmetricAddress(
tx_addr=isotp.Address(isotp.AddressingMode.NormalFixed_29bits, target_address=ta, source_address=sa, tx_only=True),
rx_addr=isotp.Address(isotp.AddressingMode.Mixed_11bits, rxid=0x123, address_extension=0x99, rx_only=True) # txid is not required
)
Sending with functional addressing (broadcast)
import isotp
addr = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x123, txid=0x456)
layer = isotp.TransportLayer(rxfn=..., txfn=..., address=addr)
try:
layer.start()
layer.send(b'Hello', isotp.TargetAddressType.Functional) # Payload must fit a Single Frame. Functional addressing only works with Single Frames
while layer.transmitting():
time.sleep(0.005)
finally:
layer.stop()
bus.shutdown()
Defining custom rxfn and txfn
In this example, we see how to configure a TransportLayer
to interact with a hardware different than python-can with a fictive API.
import isotp
from typing import Optional
def my_rxfn(timeout:float) -> Optional[isotp.CanMesage]:
# All my_hardware_something and get_something() function are fictive of course.
msg = my_hardware_api_recv(timeout) # Blocking read are encouraged for better timing.
if msg is None:
return None # Return None if no message available
return isotp.CanMesage(arbitration_id=msg.get_id(), data=msg.get_data(), dlc=msg.get_dlc(), extended_id=msg.is_extended_id())
def my_txfn(isotp_msg:isotp.CanMesage):
# all set_something functions and my_hardware_something are fictive.
msg = my_hardware_api_make_msg()
msg.set_id(isotp_msg.arbitration_id)
msg.set_data(isotp_msg.data)
msg.set_dlc(isotp_msg.dlc)
msg.set_extended_id(isotp_msg.is_extended_id)
my_hardware_api_send(msg)
addr = isotp.Address(isotp.AddressingMode.Normal_29bits, txid=0x123456, rxid = 0x123457)
layer = isotp.TransportLayer(rxfn=my_rxfn, txfn=my_txfn, address=addr)
layer.start()
# ... rest of programs
# ...
layer.stop()
my_hardware_close()
Defining partial rxfn and txfn
If your hardware API requires some sort of handle to be given to its functions, you will need a way to pass this handle from your app down to rxfn
and txfn
.
The TransportLayer
will call rxfn
and txfn
with no additional parameters, which might be an issue.
A clean way to overcome this limitation is to use a functools.partial
function.
import isotp
import functools
from typing import Optional
# hardware_handle is passed through partial func
def my_rxfn(hardware_handle, timeout:float) -> Optional[isotp.CanMesage]:
msg = my_hardware_api_recv(timeout) # Blocking read are encouraged for better timing.
if msg is None:
return None # Return None if no message available
return isotp.CanMesage(arbitration_id=msg.get_id(), data=msg.get_data(), dlc=msg.get_dlc(), extended_id=msg.is_extended_id())
# hardware_handle is passed through partial func
def my_txfn(hardware_handle, isotp_msg:isotp.CanMesage):
# all set_something functions and my_hardware_something are fictive.
msg = my_hardware_api_make_msg()
msg.set_id(isotp_msg.arbitration_id)
msg.set_data(isotp_msg.data)
msg.set_dlc(isotp_msg.dlc)
msg.set_extended_id(isotp_msg.is_extended_id)
my_hardware_api_send(hardware_handle, msg)
hardware_handle = my_hardware_open() # Fictive handle mechanism
addr = isotp.Address(isotp.AddressingMode.Normal_29bits, txid=0x123456, rxid = 0x123457)
# This is where the magic happens
partial_rxfn = functools.partial(my_rxfn, hardware_handle)
partial_txfn = functools.partial(my_txfn, hardware_handle)
layer = isotp.TransportLayer(rxfn=partial_rxfn, txfn=partial_txfn, address=addr)
layer.start()
# ... rest of programs
# ...
layer.stop()
my_hardware_close()