src/sys/kern/kern_task.c

467 lines
9.9 KiB
C

/* $OpenBSD: kern_task.c,v 1.35 2024/05/14 08:26:13 jsg Exp $ */
/*
* Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include <sys/param.h>
#include <sys/systm.h>
#include <sys/malloc.h>
#include <sys/mutex.h>
#include <sys/kthread.h>
#include <sys/task.h>
#include <sys/proc.h>
#include <sys/witness.h>
#include "kcov.h"
#if NKCOV > 0
#include <sys/kcov.h>
#endif
#ifdef WITNESS
static struct lock_type taskq_lock_type = {
.lt_name = "taskq"
};
#define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \
(LO_CLASS_RWLOCK << LO_CLASSSHIFT)
#endif /* WITNESS */
struct taskq_thread {
SLIST_ENTRY(taskq_thread)
tt_entry;
struct proc *tt_thread;
};
SLIST_HEAD(taskq_threads, taskq_thread);
struct taskq {
enum {
TQ_S_CREATED,
TQ_S_RUNNING,
TQ_S_DESTROYED
} tq_state;
unsigned int tq_running;
unsigned int tq_nthreads;
unsigned int tq_flags;
const char *tq_name;
struct mutex tq_mtx;
struct task_list tq_worklist;
struct taskq_threads tq_threads;
unsigned int tq_barriers;
unsigned int tq_bgen;
unsigned int tq_bthreads;
#ifdef WITNESS
struct lock_object tq_lock_object;
#endif
};
static const char taskq_sys_name[] = "systq";
struct taskq taskq_sys = {
.tq_state = TQ_S_CREATED,
.tq_running = 0,
.tq_nthreads = 1,
.tq_flags = 0,
.tq_name = taskq_sys_name,
.tq_mtx = MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
taskq_sys_name, 0),
.tq_worklist = TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist),
.tq_threads = SLIST_HEAD_INITIALIZER(taskq_sys.tq_threads),
.tq_barriers = 0,
.tq_bgen = 0,
.tq_bthreads = 0,
#ifdef WITNESS
.tq_lock_object = {
.lo_name = taskq_sys_name,
.lo_flags = TASKQ_LOCK_FLAGS,
},
#endif
};
static const char taskq_sys_mp_name[] = "systqmp";
struct taskq taskq_sys_mp = {
.tq_state = TQ_S_CREATED,
.tq_running = 0,
.tq_nthreads = 1,
.tq_flags = TASKQ_MPSAFE,
.tq_name = taskq_sys_mp_name,
.tq_mtx = MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
taskq_sys_mp_name, 0),
.tq_worklist = TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist),
.tq_threads = SLIST_HEAD_INITIALIZER(taskq_sys_mp.tq_threads),
.tq_barriers = 0,
.tq_bgen = 0,
.tq_bthreads = 0,
#ifdef WITNESS
.tq_lock_object = {
.lo_name = taskq_sys_mp_name,
.lo_flags = TASKQ_LOCK_FLAGS,
},
#endif
};
struct taskq *const systq = &taskq_sys;
struct taskq *const systqmp = &taskq_sys_mp;
void taskq_init(void); /* called in init_main.c */
void taskq_create_thread(void *);
void taskq_barrier_task(void *);
int taskq_next_work(struct taskq *, struct task *);
void taskq_thread(void *);
void
taskq_init(void)
{
WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type);
kthread_create_deferred(taskq_create_thread, systq);
WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type);
kthread_create_deferred(taskq_create_thread, systqmp);
}
struct taskq *
taskq_create(const char *name, unsigned int nthreads, int ipl,
unsigned int flags)
{
struct taskq *tq;
tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
if (tq == NULL)
return (NULL);
tq->tq_state = TQ_S_CREATED;
tq->tq_running = 0;
tq->tq_nthreads = nthreads;
tq->tq_name = name;
tq->tq_flags = flags;
mtx_init_flags(&tq->tq_mtx, ipl, name, 0);
TAILQ_INIT(&tq->tq_worklist);
SLIST_INIT(&tq->tq_threads);
tq->tq_barriers = 0;
tq->tq_bgen = 0;
tq->tq_bthreads = 0;
#ifdef WITNESS
memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object));
tq->tq_lock_object.lo_name = name;
tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS;
witness_init(&tq->tq_lock_object, &taskq_lock_type);
#endif
/* try to create a thread to guarantee that tasks will be serviced */
kthread_create_deferred(taskq_create_thread, tq);
return (tq);
}
void
taskq_destroy(struct taskq *tq)
{
mtx_enter(&tq->tq_mtx);
switch (tq->tq_state) {
case TQ_S_CREATED:
/* tq is still referenced by taskq_create_thread */
tq->tq_state = TQ_S_DESTROYED;
mtx_leave(&tq->tq_mtx);
return;
case TQ_S_RUNNING:
tq->tq_state = TQ_S_DESTROYED;
break;
default:
panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
}
while (tq->tq_running > 0) {
wakeup(tq);
msleep_nsec(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy",
INFSLP);
}
mtx_leave(&tq->tq_mtx);
free(tq, M_DEVBUF, sizeof(*tq));
}
void
taskq_create_thread(void *arg)
{
struct taskq *tq = arg;
int rv;
mtx_enter(&tq->tq_mtx);
switch (tq->tq_state) {
case TQ_S_DESTROYED:
mtx_leave(&tq->tq_mtx);
free(tq, M_DEVBUF, sizeof(*tq));
return;
case TQ_S_CREATED:
tq->tq_state = TQ_S_RUNNING;
break;
default:
panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
}
do {
tq->tq_running++;
mtx_leave(&tq->tq_mtx);
rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
mtx_enter(&tq->tq_mtx);
if (rv != 0) {
printf("unable to create thread for \"%s\" taskq\n",
tq->tq_name);
tq->tq_running--;
/* could have been destroyed during kthread_create */
if (tq->tq_state == TQ_S_DESTROYED &&
tq->tq_running == 0)
wakeup_one(&tq->tq_running);
break;
}
} while (tq->tq_running < tq->tq_nthreads);
mtx_leave(&tq->tq_mtx);
}
void
taskq_barrier_task(void *p)
{
struct taskq *tq = p;
unsigned int gen;
mtx_enter(&tq->tq_mtx);
tq->tq_bthreads++;
wakeup(&tq->tq_bthreads);
gen = tq->tq_bgen;
do {
msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
PWAIT, "tqbarend", INFSLP);
} while (gen == tq->tq_bgen);
mtx_leave(&tq->tq_mtx);
}
static void
taskq_do_barrier(struct taskq *tq)
{
struct task t = TASK_INITIALIZER(taskq_barrier_task, tq);
struct proc *thread = curproc;
struct taskq_thread *tt;
mtx_enter(&tq->tq_mtx);
tq->tq_barriers++;
/* is the barrier being run from a task inside the taskq? */
SLIST_FOREACH(tt, &tq->tq_threads, tt_entry) {
if (tt->tt_thread == thread) {
tq->tq_bthreads++;
wakeup(&tq->tq_bthreads);
break;
}
}
while (tq->tq_bthreads < tq->tq_nthreads) {
/* shove the task into the queue for a worker to pick up */
SET(t.t_flags, TASK_ONQUEUE);
TAILQ_INSERT_TAIL(&tq->tq_worklist, &t, t_entry);
wakeup_one(tq);
msleep_nsec(&tq->tq_bthreads, &tq->tq_mtx,
PWAIT, "tqbar", INFSLP);
/*
* another thread running a barrier might have
* done this work for us.
*/
if (ISSET(t.t_flags, TASK_ONQUEUE))
TAILQ_REMOVE(&tq->tq_worklist, &t, t_entry);
}
if (--tq->tq_barriers == 0) {
/* we're the last one out */
tq->tq_bgen++;
wakeup(&tq->tq_bgen);
tq->tq_bthreads = 0;
} else {
unsigned int gen = tq->tq_bgen;
do {
msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
PWAIT, "tqbarwait", INFSLP);
} while (gen == tq->tq_bgen);
}
mtx_leave(&tq->tq_mtx);
}
void
taskq_barrier(struct taskq *tq)
{
WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
taskq_do_barrier(tq);
}
void
taskq_del_barrier(struct taskq *tq, struct task *t)
{
WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
if (task_del(tq, t))
return;
taskq_do_barrier(tq);
}
void
task_set(struct task *t, void (*fn)(void *), void *arg)
{
t->t_func = fn;
t->t_arg = arg;
t->t_flags = 0;
}
int
task_add(struct taskq *tq, struct task *w)
{
int rv = 0;
if (ISSET(w->t_flags, TASK_ONQUEUE))
return (0);
mtx_enter(&tq->tq_mtx);
if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
rv = 1;
SET(w->t_flags, TASK_ONQUEUE);
TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
#if NKCOV > 0
if (!kcov_cold)
w->t_process = curproc->p_p;
#endif
}
mtx_leave(&tq->tq_mtx);
if (rv)
wakeup_one(tq);
return (rv);
}
int
task_del(struct taskq *tq, struct task *w)
{
int rv = 0;
if (!ISSET(w->t_flags, TASK_ONQUEUE))
return (0);
mtx_enter(&tq->tq_mtx);
if (ISSET(w->t_flags, TASK_ONQUEUE)) {
rv = 1;
CLR(w->t_flags, TASK_ONQUEUE);
TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
}
mtx_leave(&tq->tq_mtx);
return (rv);
}
int
taskq_next_work(struct taskq *tq, struct task *work)
{
struct task *next;
mtx_enter(&tq->tq_mtx);
while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
if (tq->tq_state != TQ_S_RUNNING) {
mtx_leave(&tq->tq_mtx);
return (0);
}
msleep_nsec(tq, &tq->tq_mtx, PWAIT, "bored", INFSLP);
}
TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
CLR(next->t_flags, TASK_ONQUEUE);
*work = *next; /* copy to caller to avoid races */
next = TAILQ_FIRST(&tq->tq_worklist);
mtx_leave(&tq->tq_mtx);
if (next != NULL && tq->tq_nthreads > 1)
wakeup_one(tq);
return (1);
}
void
taskq_thread(void *xtq)
{
struct taskq_thread self = { .tt_thread = curproc };
struct taskq *tq = xtq;
struct task work;
int last;
if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
KERNEL_UNLOCK();
mtx_enter(&tq->tq_mtx);
SLIST_INSERT_HEAD(&tq->tq_threads, &self, tt_entry);
mtx_leave(&tq->tq_mtx);
WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
while (taskq_next_work(tq, &work)) {
WITNESS_LOCK(&tq->tq_lock_object, 0);
#if NKCOV > 0
kcov_remote_enter(KCOV_REMOTE_COMMON, work.t_process);
#endif
(*work.t_func)(work.t_arg);
#if NKCOV > 0
kcov_remote_leave(KCOV_REMOTE_COMMON, work.t_process);
#endif
WITNESS_UNLOCK(&tq->tq_lock_object, 0);
sched_pause(yield);
}
mtx_enter(&tq->tq_mtx);
SLIST_REMOVE(&tq->tq_threads, &self, taskq_thread, tt_entry);
last = (--tq->tq_running == 0);
mtx_leave(&tq->tq_mtx);
if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
KERNEL_LOCK();
if (last)
wakeup_one(&tq->tq_running);
kthread_exit(0);
}