import logging import base64 import json from flask import Flask, send_file, jsonify from flask_socketio import SocketIO from redis import Redis logging.basicConfig(level=logging.DEBUG) LOG = logging.getLogger(__name__) DATA_URL_PREFIX = 'data:image/jpeg;base64,' # DEBUG = True app = Flask(__name__, static_url_path='', static_folder='client/dist') app.config.from_object(__name__) sio = SocketIO(app) redisdb = Redis() class CameraListener: def __init__(self): self.redisdbcon = Redis() self.pubsub = self.redisdbcon.pubsub() self.pubsub.subscribe(['image']) def work(self, item): with app.app_context(): data: bytes = item['data'] if not isinstance(data, bytes): return image_url = DATA_URL_PREFIX + base64.b64encode(data).decode('ascii') sio.emit('cameraImage', { 'image': image_url, }, broadcast=True) def run(self) -> None: while True: message = self.pubsub.get_message() if message: self.work(message) sio.sleep(0.05) @app.route('/', defaults={'path': ''}) @app.route('/') @app.route('/') # does not seem to work def index(path): return send_file('client/dist/index.html') @app.route('/api/sounds') def get_available_sounds(): try: sound_data = json.loads(redisdb.get('sound_data')) except ValueError: sound_data = [] return jsonify(sound_data) @sio.on('playSound') def play_sound(name): redisdb.publish('sound', json.dumps({'action': 'play', 'name': name})) @sio.on("camera") def camera_message(directions): # walle.set_eye_velocity(directions['angle'], directions['force']) LOG.debug(f"Moving camera in direction {directions['angle']:f} with velocity {directions['force']}") redisdb.publish('look', json.dumps({ 'force': directions['force'], 'angle': directions['angle'], })) @sio.on("move") def move_message(directions): # walle.set_movement(directions['angle'], directions['force']) LOG.debug(f"Moving in direction {directions['angle']:f} with velocity {directions['force']}") redisdb.publish('move', json.dumps({ 'force': directions['force'], 'angle': directions['angle'], })) if __name__ == '__main__': listener = CameraListener() sio.start_background_task(listener.run) sio.run(app, host='0.0.0.0')