153 lines
4.2 KiB
C
153 lines
4.2 KiB
C
//
|
|
// Created by rick on 02-03-21.
|
|
//
|
|
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
|
|
#include <myke/tasks/locking.h>
|
|
#include <myke/util/stream.h>
|
|
|
|
struct stream {
|
|
uint8_t *data;
|
|
uint32_t size;
|
|
uint32_t head;
|
|
uint32_t tail;
|
|
mutex_t *read_lock;
|
|
mutex_t *write_lock;
|
|
semaphore_t *read_wait;
|
|
};
|
|
|
|
stream_t *stream_create(uint32_t size) {
|
|
stream_t *stream = malloc(sizeof(stream_t));
|
|
memset((uint8_t *) stream, 0, sizeof(stream_t));
|
|
if (stream == NULL) {
|
|
goto return_fail;
|
|
}
|
|
stream->data = malloc(size);
|
|
if (stream->data == NULL) {
|
|
goto return_fail_stream;
|
|
}
|
|
stream->size = size;
|
|
|
|
stream->write_lock = mutex_create();
|
|
if (stream->write_lock == NULL) {
|
|
goto return_fail_data;
|
|
}
|
|
stream->read_lock = mutex_create();
|
|
if (stream->read_lock == NULL) {
|
|
goto return_fail_write_lock;
|
|
}
|
|
|
|
stream->read_wait = semaphore_create(0);
|
|
if (stream->read_wait == NULL) {
|
|
goto return_fail_read_lock;
|
|
}
|
|
|
|
return stream;
|
|
return_fail_read_lock:
|
|
mutex_free(stream->read_lock);
|
|
return_fail_write_lock:
|
|
mutex_free(stream->write_lock);
|
|
return_fail_data:
|
|
free(stream->data);
|
|
return_fail_stream:
|
|
free(stream);
|
|
return_fail:
|
|
return NULL;
|
|
}
|
|
|
|
void stream_free(stream_t *stream) {
|
|
semaphore_free(stream->read_wait);
|
|
mutex_free(stream->read_lock);
|
|
mutex_free(stream->write_lock);
|
|
free(stream->data);
|
|
free(stream);
|
|
}
|
|
|
|
void stream_wait(stream_t *stream) {
|
|
semaphore_wait(stream->read_wait);
|
|
}
|
|
|
|
uint32_t stream_read(stream_t *stream, uint8_t *target, uint32_t count) {
|
|
uint32_t available = stream_get_read_available(stream);
|
|
if (available == 0) {
|
|
// out of data
|
|
return 0;
|
|
}
|
|
mutex_acquire(stream->read_lock);
|
|
available = stream_get_read_available(stream);
|
|
if (available > count) {
|
|
available = count;
|
|
}
|
|
|
|
if (stream->tail + available > stream->size) {
|
|
// wrap around
|
|
uint32_t overflow_addr = stream->tail + available;
|
|
uint32_t amount_after_overflow = overflow_addr - stream->size;
|
|
uint32_t amount_before_overflow = available - amount_after_overflow;
|
|
memcpy(target, &stream->data[stream->tail], amount_before_overflow);
|
|
memcpy(&target[amount_before_overflow], stream->data, amount_after_overflow);
|
|
stream->tail = amount_after_overflow;
|
|
} else {
|
|
memcpy(target, &stream->data[stream->tail], available);
|
|
stream->tail += available;
|
|
if (stream->tail == stream->size) {
|
|
stream->tail = 0;
|
|
}
|
|
}
|
|
mutex_release(stream->read_lock);
|
|
return available;
|
|
}
|
|
|
|
uint32_t stream_write(stream_t *stream, const uint8_t *source, uint32_t count) {
|
|
uint32_t available = stream_get_write_available(stream);
|
|
if (available == 0) {
|
|
return 0;
|
|
}
|
|
mutex_acquire(stream->write_lock);
|
|
available = stream_get_write_available(stream);
|
|
if (available > count) {
|
|
available = count;
|
|
}
|
|
|
|
if (stream->head + available > stream->size) {
|
|
// wrap around
|
|
uint32_t overflow_addr = stream->head + available;
|
|
uint32_t amount_after_overflow = overflow_addr - stream->size;
|
|
uint32_t amount_before_overflow = available - amount_after_overflow;
|
|
memcpy(&stream->data[stream->head], source, amount_before_overflow);
|
|
memcpy(stream->data, &source[amount_before_overflow], amount_after_overflow);
|
|
stream->head = amount_after_overflow;
|
|
} else {
|
|
memcpy(&stream->data[stream->head], source, available);
|
|
stream->head += available;
|
|
if (stream->head == stream->size) {
|
|
stream->head = 0;
|
|
}
|
|
}
|
|
mutex_release(stream->write_lock);
|
|
semaphore_signal(stream->read_wait);
|
|
return available;
|
|
}
|
|
|
|
uint32_t stream_get_read_available(stream_t *stream) {
|
|
if (stream->head == stream->tail) {
|
|
return 0;
|
|
}
|
|
if (stream->head > stream->tail) {
|
|
return stream->head - stream->tail;
|
|
}
|
|
return stream->size - stream->tail + stream->head;
|
|
}
|
|
|
|
uint32_t stream_get_write_available(stream_t *stream) {
|
|
if (stream->head == stream->tail) {
|
|
return stream->size;
|
|
}
|
|
if (stream->head > stream->tail) {
|
|
return stream->size - stream->head + stream->tail;
|
|
}
|
|
return stream->tail - stream->head;
|
|
}
|