Hi,
I am trying to read CAN messages from ECU and perform UDS services parallelly with one PCAN USB hardware.
I have initialized the channel using PCAN_UDS. I am able get either CAN communication or UDS service response but not both at a time.
Can we get both communications(CAN and UDS) at a time using single PCAN USB channel? If yes, What changes do I need to do?
Below is UDS msg config:
self.__config = uds_msgconfig()
self.__config.can_id = -1
self.__config.can_msgtype = PCANTP_CAN_MSGTYPE_STANDARD
self.__config.nai.protocol = PUDS_MSGPROTOCOL_ISO_15765_2_11B_NORMAL
self.__config.nai.target_type = PCANTP_ISOTP_ADDRESSING_FUNCTIONAL
self.__config.type = PUDS_MSGTYPE_USDT
self.__config.nai.source_addr = PUDS_ISO_15765_4_ADDR_TEST_EQUIPMENT
self.__config.nai.target_addr = PUDS_ISO_15765_4_ADDR_OBD_FUNCTIONAL
UDS Initialize:
result = self.__mObjPCAN_UDS.Initialize_2013(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)
Note: I am using python.
Receive can data continuously and uds service response parallelly using single pcan usb hardware
-
- Posts: 6
- Joined: Fri 21. Apr 2023, 13:04
-
- Software Development
- Posts: 305
- Joined: Mon 9. Sep 2013, 12:21
Re: Receive can data continuously and uds service response parallelly using single pcan usb hardware
Hello,
You must initialize your device with PCAN-UDS and use PCAN-ISO-TP functions when you want to communicate with CAN/CANFD frames or ISOTP messages.
Please check one of these samples included in the PCAN-UDS package:
* 13_server_uds_and_can / 14_client_uds_and_can
* 08_read_write_uds_and_isotp
You must initialize your device with PCAN-UDS and use PCAN-ISO-TP functions when you want to communicate with CAN/CANFD frames or ISOTP messages.
Please check one of these samples included in the PCAN-UDS package:
* 13_server_uds_and_can / 14_client_uds_and_can
* 08_read_write_uds_and_isotp
Best regards,
Fabrice
Fabrice
-
- Posts: 6
- Joined: Fri 21. Apr 2023, 13:04
Re: Receive can data continuously and uds service response parallelly using single pcan usb hardware
Hi,
I have tried your suggestion but still issue is there. I am able perform only task at a time either read CAN data or UDS services.
Below is my code:
Please help me.
I have tried your suggestion but still issue is there. I am able perform only task at a time either read CAN data or UDS services.
Below is my code:
Code: Select all
self.__mObjPCAN_UDS = PCAN_UDS_2013()
self.__mObjPCAN_ISO_TP_2016 = PCAN_ISO_TP_2016()
self.__config = uds_msgconfig()
self.__config.can_id = -1
self.__config.can_msgtype = PCANTP_CAN_MSGTYPE_STANDARD
self.__config.nai.protocol = PUDS_MSGPROTOCOL_ISO_15765_2_11B_NORMAL
self.__config.nai.target_type = PCANTP_ISOTP_ADDRESSING_FUNCTIONAL
self.__config.type = PUDS_MSGTYPE_USDT
self.__config.nai.source_addr = PUDS_ISO_15765_4_ADDR_TEST_EQUIPMENT
self.__config.nai.target_addr = PUDS_ISO_15765_4_ADDR_OBD_FUNCTIONAL
self.m_ReceiveEvent = c_void_p(windll.kernel32.CreateEventA(None, 0,0,None))
result = self.__mObjPCAN_UDS.Initialize_2013(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)
Code: Select all
## Function for reading CAN messages on normal CAN devices
##
def ReadMessage(self):
rx_msg = cantp_msg()
timestamp = TPCANTimestamp()
res = self.__mObjPCAN_ISO_TP_2016.MsgDataAlloc_2016(rx_msg, PCANTP_MSGTYPE_ANY)
result = self.__mObjPCAN_ISO_TP_2016.Read_2016(self.m_PcanHandle,rx_msg,timestamp)
# print("Result : ",result)
if result.value == PCANTP_STATUS_OK.value:
# We show the received message
#
try:
# data = result[1:]
readData = dict()
readData["direction"] = "Rx"
readData["msg"] = self.__Convert(rx_msg)
readData["time"] = timestamp
# print("readData : ",readData)
self.__sdt.AddMessageData(readData)
except Exception as e:
self.PrintException()
return result.value
Code: Select all
def TesterPresent(self):
try:
# self.m_Terminated = True
out_msg_request = uds_msg()
response_count = c_uint32(0)
request = uds_msg()
response = uds_msg()
confirmation = uds_msg()
status = self.__mObjPCAN_UDS.SvcTesterPresent_2013(self.m_PcanHandle, self.__config, request, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_TP_ZSUBF)
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
status = self.__mObjPCAN_UDS.WaitForServiceFunctional_2013(self.m_PcanHandle, request, 1, True, response, response_count,confirmation)
# print(" UDS_SvcTesterPresent_2013: %i" %(status.value))
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
# self.ReadData()
return self.__display_uds_msg("TesterPresent",confirmation, response, False)
else:
# self.ReadData()
return self.__display_uds_msg("TesterPresent",request, None, False)
except:
self.PrintException()
-
- Software Development
- Posts: 305
- Joined: Mon 9. Sep 2013, 12:21
Re: Receive can data continuously and uds service response parallelly using single pcan usb hardware
Can you specify the CAN ID of the CAN frames you are trying to read?
If the CAN ID matches CAN IDs used by UDS then the frame will be consummed by UDS API and you will not be able to fetch it with PCAN-ISO-TP.
If the CAN ID matches CAN IDs used by UDS then the frame will be consummed by UDS API and you will not be able to fetch it with PCAN-ISO-TP.
Best regards,
Fabrice
Fabrice
-
- Posts: 6
- Joined: Fri 21. Apr 2023, 13:04
Re: Receive can data continuously and uds service response parallelly using single pcan usb hardware
HI,
CAN IDs are 0x14 and 0x584.
Below is my code snippet:
CAN IDs are 0x14 and 0x584.
Below is my code snippet:
Code: Select all
from PCAN_UDS_2013 import *
from PCAN_ISO_TP_2016 import *
from PCAN_ISO_TP_2016 import *
import time
import linecache
try:
import win32event
WINDOWS_EVENT_SUPPORT = True
except ImportError:
WINDOWS_EVENT_SUPPORT = False
class PCAN(object):
def __init__(self):
self.m_objPCANBasic = PCANBasic()
self.__mObjPCAN_UDS = PCAN_UDS_2013()
self.__mObjPCAN_ISO_TP_2016 = PCAN_ISO_TP_2016()
self.__config = uds_msgconfig()
self.__config.can_id = -1
self.__config.can_msgtype = PCANTP_CAN_MSGTYPE_STANDARD
self.__config.nai.protocol = PUDS_MSGPROTOCOL_ISO_15765_2_11B_NORMAL
self.__config.nai.target_type = PCANTP_ISOTP_ADDRESSING_FUNCTIONAL
self.__config.type = PUDS_MSGTYPE_USDT
self.__config.nai.source_addr = PUDS_ISO_15765_4_ADDR_TEST_EQUIPMENT
self.__config.nai.target_addr = PUDS_ISO_15765_4_ADDR_OBD_FUNCTIONAL
self.m_BAUDRATES = {'1 MBit/sec':PCAN_BAUD_1M, '800 kBit/sec':PCAN_BAUD_800K, '500 kBit/sec':PCAN_BAUD_500K, '250 kBit/sec':PCAN_BAUD_250K,
'125 kBit/sec':PCAN_BAUD_125K, '100 kBit/sec':PCAN_BAUD_100K, '95,238 kBit/sec':PCAN_BAUD_95K, '83,333 kBit/sec':PCAN_BAUD_83K,
'50 kBit/sec':PCAN_BAUD_50K, '47,619 kBit/sec':PCAN_BAUD_47K, '33,333 kBit/sec':PCAN_BAUD_33K, '20 kBit/sec':PCAN_BAUD_20K,
'10 kBit/sec':PCAN_BAUD_10K, '5 kBit/sec':PCAN_BAUD_5K}
self.m_HWTYPES = {'ISA-82C200':PCAN_TYPE_ISA, 'ISA-SJA1000':PCAN_TYPE_ISA_SJA, 'ISA-PHYTEC':PCAN_TYPE_ISA_PHYTEC, 'DNG-82C200':PCAN_TYPE_DNG,
'DNG-82C200 EPP':PCAN_TYPE_DNG_EPP, 'DNG-SJA1000':PCAN_TYPE_DNG_SJA, 'DNG-SJA1000 EPP':PCAN_TYPE_DNG_SJA_EPP}
self.m_IOPORTS = {'0100':0x100, '0120':0x120, '0140':0x140, '0200':0x200, '0220':0x220, '0240':0x240, '0260':0x260, '0278':0x278,
'0280':0x280, '02A0':0x2A0, '02C0':0x2C0, '02E0':0x2E0, '02E8':0x2E8, '02F8':0x2F8, '0300':0x300, '0320':0x320,
'0340':0x340, '0360':0x360, '0378':0x378, '0380':0x380, '03BC':0x3BC, '03E0':0x3E0, '03E8':0x3E8, '03F8':0x3F8}
self.m_INTERRUPTS = {'3':3, '4':4, '5':5, '7':7, '9':9, '10':10, '11':11, '12':12, '15':15}
#self.__sdt = SDT()
self.__currentTime = None
self.__wakeupStatus = None
self.__connectStatus = None
if WINDOWS_EVENT_SUPPORT:
self.m_ReadThread = None
self.m_Terminated = False
# self.m_ReceiveEvent = win32event.CreateEvent(None, 0, 0, None)
self.m_ReceiveEvent = c_void_p(windll.kernel32.CreateEventA(None, 0,0,None))
@property
def WakeupStatus(self):
return str(self.__wakeupStatus)
@property
def ConnectStatus(self):
return str(self.__connectStatus)
def GetChannels(self):
try:
channels = []
result = self.m_objPCANBasic.GetValue(PCAN_NONEBUS, PCAN_ATTACHED_CHANNELS)
if (result[0] == PCAN_ERROR_OK):
# Include only connectable channels
#
for channel in result[1]:
if (channel.channel_condition & PCAN_CHANNEL_AVAILABLE):
channels.append(self.FormatChannelName(channel.channel_handle, channel.device_features & FEATURE_FD_CAPABLE))
return channels
except Exception as e:
self.PrintException()
def Initialize(self,hardware,channel,baudrate,hardware_type,IO_Port,interrupt):
# startIndex = hardware.index("(") + 1
# strChannel = hardware[startIndex:startIndex+3]
# strChannel = strChannel.replace("h", "")
if hardware.upper() == "PCAN_USB":
if channel == 1:
self.m_PcanHandle = PCAN_USBBUS1.value
elif channel == 2:
self.m_PcanHandle = PCAN_USBBUS2.value
elif channel == 3:
self.m_PcanHandle = PCAN_USBBUS3.value
elif channel == 4:
self.m_PcanHandle = PCAN_USBBUS4.value
elif channel == 5:
self.m_PcanHandle = PCAN_USBBUS5.value
elif channel == 6:
self.m_PcanHandle = PCAN_USBBUS6.value
elif channel == 7:
self.m_PcanHandle = PCAN_USBBUS7.value
elif channel == 8:
self.m_PcanHandle = PCAN_USBBUS8.value
baudrate = self.m_BAUDRATES[baudrate]
hwtype = self.m_HWTYPES[hardware_type]
ioport = int(IO_Port,16)
interrupt = int(self.m_INTERRUPTS[interrupt])
result = self.__mObjPCAN_UDS.Initialize_2013(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)
# result = self.m_objPCANBasic.Initialize(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)
# result = self.__mObjPCAN_ISO_TP_2016.Initialize_2016(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)
if result != PCAN_ERROR_OK:
if result != PCAN_ERROR_CAUTION:
error = self.GetFormatedError(result)
print("Error!", error)
return {"status":"failed","Comment": error}
else:
print('The bitrate being used is different than the given one')
result = PCAN_ERROR_OK
return {"status":"failed","Comment": 'The bitrate being used is different than the given one'}
else:
print("{} has been initialized successfully.".format(hardware))
return {"status":"success","Comment":""}
pass
def UnInitialize(self):
try:
self.m_Terminated = True
# Releases a current connected PCAN-Basic channel
#
self.__mObjPCAN_UDS.Uninitialize_2013(self.m_PcanHandle)
#self.m_objPCANBasic.Uninitialize(self.m_PcanHandle)
except Exception as e:
self.PrintException()
def TesterPresent(self):
try:
# self.m_Terminated = True
out_msg_request = uds_msg()
response_count = c_uint32(0)
request = uds_msg()
response = uds_msg()
confirmation = uds_msg()
status = self.__mObjPCAN_UDS.SvcTesterPresent_2013(self.m_PcanHandle, self.__config, request, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_TP_ZSUBF)
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
status = self.__mObjPCAN_UDS.WaitForServiceFunctional_2013(self.m_PcanHandle, request, 1, True, response, response_count,confirmation)
# print(" UDS_SvcTesterPresent_2013: %i" %(status.value))
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
# self.ReadData()
return self.__display_uds_msg("TesterPresent",confirmation, response, False)
else:
# self.ReadData()
return self.__display_uds_msg("TesterPresent",request, None, False)
except:
self.PrintException()
def ReadDataByIdentifier(self):
try:
data_identifier = (c_uint16 * 2) (self.__mObjPCAN_UDS.PUDS_SVC_PARAM_DI_ADSDID, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_DI_ECUMDDID )
request = uds_msg()
response = uds_msg()
confirmation = uds_msg()
# Sends a physical ReadDataByIdentifier message
# print("\n\nSends a physical ReadDataByIdentifier message: ")
status = self.__mObjPCAN_UDS.SvcReadDataByIdentifier_2013(self.m_PcanHandle, self.__config, request, data_identifier, 2)
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
status = self.__mObjPCAN_UDS.WaitForService_2013(self.m_PcanHandle, request, response, confirmation)
# print(" UDS_SvcReadDataByIdentifier_2013: %i" %(status.value))
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
return self.__display_uds_msg("ReadDataByIdentifier",confirmation, response, False)
else:
return self.__display_uds_msg("ReadDataByIdentifier",request, None, False)
except:
self.PrintException()
def ClearDiagnosticInformation(self):
try:
request = uds_msg()
response = uds_msg()
confirmation = uds_msg()
# Sends a physical ClearDiagnosticInformation message with memory selection parameter (2020)
# print("\n\nSends a physical ClearDiagnosticInformation message: ")
status = self.__mObjPCAN_UDS.SvcClearDiagnosticInformation_2013(self.m_PcanHandle, self.__config, request, 0xF1A2B3)
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
status = self.__mObjPCAN_UDS.WaitForService_2013(self.m_PcanHandle, request, response, confirmation)
# print(" UDS_SvcClearDiagnosticInformation_2013: %i" %(status.value))
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
return self.__display_uds_msg("ClearDiagnosticInformation",confirmation, response, False)
else:
return self.__display_uds_msg("ClearDiagnosticInformation",request, None, False)
except:
self.PrintException()
def ReadDTCInformation(self):
try:
request = uds_msg()
response = uds_msg()
confirmation = uds_msg()
# Sends a physical ReadDTCInformation message
# print("\n\nSends a physical ReadDTCInformation message: ")
status = self.__mObjPCAN_UDS.SvcReadDTCInformation_2013(self.m_PcanHandle, self.__config, request, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_RDTCI_RDTCBSM, 0xF1)
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
status = self.__mObjPCAN_UDS.WaitForService_2013(self.m_PcanHandle, request, response, confirmation)
# print(" UDS_SvcReadDTCInformation_2013: %i" %(status.value))
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
return self.__display_uds_msg("ReadDTCInformation",confirmation, response, False)
else:
return self.__display_uds_msg("ReadDTCInformation",request, None, False)
except:
self.PrintException()
def RoutineControl(self):
try:
routine_control_option_record = create_string_buffer(10)
routine_control_option_record_size = 10
request = uds_msg()
response = uds_msg()
confirmation = uds_msg()
# Sends a physical RoutineControl message
# print("\n\nSends a physical RoutineControl message: ")
for i in range(routine_control_option_record_size):
routine_control_option_record[i] = self.__getCChar(ord('A') + i)
status = self.__mObjPCAN_UDS.SvcRoutineControl_2013(self.m_PcanHandle, self.__config, request, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_RC_RRR, 0xF1A2, routine_control_option_record, routine_control_option_record_size)
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
status = self.__mObjPCAN_UDS.WaitForService_2013(self.m_PcanHandle, request, response, confirmation)
# print(" UDS_SvcRoutineControl_2013: %i" %(status.value))
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
return self.__display_uds_msg("RoutineControl",confirmation, response, False)
else:
return self.__display_uds_msg("RoutineControl",request, None, False)
except:
self.PrintException()
def __display_uds_msg(self,service,request, response, no_response_expected):
"""
A function that displays UDS Request and Response messages (and count error if no response)
parameters:
request: Request message
response: Received response message
no_response_expected: if no response is expected, do not increment error counter
"""
# if request != None and request.msg.msgdata.isotp:
# print("\nUDS request from 0x%04X (to 0x%04X, with extension address 0x%02X) - result: %i - %s" %(
# request.msg.msgdata.isotp.contents.netaddrinfo.source_addr,
# request.msg.msgdata.isotp.contents.netaddrinfo.target_addr,
# request.msg.msgdata.isotp.contents.netaddrinfo.extension_addr,
# request.msg.msgdata.any.contents.netstatus,
# "ERROR !!!" if request.msg.msgdata.any.contents.netstatus != PCANTP_NETSTATUS_OK.value else "OK !"))
# # display data
# s = "\t-> Length: {x1}, Data= ".format(x1=format(request.msg.msgdata.any.contents.length, "d"))
# for i in range(request.msg.msgdata.any.contents.length):
# s += "{x1} ".format(x1=format(request.msg.msgdata.any.contents.data[i], "02X"))
# print(s)
if response != None and response.msg.msgdata.isotp:
print("\nUDS RESPONSE from 0x%04X (to 0x%04X, with extension address 0x%02X) - result: %i - %s" %(
response.msg.msgdata.isotp.contents.netaddrinfo.source_addr, response.msg.msgdata.isotp.contents.netaddrinfo.target_addr,
response.msg.msgdata.isotp.contents.netaddrinfo.extension_addr, response.msg.msgdata.any.contents.netstatus,
("ERROR !!!" if response.msg.msgdata.any.contents.netstatus != PCANTP_NETSTATUS_OK.value else "OK !")))
# display data
# s = "\t-> Length: {x1}, Data= ".format(x1=format(response.msg.msgdata.any.contents.length, "d"))
s = ""
for i in range(response.msg.msgdata.any.contents.length):
s += "{x1} ".format(x1=format(response.msg.msgdata.any.contents.data[i], "02X"))
print(s)
elif not no_response_expected:
s = "ERROR: NO UDS RESPONSE"
return {"Service":service,"Response":s}
def __getCChar(self,c):
"""
Get a character adapted to a C string buffer depending on Python version
"""
return c if sys.version_info.major >= 3 else chr(c)
## Connection Methods
## Help Function used to get an error as text
##
def GetFormatedError(self, error):
# Gets the text using the GetErrorText API function
# If the function success, the translated error is returned. If it fails,
# a text describing the current error is returned.
#
stsReturn = self.m_objPCANBasic.GetErrorText(error, 0)
if stsReturn[0] != PCAN_ERROR_OK:
return "An error occurred. Error-code's text ({0:X}h) couldn't be retrieved".format(error)
else:
return stsReturn[1]
## Function for reading CAN messages on normal CAN devices
##
def ReadMessage(self):
# We execute the "Read" function of the PCANBasic
#
rx_msg = cantp_msg()
timestamp = TPCANTimestamp()
res = self.__mObjPCAN_ISO_TP_2016.MsgDataAlloc_2016(rx_msg, PCANTP_MSGTYPE_ANY)
result = self.__mObjPCAN_ISO_TP_2016.Read_2016(self.m_PcanHandle,rx_msg,timestamp)
# print("Result : ",result)
if result.value == PCANTP_STATUS_OK.value:
# We show the received message
#
try:
# data = result[1:]
readData = dict()
readData["direction"] = "Rx"
readData["msg"] = self.__Convert(rx_msg)
readData["time"] = timestamp
# print("readData : ",readData)
#self.__sdt.AddMessageData(readData)
except Exception as e:
self.PrintException()
return result.value
def ReadMessages(self):
stsResult = PCAN_ERROR_OK
# We read at least one time the queue looking for messages.
# If a message is found, we look again trying to find more.
# If the queue is empty or an error occurr, we get out from
# the dowhile statement.
#
while (not (stsResult & PCAN_ERROR_QRCVEMPTY)):
stsResult = self.ReadMessage()
if stsResult == PCAN_ERROR_ILLOPERATION:
break
## Thread-Function used for reading PCAN-Basic messages
##
def Read(self):
try:
self.m_Terminated = False
# Configures the Receive-Event.
#
stsResult = self.__mObjPCAN_ISO_TP_2016.SetValue_2016(self.m_PcanHandle, PCAN_RECEIVE_EVENT, self.m_ReceiveEvent,sizeof(self.m_ReceiveEvent))
# self.ReadMessages()
if stsResult.value != PCANTP_STATUS_OK.value:
print ("Error: " + self.GetFormatedError(stsResult))
else:
while not self.m_Terminated:
self.ReadMessages()
# if win32event.WaitForSingleObject(self.m_ReceiveEvent, 50) == win32event.WAIT_OBJECT_0:
# Resets the Event-handle configuration
#
self.__mObjPCAN_ISO_TP_2016.SetValue_2016(self.m_PcanHandle, PCAN_RECEIVE_EVENT, self.m_ReceiveEvent,0)
except Exception as e:
self.PrintException()
print ("Error occurred while processing CAN data")
#def stop(self):
# self.__sdt.StopLog()
# self.__sdt.StopTrace()
def ReadData(self):
t1 = threading.Thread(None,self.Read)
t1.start()
def WriteData(self, data):
pass
def Start(self,msgs):
t = threading.Thread(None,self.__startECU,args=(msgs,))
t.start()
def __startECU(self,msgs):
try:
while True:
for msg in msgs:
# print(msg)
self.m_objPCANBasic.Write(self.m_PcanHandle, msg)
except Exception as e:
self.PrintException()
def Wakeup(self,msg,cycleTime,action):
try:
if action.upper() in ["ON","TRUE","START"]:
self.__wakeup = True
t = threading.Thread(target=self.__WakeupThread,args=(msg,cycleTime))
t.start()
else:
self.__wakeup = False
except Exception as e:
self.PrintException()
def __WakeupThread(self,msg,cycleTime):
try:
while self.__wakeup:
try:
res = self.__mObjPCAN_ISO_TP_2016.Write_2016(self.m_PcanHandle, msg)
if res.value == PCANTP_STATUS_OK.value:
self.__wakeupStatus = True
data = dict()
data["direction"] = "Tx"
data["msg"] = self.__Convert(msg)
data["time"] = (time.time())/10
#self.__sdt.AddMessageData(data)
else:
self.__wakeupStatus = False
except Exception as e:
self.PrintException()
self.__wakeupStatus = False
time.sleep(cycleTime/1000)
except Exception as e:
self.PrintException()
def Connect(self,msg,cycleTime,action):
try:
if action.upper() in ["ON","TRUE","START"]:
self.__connect = True
t = threading.Thread(target=self.__ConnectThread,args=(msg,cycleTime))
t.start()
else:
self.__connect = False
except Exception as e:
self.PrintException()
def __ConnectThread(self,msg,cycleTime):
try:
while self.__connect:
try:
res = self.__mObjPCAN_ISO_TP_2016.Write_2016(self.m_PcanHandle, msg)
if res.value == PCANTP_STATUS_OK.value:
self.__connectStatus = True
data = dict()
data["direction"] = "Tx"
data["msg"] = self.__Convert(msg)
data["time"] = (time.time())/10
#self.__sdt.AddMessageData(data)
else:
self.__connectStatus = False
time.sleep(cycleTime/1000)
except Exception as e:
self.PrintException()
self.__connectStatus = False
except Exception as e:
self.PrintException()
def SetRequestMode(self,msg):
try:
res = self.__mObjPCAN_ISO_TP_2016.Write_2016(self.m_PcanHandle, msg)
if res.value == PCANTP_STATUS_OK.value:
data = dict()
data["direction"] = "Tx"
data["msg"] = self.__Convert(msg)
data["time"] = (time.time())/10
# print(data)
#self.__sdt.AddMessageData(data)
return True
else:
return False
except Exception as e:
self.PrintException()
def InitMsg(self,frame_id,length,data=None):
tx_msg = cantp_msg()
try:
if msg is None:
return cantp_msg()
else:
status = self.__mObjPCAN_ISO_TP_2016.MsgDataAlloc_2016(tx_msg,PCANTP_MSGTYPE_CAN)
status = self.__mObjPCAN_ISO_TP_2016.MsgDataInit_2016(tx_msg,frame_id,PCANTP_CAN_MSGTYPE_STANDARD,length,None,None)
if data is not None:
if type(data) is str:
data_bytes = data.split(" ")
elif type(data) is list:
data_bytes = data
for i in range(msg.length):
tx_msg.msgdata.any.contents.data[i] = int(data_bytes[i], 16)
return tx_msg
except Exception as e:
self.PrintException()
return tx_msg
def __Convert(self,cantp_msg):
canMsg = TPCANMsg()
try:
canMsg.ID = cantp_msg.can_info.can_id
canMsg.MSGTYPE = cantp_msg.can_info.can_msgtype
canMsg.LEN = cantp_msg.msgdata.can.contents.length
for i in range(cantp_msg.msgdata.can.contents.length):
canMsg.DATA[i] = cantp_msg.msgdata.can.contents.data[i]
return canMsg
except Exception as e:
self.PrintException()
return canMsg
def PrintException(self):
exc_type, exc_obj, tb = sys.exc_info()
f = tb.tb_frame
lineno = tb.tb_lineno
filename = f.f_code.co_filename
linecache.checkcache(filename)
line = linecache.getline(filename, lineno, f.f_globals)
print('EXCEPTION IN ({}, LINE {} "{}"): {}'.format(filename, lineno, line.strip(), exc_obj))
if __name__ == "__main__":
pcan = PCAN()
pcan.Initialize("PCAN_USB",1,"500 kBit/sec","ISA-82C200","0100","3")
wakeup_msg = pcan.InitMsg(0x14,3)
print(pcan.Wakeup(wakeup_msg,100,"ON"))
connect_msg = pcan.InitMsg(0x584,8)
print(pcan.Connect(connect_msg,1000,"ON"))
pcan.ReadData()
print(pcan.TesterPresent())
print(pcan.ReadDTCInformation())
print(pcan.ReadDataByIdentifier())
print(pcan.RoutineControl())
print(pcan.ClearDiagnosticInformation())
time.sleep(10)
#pcan.stop()
pcan.UnInitialize()
-
- Software Development
- Posts: 305
- Joined: Mon 9. Sep 2013, 12:21
Re: Receive can data continuously and uds service response parallelly using single pcan usb hardware
Hello,
Your code presents copy/paste excerpt of code from various API samples!
For instance you are comparing PCANBasic status with cantp_status variables... this is obviously wrong and leads to bad interpretation.
There is also runtime exceptions where wrong variables are used (check your function with tx_msg and msg)...
Double check your code, the forum is not a place where we debug your code!
Finally your code shows that you want to use UDS and read some non-UDS frames with ISOTP.
This is exactly what is done in sample 10_client_uds_and_isotp: be careful on how the ISOTP event is set (use PCANTP_PARAMETER_RECEIVE_EVENT).
Note: advised Sample 14_client_uds_and_can shows how to use UDS and read any CAN frames (including UDS frames via parameter PCANTP_PARAMETER_KEEP_HIGHER_LAYER_MESSAGES), this does not exactly correspond to what you want.
Your code presents copy/paste excerpt of code from various API samples!
For instance you are comparing PCANBasic status with cantp_status variables... this is obviously wrong and leads to bad interpretation.
There is also runtime exceptions where wrong variables are used (check your function with tx_msg and msg)...
Double check your code, the forum is not a place where we debug your code!
Finally your code shows that you want to use UDS and read some non-UDS frames with ISOTP.
This is exactly what is done in sample 10_client_uds_and_isotp: be careful on how the ISOTP event is set (use PCANTP_PARAMETER_RECEIVE_EVENT).
Note: advised Sample 14_client_uds_and_can shows how to use UDS and read any CAN frames (including UDS frames via parameter PCANTP_PARAMETER_KEEP_HIGHER_LAYER_MESSAGES), this does not exactly correspond to what you want.
Best regards,
Fabrice
Fabrice