Examples

Basic transmission with python-can

import isotp
import logging
import time

from can.interfaces.vector import VectorBus

def my_error_handler(error):
   logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))

bus = VectorBus(channel=0, bitrate=500000)
addr = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x123, txid=0x456)

stack = isotp.CanStack(bus, address=addr, error_handler=my_error_handler)
stack.send(b'Hello, this is a long payload sent in small chunks')

while stack.transmitting():
   stack.process()
   time.sleep(stack.sleep_time())

print("Payload transmission done.")

bus.shutdown()

Threaded reception with python-can

import isotp
import logging
import time
import threading

from can.interfaces.socketcan import SocketcanBus

class ThreadedApp:
   def __init__(self):
      self.exit_requested = False
      self.bus = SocketcanBus(channel='vcan0')
      addr = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x123, txid=0x456)
      self.stack = isotp.CanStack(self.bus, address=addr, error_handler=self.my_error_handler)

   def start(self):
      self.exit_requested = False
      self.thread = threading.Thread(target = self.thread_task)
      self.thread.start()

   def stop(self):
      self.exit_requested = True
      if self.thread.isAlive():
         self.thread.join()

   def my_error_handler(self, error):
      logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))

   def thread_task(self):
      while self.exit_requested == False:
         self.stack.process()                # Non-blocking
         time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state

   def shutdown(self):
      self.stop()
      self.bus.shutdown()

if __name__ == '__main__':
   app = ThreadedApp()
   app.start()

   print('Waiting for payload - maximum 5 sec')
   t1 = time.time()
   while time.time() - t1 < 5:
      if app.stack.available():
         payload = app.stack.recv()
         print("Received payload : %s" % (payload))
         break
      time.sleep(0.2)

   print("Exiting")
   app.shutdown()

Different type of addresses

import isotp

isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x123, txid=0x456)
isotp.Address(isotp.AddressingMode.Normal_29bits, rxid=0x123456, txid=0x789ABC)
isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0x11, target_address=0x22)
isotp.Address(isotp.AddressingMode.Extended_11bits, rxid=0x123, txid=0x456, source_address=0x55, target_address=0xAA)
isotp.Address(isotp.AddressingMode.Extended_29bits, rxid=0x123456, txid=0x789ABC, source_address=0x55, target_address=0xAA)
isotp.Address(isotp.AddressingMode.Mixed_11bits, rxid=0x123, txid=0x456, address_extension=0x99)
isotp.Address(isotp.AddressingMode.Mixed_29bits, source_address=0x11, target_address=0x22, address_extension=0x99)

Sending with functional addressing (broadcast)

import isotp
import time

from can.interfaces.vector import VectorBus

bus = VectorBus(channel=0, bitrate=500000)
addr = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x123, txid=0x456)
stack = isotp.CanStack(bus, address=addr)
stack.send(b'Hello', isotp.TargetAddressType.Functional) # Payload must fit a Single Frame. Functional addressing only works with Single Frames

while stack.transmitting():
   stack.process()
   time.sleep(stack.sleep_time())

bus.shutdown()

Defining custom rxfn and txfn

In this example, we see how to configure a TransportLayer to interact with a hardware different than python-can with a fictive API.

import isotp

def my_rxfn():
    # All my_hardware_something and get_something() function are fictive of course.
    msg = my_hardware_api_recv()
    return isotp.CanMesage(arbitration_id=msg.get_id(), data=msg.get_data(), dlc=msg.get_dlc(), extended_id=msg.is_extended_id())


def my_txfn(isotp_msg):
    # all set_something functions and my_hardware_something are fictive.
    msg = my_hardware_api_make_msg()
    msg.set_id(isotp_msg.arbitration_id)
    msg.set_data(isotp_msg.data)
    msg.set_dlc(isotp_msg.dlc)
    msg.set_extended_id(isotp_msg.is_extended_id)
    my_hardware_api_send(msg)

addr = isotp.Address(isotp.AddressingMode.Normal_29bits, txid=0x123456, rxid = 0x123457)
layer = isotp.TransportLayer(rxfn=my_rxfn, txfn=my_txfn, address=addr)

# ... rest of programs
# ...

my_hardware_close()

Defining partial rxfn and txfn

If your hardware API requires some sort of handle to be given to its functions, you will need a way to pass this handle from your app down to rxfn and txfn. The TransportLayer will call rxfn and txfn with no additional parameters, which might be an issue.

A clean way to overcome this limitation is to use a functools.partial function.

import isotp
from functools import partial   # Allow partial functions

# hardware_handle is passed through partial func
def my_rxfn(hardware_handle):
    msg = my_hardware_api_recv(hardware_handle)
    return isotp.CanMesage(arbitration_id=msg.get_id(), data=msg.get_data(), dlc=msg.get_dlc(), extended_id=msg.is_extended_id())

# hardware_handle is passed through partial func
def my_txfn(hardware_handle, isotp_msg):
    # all set_something functions and my_hardware_something are fictive.
    msg = my_hardware_api_make_msg()
    msg.set_id(isotp_msg.arbitration_id)
    msg.set_data(isotp_msg.data)
    msg.set_dlc(isotp_msg.dlc)
    msg.set_extended_id(isotp_msg.is_extended_id)
    my_hardware_api_send(hardware_handle, msg)

hardware_handle = my_hardware_open()    # Fictive handle mechanism
addr = isotp.Address(isotp.AddressingMode.Normal_29bits, txid=0x123456, rxid = 0x123457)
# This is where the magic happens
layer = isotp.TransportLayer(rxfn=partial(my_rxfn, hardware_handle), txfn=partial(my_txfn, hardware_handle), address=addr)

# ... rest of programs
# ...

my_hardware_close()