Examples
Different layers of intelligence (1 to 4)
In the following examples, we will request an ECU reset in 4 different ways. We will start by crafting a binary payload manually, then we will add a layer of interpretation making the code more comprehensive each time.
1. Raw Connection
my_connection.send(b'\x11\x01\x77\x88\x99') # Sends ECU Reset, with subfunction = 1
payload = my_connection.wait_frame(timeout=1)
if payload == b'\x51\x01':
print('Success!')
else:
print('Reset failed')
2. Request and Responses
req = Request(services.ECUReset, subfunction=1, data=b'\x77\x88\x99')
my_connection.send(req.get_payload())
payload = my_connection.wait_frame(timeout=1)
response = Response.from_payload(payload)
if response.service == service.ECUReset and response.code == Response.Code.PositiveResponse and response.data == b'\x01':
print('Success!')
else:
print('Reset failed')
3. Services
req = services.ECUReset.make_request(reset_type=1, data=b'\x77\x88\x99')
my_connection.send(req.get_payload())
payload = my_connection.wait_frame(timeout=1)
response = Response.from_payload(payload)
services.ECUReset.interpret_response(response)
if response.service == service.ECUReset and response.code == Response.Code.PositiveResponse and response.service_data.reset_type_echo == 1:
print('Success!')
else:
print('Reset failed')
4. Client
try:
client.ecu_reset(reset_type=1, data=b'\x77\x88\x99')
print('Success!')
except:
print('Reset failed')
Using UDS over python-can
In this example, we show how to use PythonIsoTpConnection
with a fictive Vector interface.
Note that, in order to run this code, both python-can
and can-isotp
must be installed.
import can
import can.interfaces.vector import VectorBus
from udsoncan.connections import PythonIsoTpConnection
from udsoncan.client import Client
import udsoncan.configs
import isotp
# Refer to isotp documentation for full details about parameters
isotp_params = {
'stmin': 32, # Will request the sender to wait 32ms between consecutive frame. 0-127ms or 100-900ns with values from 0xF1-0xF9
'blocksize': 8, # Request the sender to send 8 consecutives frames before sending a new flow control message
'wftmax': 0, # Number of wait frame allowed before triggering an error
'tx_data_length': 8, # Link layer (CAN layer) works with 8 byte payload (CAN 2.0)
# Minimum length of CAN messages. When different from None, messages are padded to meet this length. Works with CAN 2.0 and CAN FD.
'tx_data_min_length': None,
'tx_padding': 0, # Will pad all transmitted CAN messages with byte 0x00.
'rx_flowcontrol_timeout': 1000, # Triggers a timeout if a flow control is awaited for more than 1000 milliseconds
'rx_consecutive_frame_timeout': 1000, # Triggers a timeout if a consecutive frame is awaited for more than 1000 milliseconds
'squash_stmin_requirement': False, # When sending, respect the stmin requirement of the receiver. If set to True, go as fast as possible.
'max_frame_size': 4095, # Limit the size of receive frame.
'can_fd': False, # Does not set the can_fd flag on the output CAN messages
'bitrate_switch': False, # Does not set the bitrate_switch flag on the output CAN messages
'rate_limit_enable': False, # Disable the rate limiter
'rate_limit_max_bitrate': 1000000, # Ignored when rate_limit_enable=False. Sets the max bitrate when rate_limit_enable=True
'rate_limit_window_size': 0.2, # Ignored when rate_limit_enable=False. Sets the averaging window size for bitrate calculation when rate_limit_enable=True
'listen_mode': False, # Does not use the listen_mode which prevent transmission.
}
uds_config = udsoncan.configs.default_client_config.copy()
bus = VectorBus(channel=0, bitrate=500000) # Link Layer (CAN protocol)
notifier = can.Notifier(bus, [can.Printer()]) # Add a debug listener that print all messages
tp_addr = isotp.Address(isotp.AddressingMode.Normal_11bits, txid=0x123, rxid=0x456) # Network layer addressing scheme
#stack = isotp.CanStack(bus=bus, address=tp_addr, params=isotp_params) # isotp v1.x has no notifier support
stack = isotp.NotifierBasedCanStack(bus=bus, notifier=notifier, address=tp_addr, params=isotp_params) # Network/Transport layer (IsoTP protocol). Register a new listenenr
conn = PythonIsoTpConnection(stack) # interface between Application and Transport layer
with Client(conn, config=uds_config) as client: # Application layer (UDS protocol)
client.change_session(1)
# ...
Using UDS over aioisotp
In this example, we show how to use SyncAioIsotpConnection
with a virtual can interface.
Note that, in order to run this code, both python-can
and aioisotp
must be installed.
from udsoncan.connections import SyncAioIsotpConnection
from udsoncan.client import Client
import logging
logging.basicConfig(level=logging.DEBUG)
conn = SyncAioIsotpConnection(interface="virtual", channel=0, bitrate=500000, rx_id=0x123, tx_id=0x456)
with Client(conn) as client:
with client.suppress_positive_response:
client.change_session(3)
# ...
Server default address and size format
In this example, we show how the Client uses the memory location format configurations.
client.config['server_address_format'] = 16
client.config['server_memorysize_format'] = 8
# Explicit declaration. Client will used this value
memloc1 = MemoryLocation(address=0x1234, memorysize=0x10, address_format=16, address_format=8)
# No explicit declaration. Client will use the default values in the configuration
memloc2 = MemoryLocation(address=0x1234, memorysize=0x10)
response = client.read_memory_by_address(memloc1)
response = client.read_memory_by_address(memloc2)
Security algorithm implementation
The following example shows how to define a security algorithm in the client configuration. The algorithm XOR the seed with a pre-shared key passed as a parameter.
def myalgo(level, seed, params):
"""
Builds the security key to unlock a security level. Returns the seed xor'ed with pre-shared key.
"""
output_key = bytearray(seed)
xorkey = bytearray(params['xorkey'])
for i in range(len(seed)):
output_key[i] = seed[i] ^ xorkey[i%len(xorkey)]
return bytes(output_key)
client.config['security_algo'] = myalgo
client.config['security_algo_params'] = dict(xorkey=b'\x12\x34\x56\x78')
Warning
This algorithm is not secure and is given as an example only because of its simple implementation. XOR encryption is weak on many levels; it is vulnerable to known-plaintext attacks, relatively weak against replay attacks and does not provide enough diffusion (pattern recognition is possible). If you are an ECU programmer, please do not implement this.
Reading a DID with ReadDataByIdentifier
This example shows how to configure the client with a DID configuration and request the server with ReadDataByIdentifier
import udsoncan
from udsoncan.connections import IsoTPSocketConnection
from udsoncan.client import Client
import udsoncan.configs
import struct
class MyCustomCodecThatShiftBy4(udsoncan.DidCodec):
def encode(self, val):
val = (val << 4) & 0xFFFFFFFF # Do some stuff
return struct.pack('<L', val) # Little endian, 32 bit value
def decode(self, payload):
val = struct.unpack('<L', payload)[0] # decode the 32 bits value
return val >> 4 # Do some stuff (reversed)
def __len__(self):
return 4 # encoded payload is 4 byte long.
config = dict(udsoncan.configs.default_client_config)
config['data_identifiers'] = {
'default' : '>H', # Default codec is a struct.pack/unpack string. 16bits little endian
0x1234 : MyCustomCodecThatShiftBy4, # Uses own custom defined codec. Giving the class is ok
0x1235 : MyCustomCodecThatShiftBy4(), # Same as 0x1234, giving an instance is good also
0xF190 : udsoncan.AsciiCodec(15) # Codec that read ASCII string. We must tell the length of the string
}
# IsoTPSocketconnection only works with SocketCAN under Linux. Use another connection if needed.
conn = IsoTPSocketConnection('vcan0', rxid=0x123, txid=0x456)
with Client(conn, request_timeout=2, config=config) as client:
response = client.read_data_by_identifier([0xF190])
print(response.service_data.values[0xF190]) # This is a dict of DID:Value
# Or, if a single DID is expected, a shortcut to read the value of the first DID
vin = client.read_data_by_identifier_first(0xF190)
print(vin) # 'ABCDE0123456789' (15 chars)
InputOutputControlByIdentifier Composite DID
This example shows how the InputOutputControlByIdentifier can be used with a composite data identifier and how to build a proper ioconfig dict which can be tricky. The example shown below correspond to a real example provided in ISO-14229 document
# Example taken from UDS standard
class MyCompositeDidCodec(DidCodec):
def encode(self, IAC_pintle, rpm, pedalA, pedalB, EGR_duty):
pedal = (pedalA << 4) | pedalB
return struct.pack('>BHBB', IAC_pintle, rpm, pedal, EGR_duty)
def decode(self, payload):
vals = struct.unpack('>BHBB', payload)
return {
'IAC_pintle': vals[0],
'rpm' : vals[1],
'pedalA' : (vals[2] >> 4) & 0xF,
'pedalB' : vals[2] & 0xF,
'EGR_duty' : vals[3]
}
def __len__(self):
return 5
ioconfig = {
0x132 : MyDidCodec,
0x456 : '<HH',
0x155 : {
'codec' : MyCompositeDidCodec,
'mask' : {
'IAC_pintle': 0x80,
'rpm' : 0x40,
'pedalA' : 0x20,
'pedalB' : 0x10,
'EGR_duty' : 0x08
},
'mask_size' : 2 # Mask encoded over 2 bytes
}
}
values = {'IAC_pintle': 0x07, 'rpm': 0x1234, 'pedalA': 0x4, 'pedalB' : 0x5, 'EGR_duty': 0x99}
req = InputOutputControlByIdentifier.make_request(0x155, values=values, masks=['IAC_pintle', 'pedalA'], ioconfig=ioconfig)
Using J2534 PassThru Interface
This is an example for how to use J2534Connection
.
This connection requires a compatible J2534 PassThru device (such as a tactrix openport 2.0 cable), with a DLL for said device installed.
Note, this connection has been written to plug in where a standard IsoTPSocketConnection had been used (i.e. code ported from Linux to Windows). Functionality, from a high level, is identical.
from udsoncan.connections import J2534Connection
conn = J2534Connection(windll='C:\Program Files (x86)\OpenECU\OpenPort 2.0\drivers\openport 2.0\op20pt32.dll',
rxid=0x7E8, txid=0x7E0) # Define the connection using the absolute path to the DLL, rxid and txid's for isotp
conn.send(b'\x22\xf2\x00') # Mode 22 request for DID F200
response = conn.wait_frame() # response should = 0x62 F2 00 data data data data
with Client(conn, request_timeout=1) as client: # Application layer (UDS protocol)
client.change_session(1)
# ...
Define a Dynamic DID with DynamicallyDefineDataIdentifier
# Example 1) defineByIdentifier - single value
my_def = DynamicDidDefinition(source_did = 0x1111, position=1, memorysize=2)
client.dynamically_define_did(0x1234, my_def)
# Example 2) defineByIdentifier - composite value
my_def = DynamicDidDefinition(source_did = 0x1111, position=1, memorysize=2)
my_def.add(source_did = 0x2222, position=5, memorysize=4)
client.dynamically_define_did(0x1234, my_def)
# Example 3) defineByMemoryAddress - single value
my_memloc = MemoryLocation(address=0x1111, memorysize=2, address_format=16, memorysize_format=8)
client.dynamically_define_did(0x1234, my_memloc)
# Example 4) defineByMemoryAddress - composite value
my_def = DynamicDidDefinition(MemoryLocation(address=0x1111, memorysize=2, address_format=16, memorysize_format=8))
my_def.add(MemoryLocation(address=0x2222, memorysize=4, address_format=16, memorysize_format=8))
my_def.add(MemoryLocation(address=0x3333, memorysize=1, address_format=16, memorysize_format=8))
client.dynamically_define_did(0x1234, my_def)
# Example 5) defineByMemoryAddress - composite value and client default format
client.set_config('server_address_format', 16)
client.set_config('server_memorysize_format', 8)
my_def = DynamicDidDefinition(MemoryLocation(address=0x1111, memorysize=2))
my_def.add(MemoryLocation(address=0x2222, memorysize=4))
my_def.add(MemoryLocation(address=0x3333, memorysize=1))
client.dynamically_define_did(0x1234, my_def)