From: Lennart Poettering Date: Wed, 23 Mar 2005 21:20:57 +0000 (+0000) Subject: add packet scheduler X-Git-Url: https://git.meshlink.io/?a=commitdiff_plain;h=c18626ad35cdf94edbff196070ccbb6ae825abd0;p=catta add packet scheduler git-svn-id: file:///home/lennart/svn/public/avahi/trunk@14 941a03a8-eaeb-0310-b9a0-b1bbd8fe43fe --- diff --git a/Makefile b/Makefile index 6dac8af..306bbfe 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ LIBS=$(shell pkg-config --libs glib-2.0) all: flexmdns prioq-test -flexmdns: timeeventq.o main.o iface.o netlink.o server.o address.o util.o prioq.o cache.o rr.o dns.o socket.o +flexmdns: timeeventq.o main.o iface.o netlink.o server.o address.o util.o prioq.o cache.o rr.o dns.o socket.o psched.o $(CC) -o $@ $^ $(LIBS) #test-llist: test-llist.o diff --git a/dns.c b/dns.c index a787723..6824fd3 100644 --- a/dns.c +++ b/dns.c @@ -4,11 +4,35 @@ #include "dns.h" -flxDnsPacket* flx_dns_packet_new(void) { +flxDnsPacket* flx_dns_packet_new(guint max_size) { flxDnsPacket *p; - p = g_new(flxDnsPacket, 1); - p->size = p->rindex = 2*6; - memset(p->data, 0, p->size); + + if (max_size <= 0) + max_size = FLX_DNS_PACKET_MAX_SIZE; + else if (max_size < FLX_DNS_PACKET_HEADER_SIZE) + max_size = FLX_DNS_PACKET_HEADER_SIZE; + + p = g_malloc(sizeof(flxDnsPacket) + max_size); + p->size = p->rindex = FLX_DNS_PACKET_HEADER_SIZE; + p->max_size = max_size; + + memset(FLX_DNS_PACKET_DATA(p), 0, p->size); + return p; +} + +flxDnsPacket* flx_dns_packet_new_query(guint max_size) { + flxDnsPacket *p; + + p = flx_dns_packet_new(max_size); + flx_dns_packet_set_field(p, DNS_FIELD_FLAGS, DNS_FLAGS(0, 0, 0, 0, 0, 0, 0, 0, 0, 0)); + return p; +} + +flxDnsPacket* flx_dns_packet_new_response(guint max_size) { + flxDnsPacket *p; + + p = flx_dns_packet_new(max_size); + flx_dns_packet_set_field(p, DNS_FIELD_FLAGS, DNS_FLAGS(1, 0, 0, 0, 0, 0, 0, 0, 0, 0)); return p; } @@ -19,30 +43,35 @@ void flx_dns_packet_free(flxDnsPacket *p) { void flx_dns_packet_set_field(flxDnsPacket *p, guint index, guint16 v) { g_assert(p); - g_assert(index < 2*6); + g_assert(index < FLX_DNS_PACKET_HEADER_SIZE); - ((guint16*) p->data)[index] = g_htons(v); + ((guint16*) FLX_DNS_PACKET_DATA(p))[index] = g_htons(v); } guint16 flx_dns_packet_get_field(flxDnsPacket *p, guint index) { g_assert(p); - g_assert(index < 2*6); + g_assert(index < FLX_DNS_PACKET_HEADER_SIZE); - return g_ntohs(((guint16*) p->data)[index]); + return g_ntohs(((guint16*) FLX_DNS_PACKET_DATA(p))[index]); } guint8* flx_dns_packet_append_name(flxDnsPacket *p, const gchar *name) { guint8 *d, *f = NULL; + guint saved_size; g_assert(p); g_assert(name); + saved_size = p->size; + for (;;) { guint n = strcspn(name, "."); if (!n || n > 63) - return NULL; + goto fail; - d = flx_dns_packet_extend(p, n+1); + if (!(d = flx_dns_packet_extend(p, n+1))) + goto fail; + if (!f) f = d; d[0] = n; @@ -61,28 +90,36 @@ guint8* flx_dns_packet_append_name(flxDnsPacket *p, const gchar *name) { break; } - d = flx_dns_packet_extend(p, 1); + if (!(d = flx_dns_packet_extend(p, 1))) + goto fail; + d[0] = 0; return f; + +fail: + p->size = saved_size; + return NULL; } guint8* flx_dns_packet_append_uint16(flxDnsPacket *p, guint16 v) { guint8 *d; - g_assert(p); - d = flx_dns_packet_extend(p, sizeof(guint16)); - *((guint16*) d) = g_htons(v); + if (!(d = flx_dns_packet_extend(p, sizeof(guint16)))) + return NULL; + *((guint16*) d) = g_htons(v); return d; } guint8 *flx_dns_packet_append_uint32(flxDnsPacket *p, guint32 v) { guint8 *d; - g_assert(p); - d = flx_dns_packet_extend(p, sizeof(guint32)); + + if (!(d = flx_dns_packet_extend(p, sizeof(guint32)))) + return NULL; + *((guint32*) d) = g_htonl(v); return d; @@ -95,21 +132,22 @@ guint8 *flx_dns_packet_append_bytes(flxDnsPacket *p, gconstpointer b, guint l) g_assert(b); g_assert(l); - d = flx_dns_packet_extend(p, l); - g_assert(d); - memcpy(d, b, l); + if (!(d = flx_dns_packet_extend(p, l))) + return NULL; + memcpy(d, b, l); return d; } - guint8 *flx_dns_packet_extend(flxDnsPacket *p, guint l) { guint8 *d; g_assert(p); - g_assert(p->size+l <= sizeof(p->data)); - d = p->data + p->size; + if (p->size+l > p->max_size) + return NULL; + + d = FLX_DNS_PACKET_DATA(p) + p->size; p->size += l; return d; @@ -123,13 +161,14 @@ guint8 *flx_dns_packet_append_name_compressed(flxDnsPacket *p, const gchar *name if (!prev) return flx_dns_packet_append_name(p, name); - k = prev - p->data; + k = prev - FLX_DNS_PACKET_DATA(p); if (k < 0 || k >= 0x4000 || (guint) k >= p->size) return flx_dns_packet_append_name(p, name); - d = (guint16*) flx_dns_packet_extend(p, sizeof(guint16)); - *d = g_htons((0xC000 | k)); + if (!(d = (guint16*) flx_dns_packet_extend(p, sizeof(guint16)))) + return NULL; + *d = g_htons((0xC000 | k)); return prev; } @@ -166,7 +205,7 @@ static gint consume_labels(flxDnsPacket *p, guint index, gchar *ret_name, guint if (index+1 > p->size) return -1; - n = p->data[index]; + n = FLX_DNS_PACKET_DATA(p)[index]; if (!n) { index++; @@ -197,7 +236,7 @@ static gint consume_labels(flxDnsPacket *p, guint index, gchar *ret_name, guint } else first_label = 0; - memcpy(ret_name, p->data + index, n); + memcpy(ret_name, FLX_DNS_PACKET_DATA(p) + index, n); index += n; ret_name += n; l -= n; @@ -210,7 +249,7 @@ static gint consume_labels(flxDnsPacket *p, guint index, gchar *ret_name, guint if (index+2 > p->size) return -1; - index = ((guint) (p->data[index] & ~0xC0)) << 8 | p->data[index+1]; + index = ((guint) (FLX_DNS_PACKET_DATA(p)[index] & ~0xC0)) << 8 | FLX_DNS_PACKET_DATA(p)[index+1]; if (!compressed) ret += 2; @@ -238,7 +277,7 @@ gint flx_dns_packet_consume_uint16(flxDnsPacket *p, guint16 *ret_v) { if (p->rindex + sizeof(guint16) > p->size) return -1; - *ret_v = g_ntohs(*((guint16*) (p->data + p->rindex))); + *ret_v = g_ntohs(*((guint16*) (FLX_DNS_PACKET_DATA(p) + p->rindex))); p->rindex += sizeof(guint16); return 0; @@ -251,7 +290,7 @@ gint flx_dns_packet_consume_uint32(flxDnsPacket *p, guint32 *ret_v) { if (p->rindex + sizeof(guint32) > p->size) return -1; - *ret_v = g_ntohl(*((guint32*) (p->data + p->rindex))); + *ret_v = g_ntohl(*((guint32*) (FLX_DNS_PACKET_DATA(p) + p->rindex))); p->rindex += sizeof(guint32); return 0; @@ -265,7 +304,7 @@ gint flx_dns_packet_consume_bytes(flxDnsPacket *p, gpointer ret_data, guint l) { if (p->rindex + l > p->size) return -1; - memcpy(ret_data, p->data + p->rindex, l); + memcpy(ret_data, FLX_DNS_PACKET_DATA(p) + p->rindex, l); p->rindex += l; return 0; @@ -277,7 +316,7 @@ gconstpointer flx_dns_packet_get_rptr(flxDnsPacket *p) { if (p->rindex >= p->size) return NULL; - return p->data + p->rindex; + return FLX_DNS_PACKET_DATA(p) + p->rindex; } gint flx_dns_packet_skip(flxDnsPacket *p, guint length) { diff --git a/dns.h b/dns.h index bdb0f07..0b6e750 100644 --- a/dns.h +++ b/dns.h @@ -5,14 +5,20 @@ #include "rr.h" -#define FLX_DNS_MAX_PACKET_SIZE 9000 +#define FLX_DNS_PACKET_MAX_SIZE 9000 +#define FLX_DNS_PACKET_HEADER_SIZE 12 typedef struct _flxDnsPacket { - guint size, rindex; - guint8 data[FLX_DNS_MAX_PACKET_SIZE]; + guint size, rindex, max_size; } flxDnsPacket; -flxDnsPacket* flx_dns_packet_new(void); + +#define FLX_DNS_PACKET_DATA(p) (((guint8*) p) + sizeof(flxDnsPacket)) + +flxDnsPacket* flx_dns_packet_new(guint size); +flxDnsPacket* flx_dns_packet_new_query(guint size); +flxDnsPacket* flx_dns_packet_new_response(guint size); + void flx_dns_packet_free(flxDnsPacket *p); void flx_dns_packet_set_field(flxDnsPacket *p, guint index, guint16 v); guint16 flx_dns_packet_get_field(flxDnsPacket *p, guint index); diff --git a/iface.c b/iface.c index 9acef47..c4bd0ff 100644 --- a/iface.c +++ b/iface.c @@ -58,6 +58,11 @@ static void free_interface(flxInterfaceMonitor *m, flxInterface *i) { g_assert(m); g_assert(i); + if (i->ipv4_scheduler) + flx_packet_scheduler_free(i->ipv4_scheduler); + if (i->ipv6_scheduler) + flx_packet_scheduler_free(i->ipv6_scheduler); + while (i->addresses) free_address(m, i->addresses); @@ -140,8 +145,11 @@ static void callback(flxNetlink *nl, struct nlmsghdr *n, gpointer userdata) { i->n_ipv4_addrs = i->n_ipv6_addrs = 0; FLX_LLIST_PREPEND(flxInterface, interface, m->interfaces, i); g_hash_table_insert(m->hash_table, &i->index, i); + i->mtu = 1500; i->ipv4_cache = flx_cache_new(m->server, i); i->ipv6_cache = flx_cache_new(m->server, i); + i->ipv4_scheduler = flx_packet_scheduler_new(m->server, i, AF_INET); + i->ipv6_scheduler = flx_packet_scheduler_new(m->server, i, AF_INET6); changed = 0; } @@ -157,6 +165,11 @@ static void callback(flxNetlink *nl, struct nlmsghdr *n, gpointer userdata) { g_free(i->name); i->name = g_strndup(RTA_DATA(a), RTA_PAYLOAD(a)); break; + + case IFLA_MTU: + g_assert(RTA_PAYLOAD(a) == sizeof(unsigned int)); + i->mtu = *((unsigned int*) RTA_DATA(a)); + break; default: ; @@ -347,44 +360,47 @@ int flx_address_is_relevant(flxInterfaceAddress *a) { void flx_interface_send_packet(flxInterface *i, guchar protocol, flxDnsPacket *p) { g_assert(i); g_assert(p); + + if (!flx_interface_is_relevant(i)) + return; - if ((protocol == AF_INET || protocol == AF_UNSPEC) && i->n_ipv4_addrs > 0 && flx_interface_is_relevant(i)) { + if ((protocol == AF_INET || protocol == AF_UNSPEC) && i->n_ipv4_addrs > 0) { g_message("sending on '%s':IPv4", i->name); flx_send_dns_packet_ipv4(i->monitor->server->fd_ipv4, i->index, p); } - if ((protocol == AF_INET6 || protocol == AF_UNSPEC) && i->n_ipv6_addrs > 0 && flx_interface_is_relevant(i)) { + if ((protocol == AF_INET6 || protocol == AF_UNSPEC) && i->n_ipv6_addrs > 0) { g_message("sending on '%s':IPv6", i->name); flx_send_dns_packet_ipv6(i->monitor->server->fd_ipv6, i->index, p); } } -void flx_interface_send_query(flxInterface *i, guchar protocol, flxKey *k) { - flxDnsPacket *p; - +void flx_interface_post_query(flxInterface *i, guchar protocol, flxKey *k) { g_assert(i); g_assert(k); - p = flx_dns_packet_new(); - flx_dns_packet_set_field(p, DNS_FIELD_FLAGS, DNS_FLAGS(0, 0, 0, 0, 0, 0, 0, 0, 0, 0)); - flx_dns_packet_append_key(p, k); - flx_dns_packet_set_field(p, DNS_FIELD_QDCOUNT, 1); - flx_interface_send_packet(i, protocol, p); - flx_dns_packet_free(p); + if (!flx_interface_is_relevant(i)) + return; + + if ((protocol == AF_INET || protocol == AF_UNSPEC) && i->n_ipv4_addrs > 0) + flx_packet_scheduler_post_query(i->ipv4_scheduler, k); + + if ((protocol == AF_INET6 || protocol == AF_UNSPEC) && i->n_ipv6_addrs > 0) + flx_packet_scheduler_post_query(i->ipv6_scheduler, k); } -void flx_interface_send_response(flxInterface *i, guchar protocol, flxRecord *rr) { - flxDnsPacket *p; - +void flx_interface_post_response(flxInterface *i, guchar protocol, flxRecord *rr) { g_assert(i); g_assert(rr); - p = flx_dns_packet_new(); - flx_dns_packet_set_field(p, DNS_FIELD_FLAGS, DNS_FLAGS(1, 0, 0, 0, 0, 0, 0, 0, 0, 0)); - flx_dns_packet_append_record(p, rr, FALSE); - flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, 1); - flx_interface_send_packet(i, protocol, p); - flx_dns_packet_free(p); + if (!flx_interface_is_relevant(i)) + return; + + if ((protocol == AF_INET || protocol == AF_UNSPEC) && i->n_ipv4_addrs > 0) + flx_packet_scheduler_post_response(i->ipv4_scheduler, rr); + + if ((protocol == AF_INET6 || protocol == AF_UNSPEC) && i->n_ipv6_addrs > 0) + flx_packet_scheduler_post_response(i->ipv6_scheduler, rr); } void flx_dump_caches(flxServer *s, FILE *f) { diff --git a/iface.h b/iface.h index 2adab72..50cc98f 100644 --- a/iface.h +++ b/iface.h @@ -17,6 +17,8 @@ typedef struct _flxInterface flxInterface; #include "netlink.h" #include "cache.h" #include "llist.h" +#include "psched.h" +#include "dns.h" struct _flxInterfaceMonitor { flxServer *server; @@ -41,6 +43,10 @@ struct _flxInterface { guint n_ipv6_addrs, n_ipv4_addrs; flxCache *ipv4_cache, *ipv6_cache; + + guint mtu; + + flxPacketScheduler *ipv4_scheduler, *ipv6_scheduler; }; struct _flxInterfaceAddress { @@ -64,8 +70,10 @@ flxInterface* flx_interface_monitor_get_first(flxInterfaceMonitor *m); int flx_interface_is_relevant(flxInterface *i); int flx_address_is_relevant(flxInterfaceAddress *a); -void flx_interface_send_query(flxInterface *i, guchar protocol, flxKey *k); -void flx_interface_send_response(flxInterface *i, guchar protocol, flxRecord *rr); +void flx_interface_send_packet(flxInterface *i, guchar protocol, flxDnsPacket *p); + +void flx_interface_post_query(flxInterface *i, guchar protocol, flxKey *k); +void flx_interface_post_response(flxInterface *i, guchar protocol, flxRecord *rr); void flx_dump_caches(flxServer *s, FILE *f); diff --git a/psched.c b/psched.c new file mode 100644 index 0000000..7b74053 --- /dev/null +++ b/psched.c @@ -0,0 +1,287 @@ +#include "util.h" +#include "psched.h" + +flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i, guchar protocol) { + flxPacketScheduler *s; + + g_assert(server); + g_assert(i); + + s = g_new(flxPacketScheduler, 1); + s->server = server; + s->interface = i; + s->protocol = protocol; + + FLX_LLIST_HEAD_INIT(flxQueryJob, s->query_jobs); + FLX_LLIST_HEAD_INIT(flxResponseJob, s->response_jobs); + + return s; +} + +static void query_job_free(flxPacketScheduler *s, flxQueryJob *qj) { + g_assert(qj); + + if (qj->time_event) + flx_time_event_queue_remove(qj->scheduler->server->time_event_queue, qj->time_event); + + FLX_LLIST_REMOVE(flxQueryJob, jobs, s->query_jobs, qj); + + flx_key_unref(qj->key); + g_free(qj); +} + +static void response_job_free(flxPacketScheduler *s, flxResponseJob *rj) { + g_assert(rj); + + if (rj->time_event) + flx_time_event_queue_remove(rj->scheduler->server->time_event_queue, rj->time_event); + + FLX_LLIST_REMOVE(flxResponseJob, jobs, s->response_jobs, rj); + + flx_record_unref(rj->record); + g_free(rj); +} + +void flx_packet_scheduler_free(flxPacketScheduler *s) { + flxQueryJob *qj; + flxResponseJob *rj; + flxTimeEvent *e; + + g_assert(s); + + while ((qj = s->query_jobs)) + query_job_free(s, qj); + while ((rj = s->response_jobs)) + response_job_free(s, rj); + + g_free(s); +} + +static guint8* packet_add_query_job(flxPacketScheduler *s, flxDnsPacket *p, flxQueryJob *qj) { + guint8 *d; + + g_assert(s); + g_assert(p); + g_assert(qj); + + if ((d = flx_dns_packet_append_key(p, qj->key))) { + GTimeVal tv; + + qj->done = 1; + + /* Drop query after 100ms from history */ + flx_elapse_time(&tv, 100, 0); + flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv); + } + + return d; +} + +static void query_elapse(flxTimeEvent *e, gpointer data) { + flxQueryJob *qj = data; + flxPacketScheduler *s; + flxDnsPacket *p; + guint n; + guint8 *d; + + g_assert(qj); + s = qj->scheduler; + + if (qj->done) { + /* Lets remove it from the history */ + query_job_free(s, qj); + return; + } + + p = flx_dns_packet_new_query(s->interface->mtu - 200); + d = packet_add_query_job(s, p, qj); + g_assert(d); + n = 1; + + /* Try to fill up packet with more queries, if available */ + for (qj = s->query_jobs; qj; qj = qj->jobs_next) { + + if (qj->done) + continue; + + if (!packet_add_query_job(s, p, qj)) + break; + + n++; + } + + flx_dns_packet_set_field(p, DNS_FIELD_QDCOUNT, n); + flx_interface_send_packet(s->interface, s->protocol, p); + flx_dns_packet_free(p); +} + +static flxQueryJob* look_for_query(flxPacketScheduler *s, flxKey *key) { + flxQueryJob *qj; + + g_assert(s); + g_assert(key); + + for (qj = s->query_jobs; qj; qj = qj->jobs_next) + if (flx_key_equal(qj->key, key)) + return qj; + + return NULL; +} + +void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key) { + flxQueryJob *qj; + GTimeVal tv; + + g_assert(s); + g_assert(key); + + if (look_for_query(s, key)) + return; + + qj = g_new(flxQueryJob, 1); + qj->key = flx_key_ref(key); + qj->done = FALSE; + + flx_elapse_time(&tv, 100, 0); + qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, query_elapse, qj); + qj->scheduler = s; + + FLX_LLIST_PREPEND(flxQueryJob, jobs, s->query_jobs, qj); +} + +static guint8* packet_add_response_job(flxPacketScheduler *s, flxDnsPacket *p, flxResponseJob *rj) { + guint8 *d; + + g_assert(s); + g_assert(p); + g_assert(rj); + + if ((d = flx_dns_packet_append_record(p, rj->record, FALSE))) { + GTimeVal tv; + + rj->done = 1; + + /* Drop response after 1s from history */ + flx_elapse_time(&tv, 1000, 0); + flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv); + } + + return d; +} + + +static void response_elapse(flxTimeEvent *e, gpointer data) { + flxResponseJob *rj = data; + flxPacketScheduler *s; + flxDnsPacket *p; + guint n; + guint8 *d; + + g_assert(rj); + s = rj->scheduler; + + if (rj->done) { + /* Lets remove it from the history */ + response_job_free(s, rj); + return; + } + + p = flx_dns_packet_new_response(s->interface->mtu - 200); + d = packet_add_response_job(s, p, rj); + g_assert(d); + n = 1; + + /* Try to fill up packet with more responses, if available */ + for (rj = s->response_jobs; rj; rj = rj->jobs_next) { + + if (rj->done) + continue; + + if (!packet_add_response_job(s, p, rj)) + break; + + n++; + } + + flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, n); + flx_interface_send_packet(s->interface, s->protocol, p); + flx_dns_packet_free(p); +} + +static flxResponseJob* look_for_response(flxPacketScheduler *s, flxRecord *record) { + flxResponseJob *rj; + + g_assert(s); + g_assert(record); + + for (rj = s->response_jobs; rj; rj = rj->jobs_next) + if (flx_record_equal(rj->record, record)) + return rj; + + return NULL; +} + +void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record) { + flxResponseJob *rj; + GTimeVal tv; + + g_assert(s); + g_assert(record); + + if (look_for_response(s, record)) + return; + + rj = g_new(flxResponseJob, 1); + rj->record = flx_record_ref(record); + rj->done = FALSE; + + flx_elapse_time(&tv, 20, 100); + rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, response_elapse, rj); + rj->scheduler = s; + + FLX_LLIST_PREPEND(flxResponseJob, jobs, s->response_jobs, rj); +} + +void flx_packet_scheduler_drop_query(flxPacketScheduler *s, flxKey *key) { + flxQueryJob *qj; + + g_assert(s); + g_assert(key); + + for (qj = s->query_jobs; qj; qj = qj->jobs_next) + if (flx_key_equal(qj->key, key)) { + + if (!qj->done) { + GTimeVal tv; + qj->done = TRUE; + + /* Drop query after 100ms from history */ + flx_elapse_time(&tv, 100, 0); + flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv); + } + + break; + } +} + +void flx_packet_scheduler_drop_response(flxPacketScheduler *s, flxRecord *record) { + flxResponseJob *rj; + + g_assert(s); + g_assert(record); + + for (rj = s->response_jobs; rj; rj = rj->jobs_next) + if (flx_record_equal(rj->record, record)) { + + if (!rj->done) { + GTimeVal tv; + rj->done = TRUE; + + /* Drop response after 100ms from history */ + flx_elapse_time(&tv, 100, 0); + flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv); + } + + break; + } +} diff --git a/psched.h b/psched.h new file mode 100644 index 0000000..90b67db --- /dev/null +++ b/psched.h @@ -0,0 +1,48 @@ +#ifndef foopschedhfoo +#define foopschedhfoo + +typedef struct _flxQueryJob flxQueryJob; +typedef struct _flxResponseJob flxResponseJob; +typedef struct _flxPacketScheduler flxPacketScheduler; + +#include "timeeventq.h" +#include "rr.h" +#include "llist.h" +#include "iface.h" + +struct _flxQueryJob { + flxPacketScheduler *scheduler; + flxTimeEvent *time_event; + flxKey *key; + gboolean done; + FLX_LLIST_FIELDS(flxQueryJob, jobs); +}; + +struct _flxResponseJob { + flxPacketScheduler *scheduler; + flxTimeEvent *time_event; + flxRecord *record; + gboolean done; + FLX_LLIST_FIELDS(flxResponseJob, jobs); +}; + +struct _flxPacketScheduler { + flxServer *server; + + flxInterface *interface; + guchar protocol; + + FLX_LLIST_HEAD(flxQueryJob, query_jobs); + FLX_LLIST_HEAD(flxResponseJob, response_jobs); +}; + +flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i, guchar protocol); +void flx_packet_scheduler_free(flxPacketScheduler *s); + +void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key); +void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record); + +void flx_packet_scheduler_drop_query(flxPacketScheduler *s, flxKey *key); +void flx_packet_scheduler_drop_response(flxPacketScheduler *s, flxRecord *record); + +#endif diff --git a/rr.c b/rr.c index 0153b28..5acc09b 100644 --- a/rr.c +++ b/rr.c @@ -164,3 +164,13 @@ guint flx_key_hash(const flxKey *k) { return g_str_hash(k->name) + k->type + k->class; } + +gboolean flx_record_equal(const flxRecord *a, const flxRecord *b) { + g_assert(a); + g_assert(b); + + return flx_key_equal(a->key, b->key) && + a->ttl == b->ttl && + a->size == b->size && + memcmp(a->data, b->data, a->size) == 0; +} diff --git a/rr.h b/rr.h index 0e61a73..e3d3654 100644 --- a/rr.h +++ b/rr.h @@ -55,4 +55,6 @@ const gchar *flx_dns_type_to_string(guint16 type); gchar *flx_key_to_string(flxKey *k); /* g_free() the result! */ gchar *flx_record_to_string(flxRecord *r); /* g_free() the result! */ +gboolean flx_record_equal(const flxRecord *a, const flxRecord *b); + #endif diff --git a/server.c b/server.c index 6027f5f..0f292a3 100644 --- a/server.c +++ b/server.c @@ -9,45 +9,33 @@ #include "iface.h" #include "socket.h" -static void post_response(flxServer *s, flxRecord *r, gint iface, const flxAddress *a) { - flxInterface *i; - - g_assert(s); - g_assert(r); - g_assert(iface > 0); - g_assert(a); - - if ((i = flx_interface_monitor_get_interface(s->monitor, iface))) - flx_interface_send_response(i, a->family, r); - -} - -static void handle_query_key(flxServer *s, flxKey *k, gint iface, const flxAddress *a) { +static void handle_query_key(flxServer *s, flxKey *k, flxInterface *i, const flxAddress *a) { flxEntry *e; g_assert(s); g_assert(k); + g_assert(i); g_assert(a); for (e = g_hash_table_lookup(s->rrset_by_name, k); e; e = e->by_name_next) { - if ((e->interface <= 0 || e->interface == iface) && + if ((e->interface <= 0 || e->interface == i->index) && (e->protocol == AF_UNSPEC || e->protocol == a->family)) { - post_response(s, e->record, iface, a); + flx_interface_post_response(i, a->family, e->record); } } } -static void handle_query(flxServer *s, flxDnsPacket *p, gint iface, const flxAddress *a) { +static void handle_query(flxServer *s, flxDnsPacket *p, flxInterface *i, const flxAddress *a) { guint n; g_assert(s); g_assert(p); + g_assert(i); g_assert(a); for (n = flx_dns_packet_get_field(p, DNS_FIELD_QDCOUNT); n > 0; n --) { - flxKey *key; if (!(key = flx_dns_packet_consume_key(p))) { @@ -55,29 +43,31 @@ static void handle_query(flxServer *s, flxDnsPacket *p, gint iface, const flxAdd return; } - handle_query_key(s, key, iface, a); + handle_query_key(s, key, i, a); flx_key_unref(key); } } -static void add_response_to_cache(flxCache *c, flxDnsPacket *p, const flxAddress *a) { +static void handle_response(flxServer *s, flxDnsPacket *p, flxInterface *i, const flxAddress *a) { guint n; - g_assert(c); + g_assert(s); g_assert(p); + g_assert(i); g_assert(a); + for (n = flx_dns_packet_get_field(p, DNS_FIELD_ANCOUNT); n > 0; n--) { - - flxRecord *rr; + flxRecord *record; gboolean cache_flush = FALSE; - if (!(rr = flx_dns_packet_consume_record(p, &cache_flush))) { + if (!(record = flx_dns_packet_consume_record(p, &cache_flush))) { g_warning("Packet too short"); return; } - flx_cache_update(c, rr, cache_flush, a); - flx_record_unref(rr); + flx_cache_update(a->family == AF_INET ? i->ipv4_cache : i->ipv6_cache, record, cache_flush, a); + flx_packet_scheduler_drop_response(a->family == AF_INET ? i->ipv4_scheduler : i->ipv6_scheduler, record); + flx_record_unref(record); } } @@ -120,8 +110,8 @@ static void dispatch_packet(flxServer *s, flxDnsPacket *p, struct sockaddr *sa, return; } - flx_address_from_sockaddr(sa, &a); + if (flx_dns_packet_is_query(p)) { @@ -132,11 +122,9 @@ static void dispatch_packet(flxServer *s, flxDnsPacket *p, struct sockaddr *sa, return; } - handle_query(s, p, iface, &a); + handle_query(s, p, i, &a); g_message("Handled query"); } else { - flxCache *c; - if (flx_dns_packet_get_field(p, DNS_FIELD_QDCOUNT) != 0 || flx_dns_packet_get_field(p, DNS_FIELD_ANCOUNT) == 0 || flx_dns_packet_get_field(p, DNS_FIELD_NSCOUNT) != 0 || @@ -145,10 +133,8 @@ static void dispatch_packet(flxServer *s, flxDnsPacket *p, struct sockaddr *sa, return; } - c = a.family == AF_INET ? i->ipv4_cache : i->ipv6_cache; - add_response_to_cache(c, p, &a); - - g_message("Handled responnse"); + handle_response(s, p, i, &a); + g_message("Handled response"); } } @@ -528,7 +514,7 @@ void flx_server_send_query(flxServer *s, gint interface, guchar protocol, flxKey flxInterface *i; for (i = flx_interface_monitor_get_first(s->monitor); i; i = i->interface_next) - flx_interface_send_query(i, protocol, k); + flx_interface_post_query(i, protocol, k); } else { flxInterface *i; @@ -536,6 +522,6 @@ void flx_server_send_query(flxServer *s, gint interface, guchar protocol, flxKey if (!(i = flx_interface_monitor_get_interface(s->monitor, interface))) return; - flx_interface_send_query(i, protocol, k); + flx_interface_post_query(i, protocol, k); } } diff --git a/server.h b/server.h index 63f2bf6..fbd5715 100644 --- a/server.h +++ b/server.h @@ -2,7 +2,6 @@ #define fooflxserverhfoo typedef struct _flxEntry flxEntry; -typedef struct _flxResponseJob flxResponseJob; #include "flx.h" #include "iface.h" @@ -23,12 +22,6 @@ struct _flxEntry { FLX_LLIST_FIELDS(flxEntry, by_id); }; -struct _flxResponseJob { - flxTimeEvent *time_event; - flxRecord *record; - FLX_LLIST_FIELDS(flxResponseJob, response); -}; - struct _flxServer { GMainContext *context; flxInterfaceMonitor *monitor; diff --git a/socket.c b/socket.c index 1887e1d..cc43f16 100644 --- a/socket.c +++ b/socket.c @@ -252,7 +252,7 @@ gint flx_send_dns_packet_ipv4(gint fd, gint interface, flxDnsPacket *p) { mdns_mcast_group_ipv4(&sa); memset(&io, 0, sizeof(io)); - io.iov_base = p->data; + io.iov_base = FLX_DNS_PACKET_DATA(p); io.iov_len = p->size; memset(cmsg_data, 0, sizeof(cmsg_data)); @@ -292,7 +292,7 @@ gint flx_send_dns_packet_ipv6(gint fd, gint interface, flxDnsPacket *p) { mdns_mcast_group_ipv6(&sa); memset(&io, 0, sizeof(io)); - io.iov_base = p->data; + io.iov_base = FLX_DNS_PACKET_DATA(p); io.iov_len = p->size; memset(cmsg_data, 0, sizeof(cmsg_data)); @@ -330,10 +330,10 @@ flxDnsPacket* flx_recv_dns_packet_ipv4(gint fd, struct sockaddr_in *ret_sa, gint g_assert(ret_iface); g_assert(ret_ttl); - p = flx_dns_packet_new(); + p = flx_dns_packet_new(0); - io.iov_base = p->data; - io.iov_len = sizeof(p->data); + io.iov_base = FLX_DNS_PACKET_DATA(p); + io.iov_len = p->max_size; memset(&msg, 0, sizeof(msg)); msg.msg_name = ret_sa; @@ -376,7 +376,7 @@ fail: } flxDnsPacket* flx_recv_dns_packet_ipv6(gint fd, struct sockaddr_in6 *ret_sa, gint *ret_iface, guint8* ret_ttl) { - flxDnsPacket *p= NULL; + flxDnsPacket *p = NULL; struct msghdr msg; struct iovec io; uint8_t aux[64]; @@ -389,10 +389,10 @@ flxDnsPacket* flx_recv_dns_packet_ipv6(gint fd, struct sockaddr_in6 *ret_sa, gin g_assert(ret_iface); g_assert(ret_ttl); - p = flx_dns_packet_new(); + p = flx_dns_packet_new(0); - io.iov_base = p->data; - io.iov_len = sizeof(p->data); + io.iov_base = FLX_DNS_PACKET_DATA(p); + io.iov_len = p->max_size; memset(&msg, 0, sizeof(msg)); msg.msg_name = ret_sa; diff --git a/timeeventq.c b/timeeventq.c index b3dd897..0d4af97 100644 --- a/timeeventq.c +++ b/timeeventq.c @@ -133,7 +133,7 @@ void flx_time_event_queue_remove(flxTimeEventQueue *q, flxTimeEvent *e) { g_free(e); } -void flx_time_event_update(flxTimeEventQueue *q, flxTimeEvent *e, const GTimeVal *timeval) { +void flx_time_event_queue_update(flxTimeEventQueue *q, flxTimeEvent *e, const GTimeVal *timeval) { g_assert(q); g_assert(e); g_assert(e->queue == q); @@ -142,3 +142,15 @@ void flx_time_event_update(flxTimeEventQueue *q, flxTimeEvent *e, const GTimeVal flx_prio_queue_shuffle(q->prioq, e->node); } + +flxTimeEvent* flx_time_event_queue_root(flxTimeEventQueue *q) { + g_assert(q); + + return q->prioq->root ? q->prioq->root->data : NULL; +} + +flxTimeEvent* flx_time_event_next(flxTimeEvent *e) { + g_assert(e); + + return e->node->next->data; +} diff --git a/timeeventq.h b/timeeventq.h index 8fd5c02..f663f1f 100644 --- a/timeeventq.h +++ b/timeeventq.h @@ -25,6 +25,9 @@ void flx_time_event_queue_free(flxTimeEventQueue *q); flxTimeEvent* flx_time_event_queue_add(flxTimeEventQueue *q, const GTimeVal *timeval, void (*callback)(flxTimeEvent *e, void *userdata), void *userdata); void flx_time_event_queue_remove(flxTimeEventQueue *q, flxTimeEvent *e); -void flx_time_event_update(flxTimeEventQueue *q, flxTimeEvent *e, const GTimeVal *timeval); +void flx_time_event_queue_update(flxTimeEventQueue *q, flxTimeEvent *e, const GTimeVal *timeval); + +flxTimeEvent* flx_time_event_queue_root(flxTimeEventQueue *q); +flxTimeEvent* flx_time_event_next(flxTimeEvent *e); #endif diff --git a/util.c b/util.c index faf1ac0..b8128c4 100644 --- a/util.c +++ b/util.c @@ -99,3 +99,17 @@ gint flx_wait_for_write(gint fd) { return 0; } + +GTimeVal *flx_elapse_time(GTimeVal *tv, guint msec, guint jitter) { + g_assert(tv); + + g_get_current_time(tv); + + if (msec) + g_time_val_add(tv, msec*1000); + + if (jitter) + g_time_val_add(tv, g_random_int_range(0, jitter) * 1000); + + return tv; +} diff --git a/util.h b/util.h index 78f86d2..517c2f1 100644 --- a/util.h +++ b/util.h @@ -13,4 +13,6 @@ gint flx_set_cloexec(gint fd); gint flx_set_nonblock(gint fd); gint flx_wait_for_write(gint fd); +GTimeVal *flx_elapse_time(GTimeVal *tv, guint msec, guint jitter); + #endif