160 lines
5.8 KiB
Python
160 lines
5.8 KiB
Python
|
#!/usr/bin/env python3
|
||
|
# -*- coding: UTF-8 -*-
|
||
|
import _thread
|
||
|
import socket
|
||
|
import pygame
|
||
|
import math
|
||
|
import random
|
||
|
import numpy as np
|
||
|
from pygame.locals import *
|
||
|
import time
|
||
|
|
||
|
WIDTH=160
|
||
|
HEIGHT=24*6
|
||
|
PIXELSIZE=4 #For Simulator Display
|
||
|
UDPPORT=2323
|
||
|
DEBUG=True
|
||
|
BITSPERPIXEL=2
|
||
|
|
||
|
|
||
|
class FlipdotSim():
|
||
|
def __init__(self,
|
||
|
imageSize = (160,48), #80,16
|
||
|
pixelSize = 10,
|
||
|
udpPort = 2323,
|
||
|
bitsperpixel=1):
|
||
|
self.imageSize=imageSize
|
||
|
self.udpPort = udpPort
|
||
|
self.bitsperpixel=bitsperpixel
|
||
|
self.flipdotMatrixSimulatorWidget = FlipdotMatrixSimulatorWidget(imageSize, pixelSize, bitsperpixel) #comment out if no simulator needed
|
||
|
self.udpHostSocket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
|
||
|
self.udpHostSocket.bind(("", self.udpPort))
|
||
|
|
||
|
self.timesincelastpacket=time.time()
|
||
|
|
||
|
def run(self):
|
||
|
self.RunServer()
|
||
|
|
||
|
def bitlistToInt(self,bitlist): #lsb last, [1,0]=2, [0,1]=1
|
||
|
number=0
|
||
|
for bitindex,bitvalue in enumerate(reversed(bitlist)):
|
||
|
number+=bitvalue*pow(2,bitindex)
|
||
|
return number
|
||
|
|
||
|
def RunServer(self):
|
||
|
try:
|
||
|
while True:
|
||
|
#rawData = self.udpHostSocket.recv(4096)
|
||
|
rawData = self.udpHostSocket.recv(4096*2)
|
||
|
|
||
|
imageArray = ImageArrayAdapter().convertPacketToImageArray(rawData)
|
||
|
if DEBUG:
|
||
|
print("Received Data. Time since last message: "+str( round( time.time()-self.timesincelastpacket,2))+"s ("+str( round( (1/ (time.time()-self.timesincelastpacket) ) ,2) ) +" FPS)")
|
||
|
self.timesincelastpacket=time.time()
|
||
|
_bitsneeded=self.imageSize[0]*self.imageSize[1]*self.bitsperpixel
|
||
|
if len(imageArray) < _bitsneeded:
|
||
|
if DEBUG:
|
||
|
print("WARNING: Not enough data received. Got "+str(len(imageArray))+" bits, needs "+str(_bitsneeded)+" bits")
|
||
|
emptyArray=np.zeros(_bitsneeded,dtype=int)
|
||
|
emptyArray[0:len(imageArray)]=imageArray
|
||
|
imageArray=emptyArray
|
||
|
elif len(imageArray) > _bitsneeded:
|
||
|
if DEBUG:
|
||
|
print("WARNING: Too much data received. Got "+str(len(imageArray))+" bits, needs "+str(_bitsneeded)+" bits")
|
||
|
imageArray=imageArray[0:_bitsneeded]
|
||
|
|
||
|
imageArray=[self.bitlistToInt(x) for x in np.array(imageArray).reshape((int(len(imageArray)/self.bitsperpixel),self.bitsperpixel))]
|
||
|
|
||
|
self.flipdotMatrixSimulatorWidget.show(imageArray) #send to simulator display
|
||
|
|
||
|
finally:
|
||
|
self.udpHostSocket.close()
|
||
|
|
||
|
class ImageArrayAdapter():
|
||
|
def __init__(self):
|
||
|
self.arrayOfBinaryInts = []
|
||
|
|
||
|
def convertPacketToImageArray(self, udpPacketStr):
|
||
|
self.arrayOfBinaryInts = []
|
||
|
byteArray = bytearray(udpPacketStr)
|
||
|
|
||
|
#Fix for other format. Not Used
|
||
|
#byteArray = udpPacketStr.translate(None, b'\r\n').decode().replace('[','').replace(']','').replace(' ','').split(',')
|
||
|
#byteArray = [int(x) for x in byteArray]
|
||
|
#print("rawtype="+str(type(byteArray)))
|
||
|
|
||
|
#print(byteArray)
|
||
|
for byte in byteArray:
|
||
|
#print("byte:"+str(byte))
|
||
|
self.__appendByteToArrayOfBinaryInts(byte)
|
||
|
return self.arrayOfBinaryInts
|
||
|
|
||
|
def __appendByteToArrayOfBinaryInts(self, byte):
|
||
|
byteValue = int(byte)
|
||
|
for i in range(8):
|
||
|
if math.floor(byteValue/(2**(7-i))) > 0:
|
||
|
self.arrayOfBinaryInts.append(1)
|
||
|
#print("append 1")
|
||
|
else:
|
||
|
self.arrayOfBinaryInts.append(0)
|
||
|
#print("append 0")
|
||
|
byteValue = byteValue%(2**(7-i))
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
class FlipdotMatrixSimulatorWidget():
|
||
|
BLACKCOLOR = 0
|
||
|
WHITECOLOR = 1
|
||
|
|
||
|
def __init__(self,
|
||
|
imageSize = (160,48),
|
||
|
pixelSize = 4,
|
||
|
bitsperpixel = 1):
|
||
|
self.imageSize = imageSize
|
||
|
self.pixelSize = pixelSize
|
||
|
|
||
|
pygame.init()
|
||
|
self.screen = pygame.display.set_mode((imageSize[0]*pixelSize, imageSize[1]*pixelSize))
|
||
|
self.screen.fill((255,255,255))
|
||
|
_thread.start_new_thread(self.watchCloseThread, ())
|
||
|
|
||
|
self.bitsperpixel=bitsperpixel
|
||
|
|
||
|
|
||
|
|
||
|
def watchCloseThread(self):
|
||
|
while True:
|
||
|
for event in pygame.event.get():
|
||
|
if event.type in (QUIT, QUIT):
|
||
|
import os
|
||
|
os.kill(os.getpid(), 9)
|
||
|
pygame.time.delay(500)
|
||
|
|
||
|
def show(self, imageArray):
|
||
|
for yValue in range(self.imageSize[1]):
|
||
|
for xValue in range(self.imageSize[0]):
|
||
|
i = self.imageSize[0]*yValue + xValue
|
||
|
color = imageArray[i] / (pow(2,self.bitsperpixel)-1) #gives a number between 0 and 1 inclusive. Example for bitsperpixel=2: [1,0]-> 2/ (2^2-1)=2/3
|
||
|
self.updatePixel(xValue, yValue, color)
|
||
|
pygame.display.update()
|
||
|
|
||
|
def clearPixels(self):
|
||
|
for xValue in range(self.imageSize[0]):
|
||
|
for yValue in range(self.imageSize[1]):
|
||
|
self.updatePixel(xValue, yValue, self.BLACKCOLOR)
|
||
|
|
||
|
def updatePixel(self, xValue, yValue, brightness):
|
||
|
surface = pygame.Surface((self.pixelSize, self.pixelSize))
|
||
|
minimumbrightness=32
|
||
|
rectcolor = ( (255-minimumbrightness)*brightness+minimumbrightness,127*brightness+ minimumbrightness/2, minimumbrightness/4)
|
||
|
|
||
|
#surface.fill(rectcolor)
|
||
|
pygame.draw.circle(surface, rectcolor, (int(self.pixelSize/2), int(self.pixelSize/2)), int(self.pixelSize/2)) #Draw LED as filled circles
|
||
|
self.screen.blit(surface, (xValue*self.pixelSize, yValue*self.pixelSize))
|
||
|
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
FlipdotSim(imageSize=(WIDTH,HEIGHT), pixelSize = PIXELSIZE, udpPort=UDPPORT, bitsperpixel=BITSPERPIXEL).run()
|