pac file erstellt, switch gibt die queues nicht mehr zurück, modbus überarbeitet

This commit is contained in:
Philipp Rauch 2014-02-18 16:42:32 +01:00
parent 6edf4bb90f
commit 3c818b1247
10 changed files with 130 additions and 93 deletions

View file

@ -9,7 +9,7 @@ from Config.parser import config
from datetime import datetime
from flask import abort
### json test ###
### json pac ###
from numpy import timedelta64, float64, int64
from datetime import date
from decimal import Decimal

View file

@ -23,7 +23,7 @@ def CAN_start(conf):
class CANFilter(Thread):
### Lookup f<EFBFBD>r CAN --> DB ###
### Lookup für CAN --> DB ###
lookup = {
'dc_battery' : 'battery'
}
@ -66,7 +66,11 @@ class CANFilter(Thread):
if receiveMessageName in sym.keys():
tmp = {}
for sig in list(sym[receiveMessageName].columns):
tmp[sig] = self.pcan.Messages[receiveMessageName].Signals[sig].GetData()
data = self.pcan.Messages[receiveMessageName].Signals[sig].GetData()
if self.pcan.Messages[receiveMessageName].Signals[sig].enum is not None:
data = self.pcan.Messages[receiveMessageName].Signals[sig].Enum[data]
tmp[sig] = data
sym[receiveMessageName] = sym[receiveMessageName].append(tmp, ignore_index=True)
if len(sym[receiveMessageName].index) == 100:

View file

@ -8,6 +8,7 @@ from datetime import datetime
import time
from profile.datasheet import Datasheet
import struct
import numpy
def setup(conf):
PACS = []
@ -19,8 +20,11 @@ def setup(conf):
### INITIALIZE ###
PACS.append(messages)
for ip in conf['pac_ip']:
PACS.append(ModbusTcpClient(ip))
if isinstance(conf['pac_ip'], list):
for ip in conf['pac_ip']:
PACS.append(ModbusTcpClient(ip))
else:
PACS.append(ModbusTcpClient(conf['pac_ip']))
### CONNECT ###
for PAC in PACS[1::]:
@ -31,7 +35,7 @@ def setup(conf):
def loop(PACS, item):
row = PACS[0][PACS[0]['offset'] == item]
row = PACS[0][PACS[0]['offset'] == item[1]]
inx = row.index.values[0]
#print row.get('value - type')[inx]
@ -50,7 +54,7 @@ def loop(PACS, item):
if func is None:
raise NotImplementedError
res = PACS[2].read_holding_registers(item,reg,unit=1)
res = PACS[item[0]].read_holding_registers(item[1], reg, unit=1)
return { row.get('Value name')[inx] : func(res.registers) }
@ -62,34 +66,34 @@ def _to_float_time(reg):
def _to_long(reg):
tmp = _to_ulong(reg)
s = struct.pack('=i', tmp)
return struct.unpack('=l', s)[0]
return numpy.float(struct.unpack('=l', s)[0])
def _to_double(reg):
tmp = int(reg[0]) << 3*16 | int(reg[1]) << 2*16 | int(reg[2]) << 16 | int(reg[3])
s = struct.pack('=q', tmp)
return struct.unpack('=d', s)[0]
return numpy.float(struct.unpack('=d', s)[0])
def _to_time_abs(reg):
timestamp = _to_ulong(reg)
date = datetime.fromtimestamp(timestamp)
return str(date)
return date
def _to_ulong(reg):
return int(reg[0]) << 16 | int(reg[1])
return numpy.int(reg[0]) << 16 | int(reg[1])
def _to_ushort(reg):
return reg[0]
return numpy.int(reg[0])
def _to_float(reg):
tmp = _to_ulong(reg)
s = struct.pack('=i', tmp)
return struct.unpack('=f', s)[0]
return numpy.float(struct.unpack('=f', s)[0])
def _set_time(PAC):
stamp = time.time()
high = int(int(stamp) >> 16)
low = int(int(stamp) & int(0xFFFF))
PAC.write_registers(797, (high, low))
print PAC.write_registers(797, (high, low, 0, 0), unit=1)
def close(PACS):
### CLOSE ###

View file

@ -12,7 +12,6 @@ from Config.parser import config
#import Module
import database
import modbus
from test import query
MYSQL = 0
MODBUS = 1
@ -30,13 +29,15 @@ class Switch(Thread):
def __init__(self, source = MYSQL):
Thread.__init__(self)
self.source = source
self.query = None
self.queue = None
def initialisiere(self, query = None, queue = None):
'''
initialize the swich with the given source and creates a queue and a query
initialize the swich with the given source and creates a sql_queue and a sql_query
it calls the setup method from the submodule of the source
@return: queue and query
@return: sql_queue and sql_query
'''
if self.source == MYSQL:
self.cursor = database.setup(conf)
@ -55,18 +56,17 @@ class Switch(Thread):
if conf['config_debug']:
print '\tSWITCH-QUEUE:\t', self.queue
return self.queue, self.query
def run(self):
'''
The loop method from the submodule is called with a parameter from the query.
The loop method from the submodule is called with a parameter from the sql_query.
Its output is written to the queue.
'''
while True:
# Queue implementaion
# Queue contains the tablename for query
# item = query.get() #block = True
# query should be included in the result
# Queue contains the tablename for sql_query
# item = sql_query.get() #block = True
# sql_query should be included in the result
item = self.query.get(block = True)

View file

@ -20,6 +20,6 @@ can_baudrate = 250k # default 500k
can_symfile = ems-test.sym
#### PAC ####
pac_ip = 10.2.6.5, 10.2.6.6, 10.2.6.7, 10.2.6.8
pac_ip = 10.2.6.6 # 10.2.6.5, 10.2.6.6, 10.2.6.7, 10.2.6.8
pac_csvfile = pacs.csv # only one file allowed
pac_messages = 1, 801, 3001, 50235, 797, 213
pac_messages = 797, 1, 3, 5, 13, 15, 17, 25, 27, 29

View file

@ -5,7 +5,7 @@ Created on 15.11.2013
@version: 0.02
'''
from threading import Thread
from Connector.switch import Switch, MYSQL
from Connector.switch import Switch, MYSQL, MODBUS
from Config.parser import config
#from API.service import Request
@ -13,30 +13,31 @@ from Config.parser import config
c = config()
conf = c.readConf()
def startSwitch():
def startSwitch(modul):
'''
Create a thread of the switch with the Module MYSQL
Create a thread of the switch with the given Module
@param param modul: modul to load
@return switch: Obejct of the switch
@return queue: Queue object for results
@retunn query: Queue object for querys
@return sql_queue: Queue object for results
@retunn sql_query: Queue object for querys
'''
switch = Switch(MYSQL)
queue, query = switch.initialisiere()
switch = Switch(modul)
switch.initialisiere()
if conf['config_debug']:
print 'SWITCH-Thread:\t', switch
print '\tEMS-QUERY:\t', query
print '\tEMS-QUEUE:\t', queue
print '\tEMS-QUERY:\t', switch.query
print '\tEMS-QUEUE:\t', switch.queue
switch.start()
return switch, queue, query
return switch
def update_device(query, queue, buf):
'''
update all devices witch have tables in the DB
@param query: queue for querys
@param sql_query: queue for querys
@param queue: queue for results
@param buf: Buffer object witch is updated
'''
@ -53,12 +54,13 @@ class ems(Thread):
def __init__(self, buf):
Thread.__init__(self)
self.buffer = buf
self.switch, self.queue, self.query = startSwitch()
self.mysql = startSwitch(MYSQL)
# self.modbus = startSwitch(MODBUS)
if conf['config_debug']:
print '\tEMS-BUFFER:\t', buf
self.query.put('/device')
get = self.buffer.update_buffer(self.queue.get())
self.sql_query.put('/device')
get = self.buffer.update_buffer(self.mysql.queue.get())
#print 'GET:\t', get
def __del__(self):
@ -67,17 +69,19 @@ class ems(Thread):
def run(self):
while True:
update_device(self.query, self.queue, self.buffer)
update_device(self.mysql.query, self.mysql.queue, self.buffer)
''' YOUR CODE HERE'''
def getNewMsg(self, old):
'''
A blocking Method to get a different Message from Queue
@param old: the old message
'''
tmp = self.queue.get()
tmp = self.mysql.queue.get()
while tmp == old:
self.queue.task_done()
tmp = self.queue.get()
self.mysql.queue.task_done()
tmp = self.mysql.queue.get()
return tmp
def getRequest(self):
@ -85,8 +89,8 @@ class ems(Thread):
'''
try:
req = self.buffer.system['request'].pop()
self.query.put(req.content)
self.query.join()
return self.queue.get()
self.mysql.query.put(req.content)
self.mysql.query.join()
return self.mysql.queue.get()
except IndexError:
return False

View file

@ -12,9 +12,6 @@ import os
import numpy
number = 1
#sqlerr = "INSERT INTO %s (DateTime,Type,error) VALUES (\'%s\',%s,%s);"
#sqlnor = "INSERT INTO %s (DateTime,Type,cardnumber,customernumber,plug,meterreading) VALUES (\'%s\',%s,%s,%s,%s,%s);"
folder = 'I:\Rauch\pCharger0%s'
columns = ('DateTime', 'Type', 'cardnumber', 'customernumber', 'plug', 'meterreading', 'error')
@ -28,7 +25,6 @@ def clean(li):
'''
for i in range(0, len(li)):
li[i] = li[i].strip()
#li[i] = li[i].strip('\xef\xbb\xbf')
return li
def makeDatetime(time):
@ -86,7 +82,6 @@ def genData(folder):
continue
#print res_data
#res_data = res_data.set_index('DateTime', verify_integrity = True)
res_data = res_data.replace('', numpy.nan)
res_data = res_data.where(DataFrame.notnull(res_data), 0)
res_data[['cardnumber', 'customernumber', 'plug', 'meterreading', 'error']] = res_data[[ 'cardnumber', 'customernumber', 'plug', 'meterreading', 'error']].astype(numpy.int64)
@ -106,14 +101,10 @@ db.loadDatabase(strHost = conf['mySQL_server'],
big_data = DataFrame(columns = ['DateTime', 'pillarID', 'Type', 'cardnumber', 'customernumber', 'plug', 'meterreading', 'error'])
for i in range(1,8):
number = i
#print genData('I:\Rauch\pCharger%s' % '0{0}'.format(number))
if i == 5:
continue
print 'read pCharger%s' % '0{0}'.format(number)
tmp = genData('I:\Rauch\pCharger%s' % '0{0}'.format(number))
tmp['pillarID'] = i
big_data = big_data.append(tmp, ignore_index = True)
#db.writeDatabase('pCharger%s' % '0{0}'.format(number), big_data, bClear = False)
print 'wrote pCharger%s' % '0{0}'.format(number)
tmp = big_data.sort('DateTime')
tmp = tmp.reset_index(drop = True)
@ -122,14 +113,23 @@ cur = gr.agg('min')['meterreading']
tmp['global'] = sum(list(cur))
gl = cur
print "edit/exdend data"
for i in range(tmp.index[-1]+1):
x = tmp.ix[i]
if cur[x['pillarID']][x['plug']] < x['meterreading']:
cur[x['pillarID']][x['plug']] = x['meterreading']
tmp.loc[i, 'global'] = sum(list(cur))
tmp.loc[i, 'global'] = numpy.int64(sum(list(cur)))
for item in range(len(list(cur))):
col = '%s_%s' % (cur.index.levels[0][cur.index.labels[0][item]], cur.index.levels[1][cur.index.labels[1][item]])
tmp.loc[i, col] = list(cur)[item]
pass
tmp.loc[i, col] = numpy.int64(list(cur)[item])
db.writeDatabase('pCharger', tmp, bClear = False)
for x in range(len(list(cur))):
col = '%s_%s' % (cur.index.levels[0][cur.index.labels[0][x]], cur.index.levels[1][cur.index.labels[1][x]])
tmp = tmp.replace('', numpy.nan)
tmp = tmp.where(DataFrame.notnull(tmp), 0)
tmp[[col]] = tmp[[col]].astype(numpy.int64)
print "write to DB"
db.writeDatabase('pCharger', tmp, bClear = True)
print "done"

58
src/pac.py Normal file
View file

@ -0,0 +1,58 @@
'''
Created on 21.11.2013
@author: Philipp Rauch
'''
from Config.parser import config
from Connector.switch import Switch, MODBUS
from time import sleep
from profile.database import Database
from profile.datasheet import Datasheet
from pandas import DataFrame
import numpy
### LOAD CONFIG ###
c = config()
conf = c.readConf()
### LOAD DATABASE ###
db = Database()
db.loadDatabase(strHost = conf['mySQL_server'],
intPort = int(conf['mySQL_port']),
strUser = conf['mySQL_user'],
strPasswd = conf['mySQL_pass'],
strDatabase = 'ems_testdb', #conf['mySQL_database'],
strTable = None)
### READ CSV ###
csv = '%s/%s' % (conf['config_dictionary'], conf['pac_csvfile'])
pac_csv = Datasheet()
pac_csv.loadDatasheet(strFile = csv)
messages = pac_csv.readDatasheet()
### GENERATE LIST OF COLUMNS ###
col = []
for item in conf['pac_messages']:
row = messages[messages['offset'] == int(item)]
inx = row.index.values[0]
col.append(row.get('Value name')[inx])
### CREATE DATAFRAME ###
tab = DataFrame(columns = col)
### Modbus ###
sw = Switch(MODBUS)
queue, query = sw.initialisiere()
print '\tTEST-QUERY:\t', query
print '\tTEST-QUEUE:\t', queue
sw.start()
while True:
res = {}
for mes in conf['pac_messages']:
query.put([1, int(mes)])
res.update(queue.get())
res_data = tab.append(res, ignore_index=True)
res_data[col[1::]] = res_data[col[1::]].astype(numpy.float) #till now only float support
db.writeDatabase('PAC', res_data, bClear = False)
sleep(1)

View file

@ -11,5 +11,6 @@ from CAN.Filter import CAN_start
c = config()
conf = c.readConf()
CAN_start(conf)
REST_start()

View file

@ -1,34 +0,0 @@
'''
Created on 21.11.2013
@author: Philipp Rauch
'''
# import CANFilter
# import Sym2Lib
from Config.parser import config
from Connector.switch import Switch, MODBUS
from time import sleep
### LOAD CONFIG ###
c = config()
conf = c.readConf()
### Modbus ###
sw = Switch(MODBUS)
queue, query = sw.initialisiere()
print '\tTEST-QUERY:\t', query
print '\tTEST-QUEUE:\t', queue
sw.start()
while True:
for mes in conf['pac_messages']:
query.put(int(mes))
sleep(1)
### Sym2Lib test ###
# sym = "%s/%s" % (conf["config_dictionary"], conf["symfile"])
# print Sym2Lib.get_DataFrameDict(sym)
### CAN test ###
# print 'starte CAN mit Baud von', conf['can_baudrate']
# can = CANFilter.CANFilter()
# can.start()