I'm using PLinApi.py to automate the LIN communication between a Master and a Slave through a board (that work as a gateway application).
In my implementation I read the ldf file and get info about the schedule table which use 2 unconditional frames:
- 0x30, 1 byte length (Publisher on Master side, Subscriber on Slave side)
- 0x33, 6 bytes length (Subscriber on Master side, Publisher on Slave side)
First I'm connecting to the HW as Master, update the Global Table with info about these frames, create and start the schedule table. Then I'm connecting to the HW as Slave and update the Global Table. All good at these steps.
The issue appear on read side where I'm receiving the data on frames with a lot of delay, sometimes with data updated on a previous run of a script. Looks like the queue is not flushed. With the PLin View Pro all works perfect, with load ldf or by creating manually the Global Table/Schedule table (like I did in the script.
I also tried to use the GUI example from the PLIN Api Samples (PLIN_API_Example3.pyw) but there I can read only once with Manual Read, than the GUI stacks.
Can you help me?
Thanks
Here is my script (the Main function will be write in next comment - too many lines here):
Code: Select all
import sys
import time
import ldfparser
from threading import Thread
from ctypes import *
import PLinApi
class PLinApiFunc:
def __init__(self):
self.initialize()
def initialize(self):
"""
Initialize LIN attributes
Parameters:
None
Returns:
None
"""
# API configuration
self.m_objPLinApi = PLinApi.PLinApi()
if (not self.m_objPLinApi.isLoaded()):
raise Exception("PLin-API could not be loaded ! Exiting...")
# configure LIN variables
self.m_hClient = PLinApi.HLINCLIENT(0)
self.m_hHw = PLinApi.HLINHW(0)
self.m_HwMode = PLinApi.TLIN_HARDWAREMODE_NONE
self.m_HwBaudrate = c_ushort(0)
self.FRAME_FILTER_MASK = c_uint64(0xFFFFFFFFFFFFFFFF)
self.m_lMask = self.FRAME_FILTER_MASK
# initialize a dictionnary to get LIN ID from PID
self.PIDs = {}
for i in range(64):
nPID = c_ubyte(i)
self.m_objPLinApi.GetPID(nPID)
self.PIDs[nPID.value] = i
def uninitialize(self):
"""
Uninitialize LIN attributes
Parameters:
None
Returns:
None
"""
# disconnect from LIN if necessary
if (self.m_hClient.value != PLinApi.HLINCLIENT(0).value):
self.do_lin_disconnect()
self.m_hHw = PLinApi.HLINHW(0)
# Unregister the application
self.m_objPLinApi.RemoveClient(self.m_hClient)
self.m_hClient = PLinApi.HLINCLIENT(0)
# *****************************************************************
# LIN functions
# *****************************************************************
def do_lin_connect(self, clientName, hwHandle, hwMode, hwBaudrate):
"""
Connects to a LIN hardware.
Parameters:
clientName : str
Name of the Client (python string)
hwHandle : int
LIN hardware handle (HLINHW) - index
from display_available_connection()
hwMode : str
LIN hardware mode Master or Slave
(see TLIN_HARDWAREMODE_MASTER and TLIN_HARDWAREMODE_SLAVE)
hwBaudrate : int
LIN hardware baudrate (c_ushort)
Returns:
bool
True if connection is successful, False otherwise
"""
# Variables initialization
result = False
hwHandle = PLinApi.HLINHW(int(hwHandle))
if hwMode.upper() == "MASTER":
hwMode = PLinApi.TLIN_HARDWAREMODE_MASTER
elif hwMode.upper() == "SLAVE":
hwMode = PLinApi.TLIN_HARDWAREMODE_SLAVE
else:
print("Connection Error: No Master or Slave hardware mode \
selected.")
hwBaudrate = c_ushort(int(hwBaudrate))
# If a connection to the same hardware channel
# already exits, disconnect this connection first.
if (self.m_hHw.value == hwHandle):
if (not self.doLinDisconnect()):
return result
# register LIN client
print("Connection status for client name <{}>:".format(clientName))
if (self.m_hClient.value == 0):
self.m_objPLinApi.RegisterClient(
clientName, None, self.m_hClient)
else:
self.m_objPLinApi.RegisterClient(
clientName, None, self.m_hClient)
# Try to connect the application client to the hardware with the local
# handle.
linResult = self.m_objPLinApi.ConnectClient(self.m_hClient, hwHandle)
if (linResult == PLinApi.TLIN_ERROR_OK):
# If the connection successfull assign
# the local handle to the member handle.
self.m_hHw = hwHandle
# read hardware's parameter
lnMode = c_int(0)
lnCurrBaud = c_int(0)
linResult = self.m_objPLinApi.GetHardwareParam(
hwHandle, PLinApi.TLIN_HARDWAREPARAM_MODE, lnMode, 0)
linResult = self.m_objPLinApi.GetHardwareParam(
hwHandle, PLinApi.TLIN_HARDWAREPARAM_BAUDRATE, lnCurrBaud, 0)
# check if initialization is required
if (lnMode.value == PLinApi.TLIN_HARDWAREMODE_NONE.value or
lnCurrBaud.value != hwBaudrate.value):
# Only if the current hardware is not initialized
# try to Intialize the hardware with mode and baudrate
linResult = self.m_objPLinApi.InitializeHardware(
self.m_hClient, self.m_hHw, hwMode, hwBaudrate)
if (linResult == PLinApi.TLIN_ERROR_OK):
self.m_HwMode = hwMode
self.m_HwBaudrate = hwBaudrate
# Set the client filter with the mask.
linResult = self.m_objPLinApi.SetClientFilter(
self.m_hClient, self.m_hHw, self.m_lMask)
# Read the frame table from the connected hardware.
self.read_frame_table_from_hw()
# Reset the last LIN error code to default.
linResult = PLinApi.TLIN_ERROR_OK
result = True
else:
# An error occured while initializing hardware.
# Set the member variable to default.
self.m_hHw = PLinApi.HLINHW(0)
result = False
else:
# The local hardware handle is invalid
# and/or an error occurs while connecting
# hardware with client.
# Set the member variable to default.
self.m_hHw = PLinApi.HLINHW(0)
result = False
if (linResult != PLinApi.TLIN_ERROR_OK):
self.display_error(linResult)
return result
def do_lin_disconnect(self):
"""
Disconnects an existing connection to a LIN hardware
Parameters:
None
Returns:
None
"""
# If the application was registered with LIN as client.
if (self.m_hHw.value != 0):
# The client was connected to a LIN hardware.
# Before disconnect from the hardware check
# the connected clients and determine if the
# hardware configuration have to reset or not.
#
# Initialize the locale variables.
lfOtherClient = False
lfOwnClient = False
lhClientsSize = c_ushort(255)
lhClients = (PLinApi.HLINCLIENT * lhClientsSize.value)()
# Get the connected clients from the LIN hardware.
linResult = self.m_objPLinApi.GetHardwareParam(
self.m_hHw, PLinApi.TLIN_HARDWAREPARAM_CONNECTED_CLIENTS,
lhClients, lhClientsSize)
if (linResult == PLinApi.TLIN_ERROR_OK):
# No errors !
# Check all client handles.
for i in range(1, lhClientsSize.value):
# If client handle is invalid
if (lhClients[i] == 0):
continue
# Set the boolean to true if the handle isn't the
# handle of this application.
# Even the boolean is set to true it can never
# set to false.
lfOtherClient = lfOtherClient | (
lhClients[i] != self.m_hClient.value)
# Set the boolean to true if the handle is the
# handle of this application.
# Even the boolean is set to true it can never
# set to false.
lfOwnClient = lfOwnClient | (
lhClients[i] == self.m_hClient.value)
# If another application is also connected to
# the LIN hardware do not reset the configuration.
if not lfOtherClient:
# No other application connected !
# Reset the configuration of the LIN hardware.
linResult = self.m_objPLinApi.ResetHardwareConfig(
self.m_hClient, self.m_hHw)
# If this application is connected to the hardware
# then disconnect the client. Otherwise not.
if lfOwnClient:
# Disconnect if the application was connected to a LIN
# hardware.
linResult = self.m_objPLinApi.DisconnectClient(
self.m_hClient, self.m_hHw)
if (linResult == PLinApi.TLIN_ERROR_OK):
self.m_hHw = PLinApi.HLINHW(0)
return True
else:
# Error while disconnecting from hardware.
self.display_error(linResult)
return False
else:
return True
else:
# m_hHw not connected
return True
def read_frame_table_from_hw(self):
"""
Reads all values from the frame table of the hardware.
Parameters:
None
Returns:
result : list
A global frame List of frame entry retrieved from the hardware
"""
# Initialize result
result = []
# Initialize the member attribute for the
# client mask with the initialized mask.
llMask = self.m_lMask
# Read each Frame Definition
for i in range(64):
# Before a frame entry can be read from the
# hardware, the Frame-ID of the wanted entry
# must be set
lFrameEntry = PLinApi.TLINFrameEntry()
lFrameEntry.FrameId = c_ubyte(i)
lFrameEntry.ChecksumType = PLinApi.TLIN_CHECKSUMTYPE_AUTO
lFrameEntry.Direction = PLinApi.TLIN_DIRECTION_SUBSCRIBER_AUTOLENGTH
# length values is set to LIN 1.2.
if ((i >= 0x00) and (i <= 0x1F)):
lFrameEntry.Length = c_ubyte(2)
elif ((i >= 0x20) and (i <= 0x2F)):
lFrameEntry.Length = c_ubyte(4)
elif ((i >= 0x30) and (i <= 0x3F)):
lFrameEntry.Length = c_ubyte(8)
# Read the information of the specified frame entry from the
# hardware.
linResult = self.m_objPLinApi.GetFrameEntry(
self.m_hHw, lFrameEntry)
# Check the result value of the LinApi function call.
if (linResult == PLinApi.TLIN_ERROR_OK):
result.append(lFrameEntry)
# Update the local application mask according to the frame
# configuration
if (lFrameEntry.Direction != PLinApi.TLIN_DIRECTION_DISABLED.value):
llMask = c_uint64((1 << i) & self.FRAME_FILTER_MASK.value)
self.m_lMask = c_uint64(self.m_lMask.value | llMask.value)
# If the Client and Hardware handles are valid.
if ((self.m_hClient.value != 0) and (self.m_hHw.value != 0)):
# Set the client filter.
self.m_objPLinApi.SetClientFilter(
self.m_hClient, self.m_hHw, self.m_lMask)
return result
def set_global_frame_table(self, list_exception):
"""
Set all values from the frame table of the hardware.
Parameters:
None
Returns:
None
"""
for i in range(64):
if i not in list_exception:
lFrameEntry = PLinApi.TLINFrameEntry()
lFrameEntry.FrameId = c_ubyte(i)
lFrameEntry.ChecksumType = PLinApi.TLIN_CHECKSUMTYPE_ENHANCED
lFrameEntry.Direction = PLinApi.TLIN_DIRECTION_DISABLED
# length values is set to LIN 1.2.
if ((i >= 0x00) and (i <= 0x1F)):
lFrameEntry.Length = c_ubyte(2)
elif ((i >= 0x20) and (i <= 0x2F)):
lFrameEntry.Length = c_ubyte(4)
elif ((i >= 0x30) and (i <= 0x3F)):
lFrameEntry.Length = c_ubyte(8)
linResult = self.m_objPLinApi.SetFrameEntry(
self.m_hClient, self.m_hHw, lFrameEntry)
if (linResult == PLinApi.TLIN_ERROR_OK):
pass
else:
print("Error - Global Frame Table cannot be set \
for id: {}".format(hex(i)))
self.display_error(linResult)
def get_available_hw(self):
"""
Returns a 2D list of available hardware.
Each row describes a LIN hardware with, in order, its handle,
type, device and channel number
Parameters:
None
Returns:
res : list
A list of LIN hardware information, the imbricated list
is composed like so [handle, type, device_number,
channel_number]
"""
# Initialization
res = []
# Get the buffer length needed...
lwCount = c_ushort(0)
availableHWs = (PLinApi.HLINHW * 0)()
linResult = self.m_objPLinApi.GetAvailableHardware(availableHWs,
0, lwCount)
if (lwCount == 0):
# use default value if either no hw is connected or an unexpected
# error occured
lwCount = c_ushort(16);
availableHWs = (PLinApi.HLINHW * lwCount.value)()
lwBuffSize = c_ushort(lwCount.value * 2)
linResult = self.m_objPLinApi.GetAvailableHardware(
availableHWs, lwBuffSize, lwCount)
if (linResult == PLinApi.TLIN_ERROR_OK):
# Get information for each LIN hardware found
lnHwType = c_int(0)
lnDevNo = c_int(0)
lnDevId = c_int(0)
lnChannel = c_int(0)
lnMode = c_int(0)
for i in range(lwCount.value):
lwHw = availableHWs[i]
# Read the type of the hardware with the handle lwHw.
self.m_objPLinApi.GetHardwareParam(
lwHw, PLinApi.TLIN_HARDWAREPARAM_TYPE, lnHwType, 0)
# Read the device number of the hardware with the handle lwHw.
self.m_objPLinApi.GetHardwareParam(
lwHw, PLinApi.TLIN_HARDWAREPARAM_DEVICE_NUMBER, lnDevNo, 0)
# Read the device id of the hardware with the handle lwHw.
self.m_objPLinApi.GetHardwareParam(
lwHw, PLinApi.TLIN_HARDWAREPARAM_ID_NUMBER, lnDevId, 0)
# Read the channel number of the hardware with the handle lwHw.
self.m_objPLinApi.GetHardwareParam(
lwHw, PLinApi.TLIN_HARDWAREPARAM_CHANNEL_NUMBER,
lnChannel, 0)
# Read the mode of the hardware with the handle lwHw (Master,
# Slave or None).
self.m_objPLinApi.GetHardwareParam(
lwHw, PLinApi.TLIN_HARDWAREPARAM_MODE, lnMode, 0)
# translate type value to string
if (lnHwType.value == PLinApi.LIN_HW_TYPE_USB_PRO.value):
strName = "PCAN-USB Pro"
elif (lnHwType.value == PLinApi.LIN_HW_TYPE_USB_PRO_FD.value):
strName = "PCAN-USB Pro FD"
elif (lnHwType.value == PLinApi.LIN_HW_TYPE_PLIN_USB.value):
strName = "PLIN-USB"
else:
strName = "Unknown"
# add information to result list
res.append([strName, lnDevNo.value, lnChannel.value,
lwHw, hex(lnDevId.value & self.m_lMask.value)])
return res
def get_frame_direction_as_string(self, direction):
"""
Returns the string name of a PLinApi.TLINDirection value
Parameters:
direction : int
a PLinApi.TLINDirection value (or a number)
Returns:
str
a string name of the direction value
"""
# check given parameter
if (isinstance(direction, PLinApi.TLINDirection)):
value = direction.value
else:
value = int(direction)
# translate value to string
if (value == PLinApi.TLIN_DIRECTION_DISABLED.value):
return 'Disabled'
elif (value == PLinApi.TLIN_DIRECTION_PUBLISHER.value):
return 'Publisher'
elif (value == PLinApi.TLIN_DIRECTION_SUBSCRIBER.value):
return 'Subscriber'
elif (value == PLinApi.TLIN_DIRECTION_SUBSCRIBER_AUTOLENGTH.value):
return 'Subscriber Automatic Length'
def get_frame_cst_as_string(self, checksumType):
"""
Returns the string name of a PLinApi.TLINChecksumType value
Parameters:
checksumType : int
a PLinApi.TLINChecksumType value (or a number)
Returns:
str
a string name of the checksum type value
"""
# check given parameter
if (isinstance(checksumType, PLinApi.TLINDirection)):
value = checksumType.value
else:
value = int(checksumType)
# translate value to string
if (value == PLinApi.TLIN_CHECKSUMTYPE_AUTO.value):
return 'Auto'
elif (value == PLinApi.TLIN_CHECKSUMTYPE_CLASSIC.value):
return 'Classic'
elif (value == PLinApi.TLIN_CHECKSUMTYPE_CUSTOM.value):
return 'Custom'
elif (value == PLinApi.TLIN_CHECKSUMTYPE_ENHANCED.value):
return 'Enhanced'
def get_slot_type_as_string(self, slotType):
"""
Returns the string name of a PLinApi.TLINSlotType value
Parameters:
slotType : int
a PLinApi.TLINSlotType value (or a number)
Returns:
str
a string name of the slot type value
"""
# check given parameter
if (isinstance(slotType, PLinApi.TLINSlotType)):
value = slotType.value
else:
value = int(slotType)
# translate value to string
if (value == PLinApi.TLIN_SLOTTYPE_UNCONDITIONAL.value):
return 'Unconditional'
elif (value == PLinApi.TLIN_SLOTTYPE_EVENT.value):
return 'Event'
elif (value == PLinApi.TLIN_SLOTTYPE_SPORADIC.value):
return 'Sporadic'
elif (value == PLinApi.TLIN_SLOTTYPE_MASTER_REQUEST.value):
return 'Master Request'
elif (value == PLinApi.TLIN_SLOTTYPE_SLAVE_RESPONSE.value):
return 'Slave Response'
def get_formatted_rcv_msg(self, msg, conn_mode):
"""
Returns a string formatted LIN receive message
Parameters:
msg : LIN message
a Lin receive message (TLINRcvMsg)
conn_mode : str
one of the values: MASTER or SLAVE
Returns:
str
a string formatted LIN message
"""
# Check if the received frame is a standard type.
# If it is not a standard type then ignore it.
if (msg.Type != PLinApi.TLIN_MSGTYPE_STANDARD.value):
if (msg.Type == PLinApi.TLIN_MSGTYPE_BUS_SLEEP.value):
strTemp = 'Bus Sleep status message'
elif (msg.Type == PLinApi.TLIN_MSGTYPE_BUS_WAKEUP.value):
strTemp = 'Bus WakeUp status message'
elif (msg.Type == PLinApi.TLIN_MSGTYPE_AUTOBAUDRATE_TIMEOUT.value):
strTemp = 'Auto-baudrate Timeout status message'
elif (msg.Type == PLinApi.TLIN_MSGTYPE_AUTOBAUDRATE_REPLY.value):
strTemp = 'Auto-baudrate Reply status message'
elif (msg.Type == PLinApi.TLIN_MSGTYPE_OVERRUN.value):
strTemp = 'Bus Overrun status message'
elif (msg.Type == PLinApi.TLIN_MSGTYPE_QUEUE_OVERRUN.value):
strTemp = 'Queue Overrun status message'
else:
strTemp = 'Non standard message'
return strTemp
# format Data field as string
dataStr = ""
for i in range(msg.Length):
if conn_mode.upper() == 'SLAVE' and (self.get_frame_direction_as_string(msg.Direction) == 'Subscriber' or
self.get_frame_direction_as_string(msg.Direction) == 'Subscriber Automatic Length'):
dataStr = str.format("{0}{1} ", dataStr, hex(msg.Data[i] ^ int(0xFF)))
else:
dataStr = str.format("{0}{1} ", dataStr, hex(msg.Data[i]))
# remove ending space
dataStr = dataStr[:-1]
#
# format Error field as string
error = ""
if (msg.ErrorFlags & PLinApi.TLIN_MSGERROR_CHECKSUM):
error = error + 'Checksum,'
if (msg.ErrorFlags & PLinApi.TLIN_MSGERROR_GROUND_SHORT):
error = error + 'GroundShort,'
if (msg.ErrorFlags & PLinApi.TLIN_MSGERROR_ID_PARITY_BIT_0):
error = error + 'IdParityBit0,'
if (msg.ErrorFlags & PLinApi.TLIN_MSGERROR_ID_PARITY_BIT_1):
error = error + 'IdParityBit1,'
if (msg.ErrorFlags & PLinApi.TLIN_MSGERROR_INCONSISTENT_SYNCH):
error = error + 'InconsistentSynch,'
if (msg.ErrorFlags & PLinApi.TLIN_MSGERROR_OTHER_RESPONSE):
error = error + 'OtherResponse,'
if (msg.ErrorFlags & PLinApi.TLIN_MSGERROR_SLAVE_NOT_RESPONDING):
error = error + 'SlaveNotResponding,'
if (msg.ErrorFlags & PLinApi.TLIN_MSGERROR_SLOT_DELAY):
error = error + 'SlotDelay,'
if (msg.ErrorFlags & PLinApi.TLIN_MSGERROR_TIMEOUT):
error = error + 'Timeout,'
if (msg.ErrorFlags & PLinApi.TLIN_MSGERROR_VBAT_SHORT):
error = error + 'VBatShort,'
if (msg.ErrorFlags == 0):
error = 'O.k. '
# remove ending comma
error = error[:-1]
# format message
return (str.format("{0}\t {1}\t{2}\n\t\t\t{3}\t{4}\t{5}\t{6}",
hex(self.PIDs[msg.FrameId]),
msg.Length,
dataStr,
(msg.TimeStamp),
self.get_frame_direction_as_string(msg.Direction),
hex(msg.Checksum),
error
))
def set_frame_entry(self, frameId, direction, checksumType, length):
"""
Updates a frame entry of the global frames table.
Parameters:
frameId : hex
frame ID to update (hex: 0xXX)
direction : str
one of the values: DISABLED, PUBLISHER, SUBSCRIBER,
SUBSCRIBER AUTOLENGTH
checksumType : str
one of the values: CUSTOM, CLASSIC, ENHANCED, AUTO
length : int
length of the frame (int)
Returns:
bool
True if frame successfully updated, False otherwise
"""
# initialize default values
res = True
# initialize and set Frame entry
lFrameEntry = PLinApi.TLINFrameEntry()
lFrameEntry.FrameId = c_ubyte(frameId)
lFrameEntry.Length = c_ubyte(length)
# translate checksum parameter to PLinAPI value
if checksumType.upper() == 'CUSTOM':
checksum_type = PLinApi.TLIN_CHECKSUMTYPE_CUSTOM
elif checksumType.upper() == 'CLASSIC':
checksum_type = PLinApi.TLIN_CHECKSUMTYPE_CLASSIC
elif checksumType.upper() == 'ENHANCED':
checksum_type = PLinApi.TLIN_CHECKSUMTYPE_ENHANCED
elif checksumType.upper() == 'AUTO':
checksum_type = PLinApi.TLIN_CHECKSUMTYPE_AUTO
# translate direction parameter to PLinAPI value
if direction.upper() == 'DISABLED':
lFrameEntry.Direction = PLinApi.TLIN_DIRECTION_DISABLED
elif direction.upper() == 'PUBLISHER':
lFrameEntry.Direction = PLinApi.TLIN_DIRECTION_PUBLISHER
lFrameEntry.ChecksumType = checksum_type
lFrameEntry.Flags = PLinApi.FRAME_FLAG_RESPONSE_ENABLE
elif direction.upper() == 'SUBSCRIBER':
lFrameEntry.Direction = PLinApi.TLIN_DIRECTION_SUBSCRIBER
lFrameEntry.ChecksumType = checksum_type
elif direction.upper() == 'SUBSCRIBER AUTOLENGTH':
lFrameEntry.Direction = PLinApi.TLIN_DIRECTION_SUBSCRIBER_AUTOLENGTH
lFrameEntry.ChecksumType = checksum_type
# set frame entry
linResult = self.m_objPLinApi.SetFrameEntry(
self.m_hClient, self.m_hHw, lFrameEntry)
if (linResult == PLinApi.TLIN_ERROR_OK):
# update filter mask to match the new status of the frame
lMask = c_uint64(1 << frameId)
self.m_lMask = c_uint64(self.m_lMask.value | lMask.value)
linResult = self.m_objPLinApi.SetClientFilter(
self.m_hClient, self.m_hHw, self.m_lMask)
if (linResult != PLinApi.TLIN_ERROR_OK):
self.display_error(linResult)
res = False
print("+ {}\t{}\t{}\t{}".format(
hex(lFrameEntry.FrameId),
self.get_frame_cst_as_string(lFrameEntry.ChecksumType),
self.get_frame_direction_as_string(lFrameEntry.Direction),
lFrameEntry.Length))
return res
def display_global_frames_table(self, showDisabled=False):
"""
Displays LIN Global Frames table
Parameters:
showDisabled : bool
defines if disabled frame should be included (boolean)
Returns:
str
A string table with the Global Frames
"""
# get global frames table
pGFT = self.read_frame_table_from_hw()
# print results
print("\n * Global Frames Table:\n")
print(" ID\tPID\tDirection\t\tLength\tChecksum Type")
print(" ------------------------------------------------------------")
nPID = c_ubyte(0)
# output each frame
for frame in pGFT:
nPID = c_ubyte(frame.FrameId)
self.m_objPLinApi.GetPID(nPID)
# if showDisabled is false, check wether the frame is enabled
if (frame.Direction != PLinApi.TLIN_DIRECTION_DISABLED.value or
(frame.Direction == PLinApi.TLIN_DIRECTION_DISABLED.value
and showDisabled)
):
print(str.format(" {0}\t{1}\t{2}\t\t{3}\t{4}\t",
hex(frame.FrameId),
hex(nPID.value),
self.get_frame_direction_as_string(
frame.Direction),
frame.Length,
self.get_frame_cst_as_string(
frame.ChecksumType)
))
print(" ------------------------------------------------------------\n")
def create_schedule_slots(self, noOfSlots):
'''
Create an array of schedule slots where the slots for Schedule table
will be defined
Parameters:
noOfSlots : int
Number of slots that will be created.
Returns:
scheduleTable : TLINScheduleSlot Array class
A TLINScheduleSlot array class
'''
# Create a list of schedule slots
scheduleList = (PLinApi.TLINScheduleSlot * noOfSlots)
scheduleTable = scheduleList()
return scheduleTable
def set_slot_attributes(self, scheduleArrayObj, frameId,
slot_type, frameDelay, slotIndex):
'''
Update schedule slot with info from the frame that will
run in the schedule table
Parameters:
scheduleArrayObj : obj
Array object created by the 'create_schedule_slots' method.
frameId : hex
The ID of the frame that will be present in the Schedule Table
slot_type : str
Slot Type of the frame. Can take one of the following values:
Unconditional, Event, Sporadic, Master Request, Slave Response
frameDelay : int
The delay of the frame (in ms)
slotIndex : int
Index of the slot from the 'scheduleArrayObj' array
Returns:
None
'''
# Set attributes for each slot
scheduleEntry = PLinApi.TLINScheduleSlot()
scheduleEntry.FrameId[0] = c_ubyte(frameId)
scheduleEntry.Delay = c_ushort(frameDelay)
# translate slot type parameter to PLinAPI value
if slot_type.upper() == 'UNCONDITIONAL':
scheduleEntry.Type = PLinApi.TLIN_SLOTTYPE_UNCONDITIONAL
elif slot_type.upper() == 'EVENT':
scheduleEntry.Type = PLinApi.TLIN_SLOTTYPE_EVENT
scheduleEntry.CountResolve = c_ubyte(0) #TBD
elif slot_type.upper() == 'SPORADIC':
scheduleEntry.Type = PLinApi.TLIN_SLOTTYPE_SPORADIC
scheduleEntry.CountResolve = c_ubyte(0) #TBD
elif slot_type.upper() == 'MASTER REQUEST':
scheduleEntry.Type = PLinApi.TLIN_SLOTTYPE_MASTER_REQUEST
elif slot_type.upper() == 'SLAVE RESPONSE':
scheduleEntry.Type = PLinApi.TLIN_SLOTTYPE_SLAVE_RESPONSE
#scheduleEntry.CountResolve = c_ubyte(0)
scheduleArrayObj[slotIndex] = scheduleEntry
def set_schedule(self, scheduleTableId, scheduleSlotList, slotsNumber):
'''
Set (configure) the slots in the Schedule Table,
defined in 'set_slot_attributes' method
Parameters:
scheduleTableId : int
id (number) of the chedule (int). start with 0
scheduleSlotList : list
the list with the schedule slots set (list)
numberOfSlots : int
defines number of Schedule Slots to be created (int)
Returns:
None
'''
scheduleTableId = c_ubyte(scheduleTableId)
# Call the SetSchedule LIN API method
linResult = self.m_objPLinApi.SetSchedule(
self.m_hClient, self.m_hHw, scheduleTableId, scheduleSlotList,
slotsNumber)
if (linResult == PLinApi.TLIN_ERROR_OK):
print("Set Schedule Table Done")
self.display_error(linResult)
else:
print("Error - Schedule Table not set")
self.display_error(linResult)
def get_schedule(self, scheduleTableId):
'''
Return the Schedule Table if was defined before,
based on the the Schedule Table ID
Parameters:
scheduleTableId : int
id (number) of the chedule (int). start with 0
Returns:
str
A string table with the frames and details of it from the
Schedule Table
'''
schedule_slot_array = (PLinApi.TLINScheduleSlot * 256)
schedule_table = schedule_slot_array()
count = c_int(0)
scheduleTableId = c_ubyte(scheduleTableId)
linResult = self.m_objPLinApi.GetSchedule(
self.m_hHw, scheduleTableId, schedule_table,
c_int(256), count)
if (linResult == PLinApi.TLIN_ERROR_OK):
print("Get Schedule Table - Done")
self.display_error(linResult)
else:
print("Error - Schedule Table cannot be returned")
self.display_error(linResult)
# Print schedule table
frames = self.read_frame_table_from_hw()
print("\n * Schedule Table:\n")
print(" ID\tDelay\tSlot Type\tDirection\tChecksum Type")
print(" ------------------------------------------------------------")
for index in range(count.value):
schedule_entry = schedule_table[index]
for frame in frames:
if hex(frame.FrameId) == hex(schedule_entry.FrameId[0]):
fr_direction = frame.Direction
fr_checksum = frame.ChecksumType
print(" {}\t{}\t{}\t{}\t{}\t".format(
hex(schedule_entry.FrameId[0]),
schedule_entry.Delay,
self.get_slot_type_as_string(schedule_entry.Type),
self.get_frame_direction_as_string(fr_direction),
self.get_frame_cst_as_string(fr_checksum))
)
print(" ---------------------------------------------\
---------------\n")
def start_schedule(self, scheduleTableId):
'''
Start the selected Schedule Table
Parameters:
scheduleTableId : int
id (number) of the chedule (int). start with 0
Returns:
None
'''
scheduleTableId = c_ubyte(scheduleTableId)
# Call the StartSchedule LIN API method
linResult = self.m_objPLinApi.StartSchedule(
self.m_hClient, self.m_hHw, scheduleTableId)
if (linResult == PLinApi.TLIN_ERROR_OK):
print("Schedule Table is running")
self.display_error(linResult)
else:
print("Error - Schedule Table cannot run", linResult)
self.display_error(linResult)
def suspend_schedule(self):
'''
Suspend the Schedule Table run
Parameters:
None
Returns:
None
'''
# Call the SuspendSchedule LIN API method
linResult = self.m_objPLinApi.SuspendSchedule(
self.m_hClient, self.m_hHw)
if (linResult == PLinApi.TLIN_ERROR_OK):
print("Schedule Table run Suspended")
self.display_error(linResult)
else:
print("Error - Schedule Table run canot be Suspended")
self.display_error(linResult)
def resume_schedule(self):
'''
Resume the Schedule Table run
Parameters:
None
Returns:
None
'''
# Call the ResumeSchedule LIN API method
linResult = self.m_objPLinApi.ResumeSchedule(
self.m_hClient, self.m_hHw)
if (linResult == PLinApi.TLIN_ERROR_OK):
print("ResumeSchedule Done")
self.display_error(linResult)
else:
print("ResumeSchedule Error")
self.display_error(linResult)
def delete_schedule(self, scheduleTableId):
'''
Delete the definedSchedule Table by schedule table ID
Parameters:
scheduleTableId : int
id (number) of the chedule (int). start with 0
Returns:
None
'''
scheduleTableId = c_ubyte(scheduleTableId)
# Call the DeleteSchedule LIN API method
linResult = self.m_objPLinApi.DeleteSchedule(
self.m_hClient, self.m_hHw, scheduleTableId)
if (linResult == PLinApi.TLIN_ERROR_OK):
print("Schedule Table Deleted")
self.display_error(linResult)
else:
print("Error - Schedule Table cannot be Deleted")
self.display_error(linResult)
def reset_hw(self):
'''
Flushes the queues of the Hardware and resets its counters.
Parameters:
None
Returns:
None
'''
# Call the ResetHardware LIN API method
linResult = self.m_objPLinApi.ResetHardware(self.m_hClient,
self.m_hHw)
if (linResult == PLinApi.TLIN_ERROR_OK):
print("Reset Hardware Done")
self.display_error(linResult)
else:
print("Error - Hardware cannot be reset")
self.display_error(linResult)
def reset_hw_config(self):
'''
Deletes the current configuration of the Hardware and sets its
defaults.
The Client 'm_hClient' must be registered and connected to the
Hardware to be accessed.
Parameters:
None
Returns:
None
'''
# Call the ResetHardwareConfig LIN API method
linResult = self.m_objPLinApi.ResetHardwareConfig(self.m_hClient,
self.m_hHw)
if (linResult == PLinApi.TLIN_ERROR_OK):
print("Reset Hardware Config Done")
self.display_error(linResult)
else:
print("Error - Hardware Config cannot be reset")
self.display_error(linResult)
def wake_up(self):
'''
Sends a wake-up message impulse (single data byte 0xF0). The Client
'hClient' must be registered and connected to the Hardware to be
accessed.
Remark: Only in Slave-mode. After sending a wake-up impulse a time
of 150 milliseconds is used as timeout.
Parameters:
None
Returns:
None
'''
# Call the XmtWakeUp LIN API method
linResult = self.m_objPLinApi.XmtWakeUp(self.m_hClient, self.m_hHw)
if (linResult == PLinApi.TLIN_ERROR_OK):
print("WakeUp Done")
self.display_error(linResult)
else:
print("Error - WakeUp cannot be send")
self.display_error(linResult)
def wake_up_dynamic(self):
'''
Sends a wake-up message impulse (single data byte 0xF0). The Client
'hClient' must be registered and connected to the Hardware to be
accessed.
Remark: Only in Slave-mode. After sending a wake-up impulse a time
of 150 milliseconds is used as timeout.
Parameters:
None
Returns:
None
'''
# Call the XmtDynamicWakeUp LIN API method
linResult = self.m_objPLinApi.XmtDynamicWakeUp(self.m_hClient,
self.m_hHw, 3)
if (linResult == PLinApi.TLIN_ERROR_OK):
print("Dynamic WakeUp Done")
self.display_error(linResult)
else:
print("Error - Dynamic WakeUp cannot be send")
self.display_error(linResult)
def reset_client(self):
'''
Flushes the Receive Queue of the Client and resets its counters.
Parameters:
None
Returns:
None
'''
# Call the ResetClient LIN API method
linResult = self.m_objPLinApi.ResetClient(self.m_hClient)
if (linResult == PLinApi.TLIN_ERROR_OK):
print("Reset Client Done")
self.display_error(linResult)
else:
print("Error - Client cannot be reseted")
self.display_error(linResult)
def remove_client(self):
'''
Removes a Client from the Client list of the LIN Manager.
Frees all resources (receive queues, message counters, etc.).
Parameters:
None
Returns:
None
'''
# Call the RemoveClient LIN API method
linResult = self.m_objPLinApi.RemoveClient(self.m_hClient)
if (linResult == PLinApi.TLIN_ERROR_OK):
print("Remove Client Done")
self.display_error(linResult)
else:
print("Error - Client cannot be removed")
self.display_error(linResult)
def read_msg(self, no_of_message_to_read, conn_mode):
"""
Read and display the read messages
Parameters:
no_of_message_to_read : int
Number of messages to be readed
Returns:
None
"""
bQuit = False
choice = no_of_message_to_read
# buffer of received message
listMsg = []
while (not bQuit):
# display received messages from previous loop
if (len(listMsg) > 0):
print('\n * Received messages:')
print('\n ID\tLength\tData\tTimestamp\tDirection\tChecksum\tErrors')
print(
' -----------------------------------------------------------')
for msg in listMsg:
print('\n - ' + msg)
# clear messages so that they do not appear on th next loop
listMsg = []
if (choice == 'Q'):
bQuit = True
else:
try:
# read a single message
if (choice == 0 or choice == 1):
# initialize and read a LIN message
pRcvMsg = PLinApi.TLINRcvMsg()
linResult = self.m_objPLinApi.Read(
self.m_hClient, pRcvMsg)
if (linResult == PLinApi.TLIN_ERROR_OK):
# append received message to message list
listMsg.append(self.get_formatted_rcv_msg(pRcvMsg, conn_mode))
elif (linResult == PLinApi.TLIN_ERROR_RCVQUEUE_EMPTY):
self.display_error(linResult, False, 0.5)
else:
self.display_error(linResult)
choice = 'Q'
else:
# read multiple LIN messages
nbRead = int(choice)
pRcvMsgArray = (PLinApi.TLINRcvMsg * nbRead)()
pRcvMsgCount = c_int(0)
linResult = self.m_objPLinApi.ReadMulti(
self.m_hClient, pRcvMsgArray, nbRead, pRcvMsgCount)
if (linResult == PLinApi.TLIN_ERROR_OK or
linResult == PLinApi.TLIN_ERROR_RCVQUEUE_EMPTY):
# Check wether there was no message or not enough
# to fill the array resulting in
# a TLIN_ERROR_RCVQUEUE_EMPTY
if (pRcvMsgCount.value == 0):
self.display_error(linResult, False, 0.5)
else:
# append received messages to message list
for i in range(pRcvMsgCount.value):
listMsg.append(
self.get_formatted_rcv_msg(pRcvMsgArray[i], conn_mode))
else:
self.display_error(linResult)
choice = 'Q'
except:
self.display_notification(text="Error on read message")
choice = 'Q'
print(' -----------------------------------------------------------\n')
def update_frame_data(self, frame_id, data=[]):
"""
Updata data of a frame that run in the Schedule Table
Parameters:
frame_id : hex
ID of the frame which the data will be updated
data : list
A list with the bytes of new data
(the bytes from the list can be in hex or int)
Returns:
None
"""
raw_data = (c_ubyte * len(data))()
for index, byte in enumerate(data):
raw_data[index] = c_ubyte(byte)
linResult = self.m_objPLinApi.UpdateByteArray(
self.m_hClient,
self.m_hHw,
c_ubyte(frame_id),
0,
c_ubyte(len(data)),
raw_data)
if (linResult == PLinApi.TLIN_ERROR_OK):
print("Frame ID {} successfully updated with value {}\n".format(
hex(frame_id), list(map(hex, data))))
else:
self.display_error(linResult)
print("Failed to update frame ID {} value/s.\n".format(hex(frame_id)))
def write_msg(self, frame_id, data=[]):
"""
Write a message for a frame ID from the Global Table
Parameters:
frame_id : hex
ID of the frame which the data will be written
data : list
A list with the bytes of new data
(the bytes from the list can be in hex or int)
Returns:
None
"""
try:
# set frame entry
lFrameEntry = PLinApi.TLINFrameEntry()
lFrameEntry.FrameId = c_ubyte(frame_id)
# get data length from frame
linResult = self.m_objPLinApi.GetFrameEntry(
self.m_hHw, lFrameEntry)
# initialize LIN message to sent
pMsg = PLinApi.TLINMsg()
pMsg.Direction = PLinApi.TLINDirection(
lFrameEntry.Direction)
pMsg.ChecksumType = PLinApi.TLINChecksumType(
lFrameEntry.ChecksumType)
pMsg.Length = c_ubyte(lFrameEntry.Length)
# query and fill data, only if direction is publisher
if (pMsg.Direction == PLinApi.TLIN_DIRECTION_PUBLISHER.value):
for i in range(lFrameEntry.Length):
pMsg.Data[i] = c_ubyte(int(data[i]))
# Check if the hardware is initialized as master
if (self.m_HwMode.value == PLinApi.TLIN_HARDWAREMODE_MASTER.value):
# set frame id to Protected ID
nPID = c_ubyte(frame_id)
linResult = self.m_objPLinApi.GetPID(nPID)
pMsg.FrameId = c_ubyte(nPID.value)
# set checksum
linResult = self.m_objPLinApi.CalculateChecksum(pMsg)
# write LIN message
linResult = self.m_objPLinApi.Write(
self.m_hClient, self.m_hHw, pMsg)
else:
# connected as slave : writing corresponds to updating
# the data from LIN frame
linResult = self.m_objPLinApi.UpdateByteArray(
self.m_hClient, self.m_hHw, c_ubyte(frame_id),
c_ubyte(0), c_ubyte(pMsg.Length), pMsg.Data)
if (linResult == PLinApi.TLIN_ERROR_OK):
self.display_notification(
"Message successfully written for frame ID {}".format(
hex(frame_id)))
else:
self.display_error(linResult)
self.display_notification("Failed to write message for \
frame ID {}".format(hex(frame_id)))
except:
# catch all exception (LIN, bad input)
self.display_notification()
def display_notification(self, text="** Error **", waitSeconds=0.5):
"""
Prints a notification
Parameters:
text : str
custom text to display to the user
waitSeconds : float
number of seconds to wait after displaying notification
Returns:
None
"""
print(str.format("\t{0}", text))
time.sleep(waitSeconds)
def bus_status(self):
"""
Print the current bus status
Parameters:
None
Returns:
None
"""
# Retrieves the status of the LIN Bus and outputs its state
pStatus = PLinApi.TLINHardwareStatus()
linResult = self.m_objPLinApi.GetStatus(self.m_hHw, pStatus)
if (linResult == PLinApi.TLIN_ERROR_OK):
# output bus status
if (pStatus.Status == PLinApi.TLIN_HARDWARESTATE_ACTIVE.value):
self.display_notification('Bus: Active')
elif (pStatus.Status == PLinApi.TLIN_HARDWARESTATE_AUTOBAUDRATE.value):
self.display_notification('Hardware: Baudrate Detection')
elif (pStatus.Status == PLinApi.TLIN_HARDWARESTATE_NOT_INITIALIZED.value):
self.display_notification('Hardware: Not Initialized')
elif (pStatus.Status == PLinApi.TLIN_HARDWARESTATE_SHORT_GROUND.value):
self.display_notification('Bus-Line: Shorted Ground')
elif (pStatus.Status == PLinApi.TLIN_HARDWARESTATE_SLEEP.value):
self.display_notification('Bus: Sleep')
else:
self.display_error(linResult)
def display_available_connection(self):
"""
Displays a list of available LIN hardwares.
Parameters:
None
Returns:
str
A list of available LIN hardwares.
"""
# retrieve a list of available hardware
hWInfoList = self.get_available_hw()
# display result
print(" List of available LIN hardware:")
if (len(hWInfoList) == 0):
print("\t<No hardware found>\n")
else:
# for each hardware display it's type, device and channel
for hWInfo in hWInfoList:
if (self.m_hHw.value == hWInfo[3]):
if (self.m_HwMode.value == PLinApi.TLIN_HARDWAREMODE_MASTER.value):
hwType = "master"
elif (self.m_HwMode.value == PLinApi.TLIN_HARDWAREMODE_SLAVE.value):
hwType = "slave"
# add information if the application is connecter to the
# hardware
isConnected = str.format("(connected as {0}, {1})",
hwType, self.m_HwBaudrate.value)
else:
isConnected = ""
print(str.format('\t{3}) {0} - dev. {1}, chan. {2} {4}',
hWInfo[0], hWInfo[1],
hWInfo[2], hWInfo[3], isConnected))
print("\n")
return hWInfoList
def display_error(self, linError, waitTime=0):
"""
Prints a LIN error message from the LIN API
Parameters:
linError : obj
a LIN Error (TLinError)
Returns:
None
"""
# get error string from code
pTextBuff = create_string_buffer(255)
linResult = self.m_objPLinApi.GetErrorText(
linError, 0x09, pTextBuff, 255)
# display error message
if (linResult == PLinApi.TLIN_ERROR_OK and len(pTextBuff.value) != 0):
self.display_notification(
str.format("* Error * - {0} \n", bytes.decode(
pTextBuff.value)), waitTime)
else:
self.display_notification(
str.format("* Error * - code={0}\n", linError), waitTime)
def connect_client(self, linHwDeviceId, connectionName,
hwConnectionMode, hwBaudrate=19200):
'''
Initiate a connection, Master or Slave, on the given hw ID
Parameters:
linHwDeviceId : str
The LIN hw ID set in the PEAK Settings.
connectionName : str
A name given to the connection.
hwConnectionMode : str
The hw connection mode. Can be only 'MASTER' or 'SLAVE'
hwBaudrate : int
Hw baudrate. Default is 19200
Returns:
bool
True if the connection succeed, False otherwise
'''
hwIndex = 0
# Get and display LIN hardware available connections - device
# and channel
lin_hw_available = self.display_available_connection()
# Identify LIN hw for the connection based on the given device ID
for hw in lin_hw_available:
if linHwDeviceId in str(hw[4]):
hwIndex = hw[3]
if hwIndex == 0:
print("The HW ID provided is not pressent in the available \
hw list.")
# Initiate the connection
try:
print("--> Init {} node conection on LIN hardware index {}".format(
hwConnectionMode, hwIndex))
if (self.do_lin_connect(connectionName, hwIndex, hwConnectionMode,
hwBaudrate)):
print("\tConnection Succesfull.\n")
self.display_available_connection()
result = True
else:
print("\tConnection Failed.")
result = False
except:
print("Error - Connection cannot be initiated.")
result = False
return result
def parse_ldf(self, ldf_file_path, hwConnectionMode):
'''
Parse the ldf file and return the requested info from it
Parameters:
ldf_file_path : file path
Path to the ldf file.
hwConnectionMode : str
The hw connection mode. Can be only 'MASTER' or 'SLAVE'
Returns:
frame_info : list of dictionaries
A list of dictionaries that contain info about each frame
from the Schedule Table, such as: id, boudrate, frame
direction, frame length, delay, init data
'''
frame_info = []
# Load the LDF file
ldf_file = ldfparser.parse_ldf(ldf_file_path)
# Get the node name from the hw connection mode
if hwConnectionMode.upper() == 'MASTER':
node_name = ldf_file.get_master().name
elif hwConnectionMode.upper() == 'SLAVE':
for i in ldf_file.get_slaves():
node_name = i.name
# Get each schedule table from Schedule Tables
for table in ldf_file.get_schedule_tables():
# For each schedule table from Schedule Tables, get the containing frames and the delay in ms
for entry in ldf_file.get_schedule_table(table.name).schedule:
frame_dict = {}
conn_baudrate = ldf_file.get_baudrate()
frame_name = entry.frame.name
frame_delay = entry.delay * 1000
# Based on the name of the frames from Schedule Table,
# get a list of frames from UNCONDITIONAL FRAME DEFINITIONS
# with the ID in hex and name
for uncond_frame in ldf_file.get_unconditional_frames():
if uncond_frame.name == frame_name:
# for each frame, get the signals with the start bit, length and inital values
init_val_list = []
for signal in uncond_frame.signal_map:
signal_value = signal[1].init_value
if isinstance(signal_value, int):
init_val_list.append(signal_value)
else:
init_val_list += signal_value
# get the direction of the signal
if node_name == ldf_file.get_unconditional_frame(uncond_frame.name).publisher.name:
direction = 'PUBLISHER'
else:
direction = 'SUBSCRIBER'
# add info in the dictionary
frame_dict['id'] = uncond_frame.frame_id
frame_dict['baudrate'] = conn_baudrate
frame_dict['frDirection'] = direction
frame_dict['frLengthBytes'] = uncond_frame.length
frame_dict['delay'] = int(frame_delay)
frame_dict['initData'] = init_val_list
# add dictionary in the list
frame_info.append(frame_dict)
return frame_info
def get_time(self):
hw_time=c_uint64(0)
linResult = self.m_objPLinApi.GetTargetTime(self.m_hHw, hw_time)
if (linResult == PLinApi.TLIN_ERROR_OK):
print(hw_time.value)
return hw_time.value
else:
print("Error on GetTime")
self.display_error(linResult)
return None