Files
my-kern/kernel/util/stream.c

153 lines
4.2 KiB
C

//
// Created by rick on 02-03-21.
//
#include <stdlib.h>
#include <string.h>
#include <myke/tasks/locking.h>
#include <myke/util/stream.h>
struct stream {
uint8_t *data;
uint32_t size;
uint32_t head;
uint32_t tail;
mutex_t *read_lock;
mutex_t *write_lock;
semaphore_t *read_wait;
};
stream_t *stream_create(uint32_t size) {
stream_t *stream = malloc(sizeof(stream_t));
memset((uint8_t *) stream, 0, sizeof(stream_t));
if (stream == NULL) {
goto return_fail;
}
stream->data = malloc(size);
if (stream->data == NULL) {
goto return_fail_stream;
}
stream->size = size;
stream->write_lock = mutex_create();
if (stream->write_lock == NULL) {
goto return_fail_data;
}
stream->read_lock = mutex_create();
if (stream->read_lock == NULL) {
goto return_fail_write_lock;
}
stream->read_wait = semaphore_create(0);
if (stream->read_wait == NULL) {
goto return_fail_read_lock;
}
return stream;
return_fail_read_lock:
mutex_free(stream->read_lock);
return_fail_write_lock:
mutex_free(stream->write_lock);
return_fail_data:
free(stream->data);
return_fail_stream:
free(stream);
return_fail:
return NULL;
}
void stream_free(stream_t *stream) {
semaphore_free(stream->read_wait);
mutex_free(stream->read_lock);
mutex_free(stream->write_lock);
free(stream->data);
free(stream);
}
void stream_wait(stream_t *stream) {
semaphore_wait(stream->read_wait);
}
uint32_t stream_read(stream_t *stream, uint8_t *target, uint32_t count) {
uint32_t available = stream_get_read_available(stream);
if (available == 0) {
// out of data
return 0;
}
mutex_acquire(stream->read_lock);
available = stream_get_read_available(stream);
if (available > count) {
available = count;
}
if (stream->tail + available > stream->size) {
// wrap around
uint32_t overflow_addr = stream->tail + available;
uint32_t amount_after_overflow = overflow_addr - stream->size;
uint32_t amount_before_overflow = available - amount_after_overflow;
memcpy(target, &stream->data[stream->tail], amount_before_overflow);
memcpy(&target[amount_before_overflow], stream->data, amount_after_overflow);
stream->tail = amount_after_overflow;
} else {
memcpy(target, &stream->data[stream->tail], available);
stream->tail += available;
if (stream->tail == stream->size) {
stream->tail = 0;
}
}
mutex_release(stream->read_lock);
return available;
}
uint32_t stream_write(stream_t *stream, const uint8_t *source, uint32_t count) {
uint32_t available = stream_get_write_available(stream);
if (available == 0) {
return 0;
}
mutex_acquire(stream->write_lock);
available = stream_get_write_available(stream);
if (available > count) {
available = count;
}
if (stream->head + available > stream->size) {
// wrap around
uint32_t overflow_addr = stream->head + available;
uint32_t amount_after_overflow = overflow_addr - stream->size;
uint32_t amount_before_overflow = available - amount_after_overflow;
memcpy(&stream->data[stream->head], source, amount_before_overflow);
memcpy(stream->data, &source[amount_before_overflow], amount_after_overflow);
stream->head = amount_after_overflow;
} else {
memcpy(&stream->data[stream->head], source, available);
stream->head += available;
if (stream->head == stream->size) {
stream->head = 0;
}
}
mutex_release(stream->write_lock);
semaphore_signal(stream->read_wait);
return available;
}
uint32_t stream_get_read_available(stream_t *stream) {
if (stream->head == stream->tail) {
return 0;
}
if (stream->head > stream->tail) {
return stream->head - stream->tail;
}
return stream->size - stream->tail + stream->head;
}
uint32_t stream_get_write_available(stream_t *stream) {
if (stream->head == stream->tail) {
return stream->size;
}
if (stream->head > stream->tail) {
return stream->size - stream->head + stream->tail;
}
return stream->tail - stream->head;
}