Source code for udsoncan.services.InputOutputControlByIdentifier
import struct
import math
from udsoncan.Request import Request
from udsoncan.Response import Response
from udsoncan import IOMasks, IOValues, DidCodec, CodecDefinition, IOConfig, IOConfigEntry
from udsoncan.exceptions import *
from udsoncan.BaseService import BaseService, BaseSubfunction, BaseResponseData
from udsoncan.ResponseCode import ResponseCode
import udsoncan.tools as tools
from udsoncan.common.dids import make_did_codec_from_definition, fetch_codec_definition_from_config
from typing import Optional, Any, Union, List, Dict, cast
class InputOutputControlByIdentifier(BaseService):
_sid = 0x2F
_use_subfunction = False
class ControlParam(BaseSubfunction):
"""
InputOutputControlByIdentifier defined control parameters as defined by ISO-14229:2006, Annex E
"""
__pretty_name__ = 'control parameter'
returnControlToECU = 0
resetToDefault = 1
freezeCurrentState = 2
shortTermAdjustment = 3
supported_negative_response = [ResponseCode.IncorrectMessageLengthOrInvalidFormat,
ResponseCode.ConditionsNotCorrect,
ResponseCode.RequestOutOfRange,
ResponseCode.SecurityAccessDenied
]
[docs] class ResponseData(BaseResponseData):
"""
.. data:: did_echo
DID echoed back by the server
.. data:: control_param_echo
control_param echoed back by the server
.. data:: decoded_data
Value processed by the :ref:`DidCodec<DidCodec>`.decode() method
"""
did_echo: int
control_param_echo: Optional[int]
decoded_data: Optional[Any]
def __init__(self, did_echo: int, control_param_echo: Optional[int] = None, decoded_data: Optional[Any] = None):
super().__init__(InputOutputControlByIdentifier)
self.did_echo = did_echo
self.control_param_echo = control_param_echo
self.decoded_data = decoded_data
class InterpretedResponse(Response):
service_data: "InputOutputControlByIdentifier.ResponseData"
[docs] @classmethod
def make_request(cls,
did: int,
control_param: Optional[int] = None,
values: Optional[Union[List[Any], Dict[str, Any], IOValues]] = None,
masks: Optional[Union[List[str], Dict[str, bool], IOMasks, bool]] = None,
ioconfig: Optional[IOConfig] = None
) -> Request:
"""
Generates a request for InputOutputControlByIdentifier
:param did: Data identifier to represent the IO
:type did: int
:param control_param: Optional parameter that can be a value defined in :class:`InputOutputControlByIdentifier.ControlParam<udsoncan.services.InputOutputControlByIdentifier.ControlParam>`
:type control_param: int
:param values: Optional values to send to the server. This parameter will be given to :ref:`DidCodec<DidCodec>`.encode() method.
It can be:
- A list for positional arguments
- A dict for named arguments
- An instance of :ref:`IOValues<IOValues>` for mixed arguments
:type values: list, dict, :ref:`IOValues<IOValues>`
:param masks: Optional mask record for composite values. The mask definition must be included in ``ioconfig``
It can be:
- A list naming the bit mask to set
- A dict with the mask name as a key and a boolean as the value (True to set the mask, False to clear it)
- An instance of :ref:`IOMask<IOMask>`
- A boolean value to set all masks to the same value.
:type masks: list, dict, :ref:`IOMask<IOMask>`, bool
:param ioconfig: Definition of DID codecs. Dictionary mapping a DID (int) to a valid :ref:`DidCodec<DidCodec>` class or pack/unpack string.
It is possible to use composite :ref:`DidCodec<DidCodec>` by specifying a dict with entries : codec, mask, mask_size.
:type ioconfig: dict[int] = :ref:`DidCodec<DidCodec>`, dict
:raises ValueError: If parameters are out of range, missing or wrong type
:raises ConfigError: If given DID is not defined within ioconfig
"""
tools.validate_int(did, min=0, max=0xffff, name='DID')
if control_param is not None:
if not isinstance(control_param, int):
raise ValueError("control_param must be a valid integer")
if control_param < 0 or control_param > 3:
raise ValueError(
'control_param must either be returnControlToECU(0), resetToDefault(1), freezeCurrentState(2), shortTermAdjustment(3). %d given.' % control_param)
if values is not None:
if isinstance(values, list):
values = IOValues(*values)
if isinstance(values, dict):
values = IOValues(**values)
if not isinstance(values, IOValues):
raise ValueError("values must be an instance of IOValues")
if masks is not None:
if isinstance(masks, list):
masks = IOMasks(*masks)
if isinstance(masks, dict):
masks = IOMasks(**masks)
if not isinstance(masks, IOMasks) and not isinstance(masks, bool):
raise ValueError("masks must be an instance of IOMask or a boolean value")
if values is None and masks is not None:
raise ValueError('An IOValue must be given if a IOMask is provided.')
if ioconfig is None:
raise ValueError("ioconfig must be provided")
request = Request(service=cls)
request.data = bytes()
# IO dids are defined in client config.
ioconfig_validated = tools.check_io_config(did, ioconfig)
io_config_entry = tools.fetch_io_entry_from_config(did, ioconfig_validated)
request.data += struct.pack('>H', did)
# This parameter is optional according to standard
if control_param is not None:
request.data += struct.pack('B', control_param)
# tools.
codec = make_did_codec_from_definition(io_config_entry) # Get IO codec from config
if values is not None:
request.data += codec.encode(*values.args, **values.kwargs)
if masks is not None: # Skip the masks byte if none is given.
if isinstance(masks, bool):
byte: bytes = b'\xFF' if masks == True else b'\x00'
if 'mask_size' in io_config_entry and io_config_entry['mask_size'] is not None:
mask_size = io_config_entry['mask_size']
assert mask_size is not None # mypy nitpick
request.data += (byte * mask_size)
else:
raise ConfigError(
'mask_size', msg='Given mask is boolean value, indicating that all mask should be set to same value, but no mask_size is defined in configuration. Cannot guess how many bits to set.')
elif isinstance(masks, IOMasks):
if 'mask' not in io_config_entry:
raise ConfigError('mask', msg='Cannot apply given mask. Input/Output configuration does not define their position (and size).')
masks_config = io_config_entry['mask']
if masks_config is None:
masks_config = {}
given_masks = masks.get_dict()
numeric_val: int = 0
for mask_name in given_masks:
if mask_name not in masks_config:
raise ConfigError('mask_size', msg='Cannot set mask bit for mask %s. The configuration does not define its position' % (mask_name))
if given_masks[mask_name] == True:
numeric_val |= masks_config[mask_name]
size = math.ceil(math.log(numeric_val + 1, 2) / 8.0)
if 'mask_size' in io_config_entry:
mask_size = io_config_entry['mask_size']
if mask_size is not None:
size = mask_size
request.data += numeric_val.to_bytes(size, 'big')
return request
[docs] @classmethod
def interpret_response(cls,
response: Response,
control_param: Optional[int] = None,
tolerate_zero_padding: bool = True,
ioconfig: Optional[IOConfig] = None
) -> InterpretedResponse:
"""
Populates the response ``service_data`` property with an instance of :class:`InputOutputControlByIdentifier.ResponseData<udsoncan.services.InputOutputControlByIdentifier.ResponseData>`
:param response: The received response to interpret
:type response: :ref:`Response<Response>`
:param control_param: Same optional control parameter value given to make_request()
:type control_param: int
:param tolerate_zero_padding: Ignore trailing zeros in the response data avoiding raising false :class:`InvalidResponseException<udsoncan.exceptions.InvalidResponseException>`.
:type tolerate_zero_padding: bool
:param ioconfig: Definition of DID codecs. Dictionary mapping a DID (int) to a valid :ref:`DidCodec<DidCodec>` class or pack/unpack string.
It is possible to use composite DidCodec by specifying a dict with entries : codec, mask, mask_size.
:type ioconfig: dict[int] = :ref:`DidCodec<DidCodec>`, dict
:raises ValueError: If parameters are out of range, missing or wrong type
:raises ConfigError: If DID echoed back by the server is not in the ``ioconfig`` definition
:raises InvalidResponseException: If response data is incomplete or if DID data does not match codec length.
"""
if response.data is None:
raise InvalidResponseException(response, "No data in response")
if ioconfig is None:
raise ValueError("IoConfig must be defined")
min_response_size = 2 if control_param is not None else 1 # Spec specifies that if first byte is a ControlParameter, it must be echoed back by the server
if len(response.data) < min_response_size:
raise InvalidResponseException(response, "Response must be at least %d bytes long" % min_response_size)
response.service_data = cls.ResponseData(
did_echo=struct.unpack(">H", response.data[0:2])[0]
)
did = response.service_data.did_echo
ioconfig_validated = tools.check_io_config(did, ioconfig) # IO DIDs are defined in client config.
io_config_entry = tools.fetch_io_entry_from_config(did, ioconfig_validated) # Get requested did definition (given or default)
codec = make_did_codec_from_definition(io_config_entry) # Get IO codec from config
next_byte = 2
if control_param is not None:
if len(response.data) < next_byte:
raise InvalidResponseException(
response, 'Response should include an echo of the InputOutputControlParameter (0x%02x)' % control_param)
response.service_data.control_param_echo = response.data[next_byte]
next_byte += 1
if len(response.data) >= next_byte:
remaining_data = response.data[next_byte:]
try:
payload_size = len(codec)
except DidCodec.ReadAllRemainingData:
payload_size = len(response.data) - next_byte
if len(remaining_data) > payload_size:
if remaining_data[payload_size:] == b'\x00' * (len(remaining_data) - payload_size):
if tolerate_zero_padding:
remaining_data = remaining_data[0:payload_size]
try:
response.service_data.decoded_data = codec.decode(remaining_data)
except Exception as e:
raise InvalidResponseException(response, 'Response from server could not be decoded. Exception is : %s' % e)
return cast(InputOutputControlByIdentifier.InterpretedResponse, response)