[fluksod] join the fill and truncate operations in a single method
This commit is contained in:
parent
412cbc03e3
commit
6b8e701d26
2 changed files with 20 additions and 33 deletions
|
@ -90,29 +90,28 @@ function filter(M, span, offset)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
function truncate(M, cutoff)
|
function polish(M, cutoff)
|
||||||
for sensor, T in pairs(M) do
|
local now = os.time()
|
||||||
local H = timestamps(T)
|
|
||||||
|
|
||||||
for i = H[1], os.time() - cutoff do
|
|
||||||
T[i] = nil
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
function fill(M)
|
|
||||||
for sensor, T in pairs(M) do
|
for sensor, T in pairs(M) do
|
||||||
local H = timestamps(T)
|
local H = timestamps(T)
|
||||||
|
|
||||||
|
-- fill up the holes first
|
||||||
for i = H[#H]-1, H[1]+1, -1 do
|
for i = H[#H]-1, H[1]+1, -1 do
|
||||||
if T[i] == nil or T[i] == '"nan"' then
|
if T[i] == nil or T[i] == '"nan"' then
|
||||||
T[i] = T[i+1]
|
T[i] = T[i+1]
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
for i = H[#H]+1, os.time() do
|
-- make sure the tail stretches up to 'now'
|
||||||
|
for i = H[#H]+1, now do
|
||||||
T[i] = '"nan"'
|
T[i] = '"nan"'
|
||||||
end
|
end
|
||||||
|
|
||||||
|
-- truncate the head
|
||||||
|
for i = H[1], now - cutoff do
|
||||||
|
T[i] = nil
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -127,7 +126,7 @@ function json_encode(M)
|
||||||
SB[#SB+1] = '[' .. timestamp .. ',' .. T[timestamp] .. '],'
|
SB[#SB+1] = '[' .. timestamp .. ',' .. T[timestamp] .. '],'
|
||||||
end
|
end
|
||||||
|
|
||||||
SB[#SB] = SB[#SB]:sub(1, -2) -- remove the trialing comma from the last entry
|
SB[#SB] = SB[#SB]:sub(1, -2) -- remove the trailing comma from the last entry
|
||||||
SB[#SB+1] = ']'
|
SB[#SB+1] = ']'
|
||||||
J[sensor] = table.concat(SB)
|
J[sensor] = table.concat(SB)
|
||||||
end
|
end
|
||||||
|
|
|
@ -311,30 +311,20 @@ function lan_buffer(child)
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
function polish(child, cutoff)
|
function publish(child)
|
||||||
return coroutine.create(function(measurements)
|
return coroutine.create(function(measurements)
|
||||||
while true do
|
nixio.fs.mkdirr(LAN_PUBLISH_PATH)
|
||||||
measurements:fill()
|
|
||||||
measurements:truncate(cutoff)
|
|
||||||
coroutine.resume(child, measurements)
|
|
||||||
measurements = coroutine.yield()
|
|
||||||
end
|
|
||||||
end)
|
|
||||||
end
|
|
||||||
|
|
||||||
function publish(child, dir)
|
for file in nixio.fs.dir(LAN_PUBLISH_PATH) do
|
||||||
return coroutine.create(function(measurements)
|
|
||||||
nixio.fs.mkdirr(dir)
|
|
||||||
|
|
||||||
for file in nixio.fs.dir(dir) do
|
|
||||||
nixio.fs.unlink(file)
|
nixio.fs.unlink(file)
|
||||||
end
|
end
|
||||||
|
|
||||||
while true do
|
while true do
|
||||||
|
measurements:polish(LAN_POLISH_CUTOFF)
|
||||||
local measurements_json = measurements:json_encode()
|
local measurements_json = measurements:json_encode()
|
||||||
|
|
||||||
for sensor_id, json in pairs(measurements_json) do
|
for sensor_id, json in pairs(measurements_json) do
|
||||||
local file = dir .. '/' .. sensor_id
|
local file = LAN_PUBLISH_PATH .. '/' .. sensor_id
|
||||||
|
|
||||||
nixio.fs.unlink(file)
|
nixio.fs.unlink(file)
|
||||||
fd = nixio.open(file, O_RDWR_CREAT)
|
fd = nixio.open(file, O_RDWR_CREAT)
|
||||||
|
@ -381,11 +371,9 @@ local wan_chain =
|
||||||
|
|
||||||
local lan_chain =
|
local lan_chain =
|
||||||
lan_buffer(
|
lan_buffer(
|
||||||
polish(
|
|
||||||
publish(
|
publish(
|
||||||
debug(nil)
|
debug(nil)
|
||||||
, LAN_PUBLISH_PATH)
|
)
|
||||||
, LAN_POLISH_CUTOFF)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
local chain = dispatch(wan_chain, lan_chain)
|
local chain = dispatch(wan_chain, lan_chain)
|
||||||
|
|
Loading…
Reference in a new issue