import struct
from typing import Optional, cast
from udsoncan.Request import Request
from udsoncan.Response import Response
from udsoncan.exceptions import *
from udsoncan.BaseService import BaseService, BaseResponseData, BaseSubfunction
from udsoncan.ResponseCode import ResponseCode
import udsoncan.tools as tools
class AuthenticationReturnParameter:
RequestAccepted = 0x00
GeneralReject = 0x01
AuthenticationConfiguration_APCE = 0x02
AuthenticationConfiguration_ACR_with_asymmetric_cryptography = 0x03
AuthenticationConfiguration_ACR_with_symmetric_cryptography = 0x04
DeAuthentication_successful = 0x10
CertificateVerified_OwnershipVerificationNecessary = 0x11
OwnershipVerified_AuthenticationComplete = 0x12
CertificateVerified = 0x13
class Authentication(BaseService):
_sid = 0x29
supported_negative_response = [ResponseCode.SubFunctionNotSupported,
ResponseCode.IncorrectMessageLengthOrInvalidFormat,
ResponseCode.ConditionsNotCorrect,
ResponseCode.RequestSequenceError,
ResponseCode.CertificateVerificationFailed_InvalidTimePeriod,
ResponseCode.CertificateVerificationFailed_InvalidSignature,
ResponseCode.CertificateVerificationFailed_InvalidChainOfTrust,
ResponseCode.CertificateVerificationFailed_InvalidType,
ResponseCode.CertificateVerificationFailed_InvalidFormat,
ResponseCode.CertificateVerificationFailed_InvalidContent,
ResponseCode.CertificateVerificationFailed_InvalidScope,
ResponseCode.CertificateVerificationFailed_InvalidCertificate,
ResponseCode.OwnershipVerificationFailed,
ResponseCode.ChallengeCalculationFailed,
ResponseCode.SettingAccessRightsFailed,
ResponseCode.SessionKeyCreationDerivationFailed,
ResponseCode.ConfigurationDataUsageFailed,
ResponseCode.DeAuthenticationFailed]
[docs] class ResponseData(BaseResponseData):
"""
.. data:: authentication_task_echo
Authentication Task echoed back by the server
.. data:: return_value
:ref:`AuthenticationReturnParameter<AuthenticationReturnParameter>` authenticationReturnParameter as defined in
B.5. (ISO 14229-1:2020)
.. data:: challenge_server
The challenge contains vehicle manufacturer specific formatted server data (eventually containing randomized
information) or is a random number.
.. data:: ephemeral_public_key_server
Ephemeral public key generated by the server for Diffie-Hellman key agreement.
.. data:: certificate_server
The Certificate to verify.
.. data:: proof_of_ownership_server
Proof of Ownership to be verified by the client.
.. data:: session_key_info
If present, this value shall contain session key information, e.g. the encrypted session key(s) for securing
further communication in the actual session and/or proof value(s) (e.g. a hash value) for the validation of the
session key(s) on the client side.
.. data:: algorithm_indicator
Indicates the algorithm used in the generating and verifying Proof of Ownership (POWN), which further determines
the parameters used in the algorithm and possibly the session key creation mode. This field is a 16 byte value
containing the BER encoded OID value of the algorithm used. The value is left aligned and right padded with
zero up to 16 bytes.
.. data:: needed_additional_parameter
Indicate what additional parameters, if needed, are expected by the server.
"""
authentication_task_echo: int
return_value: int
challenge_server: Optional[bytes]
ephemeral_public_key_server: Optional[bytes]
certificate_server: Optional[bytes]
proof_of_ownership_server: Optional[bytes]
session_key_info: Optional[bytes]
algorithm_indicator: Optional[bytes]
needed_additional_parameter: Optional[bytes]
def __init__(self, authentication_task_echo: int,
return_value: int,
challenge_server: Optional[bytes] = None,
ephemeral_public_key_server: Optional[bytes] = None,
certificate_server: Optional[bytes] = None,
proof_of_ownership_server: Optional[bytes] = None,
session_key_info: Optional[bytes] = None,
algorithm_indicator: Optional[bytes] = None,
needed_additional_parameter: Optional[bytes] = None
):
super().__init__(Authentication)
self.authentication_task_echo = authentication_task_echo
self.return_value = return_value
self.challenge_server = challenge_server
self.ephemeral_public_key_server = ephemeral_public_key_server
self.certificate_server = certificate_server
self.session_key_info = session_key_info
self.proof_of_ownership_server = proof_of_ownership_server
self.algorithm_indicator = algorithm_indicator
self.needed_additional_parameter = needed_additional_parameter
class InterpretedResponse(Response):
service_data: "Authentication.ResponseData"
[docs] class AuthenticationTask(BaseSubfunction):
__pretty_name__ = 'authentication task'
deAuthenticate = 0
verifyCertificateUnidirectional = 1
verifyCertificateBidirectional = 2
proofOfOwnership = 3
transmitCertificate = 4
requestChallengeForAuthentication = 5
verifyProofOfOwnershipUnidirectional = 6
verifyProofOfOwnershipBidirectional = 7
authenticationConfiguration = 8
@staticmethod
def _append_byes_parameter(data: bytes, parameter: Optional[bytes], parameter_name: str) -> bytes:
if parameter is not None:
if not isinstance(parameter, bytes):
raise ValueError(f'{parameter_name} must be a bytes object')
tools.validate_int(len(parameter), min=0, max=0xFFFF, name=f'{parameter_name} length')
data += struct.pack('>H', len(parameter))
data += parameter
else:
data += struct.pack('>H', 0)
return data
[docs] @classmethod
def make_request(cls, authentication_task: int,
communication_configuration: Optional[int] = None,
certificate_client: Optional[bytes] = None,
challenge_client: Optional[bytes] = None,
algorithm_indicator: Optional[bytes] = None,
certificate_evaluation_id: Optional[int] = None,
certificate_data: Optional[bytes] = None,
proof_of_ownership_client: Optional[bytes] = None,
ephemeral_public_key_client: Optional[bytes] = None,
additional_parameter: Optional[bytes] = None) -> Request:
"""
Generates a request for Authentication
:param authentication_task: Service subfunction. Allowed values are from 0 to 8.
:type authentication_task: int
:param communication_configuration: Optional Configuration information about how to proceed with security
in further diagnostic communication after the Authentication (vehicle manufacturer specific).
Allowed values are from 0 to 255.
:type communication_configuration: int
:param certificate_client: Optional The Certificate to verify.
:type certificate_client: bytes or None
:param challenge_client: Optional The challenge contains vehicle manufacturer specific formatted client data
(likely containing randomized information) or is a random number.
:type challenge_client: bytes or None
:param algorithm_indicator: Optional Indicates the algorithm used in the generating and verifying Proof of
Ownership (POWN), which further determines the parameters used in the algorithm and possibly the session key
creation mode. This field is a 16 byte value containing the BER encoded OID value of the algorithm used.
The value is left aligned and right padded with zero up to 16 bytes.
:type algorithm_indicator: bytes or None
:param certificate_evaluation_id: Optional unique ID to identify the evaluation type of the transmitted
certificate. The value of this parameter is vehicle manufacturer specific. Subsequent diagnostic requests with
the same evaluationTypeId will overwrite the certificate data of the previous requests.
Allowed values are from 0 to 0xFFFF.
:type certificate_evaluation_id: int or None
:param certificate_data: Optional The Certificate to verify.
:type certificate_data: bytes or None
:param proof_of_ownership_client: Optional Proof of Ownership of the previous given challenge to be verified by
the server.
:type proof_of_ownership_client: bytes or None
:param ephemeral_public_key_client: Optional Ephemeral public key generated by the client for Diffie-Hellman key
agreement.
:type ephemeral_public_key_client: bytes or None
:param additional_parameter: Optional additional parameter is provided to the server if the server indicates
as neededAdditionalParameter.
:type additional_parameter: bytes or None
:raises ValueError: If parameters are out of range, missing or wrong type
"""
tools.validate_int(authentication_task, min=0, max=8, name='Authentication Task')
data = None
if authentication_task in (Authentication.AuthenticationTask.deAuthenticate,
Authentication.AuthenticationTask.authenticationConfiguration):
# Service ID + Subfunction, no param
pass
elif authentication_task in (Authentication.AuthenticationTask.verifyCertificateUnidirectional,
Authentication.AuthenticationTask.verifyCertificateBidirectional,
Authentication.AuthenticationTask.requestChallengeForAuthentication):
tools.validate_int(communication_configuration, min=0, max=0xFF, name='Communication Configuration')
# communicationConfiguration
data = struct.pack('B', communication_configuration)
if authentication_task in (Authentication.AuthenticationTask.verifyCertificateUnidirectional,
Authentication.AuthenticationTask.verifyCertificateBidirectional):
# certificateClient
data = Authentication._append_byes_parameter(data, certificate_client, 'Certificate Client')
# challengeClient
data = Authentication._append_byes_parameter(data, challenge_client, 'Challenge Client')
else:
if not isinstance(algorithm_indicator, bytes) or len(algorithm_indicator) != 16:
raise ValueError(f'{algorithm_indicator!r} must be a bytes object of length 16')
# algorithmIndicator
data += algorithm_indicator
elif authentication_task == Authentication.AuthenticationTask.proofOfOwnership:
data = b''
# proofOfOwnershipClient
data = Authentication._append_byes_parameter(data, proof_of_ownership_client, 'Proof Of Ownership Client')
# ephemeralPublicKeyClient
data = Authentication._append_byes_parameter(data, ephemeral_public_key_client, 'Ephemeral Public Key'
' Client')
elif authentication_task == Authentication.AuthenticationTask.transmitCertificate:
tools.validate_int(certificate_evaluation_id, min=0, max=0xFFFF, name='Certificate Evaluation ID')
# certificateEvaluationId
data = struct.pack('>H', certificate_evaluation_id)
# certificateData
data = Authentication._append_byes_parameter(data, certificate_data, 'Certificate Data')
elif authentication_task in (Authentication.AuthenticationTask.verifyProofOfOwnershipUnidirectional,
Authentication.AuthenticationTask.verifyProofOfOwnershipBidirectional):
if not isinstance(algorithm_indicator, bytes) or len(algorithm_indicator) != 16:
raise ValueError(f'{algorithm_indicator!r} must be a bytes object of length 16')
# algorithmIndicator
data = algorithm_indicator
# proofOfOwnershipClient
data = Authentication._append_byes_parameter(data, proof_of_ownership_client, 'Proof Of Ownership Client')
# challengeClient
data = Authentication._append_byes_parameter(data, challenge_client, 'Challenge Client')
# additionalParameter
data = Authentication._append_byes_parameter(data, additional_parameter, 'Additional Parameter')
return Request(service=cls, subfunction=authentication_task, data=data)
@staticmethod
def _extract_byes_parameter(response: Response, data: bytes, parameter_name: str) -> bytes:
if data is not None and len(data) >= 2:
parameter_length, = struct.unpack('>H', data[:2])
else:
raise InvalidResponseException(response, f'Not enough data for length of {parameter_name}')
if len(data) >= (2 + parameter_length):
return data[2:(2 + parameter_length)]
else:
raise InvalidResponseException(response, f'Not enough data for {parameter_name}.'
f' Parsed length is: {parameter_length}, reset of payload length'
f' is {len(data) - 2}')
[docs] @classmethod
def interpret_response(cls, response: Response) -> InterpretedResponse:
"""
Populates the response ``service_data`` property with an instance of
:class:`Authentication.ResponseData<udsoncan.services.Authentication.ResponseData>`
:param response: The received response to interpret
:type response: :ref:`Response<Response>`
:raises InvalidResponseException: If response data length is not as expected
"""
if response.data is None:
raise InvalidResponseException(response, "No data in response")
if len(response.data) < 2:
raise InvalidResponseException(response, "Response data must be at least 2 bytes")
subfunction = response.data[0]
return_value = response.data[1]
offset = 2
challenge_server: Optional[bytes] = None
ephemeral_public_key_server: Optional[bytes] = None
certificate_server: Optional[bytes] = None
proof_of_ownership_server: Optional[bytes] = None
session_key_info: Optional[bytes] = None
algorithm_indicator: Optional[bytes] = None
needed_additional_parameter: Optional[bytes] = None
if subfunction in (Authentication.AuthenticationTask.deAuthenticate,
Authentication.AuthenticationTask.transmitCertificate,
Authentication.AuthenticationTask.authenticationConfiguration):
pass # no more parameters
elif subfunction == Authentication.AuthenticationTask.verifyCertificateUnidirectional:
challenge_server = Authentication._extract_byes_parameter(response, response.data[offset:],
'Challenge Server')
offset += 2 + len(challenge_server)
ephemeral_public_key_server = Authentication._extract_byes_parameter(response, response.data[offset:],
'Ephemeral Public Key Server')
offset += 2 + len(ephemeral_public_key_server)
elif subfunction == Authentication.AuthenticationTask.verifyCertificateBidirectional:
challenge_server = Authentication._extract_byes_parameter(response, response.data[offset:],
'Challenge Server')
offset += 2 + len(challenge_server)
certificate_server = Authentication._extract_byes_parameter(response, response.data[offset:],
'Certificate Server')
offset += 2 + len(certificate_server)
proof_of_ownership_server = Authentication._extract_byes_parameter(response, response.data[offset:],
'Proof Of Ownership Server')
offset += 2 + len(proof_of_ownership_server)
ephemeral_public_key_server = Authentication._extract_byes_parameter(response, response.data[offset:],
'Ephemeral Public Key Server')
offset += 2 + len(ephemeral_public_key_server)
elif subfunction == Authentication.AuthenticationTask.proofOfOwnership:
session_key_info = Authentication._extract_byes_parameter(response, response.data[offset:],
'Session Key Info')
offset += 2 + len(session_key_info)
elif subfunction in (Authentication.AuthenticationTask.requestChallengeForAuthentication,
Authentication.AuthenticationTask.verifyProofOfOwnershipUnidirectional,
Authentication.AuthenticationTask.verifyProofOfOwnershipBidirectional
):
if len(response.data[offset:]) < 16: # algorithmIndicator is exactly 16 bytes
raise InvalidResponseException(response, f'Not enough data for Algorithm Indicator (16 bytes), reset of'
f' payload length is {len(response.data) - offset}')
algorithm_indicator = response.data[offset:(offset + 16)]
offset += len(algorithm_indicator)
if subfunction == Authentication.AuthenticationTask.requestChallengeForAuthentication:
challenge_server = Authentication._extract_byes_parameter(response, response.data[offset:],
'Challenge Server')
offset += 2 + len(challenge_server)
needed_additional_parameter = Authentication._extract_byes_parameter(response, response.data[offset:],
'Needed Additional Parameter')
offset += 2 + len(needed_additional_parameter)
else:
if subfunction == Authentication.AuthenticationTask.verifyProofOfOwnershipBidirectional:
proof_of_ownership_server = Authentication._extract_byes_parameter(response, response.data[offset:],
'Proof Of Ownership Server')
offset += 2 + len(proof_of_ownership_server)
# verifyProofOfOwnershipBidirectional or verifyProofOfOwnershipUnidirectional
session_key_info = Authentication._extract_byes_parameter(response, response.data[offset:],
'Session Key Info')
offset += 2 + len(session_key_info)
else:
raise InvalidResponseException(response, "Unknown Subfunction")
if len(response.data) > offset:
raise InvalidResponseException(response, "Response data had more bytes than expected")
response.service_data = cls.ResponseData(
authentication_task_echo=subfunction,
return_value=return_value,
challenge_server=challenge_server,
ephemeral_public_key_server=ephemeral_public_key_server,
certificate_server=certificate_server,
proof_of_ownership_server=proof_of_ownership_server,
session_key_info=session_key_info,
algorithm_indicator=algorithm_indicator,
needed_additional_parameter=needed_additional_parameter
)
return cast(Authentication.InterpretedResponse, response)