feat: introduced stream to separate actual io and other tasks

This commit is contained in:
2021-03-02 21:56:37 +01:00
parent 76792dd6fd
commit c9102fbc65
6 changed files with 234 additions and 11 deletions

View File

@@ -57,11 +57,13 @@ void noreturn kmain(multiboot_info_t *multiboot_info) {
serial_init();
kprint_register(serial_kprint);
cpuidx_print_info();
store_bootloader_info(multiboot_info);
init_mmap(multiboot_info);
kprint_init();
cpuidx_print_info();
// init done, enable interrupts
__asm__ __volatile__("sti");
init_timer(1000);
@@ -72,6 +74,7 @@ void noreturn kmain(multiboot_info_t *multiboot_info) {
printf("Booted successfully v%d.%d.%d\n", version_major, version_minor, version_patch);
task_init();
kprint_start_task();
block_dev_start_task();
task_spawn(main_loop, NULL);
syscall_start_scheduler();

View File

@@ -2,9 +2,19 @@
// Created by rick on 28-01-21.
//
#include <util/stream.h>
#include <libc/string.h>
#include <stdbool.h>
#include <attributes.h>
#include <tasks/task.h>
#include <libc/libc.h>
#include "kprint.h"
#define MAX_HANDLERS 8
#define STREAM_SIZE 512
#define PRINT_BUFFER_SIZE 64
stream_t *kprint_stream;
kprint_handler handlers[MAX_HANDLERS] = {
NULL,
@@ -28,10 +38,36 @@ void kprint_register(kprint_handler handler) {
}
void kprint(const char *msg) {
stream_write(kprint_stream, (const uint8_t *) msg, strlen(msg));
}
void kprint_internal(const char *msg) {
for (int i = 0; i < MAX_HANDLERS; ++i) {
if (handlers[i] == NULL) {
continue;
}
handlers[i](msg);
}
}
void kprint_init() {
kprint_stream = stream_create(STREAM_SIZE);
}
void noreturn kprint_task(void *_) {
uint32_t last_read = 0;
uint8_t data[PRINT_BUFFER_SIZE + 1] = {0};
while (true) {
stream_wait(kprint_stream);
while (true) {
last_read = stream_read(kprint_stream, data, PRINT_BUFFER_SIZE);
if (last_read == 0) break;
data[last_read] = 0;
kprint_internal((const char *) data);
};
}
}
void kprint_start_task() {
task_spawn(kprint_task, NULL);
}

View File

@@ -13,4 +13,8 @@ typedef void (*kprint_handler)(const char *);
void kprint_register(kprint_handler);
void kprint(const char *msg);
void kprint(const char *msg);
void kprint_init();
void kprint_start_task();

View File

@@ -37,7 +37,7 @@ semaphore_t *semaphore_create(int32_t start) {
}
void semaphore_wait(semaphore_t *semaphore) {
if (__sync_sub_and_fetch(&semaphore->value, 1) == 0) {
if (__sync_sub_and_fetch(&semaphore->value, 1) >= 0) {
return; // first to lock
}
task_lock_acquire();
@@ -54,14 +54,17 @@ void semaphore_wait(semaphore_t *semaphore) {
}
void semaphore_signal(semaphore_t *semaphore) {
if (__sync_add_and_fetch(&semaphore->value, 1) >= 1) {
return; // last in queue
}
semaphore->value++;
task_lock_acquire();
task_signal(semaphore->first_wait->tid);
lock_fifo_entry_t *first_entry = semaphore->first_wait;
semaphore->first_wait = first_entry->next;
free(first_entry);
if (semaphore->first_wait != NULL) {
task_signal(semaphore->first_wait->tid);
lock_fifo_entry_t *first_entry = semaphore->first_wait;
semaphore->first_wait = first_entry->next;
if (semaphore->first_wait == NULL) {
semaphore->last_wait = NULL;
}
free(first_entry);
}
task_lock_free();
}

151
kernel/util/stream.c Normal file
View File

@@ -0,0 +1,151 @@
//
// Created by rick on 02-03-21.
//
#include <mem/malloc.h>
#include <libc/libc.h>
#include <tasks/locking.h>
#include "stream.h"
struct stream {
uint8_t *data;
uint32_t size;
uint32_t head;
uint32_t tail;
mutex_t *read_lock;
mutex_t *write_lock;
semaphore_t *read_wait;
};
stream_t *stream_create(uint32_t size) {
stream_t *stream = malloc(sizeof(stream_t));
memset((uint8_t *) stream, 0, sizeof(stream_t));
if (stream == NULL) {
goto return_fail;
}
stream->data = malloc(size);
if (stream->data == NULL) {
goto return_fail_stream;
}
stream->size = size;
stream->write_lock = mutex_create();
if (stream->write_lock == NULL) {
goto return_fail_data;
}
stream->read_lock = mutex_create();
if (stream->read_lock == NULL) {
goto return_fail_write_lock;
}
stream->read_wait = semaphore_create(0);
if (stream->read_wait == NULL) {
goto return_fail_read_lock;
}
return stream;
return_fail_read_lock:
mutex_free(stream->read_lock);
return_fail_write_lock:
mutex_free(stream->write_lock);
return_fail_data:
free(stream->data);
return_fail_stream:
free(stream);
return_fail:
return NULL;
}
void stream_free(stream_t *stream) {
semaphore_free(stream->read_wait);
mutex_free(stream->read_lock);
mutex_free(stream->write_lock);
free(stream->data);
free(stream);
}
void stream_wait(stream_t *stream) {
semaphore_wait(stream->read_wait);
}
uint32_t stream_read(stream_t *stream, uint8_t *target, uint32_t count) {
uint32_t available = stream_get_read_available(stream);
if (available == 0) {
// out of data
return 0;
}
mutex_acquire(stream->read_lock);
available = stream_get_read_available(stream);
if (available > count) {
available = count;
}
if (stream->tail + available > stream->size) {
// wrap around
uint32_t overflow_addr = stream->tail + available;
uint32_t amount_after_overflow = overflow_addr - stream->size;
uint32_t amount_before_overflow = available - amount_after_overflow;
memcpy(target, &stream->data[stream->tail], amount_before_overflow);
memcpy(&target[amount_before_overflow], stream->data, amount_after_overflow);
stream->tail = amount_after_overflow;
} else {
memcpy(target, &stream->data[stream->tail], available);
stream->tail += available;
if (stream->tail == stream->size) {
stream->tail = 0;
}
}
mutex_release(stream->read_lock);
return available;
}
uint32_t stream_write(stream_t *stream, const uint8_t *source, uint32_t count) {
uint32_t available = stream_get_write_available(stream);
if (available == 0) {
return 0;
}
mutex_acquire(stream->write_lock);
available = stream_get_write_available(stream);
if (available > count) {
available = count;
}
if (stream->head + available > stream->size) {
// wrap around
uint32_t overflow_addr = stream->head + available;
uint32_t amount_after_overflow = overflow_addr - stream->size;
uint32_t amount_before_overflow = available - amount_after_overflow;
memcpy(&stream->data[stream->head], source, amount_before_overflow);
memcpy(stream->data, &source[amount_before_overflow], amount_after_overflow);
stream->head = amount_after_overflow;
} else {
memcpy(&stream->data[stream->head], source, available);
stream->head += available;
if (stream->head == stream->size) {
stream->head = 0;
}
}
mutex_release(stream->write_lock);
semaphore_signal(stream->read_wait);
return available;
}
uint32_t stream_get_read_available(stream_t *stream) {
if (stream->head == stream->tail) {
return 0;
}
if (stream->head > stream->tail) {
return stream->head - stream->tail;
}
return stream->size - stream->tail + stream->head;
}
uint32_t stream_get_write_available(stream_t *stream) {
if (stream->head == stream->tail) {
return stream->size;
}
if (stream->head > stream->tail) {
return stream->size - stream->head + stream->tail;
}
return stream->tail - stream->head;
}

26
kernel/util/stream.h Normal file
View File

@@ -0,0 +1,26 @@
//
// Created by rick on 02-03-21.
//
#ifndef NEW_KERNEL_STREAM_H
#define NEW_KERNEL_STREAM_H
#include <stdint.h>
typedef struct stream stream_t;
stream_t *stream_create(uint32_t size);
void stream_free(stream_t *strm);
void stream_wait(stream_t *stream);
uint32_t stream_read(stream_t *strm, uint8_t *target, uint32_t count);
uint32_t stream_write(stream_t *strm, const uint8_t *source, uint32_t count);
uint32_t stream_get_write_available(stream_t *stream);
uint32_t stream_get_read_available(stream_t *stream);
#endif //NEW_KERNEL_STREAM_H