Receive can data continuously and uds service response parallelly using single pcan usb hardware

A free API for the communication with control devices according to UDS (ISO 14229-1)
Post Reply
sanjeev.palla
Posts: 6
Joined: Fri 21. Apr 2023, 13:04

Receive can data continuously and uds service response parallelly using single pcan usb hardware

Post by sanjeev.palla » Fri 21. Apr 2023, 14:24

Hi,
I am trying to read CAN messages from ECU and perform UDS services parallelly with one PCAN USB hardware.
I have initialized the channel using PCAN_UDS. I am able get either CAN communication or UDS service response but not both at a time.
Can we get both communications(CAN and UDS) at a time using single PCAN USB channel? If yes, What changes do I need to do?

Below is UDS msg config:
self.__config = uds_msgconfig()
self.__config.can_id = -1
self.__config.can_msgtype = PCANTP_CAN_MSGTYPE_STANDARD
self.__config.nai.protocol = PUDS_MSGPROTOCOL_ISO_15765_2_11B_NORMAL
self.__config.nai.target_type = PCANTP_ISOTP_ADDRESSING_FUNCTIONAL
self.__config.type = PUDS_MSGTYPE_USDT
self.__config.nai.source_addr = PUDS_ISO_15765_4_ADDR_TEST_EQUIPMENT
self.__config.nai.target_addr = PUDS_ISO_15765_4_ADDR_OBD_FUNCTIONAL

UDS Initialize:
result = self.__mObjPCAN_UDS.Initialize_2013(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)

Note: I am using python.

F.Vergnaud
Software Development
Software Development
Posts: 305
Joined: Mon 9. Sep 2013, 12:21

Re: Receive can data continuously and uds service response parallelly using single pcan usb hardware

Post by F.Vergnaud » Fri 21. Apr 2023, 14:30

Hello,

You must initialize your device with PCAN-UDS and use PCAN-ISO-TP functions when you want to communicate with CAN/CANFD frames or ISOTP messages.
Please check one of these samples included in the PCAN-UDS package:
* 13_server_uds_and_can / 14_client_uds_and_can
* 08_read_write_uds_and_isotp
Best regards,
Fabrice

sanjeev.palla
Posts: 6
Joined: Fri 21. Apr 2023, 13:04

Re: Receive can data continuously and uds service response parallelly using single pcan usb hardware

Post by sanjeev.palla » Thu 27. Apr 2023, 13:21

Hi,

I have tried your suggestion but still issue is there. I am able perform only task at a time either read CAN data or UDS services.

Below is my code:

Code: Select all

	self.__mObjPCAN_UDS = PCAN_UDS_2013()
        self.__mObjPCAN_ISO_TP_2016 = PCAN_ISO_TP_2016()
        self.__config = uds_msgconfig()
        self.__config.can_id = -1
        self.__config.can_msgtype = PCANTP_CAN_MSGTYPE_STANDARD
        self.__config.nai.protocol = PUDS_MSGPROTOCOL_ISO_15765_2_11B_NORMAL
        self.__config.nai.target_type = PCANTP_ISOTP_ADDRESSING_FUNCTIONAL
        self.__config.type = PUDS_MSGTYPE_USDT
        self.__config.nai.source_addr = PUDS_ISO_15765_4_ADDR_TEST_EQUIPMENT
        self.__config.nai.target_addr = PUDS_ISO_15765_4_ADDR_OBD_FUNCTIONAL
        self.m_ReceiveEvent = c_void_p(windll.kernel32.CreateEventA(None, 0,0,None))
        result =  self.__mObjPCAN_UDS.Initialize_2013(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)

Code: Select all

## Function for reading CAN messages on normal CAN devices
    ##
    def ReadMessage(self):
        
        rx_msg = cantp_msg()
        timestamp = TPCANTimestamp()
        res = self.__mObjPCAN_ISO_TP_2016.MsgDataAlloc_2016(rx_msg, PCANTP_MSGTYPE_ANY)
        result = self.__mObjPCAN_ISO_TP_2016.Read_2016(self.m_PcanHandle,rx_msg,timestamp)
        # print("Result : ",result)
        if result.value == PCANTP_STATUS_OK.value:
            # We show the received message
            #
            try:
                # data = result[1:]
                readData = dict()
                readData["direction"] = "Rx"
                readData["msg"] = self.__Convert(rx_msg)
                readData["time"] = timestamp
                # print("readData : ",readData)
                self.__sdt.AddMessageData(readData)
            except Exception as e:
                self.PrintException()
            
        return result.value
    

Code: Select all

def TesterPresent(self):
        try:
            # self.m_Terminated = True
            out_msg_request = uds_msg()
            response_count = c_uint32(0)
            request = uds_msg()
            response = uds_msg()
            confirmation = uds_msg()
            status = self.__mObjPCAN_UDS.SvcTesterPresent_2013(self.m_PcanHandle, self.__config, request, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_TP_ZSUBF)
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                status = self.__mObjPCAN_UDS.WaitForServiceFunctional_2013(self.m_PcanHandle, request, 1, True,  response, response_count,confirmation)
                # print(" UDS_SvcTesterPresent_2013: %i" %(status.value))
                if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                    # self.ReadData()
                    return self.__display_uds_msg("TesterPresent",confirmation, response, False)
                else:
                    # self.ReadData()
                    return self.__display_uds_msg("TesterPresent",request, None, False)
        except:
            self.PrintException()
Please help me.

F.Vergnaud
Software Development
Software Development
Posts: 305
Joined: Mon 9. Sep 2013, 12:21

Re: Receive can data continuously and uds service response parallelly using single pcan usb hardware

Post by F.Vergnaud » Thu 27. Apr 2023, 15:36

Can you specify the CAN ID of the CAN frames you are trying to read?
If the CAN ID matches CAN IDs used by UDS then the frame will be consummed by UDS API and you will not be able to fetch it with PCAN-ISO-TP.
Best regards,
Fabrice

sanjeev.palla
Posts: 6
Joined: Fri 21. Apr 2023, 13:04

Re: Receive can data continuously and uds service response parallelly using single pcan usb hardware

Post by sanjeev.palla » Thu 27. Apr 2023, 15:55

HI,

CAN IDs are 0x14 and 0x584.


Below is my code snippet:

Code: Select all

from PCAN_UDS_2013 import *
from PCAN_ISO_TP_2016 import *
from PCAN_ISO_TP_2016 import *
import time
import linecache

try:
    import win32event
    WINDOWS_EVENT_SUPPORT = True
except ImportError:     
    WINDOWS_EVENT_SUPPORT = False

class PCAN(object):

    def __init__(self):
        self.m_objPCANBasic = PCANBasic()
        self.__mObjPCAN_UDS = PCAN_UDS_2013()
        self.__mObjPCAN_ISO_TP_2016 = PCAN_ISO_TP_2016()
        self.__config = uds_msgconfig()
        self.__config.can_id = -1
        self.__config.can_msgtype = PCANTP_CAN_MSGTYPE_STANDARD
        self.__config.nai.protocol = PUDS_MSGPROTOCOL_ISO_15765_2_11B_NORMAL
        self.__config.nai.target_type = PCANTP_ISOTP_ADDRESSING_FUNCTIONAL
        self.__config.type = PUDS_MSGTYPE_USDT
        self.__config.nai.source_addr = PUDS_ISO_15765_4_ADDR_TEST_EQUIPMENT
        self.__config.nai.target_addr = PUDS_ISO_15765_4_ADDR_OBD_FUNCTIONAL
        self.m_BAUDRATES = {'1 MBit/sec':PCAN_BAUD_1M, '800 kBit/sec':PCAN_BAUD_800K, '500 kBit/sec':PCAN_BAUD_500K, '250 kBit/sec':PCAN_BAUD_250K,
                            '125 kBit/sec':PCAN_BAUD_125K, '100 kBit/sec':PCAN_BAUD_100K, '95,238 kBit/sec':PCAN_BAUD_95K, '83,333 kBit/sec':PCAN_BAUD_83K,
                            '50 kBit/sec':PCAN_BAUD_50K, '47,619 kBit/sec':PCAN_BAUD_47K, '33,333 kBit/sec':PCAN_BAUD_33K, '20 kBit/sec':PCAN_BAUD_20K,
                            '10 kBit/sec':PCAN_BAUD_10K, '5 kBit/sec':PCAN_BAUD_5K}

        self.m_HWTYPES = {'ISA-82C200':PCAN_TYPE_ISA, 'ISA-SJA1000':PCAN_TYPE_ISA_SJA, 'ISA-PHYTEC':PCAN_TYPE_ISA_PHYTEC, 'DNG-82C200':PCAN_TYPE_DNG,
                         'DNG-82C200 EPP':PCAN_TYPE_DNG_EPP, 'DNG-SJA1000':PCAN_TYPE_DNG_SJA, 'DNG-SJA1000 EPP':PCAN_TYPE_DNG_SJA_EPP}

        self.m_IOPORTS = {'0100':0x100, '0120':0x120, '0140':0x140, '0200':0x200, '0220':0x220, '0240':0x240, '0260':0x260, '0278':0x278, 
                          '0280':0x280, '02A0':0x2A0, '02C0':0x2C0, '02E0':0x2E0, '02E8':0x2E8, '02F8':0x2F8, '0300':0x300, '0320':0x320,
                          '0340':0x340, '0360':0x360, '0378':0x378, '0380':0x380, '03BC':0x3BC, '03E0':0x3E0, '03E8':0x3E8, '03F8':0x3F8}

        self.m_INTERRUPTS = {'3':3, '4':4, '5':5, '7':7, '9':9, '10':10, '11':11, '12':12, '15':15}
        
        #self.__sdt = SDT()
        self.__currentTime = None
        self.__wakeupStatus = None
        self.__connectStatus = None
        if WINDOWS_EVENT_SUPPORT:
            self.m_ReadThread = None
            self.m_Terminated = False
            # self.m_ReceiveEvent = win32event.CreateEvent(None, 0, 0, None)
            self.m_ReceiveEvent = c_void_p(windll.kernel32.CreateEventA(None, 0,0,None))

    @property
    def WakeupStatus(self):
        return str(self.__wakeupStatus)
    
    @property
    def ConnectStatus(self):
        return str(self.__connectStatus)
    
    def GetChannels(self):
        try:
            channels = []
            result =  self.m_objPCANBasic.GetValue(PCAN_NONEBUS, PCAN_ATTACHED_CHANNELS)
            if  (result[0] == PCAN_ERROR_OK):
                # Include only connectable channels
                #
                for channel in result[1]:
                    if  (channel.channel_condition & PCAN_CHANNEL_AVAILABLE):
                        channels.append(self.FormatChannelName(channel.channel_handle, channel.device_features & FEATURE_FD_CAPABLE))
            return channels
        except Exception as e:
            self.PrintException()

    def Initialize(self,hardware,channel,baudrate,hardware_type,IO_Port,interrupt):

        # startIndex = hardware.index("(") + 1
        # strChannel = hardware[startIndex:startIndex+3]
        # strChannel = strChannel.replace("h", "")
        if hardware.upper() == "PCAN_USB":
            if channel == 1:
                self.m_PcanHandle = PCAN_USBBUS1.value
            elif channel == 2:
                self.m_PcanHandle = PCAN_USBBUS2.value
            elif channel == 3:
                self.m_PcanHandle = PCAN_USBBUS3.value
            elif channel == 4:
                self.m_PcanHandle = PCAN_USBBUS4.value
            elif channel == 5:
                self.m_PcanHandle = PCAN_USBBUS5.value
            elif channel == 6:
                self.m_PcanHandle = PCAN_USBBUS6.value
            elif channel == 7:
                self.m_PcanHandle = PCAN_USBBUS7.value
            elif channel == 8:
                self.m_PcanHandle = PCAN_USBBUS8.value

        baudrate = self.m_BAUDRATES[baudrate]
        hwtype = self.m_HWTYPES[hardware_type]
        ioport = int(IO_Port,16)
        interrupt = int(self.m_INTERRUPTS[interrupt])
        
        result =  self.__mObjPCAN_UDS.Initialize_2013(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)
        # result =  self.m_objPCANBasic.Initialize(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)
        # result =  self.__mObjPCAN_ISO_TP_2016.Initialize_2016(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)
        if result != PCAN_ERROR_OK:
            if result != PCAN_ERROR_CAUTION:
                error = self.GetFormatedError(result)
                print("Error!", error)
                return {"status":"failed","Comment": error}
            else:
                print('The bitrate being used is different than the given one')
                result = PCAN_ERROR_OK
                return {"status":"failed","Comment": 'The bitrate being used is different than the given one'}
        else:
            print("{} has been initialized successfully.".format(hardware))
            return {"status":"success","Comment":""}
        pass

    
    def UnInitialize(self):
        try:
            self.m_Terminated = True
            # Releases a current connected PCAN-Basic channel
            #
            self.__mObjPCAN_UDS.Uninitialize_2013(self.m_PcanHandle)
            #self.m_objPCANBasic.Uninitialize(self.m_PcanHandle)
        except Exception as e:
            self.PrintException()
    
    def TesterPresent(self):
        try:
            # self.m_Terminated = True
            out_msg_request = uds_msg()
            response_count = c_uint32(0)
            request = uds_msg()
            response = uds_msg()
            confirmation = uds_msg()
            status = self.__mObjPCAN_UDS.SvcTesterPresent_2013(self.m_PcanHandle, self.__config, request, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_TP_ZSUBF)
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                status = self.__mObjPCAN_UDS.WaitForServiceFunctional_2013(self.m_PcanHandle, request, 1, True,  response, response_count,confirmation)
                # print(" UDS_SvcTesterPresent_2013: %i" %(status.value))
                if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                    # self.ReadData()
                    return self.__display_uds_msg("TesterPresent",confirmation, response, False)
                else:
                    # self.ReadData()
                    return self.__display_uds_msg("TesterPresent",request, None, False)
        except:
            self.PrintException()


    def ReadDataByIdentifier(self):
        
        try:
            data_identifier = (c_uint16 * 2) (self.__mObjPCAN_UDS.PUDS_SVC_PARAM_DI_ADSDID, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_DI_ECUMDDID )
            request = uds_msg()
            response = uds_msg()
            confirmation = uds_msg()
            # Sends a physical ReadDataByIdentifier message
            # print("\n\nSends a physical ReadDataByIdentifier message: ")
            status = self.__mObjPCAN_UDS.SvcReadDataByIdentifier_2013(self.m_PcanHandle, self.__config, request, data_identifier, 2)
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                status = self.__mObjPCAN_UDS.WaitForService_2013(self.m_PcanHandle, request, response, confirmation)
            # print(" UDS_SvcReadDataByIdentifier_2013: %i" %(status.value))
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                return self.__display_uds_msg("ReadDataByIdentifier",confirmation, response, False)
            else:
                return self.__display_uds_msg("ReadDataByIdentifier",request, None, False)

        except:
            self.PrintException()

    def ClearDiagnosticInformation(self):
        try:
            request = uds_msg()
            response = uds_msg()
            confirmation = uds_msg()
            # Sends a physical ClearDiagnosticInformation message with memory selection parameter (2020)
            # print("\n\nSends a physical ClearDiagnosticInformation message: ")
            status = self.__mObjPCAN_UDS.SvcClearDiagnosticInformation_2013(self.m_PcanHandle, self.__config, request, 0xF1A2B3)
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                status = self.__mObjPCAN_UDS.WaitForService_2013(self.m_PcanHandle, request, response, confirmation)
            # print(" UDS_SvcClearDiagnosticInformation_2013: %i" %(status.value))
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                return self.__display_uds_msg("ClearDiagnosticInformation",confirmation, response, False)
            else:
                return self.__display_uds_msg("ClearDiagnosticInformation",request, None, False)
        except:
            self.PrintException()

    def ReadDTCInformation(self):
        
        try:
            request = uds_msg()
            response = uds_msg()
            confirmation = uds_msg()
            # Sends a physical ReadDTCInformation message
            # print("\n\nSends a physical ReadDTCInformation message: ")
            status = self.__mObjPCAN_UDS.SvcReadDTCInformation_2013(self.m_PcanHandle, self.__config, request, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_RDTCI_RDTCBSM, 0xF1)
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                status = self.__mObjPCAN_UDS.WaitForService_2013(self.m_PcanHandle, request, response, confirmation)
            # print(" UDS_SvcReadDTCInformation_2013: %i" %(status.value))
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                return self.__display_uds_msg("ReadDTCInformation",confirmation, response, False)
            else:
                return self.__display_uds_msg("ReadDTCInformation",request, None, False)
        except:
            self.PrintException()

    def RoutineControl(self):
        
        try:
            routine_control_option_record = create_string_buffer(10)
            routine_control_option_record_size = 10
            request = uds_msg()
            response = uds_msg()
            confirmation = uds_msg()
            
            # Sends a physical RoutineControl message
            # print("\n\nSends a physical RoutineControl message: ")
            for i in range(routine_control_option_record_size):
                routine_control_option_record[i] = self.__getCChar(ord('A') + i)
            
            status = self.__mObjPCAN_UDS.SvcRoutineControl_2013(self.m_PcanHandle, self.__config, request, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_RC_RRR, 0xF1A2, routine_control_option_record, routine_control_option_record_size)
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                status = self.__mObjPCAN_UDS.WaitForService_2013(self.m_PcanHandle, request, response, confirmation)
            # print(" UDS_SvcRoutineControl_2013: %i" %(status.value))
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                return self.__display_uds_msg("RoutineControl",confirmation, response, False)
            else:
                return self.__display_uds_msg("RoutineControl",request, None, False)

        except:
            self.PrintException()
    
    def __display_uds_msg(self,service,request, response, no_response_expected):
        """
        A function that displays UDS Request and Response messages (and count error if no response)

        parameters:
        request: Request message
        response: Received response message
        no_response_expected: if no response is expected, do not increment error counter
        """
        
        # if request != None and request.msg.msgdata.isotp:
        #         print("\nUDS request from 0x%04X (to 0x%04X, with extension address 0x%02X) - result: %i - %s" %(
        #             request.msg.msgdata.isotp.contents.netaddrinfo.source_addr,
        #             request.msg.msgdata.isotp.contents.netaddrinfo.target_addr,
        #             request.msg.msgdata.isotp.contents.netaddrinfo.extension_addr, 
        #             request.msg.msgdata.any.contents.netstatus,
        #             "ERROR !!!" if request.msg.msgdata.any.contents.netstatus != PCANTP_NETSTATUS_OK.value else "OK !"))
        #         # display data
        #         s = "\t-> Length: {x1}, Data= ".format(x1=format(request.msg.msgdata.any.contents.length, "d"))
        #         for i in range(request.msg.msgdata.any.contents.length): 
        #             s += "{x1} ".format(x1=format(request.msg.msgdata.any.contents.data[i], "02X"))
            
        #         print(s)
        
        if response != None and response.msg.msgdata.isotp:
            print("\nUDS RESPONSE from 0x%04X (to 0x%04X, with extension address 0x%02X) - result: %i - %s" %(
                response.msg.msgdata.isotp.contents.netaddrinfo.source_addr, response.msg.msgdata.isotp.contents.netaddrinfo.target_addr,
                response.msg.msgdata.isotp.contents.netaddrinfo.extension_addr, response.msg.msgdata.any.contents.netstatus,
                ("ERROR !!!" if response.msg.msgdata.any.contents.netstatus != PCANTP_NETSTATUS_OK.value else "OK !")))
            # display data
            # s = "\t-> Length: {x1}, Data= ".format(x1=format(response.msg.msgdata.any.contents.length, "d"))
            s = ""
            for i in range(response.msg.msgdata.any.contents.length):
                s += "{x1} ".format(x1=format(response.msg.msgdata.any.contents.data[i], "02X"))
            
            print(s)
        
        elif not no_response_expected:
            s = "ERROR: NO UDS RESPONSE"

        return {"Service":service,"Response":s}

    def __getCChar(self,c):
        """
        Get a character adapted to a C string buffer depending on Python version
        """
        return c if sys.version_info.major >= 3 else chr(c)


    ## Connection Methods
    
    ## Help Function used to get an error as text
    ##
    def GetFormatedError(self, error):
        # Gets the text using the GetErrorText API function
        # If the function success, the translated error is returned. If it fails,
        # a text describing the current error is returned.
        #
        stsReturn = self.m_objPCANBasic.GetErrorText(error, 0)
        if stsReturn[0] != PCAN_ERROR_OK:
            return "An error occurred. Error-code's text ({0:X}h) couldn't be retrieved".format(error)
        else:
            return stsReturn[1]
    
    ## Function for reading CAN messages on normal CAN devices
    ##
    def ReadMessage(self):
        # We execute the "Read" function of the PCANBasic
        #
        rx_msg = cantp_msg()
        timestamp = TPCANTimestamp()
        res = self.__mObjPCAN_ISO_TP_2016.MsgDataAlloc_2016(rx_msg, PCANTP_MSGTYPE_ANY)
        result = self.__mObjPCAN_ISO_TP_2016.Read_2016(self.m_PcanHandle,rx_msg,timestamp)
        # print("Result : ",result)
        if result.value == PCANTP_STATUS_OK.value:
            # We show the received message
            #
            try:
                # data = result[1:]
                readData = dict()
                readData["direction"] = "Rx"
                readData["msg"] = self.__Convert(rx_msg)
                readData["time"] = timestamp
                # print("readData : ",readData)
                #self.__sdt.AddMessageData(readData)
            except Exception as e:
                self.PrintException()
            
        return result.value
    
    def ReadMessages(self):
        stsResult = PCAN_ERROR_OK

        # We read at least one time the queue looking for messages.
        # If a message is found, we look again trying to find more.
        # If the queue is empty or an error occurr, we get out from
        # the dowhile statement.
        #
        while (not (stsResult & PCAN_ERROR_QRCVEMPTY)):
            stsResult = self.ReadMessage()
            if stsResult == PCAN_ERROR_ILLOPERATION:
                break

    ## Thread-Function used for reading PCAN-Basic messages
    ##
    def Read(self):
        try:        
            self.m_Terminated = False
        
            # Configures the Receive-Event. 
            #
            stsResult = self.__mObjPCAN_ISO_TP_2016.SetValue_2016(self.m_PcanHandle, PCAN_RECEIVE_EVENT, self.m_ReceiveEvent,sizeof(self.m_ReceiveEvent))
            # self.ReadMessages()
            if stsResult.value != PCANTP_STATUS_OK.value:
                print ("Error: " + self.GetFormatedError(stsResult))                
            else:
                while not self.m_Terminated:
                    self.ReadMessages()
                    # if win32event.WaitForSingleObject(self.m_ReceiveEvent, 50) == win32event.WAIT_OBJECT_0:
                        
                
                # Resets the Event-handle configuration
                #
                self.__mObjPCAN_ISO_TP_2016.SetValue_2016(self.m_PcanHandle, PCAN_RECEIVE_EVENT, self.m_ReceiveEvent,0)
        except Exception as e:
            self.PrintException()
            print ("Error occurred while processing CAN data")

    #def stop(self):
     #   self.__sdt.StopLog()
      #  self.__sdt.StopTrace()

    def ReadData(self):
        t1 = threading.Thread(None,self.Read)
        t1.start()

    def WriteData(self, data):
        pass

    def Start(self,msgs):
        t = threading.Thread(None,self.__startECU,args=(msgs,))
        t.start()
    
    def __startECU(self,msgs):
        try:
            while True:    
                for msg in msgs:
                    # print(msg)
                    self.m_objPCANBasic.Write(self.m_PcanHandle, msg)
        except Exception as e:
            self.PrintException()

    def Wakeup(self,msg,cycleTime,action):
        try:
            if action.upper() in ["ON","TRUE","START"]:
                self.__wakeup = True
                t = threading.Thread(target=self.__WakeupThread,args=(msg,cycleTime))
                t.start()
            else:
                self.__wakeup = False
        except Exception as e:
            self.PrintException()

    def __WakeupThread(self,msg,cycleTime):
        try:
            while self.__wakeup:
                try:
                    res = self.__mObjPCAN_ISO_TP_2016.Write_2016(self.m_PcanHandle, msg)
                    if res.value == PCANTP_STATUS_OK.value:
                        self.__wakeupStatus = True
                        data = dict()
                        data["direction"] = "Tx"
                        data["msg"] = self.__Convert(msg)
                        data["time"] = (time.time())/10
                        #self.__sdt.AddMessageData(data)
                    else:
                        self.__wakeupStatus = False
                except Exception as e:
                    self.PrintException()
                    self.__wakeupStatus = False
                time.sleep(cycleTime/1000)
        except Exception as e:
            self.PrintException()
    
    def Connect(self,msg,cycleTime,action):
        try:
            if action.upper() in ["ON","TRUE","START"]:
                self.__connect = True
                t = threading.Thread(target=self.__ConnectThread,args=(msg,cycleTime))
                t.start()
            else:
                self.__connect = False
        except Exception as e:
            self.PrintException()

    def __ConnectThread(self,msg,cycleTime):
        try:
            while self.__connect:
                try:
                    res = self.__mObjPCAN_ISO_TP_2016.Write_2016(self.m_PcanHandle, msg)
                    if res.value == PCANTP_STATUS_OK.value:
                        self.__connectStatus = True
                        data = dict()
                        data["direction"] = "Tx"
                        data["msg"] = self.__Convert(msg)
                        data["time"] = (time.time())/10
                        #self.__sdt.AddMessageData(data)
                    else:
                        self.__connectStatus = False
                    time.sleep(cycleTime/1000)
                except Exception as e:
                    self.PrintException()
                    self.__connectStatus = False
        except Exception as e:
            self.PrintException()

    def SetRequestMode(self,msg):
        try:
            res = self.__mObjPCAN_ISO_TP_2016.Write_2016(self.m_PcanHandle, msg)
            if res.value == PCANTP_STATUS_OK.value:
                data = dict()
                data["direction"] = "Tx"
                data["msg"] = self.__Convert(msg)
                data["time"] = (time.time())/10
                # print(data)
                #self.__sdt.AddMessageData(data)
                return True
            else:
                return False
        except Exception as e:
            self.PrintException()
    
    def InitMsg(self,frame_id,length,data=None):
        tx_msg = cantp_msg()
        try:            
            if msg is None:
                return cantp_msg()
            else:
                status = self.__mObjPCAN_ISO_TP_2016.MsgDataAlloc_2016(tx_msg,PCANTP_MSGTYPE_CAN)
                status = self.__mObjPCAN_ISO_TP_2016.MsgDataInit_2016(tx_msg,frame_id,PCANTP_CAN_MSGTYPE_STANDARD,length,None,None)
                if data is not None:
                    if type(data) is str:
                        data_bytes = data.split(" ")
                    elif type(data) is list:
                        data_bytes = data
                    for i in range(msg.length):
                        tx_msg.msgdata.any.contents.data[i] = int(data_bytes[i], 16)
            return tx_msg
        except Exception as e:
            self.PrintException()
            return tx_msg
    
    def __Convert(self,cantp_msg):
        canMsg = TPCANMsg()
        try:
            canMsg.ID = cantp_msg.can_info.can_id
            canMsg.MSGTYPE = cantp_msg.can_info.can_msgtype
            canMsg.LEN = cantp_msg.msgdata.can.contents.length
            for i in range(cantp_msg.msgdata.can.contents.length):
                canMsg.DATA[i] = cantp_msg.msgdata.can.contents.data[i]
            return canMsg
        except Exception as e:
            self.PrintException()
            return canMsg
    
    def PrintException(self):
        exc_type, exc_obj, tb = sys.exc_info()
        f = tb.tb_frame
        lineno = tb.tb_lineno
        filename = f.f_code.co_filename
        linecache.checkcache(filename)
        line = linecache.getline(filename, lineno, f.f_globals)
        print('EXCEPTION IN ({}, LINE {} "{}"): {}'.format(filename, lineno, line.strip(), exc_obj))
                
    

if __name__ == "__main__":
    pcan = PCAN()
    pcan.Initialize("PCAN_USB",1,"500 kBit/sec","ISA-82C200","0100","3")
    wakeup_msg = pcan.InitMsg(0x14,3)
    print(pcan.Wakeup(wakeup_msg,100,"ON"))
    connect_msg = pcan.InitMsg(0x584,8)
    print(pcan.Connect(connect_msg,1000,"ON"))
    pcan.ReadData()
    print(pcan.TesterPresent())
    print(pcan.ReadDTCInformation())
    print(pcan.ReadDataByIdentifier())
    print(pcan.RoutineControl())
    print(pcan.ClearDiagnosticInformation())
    time.sleep(10)
    #pcan.stop()
    pcan.UnInitialize()
    

F.Vergnaud
Software Development
Software Development
Posts: 305
Joined: Mon 9. Sep 2013, 12:21

Re: Receive can data continuously and uds service response parallelly using single pcan usb hardware

Post by F.Vergnaud » Thu 27. Apr 2023, 17:45

Hello,

Your code presents copy/paste excerpt of code from various API samples!
For instance you are comparing PCANBasic status with cantp_status variables... this is obviously wrong and leads to bad interpretation.
There is also runtime exceptions where wrong variables are used (check your function with tx_msg and msg)...

Double check your code, the forum is not a place where we debug your code!

Finally your code shows that you want to use UDS and read some non-UDS frames with ISOTP.
This is exactly what is done in sample 10_client_uds_and_isotp: be careful on how the ISOTP event is set (use PCANTP_PARAMETER_RECEIVE_EVENT).

Note: advised Sample 14_client_uds_and_can shows how to use UDS and read any CAN frames (including UDS frames via parameter PCANTP_PARAMETER_KEEP_HIGHER_LAYER_MESSAGES), this does not exactly correspond to what you want.
Best regards,
Fabrice

Post Reply