from typing import Optional, NamedTuple from queue import Queue, Empty import wave import uuid import sched import logging import json import os.path import audioop from pyaudio import PyAudio, Stream logger = logging.getLogger(__name__) CHUNK_SIZE = 1024 INTERVAL_QUEUE = 0.1 INTERVAL_STREAM = 0.0001 class PlayEntry: def __init__(self, wave_file: wave.Wave_read, stream: Stream, device_framerate): self.wave_file = wave_file self.stream = stream self.cvstate = None self.device_framerate = device_framerate self.sample_width = wave_file.getsampwidth() self.channels = wave_file.getnchannels() self.frame_rate = wave_file.getframerate() def get_actual_data(self, nframes): data = self.wave_file.readframes(nframes) new_data, self.cvstate = audioop.ratecv(data, self.sample_width, self.channels, self.frame_rate, self.device_framerate, self.cvstate) return new_data wave_file: wave.Wave_read stream: Stream cvstate: Optional['audioop.RatecvState'] actual_size: int = None class AudioSystem: audio: Optional[PyAudio] scheduler: sched.scheduler def __init__(self, scheduler: sched.scheduler, sound_file: str): self.to_play = Queue() self.playing = {} self.audio = PyAudio() self.default_output_device = self.audio.get_default_output_device_info() self.default_sample_rate = int(self.default_output_device['defaultSampleRate']) self.scheduler = scheduler self.scheduler.enter(0, 2, self.start_streams) self.scheduler.enter(0, 1, self.fill_streams) with open(sound_file) as f: self.sound_data = json.load(f) self.sound_dir = os.path.realpath(self.sound_data['sound_dir']) def quit(self): self.audio.terminate() def queue_sound(self, sound): try: sound_entry = next(filter(lambda i: i['name'] == sound, self.sound_data['sounds'])) except StopIteration: raise ValueError(f"Sound {sound!r} is not known") self.to_play.put(os.path.join(self.sound_dir, sound_entry['path'])) def start_streams(self): self.scheduler.enter(INTERVAL_QUEUE, 2, self.start_streams) try: # drain the queue while True: item = self.to_play.get(False) logger.info(f"Queueing stream for {item!r}") wave_file = wave.open(item) # type: wave.Wave_read stream = self.audio.open( format=self.audio.get_format_from_width(wave_file.getsampwidth()), channels=wave_file.getnchannels(), rate=self.default_sample_rate, output=True, ) entry_id = uuid.uuid4() while entry_id in self.playing: entry_id = uuid.uuid4() self.playing[entry_id] = PlayEntry(wave_file, stream, self.default_sample_rate) self.to_play.task_done() except Empty: pass except IOError as e: logger.warning('Error in opening stream', exc_info=e) except Exception as e: logger.error('Fatal error in opening stream', exc_info=e) raise e def fill_streams(self): self.scheduler.enter(INTERVAL_STREAM, 1, self.fill_streams) finished_streams = [] for stream_id, entry in self.playing.items(): # type: uuid.UUID, PlayEntry try: if entry.stream.get_write_available() < CHUNK_SIZE: continue # not enough space in buffer, wait for next iteration try: data = entry.get_actual_data(CHUNK_SIZE) except IOError as e: logger.warning('An IO Error occurred while reading WAV file', exc_info=e) data = b'' if data: entry.stream.write(data) else: logger.debug("Done playing sound, shutting down stream") entry.stream.stop_stream() entry.stream.close() entry.wave_file.close() finished_streams.append(stream_id) except Exception as e: logger.warning('A fatal error occurred while rendering sound', exc_info=e) raise e for finished in finished_streams: del self.playing[finished]