// // Created by rick on 02-03-21. // #include #include #include #include struct stream { uint8_t *data; uint32_t size; uint32_t head; uint32_t tail; mutex_t *read_lock; mutex_t *write_lock; semaphore_t *read_wait; }; stream_t *stream_create(uint32_t size) { stream_t *stream = malloc(sizeof(stream_t)); memset((uint8_t *) stream, 0, sizeof(stream_t)); if (stream == NULL) { goto return_fail; } stream->data = malloc(size); if (stream->data == NULL) { goto return_fail_stream; } stream->size = size; stream->write_lock = mutex_create(); if (stream->write_lock == NULL) { goto return_fail_data; } stream->read_lock = mutex_create(); if (stream->read_lock == NULL) { goto return_fail_write_lock; } stream->read_wait = semaphore_create(0); if (stream->read_wait == NULL) { goto return_fail_read_lock; } return stream; return_fail_read_lock: mutex_free(stream->read_lock); return_fail_write_lock: mutex_free(stream->write_lock); return_fail_data: free(stream->data); return_fail_stream: free(stream); return_fail: return NULL; } void stream_free(stream_t *stream) { semaphore_free(stream->read_wait); mutex_free(stream->read_lock); mutex_free(stream->write_lock); free(stream->data); free(stream); } void stream_wait(stream_t *stream) { semaphore_wait(stream->read_wait); } uint32_t stream_read(stream_t *stream, uint8_t *target, uint32_t count) { uint32_t available = stream_get_read_available(stream); if (available == 0) { // out of data return 0; } mutex_acquire(stream->read_lock); available = stream_get_read_available(stream); if (available > count) { available = count; } if (stream->tail + available > stream->size) { // wrap around uint32_t overflow_addr = stream->tail + available; uint32_t amount_after_overflow = overflow_addr - stream->size; uint32_t amount_before_overflow = available - amount_after_overflow; memcpy(target, &stream->data[stream->tail], amount_before_overflow); memcpy(&target[amount_before_overflow], stream->data, amount_after_overflow); stream->tail = amount_after_overflow; } else { memcpy(target, &stream->data[stream->tail], available); stream->tail += available; if (stream->tail == stream->size) { stream->tail = 0; } } mutex_release(stream->read_lock); return available; } uint32_t stream_write(stream_t *stream, const uint8_t *source, uint32_t count) { uint32_t available = stream_get_write_available(stream); if (available == 0) { return 0; } mutex_acquire(stream->write_lock); available = stream_get_write_available(stream); if (available > count) { available = count; } if (stream->head + available > stream->size) { // wrap around uint32_t overflow_addr = stream->head + available; uint32_t amount_after_overflow = overflow_addr - stream->size; uint32_t amount_before_overflow = available - amount_after_overflow; memcpy(&stream->data[stream->head], source, amount_before_overflow); memcpy(stream->data, &source[amount_before_overflow], amount_after_overflow); stream->head = amount_after_overflow; } else { memcpy(&stream->data[stream->head], source, available); stream->head += available; if (stream->head == stream->size) { stream->head = 0; } } mutex_release(stream->write_lock); semaphore_signal(stream->read_wait); return available; } uint32_t stream_get_read_available(stream_t *stream) { if (stream->head == stream->tail) { return 0; } if (stream->head > stream->tail) { return stream->head - stream->tail; } return stream->size - stream->tail + stream->head; } uint32_t stream_get_write_available(stream_t *stream) { if (stream->head == stream->tail) { return stream->size; } if (stream->head > stream->tail) { return stream->size - stream->head + stream->tail; } return stream->tail - stream->head; }