Source code for udsoncan.services.Authentication

import struct
from typing import Optional, cast

from udsoncan.Request import Request
from udsoncan.Response import Response
from udsoncan.exceptions import *
from udsoncan.BaseService import BaseService, BaseResponseData, BaseSubfunction
from udsoncan.ResponseCode import ResponseCode
import udsoncan.tools as tools


class AuthenticationReturnParameter:
    RequestAccepted = 0x00
    GeneralReject = 0x01
    AuthenticationConfiguration_APCE = 0x02
    AuthenticationConfiguration_ACR_with_asymmetric_cryptography = 0x03
    AuthenticationConfiguration_ACR_with_symmetric_cryptography = 0x04

    DeAuthentication_successful = 0x10
    CertificateVerified_OwnershipVerificationNecessary = 0x11
    OwnershipVerified_AuthenticationComplete = 0x12
    CertificateVerified = 0x13


class Authentication(BaseService):
    _sid = 0x29

    supported_negative_response = [ResponseCode.SubFunctionNotSupported,
                                   ResponseCode.IncorrectMessageLengthOrInvalidFormat,
                                   ResponseCode.ConditionsNotCorrect,
                                   ResponseCode.RequestSequenceError,
                                   ResponseCode.CertificateVerificationFailed_InvalidTimePeriod,
                                   ResponseCode.CertificateVerificationFailed_InvalidSignature,
                                   ResponseCode.CertificateVerificationFailed_InvalidChainOfTrust,
                                   ResponseCode.CertificateVerificationFailed_InvalidType,
                                   ResponseCode.CertificateVerificationFailed_InvalidFormat,
                                   ResponseCode.CertificateVerificationFailed_InvalidContent,
                                   ResponseCode.CertificateVerificationFailed_InvalidScope,
                                   ResponseCode.CertificateVerificationFailed_InvalidCertificate,
                                   ResponseCode.OwnershipVerificationFailed,
                                   ResponseCode.ChallengeCalculationFailed,
                                   ResponseCode.SettingAccessRightsFailed,
                                   ResponseCode.SessionKeyCreationDerivationFailed,
                                   ResponseCode.ConfigurationDataUsageFailed,
                                   ResponseCode.DeAuthenticationFailed]

[docs] class ResponseData(BaseResponseData): """ .. data:: authentication_task_echo Authentication Task echoed back by the server .. data:: return_value :ref:`AuthenticationReturnParameter<AuthenticationReturnParameter>` authenticationReturnParameter as defined in B.5. (ISO 14229-1:2020) .. data:: challenge_server The challenge contains vehicle manufacturer specific formatted server data (eventually containing randomized information) or is a random number. .. data:: ephemeral_public_key_server Ephemeral public key generated by the server for Diffie-Hellman key agreement. .. data:: certificate_server The Certificate to verify. .. data:: proof_of_ownership_server Proof of Ownership to be verified by the client. .. data:: session_key_info If present, this value shall contain session key information, e.g. the encrypted session key(s) for securing further communication in the actual session and/or proof value(s) (e.g. a hash value) for the validation of the session key(s) on the client side. .. data:: algorithm_indicator Indicates the algorithm used in the generating and verifying Proof of Ownership (POWN), which further determines the parameters used in the algorithm and possibly the session key creation mode. This field is a 16 byte value containing the BER encoded OID value of the algorithm used. The value is left aligned and right padded with zero up to 16 bytes. .. data:: needed_additional_parameter Indicate what additional parameters, if needed, are expected by the server. """ authentication_task_echo: int return_value: int challenge_server: Optional[bytes] ephemeral_public_key_server: Optional[bytes] certificate_server: Optional[bytes] proof_of_ownership_server: Optional[bytes] session_key_info: Optional[bytes] algorithm_indicator: Optional[bytes] needed_additional_parameter: Optional[bytes] def __init__(self, authentication_task_echo: int, return_value: int, challenge_server: Optional[bytes] = None, ephemeral_public_key_server: Optional[bytes] = None, certificate_server: Optional[bytes] = None, proof_of_ownership_server: Optional[bytes] = None, session_key_info: Optional[bytes] = None, algorithm_indicator: Optional[bytes] = None, needed_additional_parameter: Optional[bytes] = None ): super().__init__(Authentication) self.authentication_task_echo = authentication_task_echo self.return_value = return_value self.challenge_server = challenge_server self.ephemeral_public_key_server = ephemeral_public_key_server self.certificate_server = certificate_server self.session_key_info = session_key_info self.proof_of_ownership_server = proof_of_ownership_server self.algorithm_indicator = algorithm_indicator self.needed_additional_parameter = needed_additional_parameter
class InterpretedResponse(Response): service_data: "Authentication.ResponseData"
[docs] class AuthenticationTask(BaseSubfunction): __pretty_name__ = 'authentication task' deAuthenticate = 0 verifyCertificateUnidirectional = 1 verifyCertificateBidirectional = 2 proofOfOwnership = 3 transmitCertificate = 4 requestChallengeForAuthentication = 5 verifyProofOfOwnershipUnidirectional = 6 verifyProofOfOwnershipBidirectional = 7 authenticationConfiguration = 8
@staticmethod def _append_byes_parameter(data: bytes, parameter: Optional[bytes], parameter_name: str) -> bytes: if parameter is not None: if not isinstance(parameter, bytes): raise ValueError(f'{parameter_name} must be a bytes object') tools.validate_int(len(parameter), min=0, max=0xFFFF, name=f'{parameter_name} length') data += struct.pack('>H', len(parameter)) data += parameter else: data += struct.pack('>H', 0) return data
[docs] @classmethod def make_request(cls, authentication_task: int, communication_configuration: Optional[int] = None, certificate_client: Optional[bytes] = None, challenge_client: Optional[bytes] = None, algorithm_indicator: Optional[bytes] = None, certificate_evaluation_id: Optional[int] = None, certificate_data: Optional[bytes] = None, proof_of_ownership_client: Optional[bytes] = None, ephemeral_public_key_client: Optional[bytes] = None, additional_parameter: Optional[bytes] = None) -> Request: """ Generates a request for Authentication :param authentication_task: Service subfunction. Allowed values are from 0 to 8. :type authentication_task: int :param communication_configuration: Optional Configuration information about how to proceed with security in further diagnostic communication after the Authentication (vehicle manufacturer specific). Allowed values are from 0 to 255. :type communication_configuration: int :param certificate_client: Optional The Certificate to verify. :type certificate_client: bytes or None :param challenge_client: Optional The challenge contains vehicle manufacturer specific formatted client data (likely containing randomized information) or is a random number. :type challenge_client: bytes or None :param algorithm_indicator: Optional Indicates the algorithm used in the generating and verifying Proof of Ownership (POWN), which further determines the parameters used in the algorithm and possibly the session key creation mode. This field is a 16 byte value containing the BER encoded OID value of the algorithm used. The value is left aligned and right padded with zero up to 16 bytes. :type algorithm_indicator: bytes or None :param certificate_evaluation_id: Optional unique ID to identify the evaluation type of the transmitted certificate. The value of this parameter is vehicle manufacturer specific. Subsequent diagnostic requests with the same evaluationTypeId will overwrite the certificate data of the previous requests. Allowed values are from 0 to 0xFFFF. :type certificate_evaluation_id: int or None :param certificate_data: Optional The Certificate to verify. :type certificate_data: bytes or None :param proof_of_ownership_client: Optional Proof of Ownership of the previous given challenge to be verified by the server. :type proof_of_ownership_client: bytes or None :param ephemeral_public_key_client: Optional Ephemeral public key generated by the client for Diffie-Hellman key agreement. :type ephemeral_public_key_client: bytes or None :param additional_parameter: Optional additional parameter is provided to the server if the server indicates as neededAdditionalParameter. :type additional_parameter: bytes or None :raises ValueError: If parameters are out of range, missing or wrong type """ tools.validate_int(authentication_task, min=0, max=8, name='Authentication Task') data = None if authentication_task in (Authentication.AuthenticationTask.deAuthenticate, Authentication.AuthenticationTask.authenticationConfiguration): # Service ID + Subfunction, no param pass elif authentication_task in (Authentication.AuthenticationTask.verifyCertificateUnidirectional, Authentication.AuthenticationTask.verifyCertificateBidirectional, Authentication.AuthenticationTask.requestChallengeForAuthentication): tools.validate_int(communication_configuration, min=0, max=0xFF, name='Communication Configuration') # communicationConfiguration data = struct.pack('B', communication_configuration) if authentication_task in (Authentication.AuthenticationTask.verifyCertificateUnidirectional, Authentication.AuthenticationTask.verifyCertificateBidirectional): # certificateClient data = Authentication._append_byes_parameter(data, certificate_client, 'Certificate Client') # challengeClient data = Authentication._append_byes_parameter(data, challenge_client, 'Challenge Client') else: if not isinstance(algorithm_indicator, bytes) or len(algorithm_indicator) != 16: raise ValueError(f'{algorithm_indicator!r} must be a bytes object of length 16') # algorithmIndicator data += algorithm_indicator elif authentication_task == Authentication.AuthenticationTask.proofOfOwnership: data = b'' # proofOfOwnershipClient data = Authentication._append_byes_parameter(data, proof_of_ownership_client, 'Proof Of Ownership Client') # ephemeralPublicKeyClient data = Authentication._append_byes_parameter(data, ephemeral_public_key_client, 'Ephemeral Public Key' ' Client') elif authentication_task == Authentication.AuthenticationTask.transmitCertificate: tools.validate_int(certificate_evaluation_id, min=0, max=0xFFFF, name='Certificate Evaluation ID') # certificateEvaluationId data = struct.pack('>H', certificate_evaluation_id) # certificateData data = Authentication._append_byes_parameter(data, certificate_data, 'Certificate Data') elif authentication_task in (Authentication.AuthenticationTask.verifyProofOfOwnershipUnidirectional, Authentication.AuthenticationTask.verifyProofOfOwnershipBidirectional): if not isinstance(algorithm_indicator, bytes) or len(algorithm_indicator) != 16: raise ValueError(f'{algorithm_indicator!r} must be a bytes object of length 16') # algorithmIndicator data = algorithm_indicator # proofOfOwnershipClient data = Authentication._append_byes_parameter(data, proof_of_ownership_client, 'Proof Of Ownership Client') # challengeClient data = Authentication._append_byes_parameter(data, challenge_client, 'Challenge Client') # additionalParameter data = Authentication._append_byes_parameter(data, additional_parameter, 'Additional Parameter') return Request(service=cls, subfunction=authentication_task, data=data)
@staticmethod def _extract_byes_parameter(response: Response, data: bytes, parameter_name: str) -> bytes: if data is not None and len(data) >= 2: parameter_length, = struct.unpack('>H', data[:2]) else: raise InvalidResponseException(response, f'Not enough data for length of {parameter_name}') if len(data) >= (2 + parameter_length): return data[2:(2 + parameter_length)] else: raise InvalidResponseException(response, f'Not enough data for {parameter_name}.' f' Parsed length is: {parameter_length}, reset of payload length' f' is {len(data) - 2}')
[docs] @classmethod def interpret_response(cls, response: Response) -> InterpretedResponse: """ Populates the response ``service_data`` property with an instance of :class:`Authentication.ResponseData<udsoncan.services.Authentication.ResponseData>` :param response: The received response to interpret :type response: :ref:`Response<Response>` :raises InvalidResponseException: If response data length is not as expected """ if response.data is None: raise InvalidResponseException(response, "No data in response") if len(response.data) < 2: raise InvalidResponseException(response, "Response data must be at least 2 bytes") subfunction = response.data[0] return_value = response.data[1] offset = 2 challenge_server: Optional[bytes] = None ephemeral_public_key_server: Optional[bytes] = None certificate_server: Optional[bytes] = None proof_of_ownership_server: Optional[bytes] = None session_key_info: Optional[bytes] = None algorithm_indicator: Optional[bytes] = None needed_additional_parameter: Optional[bytes] = None if subfunction in (Authentication.AuthenticationTask.deAuthenticate, Authentication.AuthenticationTask.transmitCertificate, Authentication.AuthenticationTask.authenticationConfiguration): pass # no more parameters elif subfunction == Authentication.AuthenticationTask.verifyCertificateUnidirectional: challenge_server = Authentication._extract_byes_parameter(response, response.data[offset:], 'Challenge Server') offset += 2 + len(challenge_server) ephemeral_public_key_server = Authentication._extract_byes_parameter(response, response.data[offset:], 'Ephemeral Public Key Server') offset += 2 + len(ephemeral_public_key_server) elif subfunction == Authentication.AuthenticationTask.verifyCertificateBidirectional: challenge_server = Authentication._extract_byes_parameter(response, response.data[offset:], 'Challenge Server') offset += 2 + len(challenge_server) certificate_server = Authentication._extract_byes_parameter(response, response.data[offset:], 'Certificate Server') offset += 2 + len(certificate_server) proof_of_ownership_server = Authentication._extract_byes_parameter(response, response.data[offset:], 'Proof Of Ownership Server') offset += 2 + len(proof_of_ownership_server) ephemeral_public_key_server = Authentication._extract_byes_parameter(response, response.data[offset:], 'Ephemeral Public Key Server') offset += 2 + len(ephemeral_public_key_server) elif subfunction == Authentication.AuthenticationTask.proofOfOwnership: session_key_info = Authentication._extract_byes_parameter(response, response.data[offset:], 'Session Key Info') offset += 2 + len(session_key_info) elif subfunction in (Authentication.AuthenticationTask.requestChallengeForAuthentication, Authentication.AuthenticationTask.verifyProofOfOwnershipUnidirectional, Authentication.AuthenticationTask.verifyProofOfOwnershipBidirectional ): if len(response.data[offset:]) < 16: # algorithmIndicator is exactly 16 bytes raise InvalidResponseException(response, f'Not enough data for Algorithm Indicator (16 bytes), reset of' f' payload length is {len(response.data) - offset}') algorithm_indicator = response.data[offset:(offset + 16)] offset += len(algorithm_indicator) if subfunction == Authentication.AuthenticationTask.requestChallengeForAuthentication: challenge_server = Authentication._extract_byes_parameter(response, response.data[offset:], 'Challenge Server') offset += 2 + len(challenge_server) needed_additional_parameter = Authentication._extract_byes_parameter(response, response.data[offset:], 'Needed Additional Parameter') offset += 2 + len(needed_additional_parameter) else: if subfunction == Authentication.AuthenticationTask.verifyProofOfOwnershipBidirectional: proof_of_ownership_server = Authentication._extract_byes_parameter(response, response.data[offset:], 'Proof Of Ownership Server') offset += 2 + len(proof_of_ownership_server) # verifyProofOfOwnershipBidirectional or verifyProofOfOwnershipUnidirectional session_key_info = Authentication._extract_byes_parameter(response, response.data[offset:], 'Session Key Info') offset += 2 + len(session_key_info) else: raise InvalidResponseException(response, "Unknown Subfunction") if len(response.data) > offset: raise InvalidResponseException(response, "Response data had more bytes than expected") response.service_data = cls.ResponseData( authentication_task_echo=subfunction, return_value=return_value, challenge_server=challenge_server, ephemeral_public_key_server=ephemeral_public_key_server, certificate_server=certificate_server, proof_of_ownership_server=proof_of_ownership_server, session_key_info=session_key_info, algorithm_indicator=algorithm_indicator, needed_additional_parameter=needed_additional_parameter ) return cast(Authentication.InterpretedResponse, response)