Not able to write 8 bytes of data

A free API for the transfer of data packages according to ISO-TP (ISO 15765-2)
Locked
sanjeev.palla
Posts: 6
Joined: Fri 21. Apr 2023, 13:04

Not able to write 8 bytes of data

Post by sanjeev.palla » Wed 26. Apr 2023, 19:24

Hi Team,

I am trying to write 2 messages using PCAN-ISO TP. One message is having 3 bytes of data and able write the data using "Write_2016" method. I am getting response code is 0(No error).But in case of 2nd message which is having 8 bytes of data, when I am trying to write the data using "Write_2016" method, I am getting response code is 13(Invalid cantp_handle). Can someone help me why am I getting response code is 13 in case of message which is having 8 bytes of data only but not in case for 3 bytes of data?

Below is my code snippet:

Code: Select all

result =  self.__mObjPCAN_UDS.Initialize_2013(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)
status = self.__mObjPCAN_ISO_TP_2016.MsgDataAlloc_2016(tx_msg,PCANTP_MSGTYPE_ISOTP)
status = self.__mObjPCAN_ISO_TP_2016.MsgDataInit_2016(tx_msg,msg.frame_id,PCANTP_CAN_MSGTYPE_STANDARD,msg.length,None,None)
res = self.__mObjPCAN_ISO_TP_2016.Write_2016(self.m_PcanHandle, msg)
Above code is working fine for 3 bytes of data but not for 8 bytes of data.

F.Vergnaud
Software Development
Software Development
Posts: 305
Joined: Mon 9. Sep 2013, 12:21

Re: Not able to write 8 bytes of data

Post by F.Vergnaud » Thu 27. Apr 2023, 09:37

Hello,

PCAN-ISO-TP API is able to transmit an ISOTP message without any configuration if its data fits in a single CAN/CANFD frame. This is why you can successfully write a 3-byte data message.

If the data length of your message is greater than 6 or 7 on CAN, the message is segmented and the API needs extra information to start an ISOTP communication: you have to configure ISOTP mappings.
The error code returned is 13 which is PCANTP_STATUS_MAPPING_NOT_INITIALIZED (and I don't know where you found the interpretation of "Invalid cantp_handle").

If you want to write a 8-byte CAN frame instead of an ISOTP message, you just have to change PCANTP_MSGTYPE_ISOTP to PCANTP_MSGTYPE_CAN.
Otherwise please take a look at the documentation and the samples of PCAN-ISO-TP API.
Best regards,
Fabrice

sanjeev.palla
Posts: 6
Joined: Fri 21. Apr 2023, 13:04

Re: Not able to write 8 bytes of data

Post by sanjeev.palla » Thu 27. Apr 2023, 12:59

Thank you so much!!!

Now I am able to write data with 8 bytes. But my uds is not working now.

Below is the response for UDS services:
{'Service': 'TesterPresent', 'Response': 'ERROR: NO UDS RESPONSE'}
{'Service': 'ReadDTCInformation', 'Response': 'ERROR: NO UDS RESPONSE'}
{'Service': 'ReadDataByIdentifier', 'Response': 'ERROR: NO UDS RESPONSE'}
{'Service': 'RoutineControl', 'Response': 'ERROR: NO UDS RESPONSE'}
{'Service': 'ClearDiagnosticInformation', 'Response': 'ERROR: NO UDS RESPONSE'}

Below is my code snippet:

Code: Select all

result =  self.__mObjPCAN_UDS.Initialize_2013(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)
status = self.__mObjPCAN_UDS.SvcTesterPresent_2013(self.m_PcanHandle, self.__config, request, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_TP_ZSUBF)
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                status = self.__mObjPCAN_UDS.WaitForServiceFunctional_2013(self.m_PcanHandle, request, 1, True,  response, response_count,confirmation)
                # print(" UDS_SvcTesterPresent_2013: %i" %(status.value))
                if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                    # self.ReadData()
                    return self.__display_uds_msg("TesterPresent",confirmation, response, False)
                else:
                    # self.ReadData()
                    return self.__display_uds_msg("TesterPresent",request, None, False)
If I don't read CAN data then I am getting response for all UDS services. I need to read CAN data as well as UDS services at a time.
Can you please help me?

F.Vergnaud
Software Development
Software Development
Posts: 305
Joined: Mon 9. Sep 2013, 12:21

Re: Not able to write 8 bytes of data

Post by F.Vergnaud » Thu 27. Apr 2023, 15:12

Your code snippet gives no information on what you're trying to do.. it doesn't even present when you write a CAN frame...
Please copy more useful extracts of code.

Can you specify the CAN IDs you are using to communicate with UDS and with ISOTP? and the confirguration of your mappings if there are any?
We have to know that the CAN frames you are sending don't interfere with UDS communication.
Best regards,
Fabrice

sanjeev.palla
Posts: 6
Joined: Fri 21. Apr 2023, 13:04

Re: Not able to write 8 bytes of data

Post by sanjeev.palla » Thu 27. Apr 2023, 15:53

Hi,

Below is my code:

Code: Select all

from PCAN_UDS_2013 import *
from PCAN_ISO_TP_2016 import *
from PCAN_ISO_TP_2016 import *
import time
import linecache

try:
    import win32event
    WINDOWS_EVENT_SUPPORT = True
except ImportError:     
    WINDOWS_EVENT_SUPPORT = False

class PCAN(object):

    def __init__(self):
        self.m_objPCANBasic = PCANBasic()
        self.__mObjPCAN_UDS = PCAN_UDS_2013()
        self.__mObjPCAN_ISO_TP_2016 = PCAN_ISO_TP_2016()
        self.__config = uds_msgconfig()
        self.__config.can_id = -1
        self.__config.can_msgtype = PCANTP_CAN_MSGTYPE_STANDARD
        self.__config.nai.protocol = PUDS_MSGPROTOCOL_ISO_15765_2_11B_NORMAL
        self.__config.nai.target_type = PCANTP_ISOTP_ADDRESSING_FUNCTIONAL
        self.__config.type = PUDS_MSGTYPE_USDT
        self.__config.nai.source_addr = PUDS_ISO_15765_4_ADDR_TEST_EQUIPMENT
        self.__config.nai.target_addr = PUDS_ISO_15765_4_ADDR_OBD_FUNCTIONAL
        self.m_BAUDRATES = {'1 MBit/sec':PCAN_BAUD_1M, '800 kBit/sec':PCAN_BAUD_800K, '500 kBit/sec':PCAN_BAUD_500K, '250 kBit/sec':PCAN_BAUD_250K,
                            '125 kBit/sec':PCAN_BAUD_125K, '100 kBit/sec':PCAN_BAUD_100K, '95,238 kBit/sec':PCAN_BAUD_95K, '83,333 kBit/sec':PCAN_BAUD_83K,
                            '50 kBit/sec':PCAN_BAUD_50K, '47,619 kBit/sec':PCAN_BAUD_47K, '33,333 kBit/sec':PCAN_BAUD_33K, '20 kBit/sec':PCAN_BAUD_20K,
                            '10 kBit/sec':PCAN_BAUD_10K, '5 kBit/sec':PCAN_BAUD_5K}

        self.m_HWTYPES = {'ISA-82C200':PCAN_TYPE_ISA, 'ISA-SJA1000':PCAN_TYPE_ISA_SJA, 'ISA-PHYTEC':PCAN_TYPE_ISA_PHYTEC, 'DNG-82C200':PCAN_TYPE_DNG,
                         'DNG-82C200 EPP':PCAN_TYPE_DNG_EPP, 'DNG-SJA1000':PCAN_TYPE_DNG_SJA, 'DNG-SJA1000 EPP':PCAN_TYPE_DNG_SJA_EPP}

        self.m_IOPORTS = {'0100':0x100, '0120':0x120, '0140':0x140, '0200':0x200, '0220':0x220, '0240':0x240, '0260':0x260, '0278':0x278, 
                          '0280':0x280, '02A0':0x2A0, '02C0':0x2C0, '02E0':0x2E0, '02E8':0x2E8, '02F8':0x2F8, '0300':0x300, '0320':0x320,
                          '0340':0x340, '0360':0x360, '0378':0x378, '0380':0x380, '03BC':0x3BC, '03E0':0x3E0, '03E8':0x3E8, '03F8':0x3F8}

        self.m_INTERRUPTS = {'3':3, '4':4, '5':5, '7':7, '9':9, '10':10, '11':11, '12':12, '15':15}
        
        #self.__sdt = SDT()
        self.__currentTime = None
        self.__wakeupStatus = None
        self.__connectStatus = None
        if WINDOWS_EVENT_SUPPORT:
            self.m_ReadThread = None
            self.m_Terminated = False
            # self.m_ReceiveEvent = win32event.CreateEvent(None, 0, 0, None)
            self.m_ReceiveEvent = c_void_p(windll.kernel32.CreateEventA(None, 0,0,None))

    @property
    def WakeupStatus(self):
        return str(self.__wakeupStatus)
    
    @property
    def ConnectStatus(self):
        return str(self.__connectStatus)
    
    def GetChannels(self):
        try:
            channels = []
            result =  self.m_objPCANBasic.GetValue(PCAN_NONEBUS, PCAN_ATTACHED_CHANNELS)
            if  (result[0] == PCAN_ERROR_OK):
                # Include only connectable channels
                #
                for channel in result[1]:
                    if  (channel.channel_condition & PCAN_CHANNEL_AVAILABLE):
                        channels.append(self.FormatChannelName(channel.channel_handle, channel.device_features & FEATURE_FD_CAPABLE))
            return channels
        except Exception as e:
            self.PrintException()

    def Initialize(self,hardware,channel,baudrate,hardware_type,IO_Port,interrupt):

        # startIndex = hardware.index("(") + 1
        # strChannel = hardware[startIndex:startIndex+3]
        # strChannel = strChannel.replace("h", "")
        if hardware.upper() == "PCAN_USB":
            if channel == 1:
                self.m_PcanHandle = PCAN_USBBUS1.value
            elif channel == 2:
                self.m_PcanHandle = PCAN_USBBUS2.value
            elif channel == 3:
                self.m_PcanHandle = PCAN_USBBUS3.value
            elif channel == 4:
                self.m_PcanHandle = PCAN_USBBUS4.value
            elif channel == 5:
                self.m_PcanHandle = PCAN_USBBUS5.value
            elif channel == 6:
                self.m_PcanHandle = PCAN_USBBUS6.value
            elif channel == 7:
                self.m_PcanHandle = PCAN_USBBUS7.value
            elif channel == 8:
                self.m_PcanHandle = PCAN_USBBUS8.value

        baudrate = self.m_BAUDRATES[baudrate]
        hwtype = self.m_HWTYPES[hardware_type]
        ioport = int(IO_Port,16)
        interrupt = int(self.m_INTERRUPTS[interrupt])
        
        result =  self.__mObjPCAN_UDS.Initialize_2013(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)
        # result =  self.m_objPCANBasic.Initialize(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)
        # result =  self.__mObjPCAN_ISO_TP_2016.Initialize_2016(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)
        if result != PCAN_ERROR_OK:
            if result != PCAN_ERROR_CAUTION:
                error = self.GetFormatedError(result)
                print("Error!", error)
                return {"status":"failed","Comment": error}
            else:
                print('The bitrate being used is different than the given one')
                result = PCAN_ERROR_OK
                return {"status":"failed","Comment": 'The bitrate being used is different than the given one'}
        else:
            print("{} has been initialized successfully.".format(hardware))
            return {"status":"success","Comment":""}
        pass

    
    def UnInitialize(self):
        try:
            self.m_Terminated = True
            # Releases a current connected PCAN-Basic channel
            #
            self.__mObjPCAN_UDS.Uninitialize_2013(self.m_PcanHandle)
            #self.m_objPCANBasic.Uninitialize(self.m_PcanHandle)
        except Exception as e:
            self.PrintException()
    
    def TesterPresent(self):
        try:
            # self.m_Terminated = True
            out_msg_request = uds_msg()
            response_count = c_uint32(0)
            request = uds_msg()
            response = uds_msg()
            confirmation = uds_msg()
            status = self.__mObjPCAN_UDS.SvcTesterPresent_2013(self.m_PcanHandle, self.__config, request, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_TP_ZSUBF)
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                status = self.__mObjPCAN_UDS.WaitForServiceFunctional_2013(self.m_PcanHandle, request, 1, True,  response, response_count,confirmation)
                # print(" UDS_SvcTesterPresent_2013: %i" %(status.value))
                if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                    # self.ReadData()
                    return self.__display_uds_msg("TesterPresent",confirmation, response, False)
                else:
                    # self.ReadData()
                    return self.__display_uds_msg("TesterPresent",request, None, False)
        except:
            self.PrintException()


    def ReadDataByIdentifier(self):
        
        try:
            data_identifier = (c_uint16 * 2) (self.__mObjPCAN_UDS.PUDS_SVC_PARAM_DI_ADSDID, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_DI_ECUMDDID )
            request = uds_msg()
            response = uds_msg()
            confirmation = uds_msg()
            # Sends a physical ReadDataByIdentifier message
            # print("\n\nSends a physical ReadDataByIdentifier message: ")
            status = self.__mObjPCAN_UDS.SvcReadDataByIdentifier_2013(self.m_PcanHandle, self.__config, request, data_identifier, 2)
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                status = self.__mObjPCAN_UDS.WaitForService_2013(self.m_PcanHandle, request, response, confirmation)
            # print(" UDS_SvcReadDataByIdentifier_2013: %i" %(status.value))
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                return self.__display_uds_msg("ReadDataByIdentifier",confirmation, response, False)
            else:
                return self.__display_uds_msg("ReadDataByIdentifier",request, None, False)

        except:
            self.PrintException()

    def ClearDiagnosticInformation(self):
        try:
            request = uds_msg()
            response = uds_msg()
            confirmation = uds_msg()
            # Sends a physical ClearDiagnosticInformation message with memory selection parameter (2020)
            # print("\n\nSends a physical ClearDiagnosticInformation message: ")
            status = self.__mObjPCAN_UDS.SvcClearDiagnosticInformation_2013(self.m_PcanHandle, self.__config, request, 0xF1A2B3)
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                status = self.__mObjPCAN_UDS.WaitForService_2013(self.m_PcanHandle, request, response, confirmation)
            # print(" UDS_SvcClearDiagnosticInformation_2013: %i" %(status.value))
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                return self.__display_uds_msg("ClearDiagnosticInformation",confirmation, response, False)
            else:
                return self.__display_uds_msg("ClearDiagnosticInformation",request, None, False)
        except:
            self.PrintException()

    def ReadDTCInformation(self):
        
        try:
            request = uds_msg()
            response = uds_msg()
            confirmation = uds_msg()
            # Sends a physical ReadDTCInformation message
            # print("\n\nSends a physical ReadDTCInformation message: ")
            status = self.__mObjPCAN_UDS.SvcReadDTCInformation_2013(self.m_PcanHandle, self.__config, request, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_RDTCI_RDTCBSM, 0xF1)
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                status = self.__mObjPCAN_UDS.WaitForService_2013(self.m_PcanHandle, request, response, confirmation)
            # print(" UDS_SvcReadDTCInformation_2013: %i" %(status.value))
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                return self.__display_uds_msg("ReadDTCInformation",confirmation, response, False)
            else:
                return self.__display_uds_msg("ReadDTCInformation",request, None, False)
        except:
            self.PrintException()

    def RoutineControl(self):
        
        try:
            routine_control_option_record = create_string_buffer(10)
            routine_control_option_record_size = 10
            request = uds_msg()
            response = uds_msg()
            confirmation = uds_msg()
            
            # Sends a physical RoutineControl message
            # print("\n\nSends a physical RoutineControl message: ")
            for i in range(routine_control_option_record_size):
                routine_control_option_record[i] = self.__getCChar(ord('A') + i)
            
            status = self.__mObjPCAN_UDS.SvcRoutineControl_2013(self.m_PcanHandle, self.__config, request, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_RC_RRR, 0xF1A2, routine_control_option_record, routine_control_option_record_size)
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                status = self.__mObjPCAN_UDS.WaitForService_2013(self.m_PcanHandle, request, response, confirmation)
            # print(" UDS_SvcRoutineControl_2013: %i" %(status.value))
            if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
                return self.__display_uds_msg("RoutineControl",confirmation, response, False)
            else:
                return self.__display_uds_msg("RoutineControl",request, None, False)

        except:
            self.PrintException()
    
    def __display_uds_msg(self,service,request, response, no_response_expected):
        """
        A function that displays UDS Request and Response messages (and count error if no response)

        parameters:
        request: Request message
        response: Received response message
        no_response_expected: if no response is expected, do not increment error counter
        """
        
        # if request != None and request.msg.msgdata.isotp:
        #         print("\nUDS request from 0x%04X (to 0x%04X, with extension address 0x%02X) - result: %i - %s" %(
        #             request.msg.msgdata.isotp.contents.netaddrinfo.source_addr,
        #             request.msg.msgdata.isotp.contents.netaddrinfo.target_addr,
        #             request.msg.msgdata.isotp.contents.netaddrinfo.extension_addr, 
        #             request.msg.msgdata.any.contents.netstatus,
        #             "ERROR !!!" if request.msg.msgdata.any.contents.netstatus != PCANTP_NETSTATUS_OK.value else "OK !"))
        #         # display data
        #         s = "\t-> Length: {x1}, Data= ".format(x1=format(request.msg.msgdata.any.contents.length, "d"))
        #         for i in range(request.msg.msgdata.any.contents.length): 
        #             s += "{x1} ".format(x1=format(request.msg.msgdata.any.contents.data[i], "02X"))
            
        #         print(s)
        
        if response != None and response.msg.msgdata.isotp:
            print("\nUDS RESPONSE from 0x%04X (to 0x%04X, with extension address 0x%02X) - result: %i - %s" %(
                response.msg.msgdata.isotp.contents.netaddrinfo.source_addr, response.msg.msgdata.isotp.contents.netaddrinfo.target_addr,
                response.msg.msgdata.isotp.contents.netaddrinfo.extension_addr, response.msg.msgdata.any.contents.netstatus,
                ("ERROR !!!" if response.msg.msgdata.any.contents.netstatus != PCANTP_NETSTATUS_OK.value else "OK !")))
            # display data
            # s = "\t-> Length: {x1}, Data= ".format(x1=format(response.msg.msgdata.any.contents.length, "d"))
            s = ""
            for i in range(response.msg.msgdata.any.contents.length):
                s += "{x1} ".format(x1=format(response.msg.msgdata.any.contents.data[i], "02X"))
            
            print(s)
        
        elif not no_response_expected:
            s = "ERROR: NO UDS RESPONSE"

        return {"Service":service,"Response":s}

    def __getCChar(self,c):
        """
        Get a character adapted to a C string buffer depending on Python version
        """
        return c if sys.version_info.major >= 3 else chr(c)


    ## Connection Methods
    
    ## Help Function used to get an error as text
    ##
    def GetFormatedError(self, error):
        # Gets the text using the GetErrorText API function
        # If the function success, the translated error is returned. If it fails,
        # a text describing the current error is returned.
        #
        stsReturn = self.m_objPCANBasic.GetErrorText(error, 0)
        if stsReturn[0] != PCAN_ERROR_OK:
            return "An error occurred. Error-code's text ({0:X}h) couldn't be retrieved".format(error)
        else:
            return stsReturn[1]
    
    ## Function for reading CAN messages on normal CAN devices
    ##
    def ReadMessage(self):
        # We execute the "Read" function of the PCANBasic
        #
        rx_msg = cantp_msg()
        timestamp = TPCANTimestamp()
        res = self.__mObjPCAN_ISO_TP_2016.MsgDataAlloc_2016(rx_msg, PCANTP_MSGTYPE_ANY)
        result = self.__mObjPCAN_ISO_TP_2016.Read_2016(self.m_PcanHandle,rx_msg,timestamp)
        # print("Result : ",result)
        if result.value == PCANTP_STATUS_OK.value:
            # We show the received message
            #
            try:
                # data = result[1:]
                readData = dict()
                readData["direction"] = "Rx"
                readData["msg"] = self.__Convert(rx_msg)
                readData["time"] = timestamp
                # print("readData : ",readData)
                #self.__sdt.AddMessageData(readData)
            except Exception as e:
                self.PrintException()
            
        return result.value
    
    def ReadMessages(self):
        stsResult = PCAN_ERROR_OK

        # We read at least one time the queue looking for messages.
        # If a message is found, we look again trying to find more.
        # If the queue is empty or an error occurr, we get out from
        # the dowhile statement.
        #
        while (not (stsResult & PCAN_ERROR_QRCVEMPTY)):
            stsResult = self.ReadMessage()
            if stsResult == PCAN_ERROR_ILLOPERATION:
                break

    ## Thread-Function used for reading PCAN-Basic messages
    ##
    def Read(self):
        try:        
            self.m_Terminated = False
        
            # Configures the Receive-Event. 
            #
            stsResult = self.__mObjPCAN_ISO_TP_2016.SetValue_2016(self.m_PcanHandle, PCAN_RECEIVE_EVENT, self.m_ReceiveEvent,sizeof(self.m_ReceiveEvent))
            # self.ReadMessages()
            if stsResult.value != PCANTP_STATUS_OK.value:
                print ("Error: " + self.GetFormatedError(stsResult))                
            else:
                while not self.m_Terminated:
                    self.ReadMessages()
                    # if win32event.WaitForSingleObject(self.m_ReceiveEvent, 50) == win32event.WAIT_OBJECT_0:
                        
                
                # Resets the Event-handle configuration
                #
                self.__mObjPCAN_ISO_TP_2016.SetValue_2016(self.m_PcanHandle, PCAN_RECEIVE_EVENT, self.m_ReceiveEvent,0)
        except Exception as e:
            self.PrintException()
            print ("Error occurred while processing CAN data")

    #def stop(self):
     #   self.__sdt.StopLog()
      #  self.__sdt.StopTrace()

    def ReadData(self):
        t1 = threading.Thread(None,self.Read)
        t1.start()

    def WriteData(self, data):
        pass

    def Start(self,msgs):
        t = threading.Thread(None,self.__startECU,args=(msgs,))
        t.start()
    
    def __startECU(self,msgs):
        try:
            while True:    
                for msg in msgs:
                    # print(msg)
                    self.m_objPCANBasic.Write(self.m_PcanHandle, msg)
        except Exception as e:
            self.PrintException()

    def Wakeup(self,msg,cycleTime,action):
        try:
            if action.upper() in ["ON","TRUE","START"]:
                self.__wakeup = True
                t = threading.Thread(target=self.__WakeupThread,args=(msg,cycleTime))
                t.start()
            else:
                self.__wakeup = False
        except Exception as e:
            self.PrintException()

    def __WakeupThread(self,msg,cycleTime):
        try:
            while self.__wakeup:
                try:
                    res = self.__mObjPCAN_ISO_TP_2016.Write_2016(self.m_PcanHandle, msg)
                    if res.value == PCANTP_STATUS_OK.value:
                        self.__wakeupStatus = True
                        data = dict()
                        data["direction"] = "Tx"
                        data["msg"] = self.__Convert(msg)
                        data["time"] = (time.time())/10
                        #self.__sdt.AddMessageData(data)
                    else:
                        self.__wakeupStatus = False
                except Exception as e:
                    self.PrintException()
                    self.__wakeupStatus = False
                time.sleep(cycleTime/1000)
        except Exception as e:
            self.PrintException()
    
    def Connect(self,msg,cycleTime,action):
        try:
            if action.upper() in ["ON","TRUE","START"]:
                self.__connect = True
                t = threading.Thread(target=self.__ConnectThread,args=(msg,cycleTime))
                t.start()
            else:
                self.__connect = False
        except Exception as e:
            self.PrintException()

    def __ConnectThread(self,msg,cycleTime):
        try:
            while self.__connect:
                try:
                    res = self.__mObjPCAN_ISO_TP_2016.Write_2016(self.m_PcanHandle, msg)
                    if res.value == PCANTP_STATUS_OK.value:
                        self.__connectStatus = True
                        data = dict()
                        data["direction"] = "Tx"
                        data["msg"] = self.__Convert(msg)
                        data["time"] = (time.time())/10
                        #self.__sdt.AddMessageData(data)
                    else:
                        self.__connectStatus = False
                    time.sleep(cycleTime/1000)
                except Exception as e:
                    self.PrintException()
                    self.__connectStatus = False
        except Exception as e:
            self.PrintException()

    def SetRequestMode(self,msg):
        try:
            res = self.__mObjPCAN_ISO_TP_2016.Write_2016(self.m_PcanHandle, msg)
            if res.value == PCANTP_STATUS_OK.value:
                data = dict()
                data["direction"] = "Tx"
                data["msg"] = self.__Convert(msg)
                data["time"] = (time.time())/10
                # print(data)
                #self.__sdt.AddMessageData(data)
                return True
            else:
                return False
        except Exception as e:
            self.PrintException()
    
    def InitMsg(self,frame_id,length,data=None):
        tx_msg = cantp_msg()
        try:            
            if msg is None:
                return cantp_msg()
            else:
                status = self.__mObjPCAN_ISO_TP_2016.MsgDataAlloc_2016(tx_msg,PCANTP_MSGTYPE_CAN)
                status = self.__mObjPCAN_ISO_TP_2016.MsgDataInit_2016(tx_msg,frame_id,PCANTP_CAN_MSGTYPE_STANDARD,length,None,None)
                if data is not None:
                    if type(data) is str:
                        data_bytes = data.split(" ")
                    elif type(data) is list:
                        data_bytes = data
                    for i in range(msg.length):
                        tx_msg.msgdata.any.contents.data[i] = int(data_bytes[i], 16)
            return tx_msg
        except Exception as e:
            self.PrintException()
            return tx_msg
    
    def __Convert(self,cantp_msg):
        canMsg = TPCANMsg()
        try:
            canMsg.ID = cantp_msg.can_info.can_id
            canMsg.MSGTYPE = cantp_msg.can_info.can_msgtype
            canMsg.LEN = cantp_msg.msgdata.can.contents.length
            for i in range(cantp_msg.msgdata.can.contents.length):
                canMsg.DATA[i] = cantp_msg.msgdata.can.contents.data[i]
            return canMsg
        except Exception as e:
            self.PrintException()
            return canMsg
    
    def PrintException(self):
        exc_type, exc_obj, tb = sys.exc_info()
        f = tb.tb_frame
        lineno = tb.tb_lineno
        filename = f.f_code.co_filename
        linecache.checkcache(filename)
        line = linecache.getline(filename, lineno, f.f_globals)
        print('EXCEPTION IN ({}, LINE {} "{}"): {}'.format(filename, lineno, line.strip(), exc_obj))
                
    

if __name__ == "__main__":
    pcan = PCAN()
    pcan.Initialize("PCAN_USB",1,"500 kBit/sec","ISA-82C200","0100","3")
    wakeup_msg = pcan.InitMsg(0x14,3)
    print(pcan.Wakeup(wakeup_msg,100,"ON"))
    connect_msg = pcan.InitMsg(0x584,8)
    print(pcan.Connect(connect_msg,1000,"ON"))
    pcan.ReadData()
    print(pcan.TesterPresent())
    print(pcan.ReadDTCInformation())
    print(pcan.ReadDataByIdentifier())
    print(pcan.RoutineControl())
    print(pcan.ClearDiagnosticInformation())
    time.sleep(10)
    #pcan.stop()
    pcan.UnInitialize()
    

K.Wagner
Software Development
Software Development
Posts: 1080
Joined: Wed 22. Sep 2010, 13:36

Re: Not able to write 8 bytes of data

Post by K.Wagner » Thu 27. Apr 2023, 16:17

The actual question on how to send 8 bytes of data was answered.
Further questions should be made using another topic.
If you decide to use the same, then do not duplicate topics.

Further conversation on the actual problem in: Receive can data continuously and uds service response parallelly using single pcan usb hardware

Closed!
Best regards,
Keneth

Locked