1190 lines
28 KiB
C
1190 lines
28 KiB
C
/* $OpenBSD: scheduler_ramqueue.c,v 1.47 2021/06/14 17:58:16 eric Exp $ */
|
|
|
|
/*
|
|
* Copyright (c) 2012 Gilles Chehade <gilles@poolp.org>
|
|
* Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
|
|
*
|
|
* Permission to use, copy, modify, and distribute this software for any
|
|
* purpose with or without fee is hereby granted, provided that the above
|
|
* copyright notice and this permission notice appear in all copies.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
|
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
|
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
|
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
|
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
|
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
*/
|
|
|
|
#include <inttypes.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
|
|
#include "smtpd.h"
|
|
#include "log.h"
|
|
|
|
TAILQ_HEAD(evplist, rq_envelope);
|
|
|
|
struct rq_message {
|
|
uint32_t msgid;
|
|
struct tree envelopes;
|
|
};
|
|
|
|
struct rq_envelope {
|
|
TAILQ_ENTRY(rq_envelope) entry;
|
|
SPLAY_ENTRY(rq_envelope) t_entry;
|
|
|
|
uint64_t evpid;
|
|
uint64_t holdq;
|
|
enum delivery_type type;
|
|
|
|
#define RQ_EVPSTATE_PENDING 0
|
|
#define RQ_EVPSTATE_SCHEDULED 1
|
|
#define RQ_EVPSTATE_INFLIGHT 2
|
|
#define RQ_EVPSTATE_HELD 3
|
|
uint8_t state;
|
|
|
|
#define RQ_ENVELOPE_EXPIRED 0x01
|
|
#define RQ_ENVELOPE_REMOVED 0x02
|
|
#define RQ_ENVELOPE_SUSPEND 0x04
|
|
#define RQ_ENVELOPE_UPDATE 0x08
|
|
#define RQ_ENVELOPE_OVERFLOW 0x10
|
|
uint8_t flags;
|
|
|
|
time_t ctime;
|
|
time_t sched;
|
|
time_t expire;
|
|
|
|
struct rq_message *message;
|
|
|
|
time_t t_inflight;
|
|
time_t t_scheduled;
|
|
};
|
|
|
|
struct rq_holdq {
|
|
struct evplist q;
|
|
size_t count;
|
|
};
|
|
|
|
struct rq_queue {
|
|
size_t evpcount;
|
|
struct tree messages;
|
|
SPLAY_HEAD(prioqtree, rq_envelope) q_priotree;
|
|
|
|
struct evplist q_pending;
|
|
struct evplist q_inflight;
|
|
|
|
struct evplist q_mta;
|
|
struct evplist q_mda;
|
|
struct evplist q_bounce;
|
|
struct evplist q_update;
|
|
struct evplist q_expired;
|
|
struct evplist q_removed;
|
|
};
|
|
|
|
static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *);
|
|
|
|
SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);
|
|
static int scheduler_ram_init(const char *);
|
|
static int scheduler_ram_insert(struct scheduler_info *);
|
|
static size_t scheduler_ram_commit(uint32_t);
|
|
static size_t scheduler_ram_rollback(uint32_t);
|
|
static int scheduler_ram_update(struct scheduler_info *);
|
|
static int scheduler_ram_delete(uint64_t);
|
|
static int scheduler_ram_hold(uint64_t, uint64_t);
|
|
static int scheduler_ram_release(int, uint64_t, int);
|
|
static int scheduler_ram_batch(int, int *, size_t *, uint64_t *, int *);
|
|
static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t);
|
|
static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t);
|
|
static int scheduler_ram_schedule(uint64_t);
|
|
static int scheduler_ram_remove(uint64_t);
|
|
static int scheduler_ram_suspend(uint64_t);
|
|
static int scheduler_ram_resume(uint64_t);
|
|
static int scheduler_ram_query(uint64_t);
|
|
|
|
static void sorted_insert(struct rq_queue *, struct rq_envelope *);
|
|
|
|
static void rq_queue_init(struct rq_queue *);
|
|
static void rq_queue_merge(struct rq_queue *, struct rq_queue *);
|
|
static void rq_queue_dump(struct rq_queue *, const char *);
|
|
static void rq_queue_schedule(struct rq_queue *rq);
|
|
static struct evplist *rq_envelope_list(struct rq_queue *, struct rq_envelope *);
|
|
static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *);
|
|
static int rq_envelope_remove(struct rq_queue *, struct rq_envelope *);
|
|
static int rq_envelope_suspend(struct rq_queue *, struct rq_envelope *);
|
|
static int rq_envelope_resume(struct rq_queue *, struct rq_envelope *);
|
|
static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *);
|
|
static const char *rq_envelope_to_text(struct rq_envelope *);
|
|
|
|
struct scheduler_backend scheduler_backend_ramqueue = {
|
|
scheduler_ram_init,
|
|
|
|
scheduler_ram_insert,
|
|
scheduler_ram_commit,
|
|
scheduler_ram_rollback,
|
|
|
|
scheduler_ram_update,
|
|
scheduler_ram_delete,
|
|
scheduler_ram_hold,
|
|
scheduler_ram_release,
|
|
|
|
scheduler_ram_batch,
|
|
|
|
scheduler_ram_messages,
|
|
scheduler_ram_envelopes,
|
|
scheduler_ram_schedule,
|
|
scheduler_ram_remove,
|
|
scheduler_ram_suspend,
|
|
scheduler_ram_resume,
|
|
scheduler_ram_query,
|
|
};
|
|
|
|
static struct rq_queue ramqueue;
|
|
static struct tree updates;
|
|
static struct tree holdqs[3]; /* delivery type */
|
|
|
|
static time_t currtime;
|
|
|
|
#define BACKOFF_TRANSFER 400
|
|
#define BACKOFF_DELIVERY 10
|
|
#define BACKOFF_OVERFLOW 3
|
|
|
|
static time_t
|
|
scheduler_backoff(time_t t0, time_t base, uint32_t step)
|
|
{
|
|
return (t0 + base * step * step);
|
|
}
|
|
|
|
static time_t
|
|
scheduler_next(time_t t0, time_t base, uint32_t step)
|
|
{
|
|
time_t t;
|
|
|
|
/* XXX be more efficient */
|
|
while ((t = scheduler_backoff(t0, base, step)) <= currtime)
|
|
step++;
|
|
|
|
return (t);
|
|
}
|
|
|
|
static int
|
|
scheduler_ram_init(const char *arg)
|
|
{
|
|
rq_queue_init(&ramqueue);
|
|
tree_init(&updates);
|
|
tree_init(&holdqs[D_MDA]);
|
|
tree_init(&holdqs[D_MTA]);
|
|
tree_init(&holdqs[D_BOUNCE]);
|
|
|
|
return (1);
|
|
}
|
|
|
|
static int
|
|
scheduler_ram_insert(struct scheduler_info *si)
|
|
{
|
|
struct rq_queue *update;
|
|
struct rq_message *message;
|
|
struct rq_envelope *envelope;
|
|
uint32_t msgid;
|
|
|
|
currtime = time(NULL);
|
|
|
|
msgid = evpid_to_msgid(si->evpid);
|
|
|
|
/* find/prepare a ramqueue update */
|
|
if ((update = tree_get(&updates, msgid)) == NULL) {
|
|
update = xcalloc(1, sizeof *update);
|
|
stat_increment("scheduler.ramqueue.update", 1);
|
|
rq_queue_init(update);
|
|
tree_xset(&updates, msgid, update);
|
|
}
|
|
|
|
/* find/prepare the msgtree message in ramqueue update */
|
|
if ((message = tree_get(&update->messages, msgid)) == NULL) {
|
|
message = xcalloc(1, sizeof *message);
|
|
message->msgid = msgid;
|
|
tree_init(&message->envelopes);
|
|
tree_xset(&update->messages, msgid, message);
|
|
stat_increment("scheduler.ramqueue.message", 1);
|
|
}
|
|
|
|
/* create envelope in ramqueue message */
|
|
envelope = xcalloc(1, sizeof *envelope);
|
|
envelope->evpid = si->evpid;
|
|
envelope->type = si->type;
|
|
envelope->message = message;
|
|
envelope->ctime = si->creation;
|
|
envelope->expire = si->creation + si->ttl;
|
|
envelope->sched = scheduler_backoff(si->creation,
|
|
(si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);
|
|
tree_xset(&message->envelopes, envelope->evpid, envelope);
|
|
|
|
update->evpcount++;
|
|
stat_increment("scheduler.ramqueue.envelope", 1);
|
|
|
|
envelope->state = RQ_EVPSTATE_PENDING;
|
|
TAILQ_INSERT_TAIL(&update->q_pending, envelope, entry);
|
|
|
|
si->nexttry = envelope->sched;
|
|
|
|
return (1);
|
|
}
|
|
|
|
static size_t
|
|
scheduler_ram_commit(uint32_t msgid)
|
|
{
|
|
struct rq_queue *update;
|
|
size_t r;
|
|
|
|
currtime = time(NULL);
|
|
|
|
update = tree_xpop(&updates, msgid);
|
|
r = update->evpcount;
|
|
|
|
if (tracing & TRACE_SCHEDULER)
|
|
rq_queue_dump(update, "update to commit");
|
|
|
|
rq_queue_merge(&ramqueue, update);
|
|
|
|
if (tracing & TRACE_SCHEDULER)
|
|
rq_queue_dump(&ramqueue, "resulting queue");
|
|
|
|
rq_queue_schedule(&ramqueue);
|
|
|
|
free(update);
|
|
stat_decrement("scheduler.ramqueue.update", 1);
|
|
|
|
return (r);
|
|
}
|
|
|
|
static size_t
|
|
scheduler_ram_rollback(uint32_t msgid)
|
|
{
|
|
struct rq_queue *update;
|
|
struct rq_envelope *evp;
|
|
size_t r;
|
|
|
|
currtime = time(NULL);
|
|
|
|
if ((update = tree_pop(&updates, msgid)) == NULL)
|
|
return (0);
|
|
r = update->evpcount;
|
|
|
|
while ((evp = TAILQ_FIRST(&update->q_pending))) {
|
|
TAILQ_REMOVE(&update->q_pending, evp, entry);
|
|
rq_envelope_delete(update, evp);
|
|
}
|
|
|
|
free(update);
|
|
stat_decrement("scheduler.ramqueue.update", 1);
|
|
|
|
return (r);
|
|
}
|
|
|
|
static int
|
|
scheduler_ram_update(struct scheduler_info *si)
|
|
{
|
|
struct rq_message *msg;
|
|
struct rq_envelope *evp;
|
|
uint32_t msgid;
|
|
|
|
currtime = time(NULL);
|
|
|
|
msgid = evpid_to_msgid(si->evpid);
|
|
msg = tree_xget(&ramqueue.messages, msgid);
|
|
evp = tree_xget(&msg->envelopes, si->evpid);
|
|
|
|
/* it *must* be in-flight */
|
|
if (evp->state != RQ_EVPSTATE_INFLIGHT)
|
|
fatalx("evp:%016" PRIx64 " not in-flight", si->evpid);
|
|
|
|
TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
|
|
|
|
/*
|
|
* If the envelope was removed while inflight, schedule it for
|
|
* removal immediately.
|
|
*/
|
|
if (evp->flags & RQ_ENVELOPE_REMOVED) {
|
|
TAILQ_INSERT_TAIL(&ramqueue.q_removed, evp, entry);
|
|
evp->state = RQ_EVPSTATE_SCHEDULED;
|
|
evp->t_scheduled = currtime;
|
|
return (1);
|
|
}
|
|
|
|
evp->sched = scheduler_next(evp->ctime,
|
|
(si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);
|
|
|
|
evp->state = RQ_EVPSTATE_PENDING;
|
|
if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
|
|
sorted_insert(&ramqueue, evp);
|
|
|
|
si->nexttry = evp->sched;
|
|
|
|
return (1);
|
|
}
|
|
|
|
static int
|
|
scheduler_ram_delete(uint64_t evpid)
|
|
{
|
|
struct rq_message *msg;
|
|
struct rq_envelope *evp;
|
|
uint32_t msgid;
|
|
|
|
currtime = time(NULL);
|
|
|
|
msgid = evpid_to_msgid(evpid);
|
|
msg = tree_xget(&ramqueue.messages, msgid);
|
|
evp = tree_xget(&msg->envelopes, evpid);
|
|
|
|
/* it *must* be in-flight */
|
|
if (evp->state != RQ_EVPSTATE_INFLIGHT)
|
|
fatalx("evp:%016" PRIx64 " not in-flight", evpid);
|
|
|
|
TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
|
|
|
|
rq_envelope_delete(&ramqueue, evp);
|
|
|
|
return (1);
|
|
}
|
|
|
|
#define HOLDQ_MAXSIZE 1000
|
|
|
|
static int
|
|
scheduler_ram_hold(uint64_t evpid, uint64_t holdq)
|
|
{
|
|
struct rq_holdq *hq;
|
|
struct rq_message *msg;
|
|
struct rq_envelope *evp;
|
|
uint32_t msgid;
|
|
|
|
currtime = time(NULL);
|
|
|
|
msgid = evpid_to_msgid(evpid);
|
|
msg = tree_xget(&ramqueue.messages, msgid);
|
|
evp = tree_xget(&msg->envelopes, evpid);
|
|
|
|
/* it *must* be in-flight */
|
|
if (evp->state != RQ_EVPSTATE_INFLIGHT)
|
|
fatalx("evp:%016" PRIx64 " not in-flight", evpid);
|
|
|
|
TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
|
|
|
|
/* If the envelope is suspended, just mark it as pending */
|
|
if (evp->flags & RQ_ENVELOPE_SUSPEND) {
|
|
evp->state = RQ_EVPSTATE_PENDING;
|
|
return (0);
|
|
}
|
|
|
|
hq = tree_get(&holdqs[evp->type], holdq);
|
|
if (hq == NULL) {
|
|
hq = xcalloc(1, sizeof(*hq));
|
|
TAILQ_INIT(&hq->q);
|
|
tree_xset(&holdqs[evp->type], holdq, hq);
|
|
stat_increment("scheduler.ramqueue.holdq", 1);
|
|
}
|
|
|
|
/* If the holdq is full, just "tempfail" the envelope */
|
|
if (hq->count >= HOLDQ_MAXSIZE) {
|
|
evp->state = RQ_EVPSTATE_PENDING;
|
|
evp->flags |= RQ_ENVELOPE_UPDATE;
|
|
evp->flags |= RQ_ENVELOPE_OVERFLOW;
|
|
sorted_insert(&ramqueue, evp);
|
|
stat_increment("scheduler.ramqueue.hold-overflow", 1);
|
|
return (0);
|
|
}
|
|
|
|
evp->state = RQ_EVPSTATE_HELD;
|
|
evp->holdq = holdq;
|
|
/* This is an optimization: upon release, the envelopes will be
|
|
* inserted in the pending queue from the first element to the last.
|
|
* Since elements already in the queue were received first, they
|
|
* were scheduled first, so they will be reinserted before the
|
|
* current element.
|
|
*/
|
|
TAILQ_INSERT_HEAD(&hq->q, evp, entry);
|
|
hq->count += 1;
|
|
stat_increment("scheduler.ramqueue.hold", 1);
|
|
|
|
return (1);
|
|
}
|
|
|
|
static int
|
|
scheduler_ram_release(int type, uint64_t holdq, int n)
|
|
{
|
|
struct rq_holdq *hq;
|
|
struct rq_envelope *evp;
|
|
int i, update;
|
|
|
|
currtime = time(NULL);
|
|
|
|
hq = tree_get(&holdqs[type], holdq);
|
|
if (hq == NULL)
|
|
return (0);
|
|
|
|
if (n == -1) {
|
|
n = 0;
|
|
update = 1;
|
|
}
|
|
else
|
|
update = 0;
|
|
|
|
for (i = 0; n == 0 || i < n; i++) {
|
|
evp = TAILQ_FIRST(&hq->q);
|
|
if (evp == NULL)
|
|
break;
|
|
|
|
TAILQ_REMOVE(&hq->q, evp, entry);
|
|
hq->count -= 1;
|
|
evp->holdq = 0;
|
|
|
|
/* When released, all envelopes are put in the pending queue
|
|
* and will be rescheduled immediately. As an optimization,
|
|
* we could just schedule them directly.
|
|
*/
|
|
evp->state = RQ_EVPSTATE_PENDING;
|
|
if (update)
|
|
evp->flags |= RQ_ENVELOPE_UPDATE;
|
|
sorted_insert(&ramqueue, evp);
|
|
}
|
|
|
|
if (TAILQ_EMPTY(&hq->q)) {
|
|
tree_xpop(&holdqs[type], holdq);
|
|
free(hq);
|
|
stat_decrement("scheduler.ramqueue.holdq", 1);
|
|
}
|
|
stat_decrement("scheduler.ramqueue.hold", i);
|
|
|
|
return (i);
|
|
}
|
|
|
|
static int
|
|
scheduler_ram_batch(int mask, int *delay, size_t *count, uint64_t *evpids, int *types)
|
|
{
|
|
struct rq_envelope *evp;
|
|
size_t i, n;
|
|
time_t t;
|
|
|
|
currtime = time(NULL);
|
|
|
|
rq_queue_schedule(&ramqueue);
|
|
if (tracing & TRACE_SCHEDULER)
|
|
rq_queue_dump(&ramqueue, "scheduler_ram_batch()");
|
|
|
|
i = 0;
|
|
n = 0;
|
|
|
|
for (;;) {
|
|
|
|
if (mask & SCHED_REMOVE && (evp = TAILQ_FIRST(&ramqueue.q_removed))) {
|
|
TAILQ_REMOVE(&ramqueue.q_removed, evp, entry);
|
|
types[i] = SCHED_REMOVE;
|
|
evpids[i] = evp->evpid;
|
|
rq_envelope_delete(&ramqueue, evp);
|
|
|
|
if (++i == *count)
|
|
break;
|
|
}
|
|
|
|
if (mask & SCHED_EXPIRE && (evp = TAILQ_FIRST(&ramqueue.q_expired))) {
|
|
TAILQ_REMOVE(&ramqueue.q_expired, evp, entry);
|
|
types[i] = SCHED_EXPIRE;
|
|
evpids[i] = evp->evpid;
|
|
rq_envelope_delete(&ramqueue, evp);
|
|
|
|
if (++i == *count)
|
|
break;
|
|
}
|
|
|
|
if (mask & SCHED_UPDATE && (evp = TAILQ_FIRST(&ramqueue.q_update))) {
|
|
TAILQ_REMOVE(&ramqueue.q_update, evp, entry);
|
|
types[i] = SCHED_UPDATE;
|
|
evpids[i] = evp->evpid;
|
|
|
|
if (evp->flags & RQ_ENVELOPE_OVERFLOW)
|
|
t = BACKOFF_OVERFLOW;
|
|
else if (evp->type == D_MTA)
|
|
t = BACKOFF_TRANSFER;
|
|
else
|
|
t = BACKOFF_DELIVERY;
|
|
|
|
evp->sched = scheduler_next(evp->ctime, t, 0);
|
|
evp->flags &= ~(RQ_ENVELOPE_UPDATE|RQ_ENVELOPE_OVERFLOW);
|
|
evp->state = RQ_EVPSTATE_PENDING;
|
|
if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
|
|
sorted_insert(&ramqueue, evp);
|
|
|
|
if (++i == *count)
|
|
break;
|
|
}
|
|
|
|
if (mask & SCHED_BOUNCE && (evp = TAILQ_FIRST(&ramqueue.q_bounce))) {
|
|
TAILQ_REMOVE(&ramqueue.q_bounce, evp, entry);
|
|
types[i] = SCHED_BOUNCE;
|
|
evpids[i] = evp->evpid;
|
|
|
|
TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
|
|
evp->state = RQ_EVPSTATE_INFLIGHT;
|
|
evp->t_inflight = currtime;
|
|
|
|
if (++i == *count)
|
|
break;
|
|
}
|
|
|
|
if (mask & SCHED_MDA && (evp = TAILQ_FIRST(&ramqueue.q_mda))) {
|
|
TAILQ_REMOVE(&ramqueue.q_mda, evp, entry);
|
|
types[i] = SCHED_MDA;
|
|
evpids[i] = evp->evpid;
|
|
|
|
TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
|
|
evp->state = RQ_EVPSTATE_INFLIGHT;
|
|
evp->t_inflight = currtime;
|
|
|
|
if (++i == *count)
|
|
break;
|
|
}
|
|
|
|
if (mask & SCHED_MTA && (evp = TAILQ_FIRST(&ramqueue.q_mta))) {
|
|
TAILQ_REMOVE(&ramqueue.q_mta, evp, entry);
|
|
types[i] = SCHED_MTA;
|
|
evpids[i] = evp->evpid;
|
|
|
|
TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
|
|
evp->state = RQ_EVPSTATE_INFLIGHT;
|
|
evp->t_inflight = currtime;
|
|
|
|
if (++i == *count)
|
|
break;
|
|
}
|
|
|
|
/* nothing seen this round */
|
|
if (i == n)
|
|
break;
|
|
|
|
n = i;
|
|
}
|
|
|
|
if (i) {
|
|
*count = i;
|
|
return (1);
|
|
}
|
|
|
|
if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) {
|
|
if (evp->sched < evp->expire)
|
|
t = evp->sched;
|
|
else
|
|
t = evp->expire;
|
|
*delay = (t < currtime) ? 0 : (t - currtime);
|
|
}
|
|
else
|
|
*delay = -1;
|
|
|
|
return (0);
|
|
}
|
|
|
|
static size_t
|
|
scheduler_ram_messages(uint32_t from, uint32_t *dst, size_t size)
|
|
{
|
|
uint64_t id;
|
|
size_t n;
|
|
void *i;
|
|
|
|
for (n = 0, i = NULL; n < size; n++) {
|
|
if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0)
|
|
break;
|
|
dst[n] = id;
|
|
}
|
|
|
|
return (n);
|
|
}
|
|
|
|
static size_t
|
|
scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size)
|
|
{
|
|
struct rq_message *msg;
|
|
struct rq_envelope *evp;
|
|
void *i;
|
|
size_t n;
|
|
|
|
if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL)
|
|
return (0);
|
|
|
|
for (n = 0, i = NULL; n < size; ) {
|
|
|
|
if (tree_iterfrom(&msg->envelopes, &i, from, NULL,
|
|
(void**)&evp) == 0)
|
|
break;
|
|
|
|
if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
|
|
continue;
|
|
|
|
dst[n].evpid = evp->evpid;
|
|
dst[n].flags = 0;
|
|
dst[n].retry = 0;
|
|
dst[n].time = 0;
|
|
|
|
if (evp->state == RQ_EVPSTATE_PENDING) {
|
|
dst[n].time = evp->sched;
|
|
dst[n].flags = EF_PENDING;
|
|
}
|
|
else if (evp->state == RQ_EVPSTATE_SCHEDULED) {
|
|
dst[n].time = evp->t_scheduled;
|
|
dst[n].flags = EF_PENDING;
|
|
}
|
|
else if (evp->state == RQ_EVPSTATE_INFLIGHT) {
|
|
dst[n].time = evp->t_inflight;
|
|
dst[n].flags = EF_INFLIGHT;
|
|
}
|
|
else if (evp->state == RQ_EVPSTATE_HELD) {
|
|
/* same as scheduled */
|
|
dst[n].time = evp->t_scheduled;
|
|
dst[n].flags = EF_PENDING;
|
|
dst[n].flags |= EF_HOLD;
|
|
}
|
|
if (evp->flags & RQ_ENVELOPE_SUSPEND)
|
|
dst[n].flags |= EF_SUSPEND;
|
|
|
|
n++;
|
|
}
|
|
|
|
return (n);
|
|
}
|
|
|
|
static int
|
|
scheduler_ram_schedule(uint64_t evpid)
|
|
{
|
|
struct rq_message *msg;
|
|
struct rq_envelope *evp;
|
|
uint32_t msgid;
|
|
void *i;
|
|
int r;
|
|
|
|
currtime = time(NULL);
|
|
|
|
if (evpid > 0xffffffff) {
|
|
msgid = evpid_to_msgid(evpid);
|
|
if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
|
|
return (0);
|
|
if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
|
|
return (0);
|
|
if (evp->state == RQ_EVPSTATE_INFLIGHT)
|
|
return (0);
|
|
rq_envelope_schedule(&ramqueue, evp);
|
|
return (1);
|
|
}
|
|
else {
|
|
msgid = evpid;
|
|
if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
|
|
return (0);
|
|
i = NULL;
|
|
r = 0;
|
|
while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) {
|
|
if (evp->state == RQ_EVPSTATE_INFLIGHT)
|
|
continue;
|
|
rq_envelope_schedule(&ramqueue, evp);
|
|
r++;
|
|
}
|
|
return (r);
|
|
}
|
|
}
|
|
|
|
static int
|
|
scheduler_ram_remove(uint64_t evpid)
|
|
{
|
|
struct rq_message *msg;
|
|
struct rq_envelope *evp;
|
|
uint32_t msgid;
|
|
void *i;
|
|
int r;
|
|
|
|
currtime = time(NULL);
|
|
|
|
if (evpid > 0xffffffff) {
|
|
msgid = evpid_to_msgid(evpid);
|
|
if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
|
|
return (0);
|
|
if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
|
|
return (0);
|
|
if (rq_envelope_remove(&ramqueue, evp))
|
|
return (1);
|
|
return (0);
|
|
}
|
|
else {
|
|
msgid = evpid;
|
|
if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
|
|
return (0);
|
|
i = NULL;
|
|
r = 0;
|
|
while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
|
|
if (rq_envelope_remove(&ramqueue, evp))
|
|
r++;
|
|
return (r);
|
|
}
|
|
}
|
|
|
|
static int
|
|
scheduler_ram_suspend(uint64_t evpid)
|
|
{
|
|
struct rq_message *msg;
|
|
struct rq_envelope *evp;
|
|
uint32_t msgid;
|
|
void *i;
|
|
int r;
|
|
|
|
currtime = time(NULL);
|
|
|
|
if (evpid > 0xffffffff) {
|
|
msgid = evpid_to_msgid(evpid);
|
|
if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
|
|
return (0);
|
|
if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
|
|
return (0);
|
|
if (rq_envelope_suspend(&ramqueue, evp))
|
|
return (1);
|
|
return (0);
|
|
}
|
|
else {
|
|
msgid = evpid;
|
|
if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
|
|
return (0);
|
|
i = NULL;
|
|
r = 0;
|
|
while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
|
|
if (rq_envelope_suspend(&ramqueue, evp))
|
|
r++;
|
|
return (r);
|
|
}
|
|
}
|
|
|
|
static int
|
|
scheduler_ram_resume(uint64_t evpid)
|
|
{
|
|
struct rq_message *msg;
|
|
struct rq_envelope *evp;
|
|
uint32_t msgid;
|
|
void *i;
|
|
int r;
|
|
|
|
currtime = time(NULL);
|
|
|
|
if (evpid > 0xffffffff) {
|
|
msgid = evpid_to_msgid(evpid);
|
|
if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
|
|
return (0);
|
|
if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
|
|
return (0);
|
|
if (rq_envelope_resume(&ramqueue, evp))
|
|
return (1);
|
|
return (0);
|
|
}
|
|
else {
|
|
msgid = evpid;
|
|
if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
|
|
return (0);
|
|
i = NULL;
|
|
r = 0;
|
|
while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
|
|
if (rq_envelope_resume(&ramqueue, evp))
|
|
r++;
|
|
return (r);
|
|
}
|
|
}
|
|
|
|
static int
|
|
scheduler_ram_query(uint64_t evpid)
|
|
{
|
|
uint32_t msgid;
|
|
|
|
if (evpid > 0xffffffff)
|
|
msgid = evpid_to_msgid(evpid);
|
|
else
|
|
msgid = evpid;
|
|
|
|
if (tree_get(&ramqueue.messages, msgid) == NULL)
|
|
return (0);
|
|
|
|
return (1);
|
|
}
|
|
|
|
static void
|
|
sorted_insert(struct rq_queue *rq, struct rq_envelope *evp)
|
|
{
|
|
struct rq_envelope *evp2;
|
|
|
|
SPLAY_INSERT(prioqtree, &rq->q_priotree, evp);
|
|
evp2 = SPLAY_NEXT(prioqtree, &rq->q_priotree, evp);
|
|
if (evp2)
|
|
TAILQ_INSERT_BEFORE(evp2, evp, entry);
|
|
else
|
|
TAILQ_INSERT_TAIL(&rq->q_pending, evp, entry);
|
|
}
|
|
|
|
static void
|
|
rq_queue_init(struct rq_queue *rq)
|
|
{
|
|
memset(rq, 0, sizeof *rq);
|
|
tree_init(&rq->messages);
|
|
TAILQ_INIT(&rq->q_pending);
|
|
TAILQ_INIT(&rq->q_inflight);
|
|
TAILQ_INIT(&rq->q_mta);
|
|
TAILQ_INIT(&rq->q_mda);
|
|
TAILQ_INIT(&rq->q_bounce);
|
|
TAILQ_INIT(&rq->q_update);
|
|
TAILQ_INIT(&rq->q_expired);
|
|
TAILQ_INIT(&rq->q_removed);
|
|
SPLAY_INIT(&rq->q_priotree);
|
|
}
|
|
|
|
static void
|
|
rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
|
|
{
|
|
struct rq_message *message, *tomessage;
|
|
struct rq_envelope *envelope;
|
|
uint64_t id;
|
|
void *i;
|
|
|
|
while (tree_poproot(&update->messages, &id, (void*)&message)) {
|
|
if ((tomessage = tree_get(&rq->messages, id)) == NULL) {
|
|
/* message does not exist. re-use structure */
|
|
tree_xset(&rq->messages, id, message);
|
|
continue;
|
|
}
|
|
/* need to re-link all envelopes before merging them */
|
|
i = NULL;
|
|
while ((tree_iter(&message->envelopes, &i, &id,
|
|
(void*)&envelope)))
|
|
envelope->message = tomessage;
|
|
tree_merge(&tomessage->envelopes, &message->envelopes);
|
|
free(message);
|
|
stat_decrement("scheduler.ramqueue.message", 1);
|
|
}
|
|
|
|
/* Sorted insert in the pending queue */
|
|
while ((envelope = TAILQ_FIRST(&update->q_pending))) {
|
|
TAILQ_REMOVE(&update->q_pending, envelope, entry);
|
|
sorted_insert(rq, envelope);
|
|
}
|
|
|
|
rq->evpcount += update->evpcount;
|
|
}
|
|
|
|
#define SCHEDULEMAX 1024
|
|
|
|
static void
|
|
rq_queue_schedule(struct rq_queue *rq)
|
|
{
|
|
struct rq_envelope *evp;
|
|
size_t n;
|
|
|
|
n = 0;
|
|
while ((evp = TAILQ_FIRST(&rq->q_pending))) {
|
|
if (evp->sched > currtime && evp->expire > currtime)
|
|
break;
|
|
|
|
if (n == SCHEDULEMAX)
|
|
break;
|
|
|
|
if (evp->state != RQ_EVPSTATE_PENDING)
|
|
fatalx("evp:%016" PRIx64 " flags=0x%x", evp->evpid,
|
|
evp->flags);
|
|
|
|
if (evp->expire <= currtime) {
|
|
TAILQ_REMOVE(&rq->q_pending, evp, entry);
|
|
SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
|
|
TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry);
|
|
evp->state = RQ_EVPSTATE_SCHEDULED;
|
|
evp->flags |= RQ_ENVELOPE_EXPIRED;
|
|
evp->t_scheduled = currtime;
|
|
continue;
|
|
}
|
|
rq_envelope_schedule(rq, evp);
|
|
n += 1;
|
|
}
|
|
}
|
|
|
|
static struct evplist *
|
|
rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp)
|
|
{
|
|
switch (evp->state) {
|
|
case RQ_EVPSTATE_PENDING:
|
|
return &rq->q_pending;
|
|
|
|
case RQ_EVPSTATE_SCHEDULED:
|
|
if (evp->flags & RQ_ENVELOPE_EXPIRED)
|
|
return &rq->q_expired;
|
|
if (evp->flags & RQ_ENVELOPE_REMOVED)
|
|
return &rq->q_removed;
|
|
if (evp->flags & RQ_ENVELOPE_UPDATE)
|
|
return &rq->q_update;
|
|
if (evp->type == D_MTA)
|
|
return &rq->q_mta;
|
|
if (evp->type == D_MDA)
|
|
return &rq->q_mda;
|
|
if (evp->type == D_BOUNCE)
|
|
return &rq->q_bounce;
|
|
fatalx("%016" PRIx64 " bad evp type %d", evp->evpid, evp->type);
|
|
|
|
case RQ_EVPSTATE_INFLIGHT:
|
|
return &rq->q_inflight;
|
|
|
|
case RQ_EVPSTATE_HELD:
|
|
return (NULL);
|
|
}
|
|
|
|
fatalx("%016" PRIx64 " bad state %d", evp->evpid, evp->state);
|
|
return (NULL);
|
|
}
|
|
|
|
static void
|
|
rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
|
|
{
|
|
struct rq_holdq *hq;
|
|
struct evplist *q = NULL;
|
|
|
|
switch (evp->type) {
|
|
case D_MTA:
|
|
q = &rq->q_mta;
|
|
break;
|
|
case D_MDA:
|
|
q = &rq->q_mda;
|
|
break;
|
|
case D_BOUNCE:
|
|
q = &rq->q_bounce;
|
|
break;
|
|
}
|
|
|
|
if (evp->flags & RQ_ENVELOPE_UPDATE)
|
|
q = &rq->q_update;
|
|
|
|
if (evp->state == RQ_EVPSTATE_HELD) {
|
|
hq = tree_xget(&holdqs[evp->type], evp->holdq);
|
|
TAILQ_REMOVE(&hq->q, evp, entry);
|
|
hq->count -= 1;
|
|
if (TAILQ_EMPTY(&hq->q)) {
|
|
tree_xpop(&holdqs[evp->type], evp->holdq);
|
|
free(hq);
|
|
}
|
|
evp->holdq = 0;
|
|
stat_decrement("scheduler.ramqueue.hold", 1);
|
|
}
|
|
else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
|
|
TAILQ_REMOVE(&rq->q_pending, evp, entry);
|
|
SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
|
|
}
|
|
|
|
TAILQ_INSERT_TAIL(q, evp, entry);
|
|
evp->state = RQ_EVPSTATE_SCHEDULED;
|
|
evp->t_scheduled = currtime;
|
|
}
|
|
|
|
static int
|
|
rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
|
|
{
|
|
struct rq_holdq *hq;
|
|
struct evplist *evl;
|
|
|
|
if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
|
|
return (0);
|
|
/*
|
|
* If envelope is inflight, mark it envelope for removal.
|
|
*/
|
|
if (evp->state == RQ_EVPSTATE_INFLIGHT) {
|
|
evp->flags |= RQ_ENVELOPE_REMOVED;
|
|
return (1);
|
|
}
|
|
|
|
if (evp->state == RQ_EVPSTATE_HELD) {
|
|
hq = tree_xget(&holdqs[evp->type], evp->holdq);
|
|
TAILQ_REMOVE(&hq->q, evp, entry);
|
|
hq->count -= 1;
|
|
if (TAILQ_EMPTY(&hq->q)) {
|
|
tree_xpop(&holdqs[evp->type], evp->holdq);
|
|
free(hq);
|
|
}
|
|
evp->holdq = 0;
|
|
stat_decrement("scheduler.ramqueue.hold", 1);
|
|
}
|
|
else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
|
|
evl = rq_envelope_list(rq, evp);
|
|
TAILQ_REMOVE(evl, evp, entry);
|
|
if (evl == &rq->q_pending)
|
|
SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
|
|
}
|
|
|
|
TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry);
|
|
evp->state = RQ_EVPSTATE_SCHEDULED;
|
|
evp->flags |= RQ_ENVELOPE_REMOVED;
|
|
evp->t_scheduled = currtime;
|
|
|
|
return (1);
|
|
}
|
|
|
|
static int
|
|
rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp)
|
|
{
|
|
struct rq_holdq *hq;
|
|
struct evplist *evl;
|
|
|
|
if (evp->flags & RQ_ENVELOPE_SUSPEND)
|
|
return (0);
|
|
|
|
if (evp->state == RQ_EVPSTATE_HELD) {
|
|
hq = tree_xget(&holdqs[evp->type], evp->holdq);
|
|
TAILQ_REMOVE(&hq->q, evp, entry);
|
|
hq->count -= 1;
|
|
if (TAILQ_EMPTY(&hq->q)) {
|
|
tree_xpop(&holdqs[evp->type], evp->holdq);
|
|
free(hq);
|
|
}
|
|
evp->holdq = 0;
|
|
evp->state = RQ_EVPSTATE_PENDING;
|
|
stat_decrement("scheduler.ramqueue.hold", 1);
|
|
}
|
|
else if (evp->state != RQ_EVPSTATE_INFLIGHT) {
|
|
evl = rq_envelope_list(rq, evp);
|
|
TAILQ_REMOVE(evl, evp, entry);
|
|
if (evl == &rq->q_pending)
|
|
SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
|
|
}
|
|
|
|
evp->flags |= RQ_ENVELOPE_SUSPEND;
|
|
|
|
return (1);
|
|
}
|
|
|
|
static int
|
|
rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp)
|
|
{
|
|
struct evplist *evl;
|
|
|
|
if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
|
|
return (0);
|
|
|
|
if (evp->state != RQ_EVPSTATE_INFLIGHT) {
|
|
evl = rq_envelope_list(rq, evp);
|
|
if (evl == &rq->q_pending)
|
|
sorted_insert(rq, evp);
|
|
else
|
|
TAILQ_INSERT_TAIL(evl, evp, entry);
|
|
}
|
|
|
|
evp->flags &= ~RQ_ENVELOPE_SUSPEND;
|
|
|
|
return (1);
|
|
}
|
|
|
|
static void
|
|
rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp)
|
|
{
|
|
tree_xpop(&evp->message->envelopes, evp->evpid);
|
|
if (tree_empty(&evp->message->envelopes)) {
|
|
tree_xpop(&rq->messages, evp->message->msgid);
|
|
free(evp->message);
|
|
stat_decrement("scheduler.ramqueue.message", 1);
|
|
}
|
|
|
|
free(evp);
|
|
rq->evpcount--;
|
|
stat_decrement("scheduler.ramqueue.envelope", 1);
|
|
}
|
|
|
|
static const char *
|
|
rq_envelope_to_text(struct rq_envelope *e)
|
|
{
|
|
static char buf[256];
|
|
char t[64];
|
|
|
|
(void)snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid);
|
|
|
|
if (e->type == D_BOUNCE)
|
|
(void)strlcat(buf, "bounce", sizeof buf);
|
|
else if (e->type == D_MDA)
|
|
(void)strlcat(buf, "mda", sizeof buf);
|
|
else if (e->type == D_MTA)
|
|
(void)strlcat(buf, "mta", sizeof buf);
|
|
|
|
(void)snprintf(t, sizeof t, ",expire=%s",
|
|
duration_to_text(e->expire - currtime));
|
|
(void)strlcat(buf, t, sizeof buf);
|
|
|
|
|
|
switch (e->state) {
|
|
case RQ_EVPSTATE_PENDING:
|
|
(void)snprintf(t, sizeof t, ",pending=%s",
|
|
duration_to_text(e->sched - currtime));
|
|
(void)strlcat(buf, t, sizeof buf);
|
|
break;
|
|
|
|
case RQ_EVPSTATE_SCHEDULED:
|
|
(void)snprintf(t, sizeof t, ",scheduled=%s",
|
|
duration_to_text(currtime - e->t_scheduled));
|
|
(void)strlcat(buf, t, sizeof buf);
|
|
break;
|
|
|
|
case RQ_EVPSTATE_INFLIGHT:
|
|
(void)snprintf(t, sizeof t, ",inflight=%s",
|
|
duration_to_text(currtime - e->t_inflight));
|
|
(void)strlcat(buf, t, sizeof buf);
|
|
break;
|
|
|
|
case RQ_EVPSTATE_HELD:
|
|
(void)snprintf(t, sizeof t, ",held=%s",
|
|
duration_to_text(currtime - e->t_inflight));
|
|
(void)strlcat(buf, t, sizeof buf);
|
|
break;
|
|
default:
|
|
fatalx("%016" PRIx64 " bad state %d", e->evpid, e->state);
|
|
}
|
|
|
|
if (e->flags & RQ_ENVELOPE_REMOVED)
|
|
(void)strlcat(buf, ",removed", sizeof buf);
|
|
if (e->flags & RQ_ENVELOPE_EXPIRED)
|
|
(void)strlcat(buf, ",expired", sizeof buf);
|
|
if (e->flags & RQ_ENVELOPE_SUSPEND)
|
|
(void)strlcat(buf, ",suspended", sizeof buf);
|
|
|
|
(void)strlcat(buf, "]", sizeof buf);
|
|
|
|
return (buf);
|
|
}
|
|
|
|
static void
|
|
rq_queue_dump(struct rq_queue *rq, const char * name)
|
|
{
|
|
struct rq_message *message;
|
|
struct rq_envelope *envelope;
|
|
void *i, *j;
|
|
uint64_t id;
|
|
|
|
log_debug("debug: /--- ramqueue: %s", name);
|
|
|
|
i = NULL;
|
|
while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
|
|
log_debug("debug: | msg:%08" PRIx32, message->msgid);
|
|
j = NULL;
|
|
while ((tree_iter(&message->envelopes, &j, &id,
|
|
(void*)&envelope)))
|
|
log_debug("debug: | %s",
|
|
rq_envelope_to_text(envelope));
|
|
}
|
|
log_debug("debug: \\---");
|
|
}
|
|
|
|
static int
|
|
rq_envelope_cmp(struct rq_envelope *e1, struct rq_envelope *e2)
|
|
{
|
|
time_t ref1, ref2;
|
|
|
|
ref1 = (e1->sched < e1->expire) ? e1->sched : e1->expire;
|
|
ref2 = (e2->sched < e2->expire) ? e2->sched : e2->expire;
|
|
if (ref1 != ref2)
|
|
return (ref1 < ref2) ? -1 : 1;
|
|
|
|
if (e1->evpid != e2->evpid)
|
|
return (e1->evpid < e2->evpid) ? -1 : 1;
|
|
|
|
return 0;
|
|
}
|
|
|
|
SPLAY_GENERATE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);
|