354 lines
11 KiB
Python
354 lines
11 KiB
Python
import paho.mqtt.client as mqtt
|
|
|
|
import time
|
|
import board
|
|
import busio
|
|
|
|
from adafruit_ht16k33 import segments
|
|
from enum import Enum
|
|
|
|
from gpiozero import Button
|
|
|
|
import socket #to get local ip
|
|
|
|
__TOPIC_BRIGHTNESS__ = "brightness"
|
|
__TOPIC_SCROLL_INTERVAL__ = "scrollinterval"
|
|
__TOPIC_TEXT_STATIC__ = "textstatic"
|
|
__TOPIC_TEXT_ONCE__ = "textonce"
|
|
__TOPIC_TEXT_SCROLL__ = "textscroll"
|
|
__TOPIC_TEXT_MARQUEE__ = "textmarquee"
|
|
__TOPIC_TEXT_PAN__ = "textpan"
|
|
|
|
__TOPIC_BUTTON__ = "button"
|
|
|
|
NUMDIGITS = 8
|
|
|
|
class Mode(Enum):
|
|
STATIC = 1 #show text constantly. may cut digits
|
|
ONCE = 3 #show once for some time
|
|
SCROLL = 2 #scroll thorugh once
|
|
PAN = 4 #scroll back and forth
|
|
MARQUEE = 5
|
|
|
|
class Controller:
|
|
connected = False
|
|
displayL = None
|
|
displayR = None
|
|
|
|
basetopic = None
|
|
mqtthost = None
|
|
mqttport = None
|
|
|
|
last_reconnectTry=0
|
|
|
|
|
|
def __init__(self, base_topic, mqtt_host, mqtt_port = 1883):
|
|
|
|
self.scroll_interval = 0.25
|
|
self.text = " "
|
|
self.text_last = ""
|
|
self.poscount = 0
|
|
self.last_scrollupdate = 0
|
|
self.mode = Mode.STATIC
|
|
|
|
self.buttonLeft = Button(23, pull_up=False, hold_time=1) # https://gpiozero.readthedocs.io/en/stable/api_input.html
|
|
self.buttonRight = Button(24, pull_up=False, hold_time=1) # with deactivated pullup input is active high
|
|
|
|
self.buttonLeft.when_released = self.buttonLeft_released
|
|
self.buttonLeft.when_held = self.buttonLeft_held
|
|
self.buttonLeft_heldFlag = False #if true, next released will be ignored
|
|
|
|
self.buttonRight.when_released = self.buttonRight_released
|
|
self.buttonRight.when_held = self.buttonRight_held
|
|
self.buttonRight_heldFlag = False #if true, next released will be ignored
|
|
|
|
|
|
# Create the I2C interface.
|
|
self.i2c = busio.I2C(board.SCL, board.SDA)
|
|
|
|
# Create the LED segment class.
|
|
# This creates a 14 segment 4 character display:
|
|
self.displayL = segments.Seg14x4(self.i2c, address=0x70) # find address with "sudo i2cdetect -y 1" . you need to install: sudo apt-get install -y python-smbus i2c-tools
|
|
self.displayR = segments.Seg14x4(self.i2c, address=0x71)
|
|
|
|
# Clear the display.
|
|
self.displayL.fill(0)
|
|
self.displayR.fill(0)
|
|
|
|
# set brightness, range 0-1.0, 1.0 max brightness
|
|
self.displayL.brightness = 1.0
|
|
self.displayR.brightness = 1.0
|
|
|
|
self.basetopic = base_topic
|
|
self.mqtthost = mqtt_host
|
|
self.mqttport = mqtt_port
|
|
|
|
self.connectToMQTT()
|
|
|
|
def connectToMQTT(self):
|
|
print("trying to connect to mqtt")
|
|
self.mqtt = mqtt.Client()
|
|
self.mqtt.on_connect = self.on_connect
|
|
self.mqtt.on_message = self.on_message
|
|
self.mqtt.on_disconnect = self.on_disconnect
|
|
self.mqtt.connect(self.mqtthost, self.mqttport)
|
|
self.topic = self.basetopic
|
|
|
|
|
|
|
|
|
|
def on_disconnect(self, client, userdata, rc):
|
|
print("MQTT disconnected")
|
|
self.mode = Mode.STATIC
|
|
self.text = "- - - -"
|
|
self.connected = False
|
|
|
|
def on_message(self, client, userdata, message):
|
|
msg = str(message.payload.decode("utf-8"))
|
|
print("msg = " + msg)
|
|
|
|
if message.topic.endswith(__TOPIC_BRIGHTNESS__ + "/set"):
|
|
val = float(msg)
|
|
if val >= 0.0 and val <=1.0:
|
|
self.displayL.brightness = val
|
|
self.displayR.brightness = val
|
|
print("changed brightness to ", self.displayL.brightness)
|
|
self.mqtt.publish(self.topic + "/" + __TOPIC_BRIGHTNESS__, self.displayL.brightness, 1 )
|
|
else:
|
|
print("invalid brightness ", val)
|
|
|
|
if message.topic.endswith(__TOPIC_SCROLL_INTERVAL__ + "/set"):
|
|
val = float(msg)
|
|
if val > 0.0:
|
|
self.scroll_interval = val
|
|
print("changed scroll_interval to ", self.scroll_interval)
|
|
self.mqtt.publish(self.topic + "/" + __TOPIC_SCROLL_INTERVAL__, self.scroll_interval, 1 )
|
|
else:
|
|
print("error not >0.0: ", val)
|
|
|
|
if message.topic.endswith(__TOPIC_TEXT_STATIC__ + "/set"):
|
|
self.mode = Mode.STATIC
|
|
self.text = msg.ljust(NUMDIGITS)
|
|
print("changed text to ", self.text)
|
|
self.mqtt.publish(self.topic + "/" + __TOPIC_TEXT_STATIC__, self.text, 1 )
|
|
|
|
if message.topic.endswith(__TOPIC_TEXT_ONCE__ + "/set"):
|
|
self.mode = Mode.ONCE
|
|
self.text = msg.ljust(NUMDIGITS)
|
|
self.poscount = 0 #use for timing
|
|
print("changed text to ", self.text)
|
|
self.mqtt.publish(self.topic + "/" + __TOPIC_TEXT_ONCE__, self.text, 1 )
|
|
|
|
if message.topic.endswith(__TOPIC_TEXT_MARQUEE__ + "/set"):
|
|
self.mode = Mode.MARQUEE
|
|
self.poscount = 0 #start at beginning of new text
|
|
self.text = msg
|
|
for i in range(NUMDIGITS):
|
|
self.text = " "+self.text+" " #add spaces left and right for marquee to enter and leave the full frame
|
|
print("changed text to ", self.text)
|
|
self.mqtt.publish(self.topic + "/" + __TOPIC_TEXT_MARQUEE__, self.text, 1 )
|
|
|
|
if message.topic.endswith(__TOPIC_TEXT_PAN__ + "/set"):
|
|
self.mode = Mode.PAN
|
|
self.poscount = 0 #start at beginning of new text
|
|
self.text = msg
|
|
for i in range(int(NUMDIGITS/4)):
|
|
self.text = " "+self.text+" " #add spaces left and right for marquee to enter and leave the full frame
|
|
print("changed text to ", self.text)
|
|
self.mqtt.publish(self.topic + "/" + __TOPIC_TEXT_PAN__, self.text, 1 )
|
|
|
|
if message.topic.endswith(__TOPIC_TEXT_SCROLL__ + "/set"):
|
|
self.mode = Mode.SCROLL
|
|
self.poscount = 0 #start at beginning of new text
|
|
self.text = msg
|
|
for i in range(NUMDIGITS):
|
|
self.text = " "+self.text+" " #add spaces left and right for marquee to enter and leave the full frame
|
|
print("changed text to ", self.text)
|
|
self.mqtt.publish(self.topic + "/" + __TOPIC_TEXT_SCROLL__, self.text, 1 )
|
|
|
|
|
|
def on_connect(self, client, userdata, flags, rc):
|
|
print("Connected to MQTT Broker")
|
|
self.mqtt.subscribe(self.topic + "/#")
|
|
|
|
self.connected = True
|
|
|
|
self.mqtt.publish(self.topic + "/" + "$state", "ready", 1 )
|
|
|
|
self.mqtt.publish(self.topic + "/" + "$hostname", socket.gethostname() , 1 )
|
|
self.mqtt.publish(self.topic + "/" + "$localip", self.get_ip() , 1 )
|
|
|
|
#clear display
|
|
self.mode = Mode.STATIC
|
|
self.text = ""
|
|
|
|
|
|
def get_ip(self):
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
try:
|
|
# doesn't even have to be reachable
|
|
s.connect(('10.255.255.255', 1))
|
|
IP = s.getsockname()[0]
|
|
except Exception:
|
|
IP = '127.0.0.1'
|
|
finally:
|
|
s.close()
|
|
return IP
|
|
|
|
def writeSegment(self, _digit, _char, clear=False):
|
|
if _digit>=0 and _digit<4:
|
|
if clear:
|
|
self.displayL[_digit]=' '
|
|
self.displayL[_digit] = _char
|
|
elif _digit<8:
|
|
if clear:
|
|
self.displayR[_digit-4]=' '
|
|
self.displayR[_digit-4] = _char
|
|
|
|
def displayTextOffset(self, _text, _offset = 0):
|
|
# This function is needed because library print functions only work for a single display module. This function handles the '.' character with its special cases.
|
|
|
|
_text+=" " #append NUMDIGITS spaces at end for when scrolling past end
|
|
|
|
|
|
|
|
_text=_text[_offset:] #move by offset
|
|
|
|
i=0 #counter for reading pos
|
|
d=0 #counter for writing segment pos
|
|
while d<NUMDIGITS and i<len(_text):
|
|
|
|
if _text[i]=='.' and _text[max(0,i-1)]=='.': #is dot and was before (or is at start)
|
|
self.writeSegment(d,_text[i],True)
|
|
d+=1
|
|
i+=1
|
|
|
|
else:
|
|
if _text[i]!='.' : #normal char
|
|
self.writeSegment(d,_text[i])
|
|
i+=1
|
|
|
|
if i<len(_text):
|
|
if _text[i]=='.': #dot, but can be appended to last char
|
|
self.writeSegment(d,_text[i])
|
|
i+=1
|
|
|
|
d+=1
|
|
|
|
|
|
def seglen(self, _text): #len function but optimized for 14-segment display
|
|
i=0 #counter for reading pos
|
|
d=0 #counter for writing segment pos
|
|
while i<len(_text):
|
|
|
|
if _text[i]=='.' and _text[max(0,i-1)]=='.': #is dot and was before (or is at start)
|
|
d+=1
|
|
i+=1
|
|
|
|
else:
|
|
if _text[i]!='.' : #normal char
|
|
i+=1
|
|
|
|
if i<len(_text):
|
|
if _text[i]=='.': #dot, but can be appended to last char
|
|
i+=1
|
|
|
|
d+=1
|
|
|
|
return d
|
|
|
|
|
|
def loop_forever(self):
|
|
run = True
|
|
|
|
while run:
|
|
|
|
if not self.connected:
|
|
if (time.time()>self.last_reconnectTry+30):
|
|
connectToMQTT()
|
|
self.last_reconnectTry=time.time()
|
|
self.mqtt.loop(0.1) #with block timeout
|
|
|
|
|
|
|
|
if self.displayL is not None and self.displayR is not None: #displays initialized
|
|
|
|
if self.mode == Mode.STATIC or self.mode == Mode.ONCE:
|
|
if self.text != self.text_last: #time to update animation
|
|
self.displayTextOffset(self.text, 0)
|
|
|
|
if self.mode == Mode.ONCE:
|
|
if time.time() > self.last_scrollupdate+self.scroll_interval:
|
|
if self.poscount>=NUMDIGITS: #time it stays depends on scrollspeed (how long it would take to pass over the frame)
|
|
self.clearAndStop()
|
|
|
|
self.poscount += 1 #used for timing
|
|
self.last_scrollupdate = time.time()
|
|
|
|
if self.mode == Mode.MARQUEE or self.mode == Mode.SCROLL:
|
|
if time.time() > self.last_scrollupdate+self.scroll_interval or self.text != self.text_last: #time to update animation
|
|
self.displayTextOffset(self.text, self.poscount)
|
|
|
|
self.poscount += 1
|
|
if self.mode == Mode.MARQUEE:
|
|
self.poscount %= max(1,int(self.seglen(self.text)-NUMDIGITS+1))
|
|
elif self.mode == Mode.SCROLL:
|
|
if self.poscount >= max(1,int(self.seglen(self.text)-NUMDIGITS+1)): #reached end for scroll once
|
|
self.clearAndStop()
|
|
|
|
self.last_scrollupdate = time.time()
|
|
|
|
|
|
if self.mode == Mode.PAN:
|
|
if time.time() > self.last_scrollupdate+self.scroll_interval or self.text != self.text_last: #time to update animation
|
|
print("pos=", self.poscount)
|
|
scrolloffset = self.poscount
|
|
if scrolloffset >= (self.seglen(self.text)-NUMDIGITS):
|
|
scrolloffset = (self.seglen(self.text)-NUMDIGITS) - (scrolloffset-(self.seglen(self.text)-NUMDIGITS)) #reverse
|
|
scrolloffset = max(0,scrolloffset) #cap at zero
|
|
print("scrolloffset=", scrolloffset)
|
|
|
|
self.displayTextOffset(self.text, scrolloffset)
|
|
|
|
self.poscount += 1
|
|
self.poscount %= (self.seglen(self.text)-NUMDIGITS)*2 #if poscount over scrollable width means scroll backwards
|
|
self.last_scrollupdate = time.time()
|
|
|
|
self.text_last = self.text
|
|
|
|
|
|
def clearAndStop(self):
|
|
self.mode == Mode.STATIC #Stop animation
|
|
self.text=" ".ljust(NUMDIGITS) #empty
|
|
self.displayTextOffset(self.text, 0) #update immediately
|
|
|
|
|
|
def buttonLeft_released(self):
|
|
if self.buttonLeft_heldFlag:
|
|
self.buttonLeft_heldFlag = False
|
|
else:
|
|
print("button left pressed")
|
|
if self.connected:
|
|
self.mqtt.publish(self.topic + "/" + __TOPIC_BUTTON__, "lpress", 1 )
|
|
|
|
|
|
def buttonLeft_held(self):
|
|
print("button left held")
|
|
self.buttonLeft_heldFlag = True
|
|
if self.connected:
|
|
self.mqtt.publish(self.topic + "/" + __TOPIC_BUTTON__, "lhold", 1 )
|
|
|
|
def buttonRight_released(self):
|
|
if self.buttonRight_heldFlag:
|
|
self.buttonRight_heldFlag = False
|
|
else:
|
|
print("button right pressed")
|
|
if self.connected:
|
|
self.mqtt.publish(self.topic + "/" + __TOPIC_BUTTON__, "rpress", 1 )
|
|
|
|
def buttonRight_held(self):
|
|
print("button right held")
|
|
self.buttonRight_heldFlag = True
|
|
if self.connected:
|
|
self.mqtt.publish(self.topic + "/" + __TOPIC_BUTTON__, "rhold", 1 )
|
|
|