Source code for udsoncan.services.ReadDTCInformation

import struct
from udsoncan import Dtc, check_did_config, make_did_codec_from_definition, fetch_codec_definition_from_config, latest_standard, DIDConfig
from udsoncan.Request import Request
from udsoncan.Response import Response
from udsoncan.exceptions import *
from udsoncan.BaseService import BaseService, BaseSubfunction, BaseResponseData
from udsoncan.ResponseCode import ResponseCode
import udsoncan.tools as tools

from typing import Dict, Union, Optional, cast, List


class ReadDTCInformation(BaseService):
    _sid = 0x19

    supported_negative_response = [ResponseCode.SubFunctionNotSupported,
                                   ResponseCode.IncorrectMessageLengthOrInvalidFormat,
                                   ResponseCode.RequestOutOfRange
                                   ]

[docs] class ResponseData(BaseResponseData): """ .. data:: subfunction_echo Subfunction echoed back by the server .. data:: dtcs :ref:`DTC<DTC>` instances and their status read from the server. .. data:: dtc_count Number of DTC read or available .. data:: dtc_format Integer indicating the format of the DTC. See :ref:`DTC.Format<DTC_Format>` .. data:: status_availability :ref:`Dtc.Status<DTC_Status>` indicating which status the server supports .. data:: extended_data List of bytes containing the DTC extended data .. data:: memory_selection_echo Echo of the memory selection byte """ subfunction_echo: int dtcs: List[Dtc] dtc_count: Optional[int] dtc_format: Optional[int] status_availability: Optional[Dtc.Status] extended_data: Optional[List[bytes]] memory_selection_echo: Optional[int] def __init__(self, subfunction_echo: int, dtcs: Optional[List[Dtc]] = None, dtc_count: Optional[int] = None, dtc_format: Optional[int] = None, status_availability: Optional[Dtc.Status] = None, extended_data: Optional[List[bytes]] = [], memory_selection_echo: Optional[int] = None ): super().__init__(ReadDTCInformation) self.subfunction_echo = subfunction_echo self.dtcs = dtcs if dtcs is not None else [] self.dtc_count = dtc_count self.dtc_format = dtc_format self.status_availability = status_availability self.extended_data = extended_data if extended_data is not None else [] self.memory_selection_echo = memory_selection_echo
class InterpretedResponse(Response): service_data: "ReadDTCInformation.ResponseData"
[docs] class Subfunction(BaseSubfunction): __pretty_name__ = 'subfunction' reportNumberOfDTCByStatusMask = 1 reportDTCByStatusMask = 2 reportDTCSnapshotIdentification = 3 reportDTCSnapshotRecordByDTCNumber = 4 reportDTCSnapshotRecordByRecordNumber = 5 reportDTCExtendedDataRecordByDTCNumber = 6 reportNumberOfDTCBySeverityMaskRecord = 7 reportDTCBySeverityMaskRecord = 8 reportSeverityInformationOfDTC = 9 reportSupportedDTCs = 0xA reportFirstTestFailedDTC = 0xB reportFirstConfirmedDTC = 0xC reportMostRecentTestFailedDTC = 0xD reportMostRecentConfirmedDTC = 0xE reportMirrorMemoryDTCByStatusMask = 0xF reportMirrorMemoryDTCExtendedDataRecordByDTCNumber = 0x10 reportNumberOfMirrorMemoryDTCByStatusMask = 0x11 reportNumberOfEmissionsRelatedOBDDTCByStatusMask = 0x12 reportEmissionsRelatedOBDDTCByStatusMask = 0x13 reportDTCFaultDetectionCounter = 0x14 reportDTCWithPermanentStatus = 0x15 reportDTCExtDataRecordByRecordNumber = 0x16 reportUserDefMemoryDTCByStatusMask = 0x17 reportUserDefMemoryDTCSnapshotRecordByDTCNumber = 0x18 reportUserDefMemoryDTCExtDataRecordByDTCNumber = 0x19 # todo reportSupportedDTCExtDataRecord = 0x1A reportWWHOBDDTCByMaskRecord = 0x42 reportWWHOBDDTCWithPermanentStatus = 0x55 reportDTCInformationByDTCReadinessGroupIdentifier = 0x56
@classmethod def assert_severity_mask(cls, severity_mask: Optional[int], subfunction: int) -> None: if severity_mask is None: raise ValueError('severity_mask must be provided for subfunction 0x%02x' % subfunction) tools.validate_int(severity_mask, min=0, max=0xFF, name='Severity mask') @classmethod def assert_status_mask(cls, status_mask: Optional[int], subfunction: int) -> None: if status_mask is None: raise ValueError('status_mask must be provided for subfunction 0x%02x' % subfunction) tools.validate_int(status_mask, min=0, max=0xFF, name='Status mask') @classmethod def assert_memory_selection(cls, memory_selection: Optional[int], subfunction: int) -> None: if memory_selection is None: raise ValueError('memory_selection must be provided for subfunction 0x%02x' % subfunction) tools.validate_int(memory_selection, min=0, max=0xFF, name='Memory Selection') @classmethod def assert_dtc(cls, dtc: Optional[int], subfunction: int) -> None: if dtc is None: raise ValueError('A dtc value must be provided for subfunction 0x%02x' % subfunction) tools.validate_int(dtc, min=0, max=0xFFFFFF, name='DTC') @classmethod def assert_snapshot_record_number(cls, snapshot_record_number: Optional[int], subfunction: int) -> None: if snapshot_record_number is None: raise ValueError('snapshot_record_number must be provided for subfunction 0x%02x' % subfunction) tools.validate_int(snapshot_record_number, min=0, max=0xFF, name='Snapshot record number') @classmethod def assert_extended_data_record_number(cls, extended_data_record_number: Optional[int], subfunction: int, maxval: int = 0xFF) -> None: if extended_data_record_number is None: raise ValueError('extended_data_record_number must be provided for subfunction 0x%02x' % subfunction) tools.validate_int(extended_data_record_number, min=0, max=maxval, name='Extended data record number') @classmethod def assert_extended_data_size_int_or_dict(cls, extended_data_size: Optional[Union[int, Dict[int, int]]], subfunction: int) -> None: if extended_data_size is None: raise ValueError('extended_data_size must be provided as length of data is not given by the server for subfunction %d.' % subfunction) if isinstance(extended_data_size, int): tools.validate_int(extended_data_size, min=0, max=0xFFF, name='Extended data size') if isinstance(extended_data_size, dict): for dtcid in extended_data_size: tools.validate_int(extended_data_size[dtcid], min=0, max=0xFFF, name='Extended data size for DTC=0x%06x' % dtcid) @classmethod def pack_dtc(cls, dtcid: int) -> bytes: return struct.pack('BBB', (dtcid >> 16) & 0xFF, (dtcid >> 8) & 0xFF, (dtcid >> 0) & 0xFF) @classmethod def check_subfunction_valid(cls, subfunction: int, standard_version: int = latest_standard) -> None: tools.validate_int(subfunction, min=1, max=0xFF, name='Subfunction') vlist = vars(cls) ok = True for v in vlist: if isinstance(v, int) and v == subfunction: ok = False break if not ok: raise ValueError('Unknown subfunction : 0x%02x', subfunction) # These subfunction have been added in the 2020 version of the standard subfunction2020 = [ cls.Subfunction.reportUserDefMemoryDTCByStatusMask, cls.Subfunction.reportDTCExtDataRecordByRecordNumber, cls.Subfunction.reportUserDefMemoryDTCSnapshotRecordByDTCNumber, cls.Subfunction.reportUserDefMemoryDTCExtDataRecordByDTCNumber, cls.Subfunction.reportSupportedDTCExtDataRecord, cls.Subfunction.reportWWHOBDDTCByMaskRecord, cls.Subfunction.reportWWHOBDDTCWithPermanentStatus, cls.Subfunction.reportDTCInformationByDTCReadinessGroupIdentifier, ] if subfunction in subfunction2020 and standard_version < 2020: raise NotImplementedError('The subfunction 0x%02x has been introduced in standard version 2020 but decoding is requested to be done as per %d version' % ( subfunction, standard_version))
[docs] @classmethod def make_request(cls, subfunction: int, status_mask: Optional[Union[int, Dtc.Status]] = None, severity_mask: Optional[Union[int, Dtc.Severity]] = None, dtc: Optional[Union[int, Dtc]] = None, snapshot_record_number: Optional[int] = None, extended_data_record_number: Optional[int] = None, memory_selection: Optional[int] = None, standard_version: int = latest_standard) -> Request: """ Generates a request for ReadDTCInformation. Each subfunction uses a subset of parameters. :param subfunction: The service subfunction. Values are defined in :class:`ReadDTCInformation.Subfunction<ReadDTCInformation.Subfunction>` :type subfunction: int :param status_mask: A DTC status mask used to filter DTC :type status_mask: int or :ref:`Dtc.Status <DTC_Status>` :param severity_mask: A severity mask used to filter DTC :type severity_mask: int or :ref:`Dtc.Severity <DTC_Severity>` :param dtc: A DTC mask used to filter DTC :type dtc: int or :ref:`Dtc <DTC>` :param snapshot_record_number: Snapshot record number :type snapshot_record_number: int :param extended_data_record_number: Extended data record number :type extended_data_record_number: int :param memory_selection: Memory selection for user defined memory DTC :type memory_selection: int :raises ValueError: If parameters are out of range, missing or wrong type """ # Request grouping for subfunctions that have the same request format request_subfn_no_param = [ ReadDTCInformation.Subfunction.reportSupportedDTCs, ReadDTCInformation.Subfunction.reportFirstTestFailedDTC, ReadDTCInformation.Subfunction.reportFirstConfirmedDTC, ReadDTCInformation.Subfunction.reportMostRecentTestFailedDTC, ReadDTCInformation.Subfunction.reportMostRecentConfirmedDTC, ReadDTCInformation.Subfunction.reportDTCFaultDetectionCounter, ReadDTCInformation.Subfunction.reportDTCWithPermanentStatus, # Documentation is confusing about reportDTCSnapshotIdentification subfunction. # It is presented with reportDTCSnapshotRecordByDTCNumber (2 params) but a footnote says that these 2 parameters # are not to be provided for reportDTCSnapshotIdentification. Therefore, it is the same as other no-params subfn ReadDTCInformation.Subfunction.reportDTCSnapshotIdentification ] request_subfn_status_mask = [ ReadDTCInformation.Subfunction.reportNumberOfDTCByStatusMask, ReadDTCInformation.Subfunction.reportDTCByStatusMask, ReadDTCInformation.Subfunction.reportMirrorMemoryDTCByStatusMask, ReadDTCInformation.Subfunction.reportNumberOfMirrorMemoryDTCByStatusMask, ReadDTCInformation.Subfunction.reportNumberOfEmissionsRelatedOBDDTCByStatusMask, ReadDTCInformation.Subfunction.reportEmissionsRelatedOBDDTCByStatusMask ] request_subfn_mask_record_plus_snapshot_record_number = [ ReadDTCInformation.Subfunction.reportDTCSnapshotRecordByDTCNumber ] request_subfn_mask_record_plus_snapshot_record_number_plus_memory_selection = [ ReadDTCInformation.Subfunction.reportUserDefMemoryDTCSnapshotRecordByDTCNumber ] request_subfn_snapshot_record_number = [ ReadDTCInformation.Subfunction.reportDTCSnapshotRecordByRecordNumber ] request_subfn_mask_record_plus_extdata_record_number = [ ReadDTCInformation.Subfunction.reportDTCExtendedDataRecordByDTCNumber, ReadDTCInformation.Subfunction.reportMirrorMemoryDTCExtendedDataRecordByDTCNumber ] request_subfn_mask_record_plus_extdata_record_number_plus_memory_selection = [ ReadDTCInformation.Subfunction.reportUserDefMemoryDTCExtDataRecordByDTCNumber, ] request_subfn_severity_plus_status_mask = [ ReadDTCInformation.Subfunction.reportNumberOfDTCBySeverityMaskRecord, ReadDTCInformation.Subfunction.reportDTCBySeverityMaskRecord ] request_subfn_mask_record = [ ReadDTCInformation.Subfunction.reportSeverityInformationOfDTC ] request_subfn_status_mask_plus_memory_selection = [ ReadDTCInformation.Subfunction.reportUserDefMemoryDTCByStatusMask ] cls.check_subfunction_valid(subfunction, standard_version) if status_mask is not None and isinstance(status_mask, Dtc.Status): status_mask = status_mask.get_byte_as_int() if severity_mask is not None and isinstance(severity_mask, Dtc.Severity): severity_mask = severity_mask.get_byte_as_int() if dtc is not None and isinstance(dtc, Dtc): dtc = dtc.id req = Request(service=cls, subfunction=subfunction) if subfunction in request_subfn_no_param: # Service ID + Subfunction pass elif subfunction in request_subfn_status_mask: cls.assert_status_mask(status_mask, subfunction) req.data = struct.pack('B', status_mask) elif subfunction in request_subfn_mask_record_plus_snapshot_record_number: cls.assert_dtc(dtc, subfunction) assert dtc is not None # mypy nitpick cls.assert_snapshot_record_number(snapshot_record_number, subfunction) req.data = cls.pack_dtc(dtc) + struct.pack('B', snapshot_record_number) elif subfunction in request_subfn_mask_record_plus_snapshot_record_number_plus_memory_selection: cls.assert_dtc(dtc, subfunction) cls.assert_snapshot_record_number(snapshot_record_number, subfunction) cls.assert_memory_selection(memory_selection, subfunction) assert dtc is not None # mypy nitpick req.data = cls.pack_dtc(dtc) + struct.pack('BB', snapshot_record_number, memory_selection) elif subfunction in request_subfn_snapshot_record_number: cls.assert_snapshot_record_number(snapshot_record_number, subfunction) req.data = struct.pack('B', snapshot_record_number) elif subfunction in request_subfn_mask_record_plus_extdata_record_number: cls.assert_dtc(dtc, subfunction) assert dtc is not None # mypy nitpick cls.assert_extended_data_record_number(extended_data_record_number, subfunction) req.data = cls.pack_dtc(dtc) + struct.pack('B', extended_data_record_number) elif subfunction in request_subfn_mask_record_plus_extdata_record_number_plus_memory_selection: cls.assert_dtc(dtc, subfunction) assert dtc is not None # mypy nitpick cls.assert_memory_selection(memory_selection, subfunction) cls.assert_extended_data_record_number(extended_data_record_number, subfunction) req.data = cls.pack_dtc(dtc) + struct.pack('BB', extended_data_record_number, memory_selection) elif subfunction in request_subfn_severity_plus_status_mask: cls.assert_status_mask(status_mask, subfunction) cls.assert_severity_mask(severity_mask, subfunction) req.data = struct.pack('BB', severity_mask, status_mask) elif subfunction in request_subfn_mask_record: cls.assert_dtc(dtc, subfunction) assert dtc is not None req.data = cls.pack_dtc(dtc) elif subfunction in request_subfn_status_mask_plus_memory_selection: cls.assert_memory_selection(memory_selection, subfunction) cls.assert_status_mask(status_mask, subfunction) req.data = struct.pack('BB', status_mask, memory_selection) elif subfunction == ReadDTCInformation.Subfunction.reportDTCExtDataRecordByRecordNumber: cls.assert_extended_data_record_number(extended_data_record_number, subfunction, maxval=0xEF) # Maximum specified by ISO-14229:2020 req.data = struct.pack('B', extended_data_record_number) return req
[docs] @classmethod def interpret_response(cls, response: Response, subfunction: int, extended_data_size: Optional[Union[int, Dict[int, int]]] = None, tolerate_zero_padding: bool = True, ignore_all_zero_dtc: bool = True, dtc_snapshot_did_size: int = 2, didconfig: Optional[DIDConfig] = None, standard_version: int = latest_standard) -> InterpretedResponse: """ Populates the response ``service_data`` property with an instance of :class:`ReadDTCInformation.ResponseData<udsoncan.services.ReadDTCInformation.ResponseData>` :param response: The received response to interpret :type response: :ref:`Response<Response>` :param subfunction: The service subfunction. Values are defined in :class:`ReadDTCInformation.Subfunction<udsoncan.services.ReadDTCInformation.Subfunction>` :type subfunction: int :param extended_data_size: Extended data size to expect. Extended data is implementation specific, therefore, size is not standardized :type extended_data_size: int or dict[int, int] to specify a size per extended data ID :param tolerate_zero_padding: Ignore trailing zeros in the response data avoiding raising false :class:`InvalidResponseException<udsoncan.exceptions.InvalidResponseException>`. :type tolerate_zero_padding: bool :param ignore_all_zero_dtc: Discard any DTC entries that have an ID of 0. Avoid reading extra DTCs when using a transport protocol using zero padding. :type ignore_all_zero_dtc: bool :param dtc_snapshot_did_size: Number of bytes to encode the data identifier number. Other services such as :ref:`ReadDataByIdentifier<ReadDataByIdentifier>` encode DID over 2 bytes. UDS standard does not define the size of the snapshot DID, therefore, it must be supplied. :type dtc_snapshot_did_size: int :param didconfig: Definition of DID codecs. Dictionary mapping a DID (int) to a valid :ref:`DidCodec<DidCodec>` class or pack/unpack string :type didconfig: dict[int] = :ref:`DidCodec<DidCodec>` :raises InvalidResponseException: If response length is wrong or does not match DID configuration :raises ValueError: If parameters are out of range, missing or wrong types :raises ConfigError: If the server returns a snapshot DID not defined in ``didconfig`` """ cls.check_subfunction_valid(subfunction, standard_version) # Response grouping for responses that are encoded the same way response_subfn_dtc_availability_mask_plus_dtc_record = [ ReadDTCInformation.Subfunction.reportDTCByStatusMask, ReadDTCInformation.Subfunction.reportUserDefMemoryDTCByStatusMask, ReadDTCInformation.Subfunction.reportSupportedDTCs, ReadDTCInformation.Subfunction.reportFirstTestFailedDTC, ReadDTCInformation.Subfunction.reportFirstConfirmedDTC, ReadDTCInformation.Subfunction.reportMostRecentTestFailedDTC, ReadDTCInformation.Subfunction.reportMostRecentConfirmedDTC, ReadDTCInformation.Subfunction.reportMirrorMemoryDTCByStatusMask, ReadDTCInformation.Subfunction.reportEmissionsRelatedOBDDTCByStatusMask, ReadDTCInformation.Subfunction.reportDTCWithPermanentStatus ] response_subfn_number_of_dtc = [ ReadDTCInformation.Subfunction.reportNumberOfDTCByStatusMask, ReadDTCInformation.Subfunction.reportNumberOfDTCBySeverityMaskRecord, ReadDTCInformation.Subfunction.reportNumberOfMirrorMemoryDTCByStatusMask, ReadDTCInformation.Subfunction.reportNumberOfEmissionsRelatedOBDDTCByStatusMask, ] response_subfn_dtc_availability_mask_plus_dtc_record_with_severity = [ ReadDTCInformation.Subfunction.reportDTCBySeverityMaskRecord, ReadDTCInformation.Subfunction.reportSeverityInformationOfDTC ] response_subfn_dtc_plus_fault_counter = [ ReadDTCInformation.Subfunction.reportDTCFaultDetectionCounter ] response_subfn_dtc_plus_sapshot_record = [ ReadDTCInformation.Subfunction.reportDTCSnapshotIdentification ] response_sbfn_dtc_status_snapshots_records = [ ReadDTCInformation.Subfunction.reportDTCSnapshotRecordByDTCNumber, ReadDTCInformation.Subfunction.reportUserDefMemoryDTCSnapshotRecordByDTCNumber ] response_sbfn_dtc_status_snapshots_records_record_first = [ ReadDTCInformation.Subfunction.reportDTCSnapshotRecordByRecordNumber ] response_subfn_mask_record_plus_extdata = [ ReadDTCInformation.Subfunction.reportDTCExtendedDataRecordByDTCNumber, ReadDTCInformation.Subfunction.reportMirrorMemoryDTCExtendedDataRecordByDTCNumber, ReadDTCInformation.Subfunction.reportUserDefMemoryDTCExtDataRecordByDTCNumber ] response_subfn_record_number_plus_dtc_mask_plus_extdata = [ ReadDTCInformation.Subfunction.reportDTCExtDataRecordByRecordNumber ] subfunctions_with_memory_selection = [ ReadDTCInformation.Subfunction.reportUserDefMemoryDTCByStatusMask, ReadDTCInformation.Subfunction.reportUserDefMemoryDTCSnapshotRecordByDTCNumber, ReadDTCInformation.Subfunction.reportUserDefMemoryDTCExtDataRecordByDTCNumber ] firstbyte_is_memory_selection_echo = (subfunction in subfunctions_with_memory_selection) if response.data is None: raise InvalidResponseException(response, "No data in response") if len(response.data) < 1: raise InvalidResponseException(response, 'Response must be at least 1 byte long (echo of subfunction)') response.service_data = cls.ResponseData( subfunction_echo=response.data[0], # First byte is subfunction ) # Now for each response group, we have a different decoding algorithm if subfunction in response_subfn_dtc_availability_mask_plus_dtc_record + response_subfn_dtc_availability_mask_plus_dtc_record_with_severity: if subfunction in response_subfn_dtc_availability_mask_plus_dtc_record: dtc_size = 4 # DTC ID (3) + Status (1) elif subfunction in response_subfn_dtc_availability_mask_plus_dtc_record_with_severity: dtc_size = 6 # DTC ID (3) + Status (1) + Severity (1) + FunctionalUnit (1) if firstbyte_is_memory_selection_echo: if len(response.data) < 3: raise InvalidResponseException( response, 'Response must be at least 3 bytes long (echo of subfunction, echo of MemorySelection and DTCStatusAvailabilityMask)') response.service_data.memory_selection_echo = int(response.data[1]) actual_byte = 2 else: if len(response.data) < 2: raise InvalidResponseException( response, 'Response must be at least 2 byte long (echo of subfunction and DTCStatusAvailabilityMask)') actual_byte = 1 response.service_data.status_availability = Dtc.Status.from_byte(response.data[actual_byte]) actual_byte += 1 while True: # Loop until we have read all dtcs if len(response.data) <= actual_byte: break # done elif len(response.data) < actual_byte + dtc_size: partial_dtc_length = len(response.data) - actual_byte if tolerate_zero_padding and response.data[actual_byte:] == b'\x00' * partial_dtc_length: break else: # We purposely ignore extra byte for subfunction reportSeverityInformationOfDTC as it is supposed to return 0 or 1 DTC. if subfunction != ReadDTCInformation.Subfunction.reportSeverityInformationOfDTC or actual_byte == 2: raise InvalidResponseException( response, 'Incomplete DTC record. Missing %d bytes to response to complete the record' % (dtc_size - partial_dtc_length)) else: dtc_bytes = response.data[actual_byte:actual_byte + dtc_size] if dtc_bytes == b'\x00' * dtc_size and ignore_all_zero_dtc: pass # ignore else: if subfunction in response_subfn_dtc_availability_mask_plus_dtc_record: dtc = Dtc(struct.unpack('>L', b'\x00' + dtc_bytes[0:3])[0]) dtc.status.set_byte(dtc_bytes[3]) elif subfunction in response_subfn_dtc_availability_mask_plus_dtc_record_with_severity: dtc = Dtc(struct.unpack('>L', b'\x00' + dtc_bytes[2:5])[0]) dtc.severity.set_byte(dtc_bytes[0]) dtc.functional_unit = dtc_bytes[1] dtc.status.set_byte(dtc_bytes[5]) response.service_data.dtcs.append(dtc) actual_byte += dtc_size response.service_data.dtc_count = len(response.service_data.dtcs) # The 2 following subfunction responses have different purposes but their constructions are very similar. elif subfunction in response_subfn_dtc_plus_fault_counter + response_subfn_dtc_plus_sapshot_record: dtc_size = 4 if len(response.data) < 1: raise InvalidResponseException(response, 'Response must be at least 1 byte long (echo of subfunction)') actual_byte = 1 # Increasing index dtc_map: Dict[int, Dtc] = dict() # This map is used to append snapshot to existing DTC. while True: # Loop until we have read all dtcs if len(response.data) <= actual_byte: break # done elif len(response.data) < actual_byte + dtc_size: partial_dtc_length = len(response.data) - actual_byte if tolerate_zero_padding and response.data[actual_byte:] == b'\x00' * partial_dtc_length: break else: raise InvalidResponseException( response, 'Incomplete DTC record. Missing %d bytes to response to complete the record' % (dtc_size - partial_dtc_length)) else: dtc_bytes = response.data[actual_byte:actual_byte + dtc_size] if dtc_bytes == b'\x00' * dtc_size and ignore_all_zero_dtc: pass # ignore else: dtcid = struct.unpack('>L', b'\x00' + dtc_bytes[0:3])[0] # We create the DTC or get its reference if already created. dtc_created = False if dtcid in dtc_map and subfunction in response_subfn_dtc_plus_sapshot_record: dtc = dtc_map[dtcid] else: dtc = Dtc(dtcid) dtc_map[dtcid] = dtc dtc_created = True # We either read the DTC fault counter or Snapshot record number. if subfunction in response_subfn_dtc_plus_fault_counter: dtc.fault_counter = dtc_bytes[3] elif subfunction in response_subfn_dtc_plus_sapshot_record: record_number = dtc_bytes[3] if dtc.snapshots is None: dtc.snapshots = [] dtc.snapshots.append(record_number) # Adds the DTC to the list. if dtc_created: response.service_data.dtcs.append(dtc) actual_byte += dtc_size response.service_data.dtc_count = len(response.service_data.dtcs) # This group of responses returns a number of DTCs available elif subfunction in response_subfn_number_of_dtc: if len(response.data) < 5: raise InvalidResponseException(response, 'Response must be exactly 5 bytes long ') response.service_data.status_availability = Dtc.Status.from_byte(response.data[1]) response.service_data.dtc_format = response.data[2] response.service_data.dtc_count = struct.unpack('>H', response.data[3:5])[0] # This group of responses returns DTC snapshots # Responses include a DTC, many snapshot records. For each record, we find many Data Identifiers. # We create one Dtc.Snapshot for each DID. That'll be easier to work with. # <DTC,RecordNumber1,NumberOfDid_X,DID1,DID2,...DIDX, RecordNumber2,NumberOfDid_Y,DID1,DID2,...DIDY, etc> elif subfunction in response_sbfn_dtc_status_snapshots_records: minlength = 5 if not firstbyte_is_memory_selection_echo else 6 if len(response.data) < minlength: raise InvalidResponseException(response, 'Response must be at least %d bytes long' % minlength) if firstbyte_is_memory_selection_echo: response.service_data.memory_selection_echo = response.data[1] actual_byte = 2 else: actual_byte = 1 dtc = Dtc(struct.unpack('>L', b'\x00' + response.data[actual_byte:(actual_byte + 3)])[0]) dtc.status.set_byte(response.data[actual_byte + 3]) actual_byte += 4 # Increasing index tools.validate_int(dtc_snapshot_did_size, min=1, max=8, name='dtc_snapshot_did_size') while True: # Loop until we have read all dtcs if len(response.data) <= actual_byte: break # done remaining_data = response.data[actual_byte:] if tolerate_zero_padding and remaining_data == b'\x00' * len(remaining_data): break if len(remaining_data) < 2: raise InvalidResponseException(response, 'Incomplete response from server. Missing "number of identifier" and following data') record_number = remaining_data[0] number_of_did = remaining_data[1] # Validate record number and number of DID before continuing if number_of_did == 0: raise InvalidResponseException(response, 'Server returned snapshot record #%d with no data identifier included' % (record_number)) if len(remaining_data) < 2 + dtc_snapshot_did_size: raise InvalidResponseException(response, 'Incomplete response from server. Missing DID number and associated data.') actual_byte += 2 for i in range(number_of_did): remaining_data = response.data[actual_byte:] snapshot = Dtc.Snapshot() # One snapshot per DID for convenience. snapshot.record_number = record_number # As standard does not specify the length of the DID, we craft it based on a config did = 0 for j in range(dtc_snapshot_did_size): offset = dtc_snapshot_did_size - 1 - j did |= (remaining_data[offset] << (8 * j)) # Decode the data based on DID number. snapshot.did = did didconfig = check_did_config(did, didconfig) codec_definition = fetch_codec_definition_from_config(did, didconfig) codec = make_did_codec_from_definition(codec_definition) data_offset = dtc_snapshot_did_size if len(remaining_data[data_offset:]) < len(codec): raise InvalidResponseException(response, 'Incomplete response. Data for DID 0x%04x is only %d bytes while %d bytes is expected' % ( did, len(remaining_data[data_offset:]), len(codec))) snapshot.raw_data = remaining_data[data_offset:data_offset + len(codec)] snapshot.data = codec.decode(snapshot.raw_data) dtc.snapshots.append(snapshot) actual_byte += dtc_snapshot_did_size + len(codec) response.service_data.dtcs.append(dtc) response.service_data.dtc_count = 1 # This group of responses returns DTC snapshots # Responses include a DTC, many snapshots records. For each record, we find many Data Identifiers. # We create one Dtc.Snapshot for each DID. That'll be easier to work with. # Similar to previous subfunction group, but order of information is changed. # <RecordNumber1, DTC1,NumberOfDid_X,DID1,DID2,...DIDX, RecordNumber2,DTC2, NumberOfDid_Y,DID1,DID2,...DIDY, etc> elif subfunction in response_sbfn_dtc_status_snapshots_records_record_first: tools.validate_int(dtc_snapshot_did_size, min=1, max=8, name='dtc_snapshot_did_size') if len(response.data) < 2: raise InvalidResponseException(response, 'Response must be at least 2 bytes long. Subfunction echo + RecordNumber ') actual_byte = 1 # Increasing index while True: # Loop through response data if len(response.data) <= actual_byte: break # done remaining_data = response.data[actual_byte:] record_number = remaining_data[0] # If empty response but filled with 0, it is considered ok if remaining_data == b'\x00' * len(remaining_data) and tolerate_zero_padding: break # If record number received but no DTC provided (allowed according to standard), we exit. if len(remaining_data) == 1 or tolerate_zero_padding and remaining_data[1:] == b'\x00' * len(remaining_data[1:]): break if len(remaining_data) < 5: # Partial DTC (No DTC at all is checked above) raise InvalidResponseException(response, 'Incomplete response from server. Missing "DTCAndStatusRecord" and following data') if len(remaining_data) < 6: raise InvalidResponseException(response, 'Incomplete response from server. Missing number of data identifier') # DTC decoding dtc = Dtc(struct.unpack('>L', b'\x00' + remaining_data[1:4])[0]) dtc.status.set_byte(remaining_data[4]) number_of_did = remaining_data[5] actual_byte += 6 remaining_data = response.data[actual_byte:] if number_of_did == 0: raise InvalidResponseException(response, 'Server returned snapshot record #%d with no data identifier included' % (record_number)) if len(remaining_data) < dtc_snapshot_did_size: raise InvalidResponseException(response, 'Incomplete response from server. Missing DID and associated data') # We have a DTC and 0 DID, next loop if tolerate_zero_padding and remaining_data == b'\x00' * len(remaining_data): break # For each DID for i in range(number_of_did): remaining_data = response.data[actual_byte:] snapshot = Dtc.Snapshot() # One snapshot epr DID for convenience snapshot.record_number = record_number # As standard does not specify the length of the DID, we craft it based on a config did = 0 for j in range(dtc_snapshot_did_size): offset = dtc_snapshot_did_size - 1 - j did |= (remaining_data[offset] << (8 * j)) # Decode the data based on DID number. snapshot.did = did didconfig = check_did_config(did, didconfig) codec_definition = fetch_codec_definition_from_config(did, didconfig) codec = make_did_codec_from_definition(codec_definition) data_offset = dtc_snapshot_did_size if len(remaining_data[data_offset:]) < len(codec): raise InvalidResponseException(response, 'Incomplete response. Data for DID 0x%04x is only %d bytes while %d bytes is expected' % ( did, len(remaining_data[data_offset:]), len(codec))) snapshot.raw_data = remaining_data[data_offset:data_offset + len(codec)] snapshot.data = codec.decode(snapshot.raw_data) dtc.snapshots.append(snapshot) actual_byte += dtc_snapshot_did_size + len(codec) response.service_data.dtcs.append(dtc) response.service_data.dtc_count = len(response.service_data.dtcs) # These subfunctions include DTC ExtraData. We give it raw to user. elif subfunction in response_subfn_mask_record_plus_extdata: cls.assert_extended_data_size_int_or_dict(extended_data_size, subfunction) assert extended_data_size is not None minlength = 5 if not firstbyte_is_memory_selection_echo else 6 if len(response.data) < minlength: missing_data = 'MemorySelection and DTCAndStatusRecord' if firstbyte_is_memory_selection_echo else 'DTCAndStatusRecord' raise InvalidResponseException(response, 'Incomplete response from server. Missing %s' % missing_data) if firstbyte_is_memory_selection_echo: response.service_data.memory_selection_echo = response.data[1] actual_byte = 2 else: actual_byte = 1 # DTC decoding dtc = Dtc(struct.unpack('>L', b'\x00' + response.data[actual_byte:(actual_byte + 3)])[0]) dtc.status.set_byte(response.data[actual_byte + 3]) size = cls.get_extended_data_size(dtc, extended_data_size) actual_byte = actual_byte + 4 # Increasing index while actual_byte < len(response.data): # Loop through data remaining_data = response.data[actual_byte:] record_number = remaining_data[0] if record_number == 0: if remaining_data == b'\x00' * len(remaining_data) and tolerate_zero_padding: break else: raise InvalidResponseException( response, 'Extended data record number given by the server is 0 but this value is a reserved value.') actual_byte += 1 remaining_data = response.data[actual_byte:] if len(remaining_data) < size: raise InvalidResponseException(response, 'Incomplete response from server. Length of extended data for DTC 0x%06x with record number 0x%02x is %d bytes but smaller than given data_size of %d bytes' % ( dtc.id, record_number, len(remaining_data), size)) exdata = Dtc.ExtendedData() exdata.record_number = record_number exdata.raw_data = remaining_data[0:size] dtc.extended_data.append(exdata) actual_byte += size response.service_data.dtcs.append(dtc) response.service_data.dtc_count = len(response.service_data.dtcs) elif subfunction in response_subfn_record_number_plus_dtc_mask_plus_extdata: cls.assert_extended_data_size_int_or_dict(extended_data_size, subfunction) assert extended_data_size is not None if len(response.data) < 2: raise InvalidResponseException(response, 'Incomplete response from server. Missing DTCExtDataRecordNumber') record_number = int(response.data[1]) if record_number > 0xEF: raise InvalidResponseException( response, 'Server returned a RecordNumber of %d which is out of range (00-EF) according to ISO-14229:2020', record_number) actual_byte = 2 received_dtcs = set() while True: if actual_byte == len(response.data): break remaining_data = response.data[actual_byte:] all_zero_ending = True if remaining_data == b'\x00' * len(remaining_data) else False if all_zero_ending: try: zero_dtc_size = cls.get_extended_data_size(Dtc(0), extended_data_size) except: zero_dtc_size = None all_zero_dtc_to_read = (zero_dtc_size is not None and len(remaining_data) >= zero_dtc_size + 4) and ~ignore_all_zero_dtc if ~all_zero_dtc_to_read: if tolerate_zero_padding: break else: raise InvalidResponseException(response, 'Server response ends with a sequence of zero and zero adding is not tolerated') # DTC decoding if len(response.data) - actual_byte < 4: raise InvalidResponseException(response, 'Incomplete DTCAndStatusRecord at position %d' % actual_byte) dtc = Dtc(struct.unpack('>L', b'\x00' + response.data[actual_byte:(actual_byte + 3)])[0]) if dtc.id in received_dtcs: raise InvalidResponseException( response, 'The server returned two set of data for the same DTC (0x%06X) and same record number (%d)', (dtc.id, record_number)) received_dtcs.add(dtc.id) dtc.status.set_byte(response.data[actual_byte + 3]) actual_byte += 4 size = cls.get_extended_data_size(dtc, extended_data_size) if len(response.data) - actual_byte < size: raise InvalidResponseException(response, 'Incomplete ExtendedData for DTC 0x%06X. Configuration says that there should be %d bytes of data but server only gave %d bytes.' % ( dtc.id, size, len(response.data) - actual_byte)) exdata = Dtc.ExtendedData() exdata.record_number = record_number exdata.raw_data = response.data[actual_byte:actual_byte + size] dtc.extended_data.append(exdata) response.service_data.dtcs.append(dtc) actual_byte += size response.service_data.dtc_count = len(response.service_data.dtcs) return cast(ReadDTCInformation.InterpretedResponse, response)
@classmethod def get_extended_data_size(cls, dtc: Dtc, given_size: Union[int, Dict[int, int]]) -> int: size = None if isinstance(given_size, int): size = given_size elif isinstance(given_size, dict): if dtc.id not in given_size: raise ConfigError(key=dtc.id, msg='Server returned extended data for DTC 0x%06X but no data size was specified for this DTC' % (dtc.id)) size = given_size[dtc.id] else: raise ValueError('Unsupported extended data size type.') return size