Added methods to receive CAN messages

* Added a receiveMessage method to the PCan class
* The PCan class can hold messages now, needed for updating message data and signal data after reception
* Added a GetData method to the CanMessage class
* Updated and extended the example program
This commit is contained in:
Christian Sltrop 2012-08-07 16:42:08 +02:00
parent 8363ddd5a8
commit 3a693ba35d
7 changed files with 165 additions and 56 deletions

View file

@ -116,6 +116,47 @@ class CanMessage(object):
if tgtBitNo >= 8:
tgtBitNo = 0
tgtByteNo += 1
'''
Takes the CAN message data and decomposes it into its signals.
This method is basically the reversion of the method composeData.
'''
def decomposeData(self):
for sigKey in self.Signals:
sig = self.Signals[sigKey]
tgtBegin = 0 # writing the target data (to the signal) always starts at the first byte
srcBegin = sig.Begin # where the target data is in the message is defined in the Begin field of the CanSignal object. Unit: bits
tgtByteNo = tgtBegin/8 # will be 0 if tgtBegin == 0
tgtBitNo = tgtBegin%8 # will be 0 if tgtBegin == 0
srcByteNo = srcBegin/8 # get the byte and bit numbers
srcBitNo = srcBegin%8 # for the source (the CanMessage object) array
for i in range(0, sig.Length):
# copy the source data bits into the target
if srcByteNo >= self.Length:
sys.exit('Signal data not in message (message too short)')
tmp = (self.Data[srcByteNo] & (1<<tgtBitNo))
if tmp != 0: # the bit shall be set
sig.Data[tgtByteNo] |= (1<<tgtBitNo)
else: # the bit shall be cleared
sig.Data[tgtByteNo] &= ~(1<<tgtBitNo)
# increment the counters
srcBitNo += 1
if srcBitNo >= 8: # on bit counter overflow, increment the byte counter and reset the bit counter
srcBitNo = 0
srcByteNo += 1
tgtBitNo += 1
if tgtBitNo >= 8:
tgtBitNo = 0
tgtByteNo += 1
self.Signals[sigKey] = sig
'''
Calculate the CRC checksum over the whole message.
@ -124,10 +165,9 @@ class CanMessage(object):
If this is true for all checksums is not certain! Has been tested for mMotor_5 and mMotor_6 only!
'''
def calculateCRC(self):
crc = Crc(width=8, poly=0x01, reflect_in = False, reflect_out = False, xor_in = 0xff, xor_out = 0xff)
crc = Crc(width=8, poly=0x01, reflect_in = False, reflect_out = False, xor_in = 0xff, xor_out = 0xff) # configure the crc calculator according to the VW parameters
self.composeData()
data = self.Data.tostring()
my_crc = crc.bit_by_bit_fast(data)
#print ("%x" % my_crc)
return my_crc

Binary file not shown.

View file

@ -1,11 +1,13 @@
# -*- coding: UTF-8 -*-
'''
Created on 06.07.2012
@author: sueltrop
@author: Christian ltrop
'''
import sys
import array
#import sys
#import array
class CanSignal(object):
@ -35,7 +37,7 @@ class CanSignal(object):
def SetData(self, SignalData):
'''
Updated the data of the Signal.
Update the data of the Signal.
The data is given to the Signal as full bytes, but only the bits up to Length may be used!
@param Data: An array of unsigned characters containing the data.
'''
@ -44,4 +46,18 @@ class CanSignal(object):
tmpData = int((SignalData-self.Offset) / self.Scaling)
self.Data = []
for i in range(0, int(self.Length/8)+1):
self.Data.append ( (tmpData >> (8*i) ) & 0xff )
self.Data.append ( (tmpData >> (8*i) ) & 0xff )
def GetData(self):
'''
'''
tmpData = 0
for i in range(0, int(self.Length/8)):
#print 'Data[i]', self.Data[i]
tmpData += ( self.Data[i] << (8*i) )
tmpData = int( (tmpData * self.Scaling) + self.Offset )
#tmpData = int( (tmpData - self.Offset ) / self.Scaling )
#print tmpData
return tmpData

Binary file not shown.

View file

@ -3,7 +3,7 @@
'''
Created on 06.07.2012
@author: Christan Sültrop
@author: Christian Sültrop
'''
import PCANBasic # PCANBasic wrapper library provided by PEAK
@ -24,21 +24,41 @@ class PcanAdapter (object):
def __init__(self, Baudrate):
'''
Constructor.
@param Baudrate: Baudrate from the PcanAdapter.Baudrate dictionary
@param Baudrate: Baud rate from the PcanAdapter. Baud rate dictionary
'''
# Object variables:
# Instance variables:
self.Messages = {}
self.Channel = PCANBasic.PCAN_USBBUS1
self.Pcan = PCANBasic.PCANBasic()
self.Baudrate = Baudrate # Baudrate from PCANBasic
self.Baudrate = Baudrate # Baud rate from PCANBasic
self.isInitialised = False
def __del__(self):
'''
Destructor.
Uninitializes the PCAN adapter.
Uninitialises the PCAN adapter.
'''
print '\nDestructor:'
self.uninitialize()
def addMessage(self, CanMessage):
'''
Add a Message of type CanMessage to the list of messages.
@param CanMessage: The message to add to the list of messages.
'''
self.Messages.update({CanMessage.Label: CanMessage})
def removeMessage(self, CanMessageLabel):
try:
self.Messages.pop(CanMessageLabel)
except:
pass
def clearMessages(self):
self.Messages = {}
def initialize(self):
'''
@ -48,9 +68,9 @@ class PcanAdapter (object):
status = self.Pcan.Initialize(self.Channel, self.Baudrate)
if status != PCANBasic.PCAN_ERROR_OK:
print 'Error: ', self.Pcan.GetErrorText(status, 0)[1]
sys.exit("PCAN initialization error")
sys.exit("PCAN initialisation error")
print("PCAN initialized")
print("PCAN initialised")
self.isInitialised = True
channel, hwId = self.Pcan.GetValue( self.Channel, PCANBasic.PCAN_DEVICE_NUMBER )
@ -64,9 +84,9 @@ class PcanAdapter (object):
status = self.Pcan.Uninitialize(self.Channel)
if status != PCANBasic.PCAN_ERROR_OK:
print 'Error: ', self.Pcan.GetErrorText(status, 0)[1]
sys.exit("PCAN deinitialization error")
sys.exit("PCAN deinitialisation error")
print("PCAN deinitialized")
print("PCAN deinitialised")
def sendMessage(self, Message):
'''
@ -80,7 +100,26 @@ class PcanAdapter (object):
msg.ID = Message.Id # copy the ID
msg.MSGTYPE = PCANBasic.PCAN_MESSAGE_STANDARD # Message type is standard (not extended)
msg.LEN = Message.Length # copy the length
msg.DATA[0:Message.Length] = Message.Data # copy the message data into the PCAN messge object
msg.DATA[0:Message.Length] = Message.Data # copy the message data into the PCAN message object
self.Pcan.Write(self.Channel, msg) # write it onto the Bus
#print ('Message ' + Message.Label + ' written.')
#print ('Message ' + Message.Label + ' written.')
def receiveMessage(self):
'''
Tries to receive a CAN message and puts its data into the according CanMessage object's Data field.
This method should be called frequently.
'''
result, msg, timestamp = self.Pcan.Read(self.Channel)
if result == PCANBasic.PCAN_ERROR_OK:
# loop through the messages and look for one that matches the received message:
for msgKey in self.Messages:
if self.Messages[msgKey].Id == msg.ID:
if msg.LEN != self.Messages[msgKey].Length:
# an error message could be posted at this point
pass
else:
for i in range(0, self.Messages[msgKey].Length):
self.Messages[msgKey].Data[i] = msg.DATA[i]
self.Messages[msgKey].decomposeData()

Binary file not shown.

View file

@ -9,53 +9,67 @@ Example for creating and sending CAN messages via PCAN using the CanMessage, Can
from CanMessage import CanMessage
from CanSignal import CanSignal
from PCan import PcanAdapter
print '\ncreate some messages'
mMotor_1 = CanMessage(0x280, 8, 10, 'mMotor_1')
print 'mMotor_1.Data', mMotor_1.Data
mMotor_2 = CanMessage(0x288, 8, 20, 'mMotor_2')
print 'mMotor_2.Data', mMotor_2.Data
print '\ncreate some Signals'
# for mMotor_1
MO1_Leergas = CanSignal(0, 1, [0x01], 'MO1_Leergas')
MO1_Sta_Pedal = CanSignal(1, 1, [0x01], 'MO1_Sta_Pedal')
MO1_Mo_m_ex = CanSignal(8, 8, [0x00], 'MO1_Mo_m_ex')
value = int(3000*0.25)
MO1_Drehzahl = CanSignal(16, 16, [value>>8, value&0x00ff], 'MO1_Drehzahl') # split into two bytes
mMotor_1.addSignal(MO1_Leergas)
mMotor_1.addSignal(MO1_Sta_Pedal)
mMotor_1.addSignal(MO1_Mo_m_ex)
mMotor_1.addSignal(MO1_Drehzahl)
print MO1_Drehzahl.Data
# for mMotor_2
MO2_Kuehlm_T = CanSignal(8, 8, [int(60*0.75)-48], 'MO2_Kuehlm_T')
mMotor_2.addSignal( MO2_Kuehlm_T )
mMotor_2.addSignal( CanSignal(24, 8, [0], 'MO2_GRA_Soll') )
mMotor_2.addSignal( CanSignal(0, 6, [44], 'MO2_CAN_Vers') )
print '\nManipulate Signals'
# does not work yet! need a way to replace a Signal -> dictionary
MO1_Leergas.SetData( [0x00] )
mMotor_1.addSignal( MO1_Leergas )
import time
print '\nCreate a PCAN adapter'
pcan = PcanAdapter(PcanAdapter.Baudrate['500k'])
pcan.initialize()
print '\ncreate some messages'
mMotor_1 = CanMessage(0x280, 8, 10, 'mMotor_1')
pcan.addMessage(mMotor_1)
print 'mMotor_1.Data', pcan.Messages['mMotor_1'].Data
mMotor_2 = CanMessage(0x288, 8, 20, 'mMotor_2')
pcan.addMessage(mMotor_2)
print 'mMotor_2.Data', pcan.Messages['mMotor_2'].Data
print '\ncreate some Signals'
# for mMotor_1
MO1_Leergas = CanSignal(0, 1, 0, 1, 1, 'MO1_Leergas')
MO1_Sta_Pedal = CanSignal(1, 1, 0, 1, 1, 'MO1_Sta_Pedal')
MO1_Mo_m_ex = CanSignal(8, 8, 0, 1, 0, 'MO1_Mo_m_ex')
value = 3000
MO1_Drehzahl = CanSignal(16, 16, 0, 0.25, value, 'MO1_Drehzahl') # split into two bytes
pcan.Messages['mMotor_1'].addSignal(MO1_Leergas)
pcan.Messages['mMotor_1'].addSignal(MO1_Sta_Pedal)
pcan.Messages['mMotor_1'].addSignal(MO1_Mo_m_ex)
pcan.Messages['mMotor_1'].addSignal(MO1_Drehzahl)
print pcan.Messages['mMotor_1'].Signals['MO1_Drehzahl'].Data
# for mMotor_2
MO2_Kuehlm_T = CanSignal(8, 8, -48, 0.75, 60, 'MO2_Kuehlm_T')
pcan.Messages['mMotor_2'].addSignal( MO2_Kuehlm_T )
pcan.Messages['mMotor_2'].addSignal( CanSignal(24, 8, 0, 1, 0, 'MO2_GRA_Soll') )
pcan.Messages['mMotor_2'].addSignal( CanSignal(0, 6, 0, 1, 44, 'MO2_CAN_Vers') )
print '\nManipulate Signals'
pcan.Messages['mMotor_1'].Signals['MO1_Leergas'].SetData( 0 )
print '\nSend the messages'
mMotor_1.composeData()
pcan.sendMessage(mMotor_1)
pcan.Messages['mMotor_1'].composeData()
print 'mMotor_1.Data', pcan.Messages['mMotor_1'].Data
pcan.sendMessage(pcan.Messages['mMotor_1'])
mMotor_2.composeData()
pcan.sendMessage(mMotor_2)
pcan.Messages['mMotor_2'].composeData()
print 'mMotor_2.Data', pcan.Messages['mMotor_2'].Data
pcan.sendMessage(pcan.Messages['mMotor_2'])
# receive messages, if the signal MO1_Drehzahl has changed, print the new value
while True:
dataOld = pcan.Messages['mMotor_1'].Signals['MO1_Drehzahl'].GetData()
pcan.receiveMessage()
if pcan.Messages['mMotor_1'].Signals['MO1_Drehzahl'].GetData() != dataOld:
print 'mMotor_1.Data', pcan.Messages['mMotor_1'].Data
print 'GetData()', pcan.Messages['mMotor_1'].Signals['MO1_Drehzahl'].GetData()
time.sleep(0.0005)
# wait a while to make sure messages can be sent and end
time.sleep(1)
print 'end of program'