diff --git a/openwrt/package/flukso/src/data.lua b/openwrt/package/flukso/src/data.lua index ba8bdb8..2662b29 100644 --- a/openwrt/package/flukso/src/data.lua +++ b/openwrt/package/flukso/src/data.lua @@ -1,6 +1,7 @@ -- -- data.lua: property and methods for manipulating incoming measurements -- Copyright (c) 2009 jokamajo.org +-- 2010 flukso.net -- -- This program is free software; you can redistribute it and/or -- modify it under the terms of the GNU General Public License @@ -45,10 +46,7 @@ end function filter(M, span, offset) for meter, T in pairs(M) do - local H = {} -- helper table, an indexed array containing all the measurement's timestamps - for timestamp in pairs(T) do H[#H+1] = timestamp end - table.sort(H) -- sort in ascending order, oldest timestamps will be treated first - + local H = timestamps(T) local i = 2 while not (H[i+1] == nil or H[i] > os.time()-offset) do if math.floor(H[i-1]/span) == math.floor(H[i]/span) and math.floor(H[i]/span) == math.floor(H[i+1]/span) then @@ -60,3 +58,41 @@ function filter(M, span, offset) end end end + +function truncate(M, cutoff) + for meter, T in pairs(M) do + local H = timestamps(T) + for i = H[1], H[#H]-60 do + T[i] = nil + end + end +end + +function fill(M) + for meter, T in pairs(M) do + local H = timestamps(T) + for i = H[1]+1, H[#H]-1 do + if T[i] == nil then T[i] = T[i-1] end + end + end +end + +function json_encode(M) + J = {} + for meter, T in pairs(M) do + J[meter] = '[' + local H = timestamps(T) + for i = H[1], H[#H] do + J[meter] = J[meter] .. '[' .. T[i] .. ']' + end + J[meter] = J[meter] .. ']' + end + return J +end + +local function timestamps(T) + local H = {} -- helper table, an indexed array containing all the measurement's timestamps + for timestamp in pairs(T) do H[#H+1] = timestamp end + table.sort(H) -- sort in ascending order, oldest timestamps will be treated first + return H +end diff --git a/openwrt/package/flukso/src/flukso.lua b/openwrt/package/flukso/src/flukso.lua index a88c8c6..9f8de77 100755 --- a/openwrt/package/flukso/src/flukso.lua +++ b/openwrt/package/flukso/src/flukso.lua @@ -35,10 +35,12 @@ local param = {xmlrpcaddress = 'http://logger.flukso.net/xmlrpc', pwraddress = '255.255.255.255', pwrport = 26488, pwrenable = false, + pwrinterval = 1, + pwrdir = '/tmp/sensor', device = '/dev/ttyS0', interval = 300} -function receive(child, device, pwraddress, pwrport, pwrenable) +function dispatch(e_child, p_child, device, pwraddress, pwrport, pwrenable) return coroutine.create(function() -- open the connection to the syslog deamon, specifying our identity posix.openlog('flukso') @@ -60,11 +62,15 @@ function receive(child, device, pwraddress, pwrport, pwrenable) os.execute('gpioctl set 4 > /dev/null') local meter, value = line:sub(5, 36), tonumber(line:sub(38)) - coroutine.resume(child, meter, os.time(), value) + coroutine.resume(e_child, meter, os.time(), value) + elseif line:sub(1, 3) == 'pwr' and line:len() == 47 and line:find(':') == 37 then -- user data + additional data integrity checks - if pwrenable then udp:send(line) end + local meter, value = line:sub(5, 36), tonumber(line:sub(38)) + if pwrenable then coroutine.resume(p_child, meter, os.time(), value) end + elseif line:sub(1, 3) == 'msg' then -- control data posix.syslog(31, 'received message from '..device..': '..line:sub(5)) + else posix.syslog(27, 'input error on '..device..': '..line) end @@ -142,24 +148,51 @@ function gc(child) end) end -function debug() +function polish(child, cutoff) return coroutine.create(function(measurements) while true do - dbg.vardump(measurements) + measurements:fill() + measurements:truncate(cutoff) + coroutine.resume(child, measurements) measurements = coroutine.yield() end end) end --- receive: listen to the serial port for incoming pulses +function publish(child, dir) + return coroutine.create(function(measurements) + os.execute('mkdir -p ' .. dir .. ' > /dev/null') + while true do + local measurements_json = measurements:json_encode() + for meter, json in measurements_json do + io.output(dir .. '/' .. meter) + io.write(json) + io.close() + end + coroutine.resume(child, measurements) + measurements = coroutine.yield() + end + end) +end + +function debug(child) + return coroutine.create(function(measurements) + while true do + dbg.vardump(measurements) + if child then coroutine.resume(child, measurements) end + measurements = coroutine.yield() + end + end) +end + +-- dispatch: listen to the serial port for incoming pulses -- buffer: buffer the pulses in a measurement object -- filter: sweep recursively to filter all redundant entries -- send: report the measurements to the server via xmlrpc -- gc: perform a full garbage collection cycle -- debug: dump measurements table to stdout -local chain = receive( - buffer( +local e_chain = buffer( filter( filter( filter( @@ -172,6 +205,15 @@ local chain = receive( , 900, 7200) , 60, 0) , param.interval) - , param.device, param.pwraddress, param.pwrport, param.pwrenable) + +local p_chain = buffer( + polish( + publish( + debug() + , param.pwrdir) + , 60) + , param.pwrinterval) + +local chain = dispatch(e_chain, p_chain, param.device, param.pwraddress, param.pwrport, param.pwrenable) coroutine.resume(chain)