Code: Select all
from PCAN_UDS_2013 import *
from PCAN_ISO_TP_2016 import *
from PCAN_ISO_TP_2016 import *
import time
import linecache
try:
import win32event
WINDOWS_EVENT_SUPPORT = True
except ImportError:
WINDOWS_EVENT_SUPPORT = False
class PCAN(object):
def __init__(self):
self.m_objPCANBasic = PCANBasic()
self.__mObjPCAN_UDS = PCAN_UDS_2013()
self.__mObjPCAN_ISO_TP_2016 = PCAN_ISO_TP_2016()
self.__config = uds_msgconfig()
self.__config.can_id = -1
self.__config.can_msgtype = PCANTP_CAN_MSGTYPE_STANDARD
self.__config.nai.protocol = PUDS_MSGPROTOCOL_ISO_15765_2_11B_NORMAL
self.__config.nai.target_type = PCANTP_ISOTP_ADDRESSING_FUNCTIONAL
self.__config.type = PUDS_MSGTYPE_USDT
self.__config.nai.source_addr = PUDS_ISO_15765_4_ADDR_TEST_EQUIPMENT
self.__config.nai.target_addr = PUDS_ISO_15765_4_ADDR_OBD_FUNCTIONAL
self.m_BAUDRATES = {'1 MBit/sec':PCAN_BAUD_1M, '800 kBit/sec':PCAN_BAUD_800K, '500 kBit/sec':PCAN_BAUD_500K, '250 kBit/sec':PCAN_BAUD_250K,
'125 kBit/sec':PCAN_BAUD_125K, '100 kBit/sec':PCAN_BAUD_100K, '95,238 kBit/sec':PCAN_BAUD_95K, '83,333 kBit/sec':PCAN_BAUD_83K,
'50 kBit/sec':PCAN_BAUD_50K, '47,619 kBit/sec':PCAN_BAUD_47K, '33,333 kBit/sec':PCAN_BAUD_33K, '20 kBit/sec':PCAN_BAUD_20K,
'10 kBit/sec':PCAN_BAUD_10K, '5 kBit/sec':PCAN_BAUD_5K}
self.m_HWTYPES = {'ISA-82C200':PCAN_TYPE_ISA, 'ISA-SJA1000':PCAN_TYPE_ISA_SJA, 'ISA-PHYTEC':PCAN_TYPE_ISA_PHYTEC, 'DNG-82C200':PCAN_TYPE_DNG,
'DNG-82C200 EPP':PCAN_TYPE_DNG_EPP, 'DNG-SJA1000':PCAN_TYPE_DNG_SJA, 'DNG-SJA1000 EPP':PCAN_TYPE_DNG_SJA_EPP}
self.m_IOPORTS = {'0100':0x100, '0120':0x120, '0140':0x140, '0200':0x200, '0220':0x220, '0240':0x240, '0260':0x260, '0278':0x278,
'0280':0x280, '02A0':0x2A0, '02C0':0x2C0, '02E0':0x2E0, '02E8':0x2E8, '02F8':0x2F8, '0300':0x300, '0320':0x320,
'0340':0x340, '0360':0x360, '0378':0x378, '0380':0x380, '03BC':0x3BC, '03E0':0x3E0, '03E8':0x3E8, '03F8':0x3F8}
self.m_INTERRUPTS = {'3':3, '4':4, '5':5, '7':7, '9':9, '10':10, '11':11, '12':12, '15':15}
#self.__sdt = SDT()
self.__currentTime = None
self.__wakeupStatus = None
self.__connectStatus = None
if WINDOWS_EVENT_SUPPORT:
self.m_ReadThread = None
self.m_Terminated = False
# self.m_ReceiveEvent = win32event.CreateEvent(None, 0, 0, None)
self.m_ReceiveEvent = c_void_p(windll.kernel32.CreateEventA(None, 0,0,None))
@property
def WakeupStatus(self):
return str(self.__wakeupStatus)
@property
def ConnectStatus(self):
return str(self.__connectStatus)
def GetChannels(self):
try:
channels = []
result = self.m_objPCANBasic.GetValue(PCAN_NONEBUS, PCAN_ATTACHED_CHANNELS)
if (result[0] == PCAN_ERROR_OK):
# Include only connectable channels
#
for channel in result[1]:
if (channel.channel_condition & PCAN_CHANNEL_AVAILABLE):
channels.append(self.FormatChannelName(channel.channel_handle, channel.device_features & FEATURE_FD_CAPABLE))
return channels
except Exception as e:
self.PrintException()
def Initialize(self,hardware,channel,baudrate,hardware_type,IO_Port,interrupt):
# startIndex = hardware.index("(") + 1
# strChannel = hardware[startIndex:startIndex+3]
# strChannel = strChannel.replace("h", "")
if hardware.upper() == "PCAN_USB":
if channel == 1:
self.m_PcanHandle = PCAN_USBBUS1.value
elif channel == 2:
self.m_PcanHandle = PCAN_USBBUS2.value
elif channel == 3:
self.m_PcanHandle = PCAN_USBBUS3.value
elif channel == 4:
self.m_PcanHandle = PCAN_USBBUS4.value
elif channel == 5:
self.m_PcanHandle = PCAN_USBBUS5.value
elif channel == 6:
self.m_PcanHandle = PCAN_USBBUS6.value
elif channel == 7:
self.m_PcanHandle = PCAN_USBBUS7.value
elif channel == 8:
self.m_PcanHandle = PCAN_USBBUS8.value
baudrate = self.m_BAUDRATES[baudrate]
hwtype = self.m_HWTYPES[hardware_type]
ioport = int(IO_Port,16)
interrupt = int(self.m_INTERRUPTS[interrupt])
result = self.__mObjPCAN_UDS.Initialize_2013(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)
# result = self.m_objPCANBasic.Initialize(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)
# result = self.__mObjPCAN_ISO_TP_2016.Initialize_2016(self.m_PcanHandle,baudrate,hwtype,ioport,interrupt)
if result != PCAN_ERROR_OK:
if result != PCAN_ERROR_CAUTION:
error = self.GetFormatedError(result)
print("Error!", error)
return {"status":"failed","Comment": error}
else:
print('The bitrate being used is different than the given one')
result = PCAN_ERROR_OK
return {"status":"failed","Comment": 'The bitrate being used is different than the given one'}
else:
print("{} has been initialized successfully.".format(hardware))
return {"status":"success","Comment":""}
pass
def UnInitialize(self):
try:
self.m_Terminated = True
# Releases a current connected PCAN-Basic channel
#
self.__mObjPCAN_UDS.Uninitialize_2013(self.m_PcanHandle)
#self.m_objPCANBasic.Uninitialize(self.m_PcanHandle)
except Exception as e:
self.PrintException()
def TesterPresent(self):
try:
# self.m_Terminated = True
out_msg_request = uds_msg()
response_count = c_uint32(0)
request = uds_msg()
response = uds_msg()
confirmation = uds_msg()
status = self.__mObjPCAN_UDS.SvcTesterPresent_2013(self.m_PcanHandle, self.__config, request, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_TP_ZSUBF)
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
status = self.__mObjPCAN_UDS.WaitForServiceFunctional_2013(self.m_PcanHandle, request, 1, True, response, response_count,confirmation)
# print(" UDS_SvcTesterPresent_2013: %i" %(status.value))
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
# self.ReadData()
return self.__display_uds_msg("TesterPresent",confirmation, response, False)
else:
# self.ReadData()
return self.__display_uds_msg("TesterPresent",request, None, False)
except:
self.PrintException()
def ReadDataByIdentifier(self):
try:
data_identifier = (c_uint16 * 2) (self.__mObjPCAN_UDS.PUDS_SVC_PARAM_DI_ADSDID, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_DI_ECUMDDID )
request = uds_msg()
response = uds_msg()
confirmation = uds_msg()
# Sends a physical ReadDataByIdentifier message
# print("\n\nSends a physical ReadDataByIdentifier message: ")
status = self.__mObjPCAN_UDS.SvcReadDataByIdentifier_2013(self.m_PcanHandle, self.__config, request, data_identifier, 2)
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
status = self.__mObjPCAN_UDS.WaitForService_2013(self.m_PcanHandle, request, response, confirmation)
# print(" UDS_SvcReadDataByIdentifier_2013: %i" %(status.value))
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
return self.__display_uds_msg("ReadDataByIdentifier",confirmation, response, False)
else:
return self.__display_uds_msg("ReadDataByIdentifier",request, None, False)
except:
self.PrintException()
def ClearDiagnosticInformation(self):
try:
request = uds_msg()
response = uds_msg()
confirmation = uds_msg()
# Sends a physical ClearDiagnosticInformation message with memory selection parameter (2020)
# print("\n\nSends a physical ClearDiagnosticInformation message: ")
status = self.__mObjPCAN_UDS.SvcClearDiagnosticInformation_2013(self.m_PcanHandle, self.__config, request, 0xF1A2B3)
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
status = self.__mObjPCAN_UDS.WaitForService_2013(self.m_PcanHandle, request, response, confirmation)
# print(" UDS_SvcClearDiagnosticInformation_2013: %i" %(status.value))
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
return self.__display_uds_msg("ClearDiagnosticInformation",confirmation, response, False)
else:
return self.__display_uds_msg("ClearDiagnosticInformation",request, None, False)
except:
self.PrintException()
def ReadDTCInformation(self):
try:
request = uds_msg()
response = uds_msg()
confirmation = uds_msg()
# Sends a physical ReadDTCInformation message
# print("\n\nSends a physical ReadDTCInformation message: ")
status = self.__mObjPCAN_UDS.SvcReadDTCInformation_2013(self.m_PcanHandle, self.__config, request, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_RDTCI_RDTCBSM, 0xF1)
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
status = self.__mObjPCAN_UDS.WaitForService_2013(self.m_PcanHandle, request, response, confirmation)
# print(" UDS_SvcReadDTCInformation_2013: %i" %(status.value))
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
return self.__display_uds_msg("ReadDTCInformation",confirmation, response, False)
else:
return self.__display_uds_msg("ReadDTCInformation",request, None, False)
except:
self.PrintException()
def RoutineControl(self):
try:
routine_control_option_record = create_string_buffer(10)
routine_control_option_record_size = 10
request = uds_msg()
response = uds_msg()
confirmation = uds_msg()
# Sends a physical RoutineControl message
# print("\n\nSends a physical RoutineControl message: ")
for i in range(routine_control_option_record_size):
routine_control_option_record[i] = self.__getCChar(ord('A') + i)
status = self.__mObjPCAN_UDS.SvcRoutineControl_2013(self.m_PcanHandle, self.__config, request, self.__mObjPCAN_UDS.PUDS_SVC_PARAM_RC_RRR, 0xF1A2, routine_control_option_record, routine_control_option_record_size)
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
status = self.__mObjPCAN_UDS.WaitForService_2013(self.m_PcanHandle, request, response, confirmation)
# print(" UDS_SvcRoutineControl_2013: %i" %(status.value))
if self.__mObjPCAN_UDS.StatusIsOk_2013(status, PUDS_STATUS_OK, False):
return self.__display_uds_msg("RoutineControl",confirmation, response, False)
else:
return self.__display_uds_msg("RoutineControl",request, None, False)
except:
self.PrintException()
def __display_uds_msg(self,service,request, response, no_response_expected):
"""
A function that displays UDS Request and Response messages (and count error if no response)
parameters:
request: Request message
response: Received response message
no_response_expected: if no response is expected, do not increment error counter
"""
# if request != None and request.msg.msgdata.isotp:
# print("\nUDS request from 0x%04X (to 0x%04X, with extension address 0x%02X) - result: %i - %s" %(
# request.msg.msgdata.isotp.contents.netaddrinfo.source_addr,
# request.msg.msgdata.isotp.contents.netaddrinfo.target_addr,
# request.msg.msgdata.isotp.contents.netaddrinfo.extension_addr,
# request.msg.msgdata.any.contents.netstatus,
# "ERROR !!!" if request.msg.msgdata.any.contents.netstatus != PCANTP_NETSTATUS_OK.value else "OK !"))
# # display data
# s = "\t-> Length: {x1}, Data= ".format(x1=format(request.msg.msgdata.any.contents.length, "d"))
# for i in range(request.msg.msgdata.any.contents.length):
# s += "{x1} ".format(x1=format(request.msg.msgdata.any.contents.data[i], "02X"))
# print(s)
if response != None and response.msg.msgdata.isotp:
print("\nUDS RESPONSE from 0x%04X (to 0x%04X, with extension address 0x%02X) - result: %i - %s" %(
response.msg.msgdata.isotp.contents.netaddrinfo.source_addr, response.msg.msgdata.isotp.contents.netaddrinfo.target_addr,
response.msg.msgdata.isotp.contents.netaddrinfo.extension_addr, response.msg.msgdata.any.contents.netstatus,
("ERROR !!!" if response.msg.msgdata.any.contents.netstatus != PCANTP_NETSTATUS_OK.value else "OK !")))
# display data
# s = "\t-> Length: {x1}, Data= ".format(x1=format(response.msg.msgdata.any.contents.length, "d"))
s = ""
for i in range(response.msg.msgdata.any.contents.length):
s += "{x1} ".format(x1=format(response.msg.msgdata.any.contents.data[i], "02X"))
print(s)
elif not no_response_expected:
s = "ERROR: NO UDS RESPONSE"
return {"Service":service,"Response":s}
def __getCChar(self,c):
"""
Get a character adapted to a C string buffer depending on Python version
"""
return c if sys.version_info.major >= 3 else chr(c)
## Connection Methods
## Help Function used to get an error as text
##
def GetFormatedError(self, error):
# Gets the text using the GetErrorText API function
# If the function success, the translated error is returned. If it fails,
# a text describing the current error is returned.
#
stsReturn = self.m_objPCANBasic.GetErrorText(error, 0)
if stsReturn[0] != PCAN_ERROR_OK:
return "An error occurred. Error-code's text ({0:X}h) couldn't be retrieved".format(error)
else:
return stsReturn[1]
## Function for reading CAN messages on normal CAN devices
##
def ReadMessage(self):
# We execute the "Read" function of the PCANBasic
#
rx_msg = cantp_msg()
timestamp = TPCANTimestamp()
res = self.__mObjPCAN_ISO_TP_2016.MsgDataAlloc_2016(rx_msg, PCANTP_MSGTYPE_ANY)
result = self.__mObjPCAN_ISO_TP_2016.Read_2016(self.m_PcanHandle,rx_msg,timestamp)
# print("Result : ",result)
if result.value == PCANTP_STATUS_OK.value:
# We show the received message
#
try:
# data = result[1:]
readData = dict()
readData["direction"] = "Rx"
readData["msg"] = self.__Convert(rx_msg)
readData["time"] = timestamp
# print("readData : ",readData)
#self.__sdt.AddMessageData(readData)
except Exception as e:
self.PrintException()
return result.value
def ReadMessages(self):
stsResult = PCAN_ERROR_OK
# We read at least one time the queue looking for messages.
# If a message is found, we look again trying to find more.
# If the queue is empty or an error occurr, we get out from
# the dowhile statement.
#
while (not (stsResult & PCAN_ERROR_QRCVEMPTY)):
stsResult = self.ReadMessage()
if stsResult == PCAN_ERROR_ILLOPERATION:
break
## Thread-Function used for reading PCAN-Basic messages
##
def Read(self):
try:
self.m_Terminated = False
# Configures the Receive-Event.
#
stsResult = self.__mObjPCAN_ISO_TP_2016.SetValue_2016(self.m_PcanHandle, PCAN_RECEIVE_EVENT, self.m_ReceiveEvent,sizeof(self.m_ReceiveEvent))
# self.ReadMessages()
if stsResult.value != PCANTP_STATUS_OK.value:
print ("Error: " + self.GetFormatedError(stsResult))
else:
while not self.m_Terminated:
self.ReadMessages()
# if win32event.WaitForSingleObject(self.m_ReceiveEvent, 50) == win32event.WAIT_OBJECT_0:
# Resets the Event-handle configuration
#
self.__mObjPCAN_ISO_TP_2016.SetValue_2016(self.m_PcanHandle, PCAN_RECEIVE_EVENT, self.m_ReceiveEvent,0)
except Exception as e:
self.PrintException()
print ("Error occurred while processing CAN data")
#def stop(self):
# self.__sdt.StopLog()
# self.__sdt.StopTrace()
def ReadData(self):
t1 = threading.Thread(None,self.Read)
t1.start()
def WriteData(self, data):
pass
def Start(self,msgs):
t = threading.Thread(None,self.__startECU,args=(msgs,))
t.start()
def __startECU(self,msgs):
try:
while True:
for msg in msgs:
# print(msg)
self.m_objPCANBasic.Write(self.m_PcanHandle, msg)
except Exception as e:
self.PrintException()
def Wakeup(self,msg,cycleTime,action):
try:
if action.upper() in ["ON","TRUE","START"]:
self.__wakeup = True
t = threading.Thread(target=self.__WakeupThread,args=(msg,cycleTime))
t.start()
else:
self.__wakeup = False
except Exception as e:
self.PrintException()
def __WakeupThread(self,msg,cycleTime):
try:
while self.__wakeup:
try:
res = self.__mObjPCAN_ISO_TP_2016.Write_2016(self.m_PcanHandle, msg)
if res.value == PCANTP_STATUS_OK.value:
self.__wakeupStatus = True
data = dict()
data["direction"] = "Tx"
data["msg"] = self.__Convert(msg)
data["time"] = (time.time())/10
#self.__sdt.AddMessageData(data)
else:
self.__wakeupStatus = False
except Exception as e:
self.PrintException()
self.__wakeupStatus = False
time.sleep(cycleTime/1000)
except Exception as e:
self.PrintException()
def Connect(self,msg,cycleTime,action):
try:
if action.upper() in ["ON","TRUE","START"]:
self.__connect = True
t = threading.Thread(target=self.__ConnectThread,args=(msg,cycleTime))
t.start()
else:
self.__connect = False
except Exception as e:
self.PrintException()
def __ConnectThread(self,msg,cycleTime):
try:
while self.__connect:
try:
res = self.__mObjPCAN_ISO_TP_2016.Write_2016(self.m_PcanHandle, msg)
if res.value == PCANTP_STATUS_OK.value:
self.__connectStatus = True
data = dict()
data["direction"] = "Tx"
data["msg"] = self.__Convert(msg)
data["time"] = (time.time())/10
#self.__sdt.AddMessageData(data)
else:
self.__connectStatus = False
time.sleep(cycleTime/1000)
except Exception as e:
self.PrintException()
self.__connectStatus = False
except Exception as e:
self.PrintException()
def SetRequestMode(self,msg):
try:
res = self.__mObjPCAN_ISO_TP_2016.Write_2016(self.m_PcanHandle, msg)
if res.value == PCANTP_STATUS_OK.value:
data = dict()
data["direction"] = "Tx"
data["msg"] = self.__Convert(msg)
data["time"] = (time.time())/10
# print(data)
#self.__sdt.AddMessageData(data)
return True
else:
return False
except Exception as e:
self.PrintException()
def InitMsg(self,frame_id,length,data=None):
tx_msg = cantp_msg()
try:
if msg is None:
return cantp_msg()
else:
status = self.__mObjPCAN_ISO_TP_2016.MsgDataAlloc_2016(tx_msg,PCANTP_MSGTYPE_CAN)
status = self.__mObjPCAN_ISO_TP_2016.MsgDataInit_2016(tx_msg,frame_id,PCANTP_CAN_MSGTYPE_STANDARD,length,None,None)
if data is not None:
if type(data) is str:
data_bytes = data.split(" ")
elif type(data) is list:
data_bytes = data
for i in range(msg.length):
tx_msg.msgdata.any.contents.data[i] = int(data_bytes[i], 16)
return tx_msg
except Exception as e:
self.PrintException()
return tx_msg
def __Convert(self,cantp_msg):
canMsg = TPCANMsg()
try:
canMsg.ID = cantp_msg.can_info.can_id
canMsg.MSGTYPE = cantp_msg.can_info.can_msgtype
canMsg.LEN = cantp_msg.msgdata.can.contents.length
for i in range(cantp_msg.msgdata.can.contents.length):
canMsg.DATA[i] = cantp_msg.msgdata.can.contents.data[i]
return canMsg
except Exception as e:
self.PrintException()
return canMsg
def PrintException(self):
exc_type, exc_obj, tb = sys.exc_info()
f = tb.tb_frame
lineno = tb.tb_lineno
filename = f.f_code.co_filename
linecache.checkcache(filename)
line = linecache.getline(filename, lineno, f.f_globals)
print('EXCEPTION IN ({}, LINE {} "{}"): {}'.format(filename, lineno, line.strip(), exc_obj))
if __name__ == "__main__":
pcan = PCAN()
pcan.Initialize("PCAN_USB",1,"500 kBit/sec","ISA-82C200","0100","3")
wakeup_msg = pcan.InitMsg(0x14,3)
print(pcan.Wakeup(wakeup_msg,100,"ON"))
connect_msg = pcan.InitMsg(0x584,8)
print(pcan.Connect(connect_msg,1000,"ON"))
pcan.ReadData()
print(pcan.TesterPresent())
print(pcan.ReadDTCInformation())
print(pcan.ReadDataByIdentifier())
print(pcan.RoutineControl())
print(pcan.ClearDiagnosticInformation())
time.sleep(10)
#pcan.stop()
pcan.UnInitialize()