95 lines
2.6 KiB
Python
95 lines
2.6 KiB
Python
import logging
|
|
import base64
|
|
import json
|
|
|
|
from flask import Flask, send_file, jsonify
|
|
from flask_socketio import SocketIO
|
|
from redis import Redis
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
LOG = logging.getLogger(__name__)
|
|
DATA_URL_PREFIX = 'data:image/jpeg;base64,'
|
|
# DEBUG = True
|
|
|
|
app = Flask(__name__, static_url_path='', static_folder='client/dist')
|
|
app.config.from_object(__name__)
|
|
sio = SocketIO(app)
|
|
|
|
redisdb = Redis()
|
|
|
|
|
|
class CameraListener:
|
|
def __init__(self):
|
|
self.redisdbcon = Redis()
|
|
self.pubsub = self.redisdbcon.pubsub()
|
|
self.pubsub.subscribe(['image'])
|
|
|
|
def work(self, item):
|
|
with app.app_context():
|
|
data: bytes = item['data']
|
|
if not isinstance(data, bytes):
|
|
return
|
|
image_url = DATA_URL_PREFIX + base64.b64encode(data).decode('ascii')
|
|
sio.emit('cameraImage', {
|
|
'image': image_url,
|
|
}, broadcast=True)
|
|
|
|
def run(self) -> None:
|
|
while True:
|
|
message = self.pubsub.get_message()
|
|
if message:
|
|
self.work(message)
|
|
sio.sleep(0.05)
|
|
|
|
|
|
@app.route('/', defaults={'path': ''})
|
|
@app.route('/<string:path>')
|
|
@app.route('/<path:path>') # does not seem to work
|
|
def index(path):
|
|
if path == 'reset.css':
|
|
return send_file('client/dist/reset.css')
|
|
elif path == 'favicon.ico':
|
|
return send_file('client/dist/favicon.ico')
|
|
else:
|
|
return send_file('client/dist/index.html')
|
|
|
|
|
|
@app.route('/api/sounds')
|
|
def get_available_sounds():
|
|
try:
|
|
sound_data = json.loads(redisdb.get('sound_data'))
|
|
except ValueError:
|
|
sound_data = []
|
|
return jsonify(sound_data)
|
|
|
|
|
|
@sio.on('playSound')
|
|
def play_sound(name):
|
|
redisdb.publish('sound', json.dumps({'action': 'play', 'name': name}))
|
|
|
|
|
|
@sio.on("camera")
|
|
def camera_message(directions):
|
|
# walle.set_eye_velocity(directions['angle'], directions['force'])
|
|
LOG.debug(f"Moving camera in direction {directions['angle']:f} with velocity {directions['force']}")
|
|
redisdb.publish('look', json.dumps({
|
|
'force': directions['force'],
|
|
'angle': directions['angle'],
|
|
}))
|
|
|
|
|
|
@sio.on("move")
|
|
def move_message(directions):
|
|
# walle.set_movement(directions['angle'], directions['force'])
|
|
LOG.debug(f"Moving in direction {directions['angle']:f} with velocity {directions['force']}")
|
|
redisdb.publish('move', json.dumps({
|
|
'force': directions['force'],
|
|
'angle': directions['angle'],
|
|
}))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
listener = CameraListener()
|
|
sio.start_background_task(listener.run)
|
|
sio.run(app, host='0.0.0.0')
|