import base64 import json from pprint import pprint from threading import Thread # import eventlet # eventlet.monkey_patch() from typing import List from flask import Flask, jsonify, send_file, Response from flask_socketio import SocketIO from redis import Redis # from control.camera import Camera # from control.walle import WallE DATA_URL_PREFIX = 'data:image/jpeg;base64,' # DEBUG = True app = Flask(__name__, static_url_path='', static_folder='client/dist') app.config.from_object(__name__) sio = SocketIO(app) redisdb = Redis() # camera = Camera() # walle = WallE() class CameraListener: def __init__(self): self.redisdbcon = Redis() self.pubsub = self.redisdbcon.pubsub() self.pubsub.subscribe(['image']) def work(self, item): with app.app_context(): data: bytes = item['data'] if not isinstance(data, bytes): return image_url = DATA_URL_PREFIX + base64.b64encode(data).decode('ascii') sio.emit('cameraImage', { 'image': image_url, }, broadcast=True) def run(self) -> None: while True: message = self.pubsub.get_message() if message: self.work(message) sio.sleep(0.05) @app.route('/') def index(): return send_file('client/dist/index.html') @app.route('/imagestream.mjpg') def image_stream(): # ps = redisdb.pubsub() # cam = Camera() # return Response(cam.mjpeg_stream(boundary.encode()), # mimetype='multipart/x-mixed-replace; boundary=lkajflkasdjlkfaj') return "" @sio.on("camera") def camera_message(directions): # walle.set_eye_velocity(directions['angle'], directions['force']) print(f"Moving camera in direction {directions['angle']:f} with velocity {directions['force']}") redisdb.publish('look', json.dumps({ 'force': directions['force'], 'angle': directions['angle'], })) @sio.on("move") def move_message(directions): # walle.set_movement(directions['angle'], directions['force']) print(f"Moving in direction {directions['angle']:f} with velocity {directions['force']}") redisdb.publish('move', json.dumps({ 'force': directions['force'], 'angle': directions['angle'], })) if __name__ == '__main__': listener = CameraListener() sio.start_background_task(listener.run) sio.run(app, host='0.0.0.0')