From 4aa744ffac20c7b5e18cb3b23e5dbac8221c0043 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Sat, 26 Mar 2005 18:37:06 +0000 Subject: [PATCH] * add todo list * beef up packet scheduler git-svn-id: file:///home/lennart/svn/public/avahi/trunk@18 941a03a8-eaeb-0310-b9a0-b1bbd8fe43fe --- Makefile | 2 +- announce.c | 75 +++++++------- announce.h | 1 + cache.c | 2 +- dns.c | 15 ++- iface.c | 57 ++++++----- iface.h | 6 +- main.c | 12 +-- psched.c | 276 ++++++++++++++++++++++++++++++++++++++++----------- psched.h | 12 ++- rr.c | 7 +- server.c | 11 +- subscribe.c | 1 - timeeventq.c | 2 + timeeventq.h | 3 + todo | 8 ++ 16 files changed, 341 insertions(+), 149 deletions(-) create mode 100644 todo diff --git a/Makefile b/Makefile index 6798fe7..8f60172 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -CC=gcc +#CC=gcc CFLAGS=-g -O0 -Wall -W -pipe $(shell pkg-config --cflags glib-2.0) -Wno-unused LIBS=$(shell pkg-config --libs glib-2.0) diff --git a/announce.c b/announce.c index db983d4..5b1e4e7 100644 --- a/announce.c +++ b/announce.c @@ -1,6 +1,8 @@ #include "announce.h" #include "util.h" +#define FLX_ANNOUNCEMENT_JITTER_MSEC 0 + static void remove_announcement(flxServer *s, flxAnnouncement *a) { g_assert(s); g_assert(a); @@ -21,21 +23,22 @@ static void elapse_announce(flxTimeEvent *e, void *userdata) { g_assert(e); g_assert(a); - if (a->n_announced >= 3) { - g_message("Enough announcements for record [%s]", t = flx_record_to_string(a->entry->record)); - g_free(t); - remove_announcement(a->server, a); - return; - } + flx_interface_post_response(a->interface, a->entry->record, FALSE); - flx_interface_post_response(a->interface, a->entry->record); - a->n_announced++; + if (a->n_announced++ <= 8) + a->sec_delay *= 2; g_message("Announcement #%i on interface %s.%i for entry [%s]", a->n_announced, a->interface->hardware->name, a->interface->protocol, t = flx_record_to_string(a->entry->record)); g_free(t); - - flx_elapse_time(&tv, 1000, 100); - flx_time_event_queue_update(a->server->time_event_queue, a->time_event, &tv); + + if (a->n_announced >= 4) { + g_message("Enough announcements for record [%s]", t = flx_record_to_string(a->entry->record)); + g_free(t); + remove_announcement(a->server, a); + } else { + flx_elapse_time(&tv, a->sec_delay*1000, FLX_ANNOUNCEMENT_JITTER_MSEC); + flx_time_event_queue_update(a->server->time_event_queue, a->time_event, &tv); + } } static void new_announcement(flxServer *s, flxInterface *i, flxServerEntry *e) { @@ -47,7 +50,10 @@ static void new_announcement(flxServer *s, flxInterface *i, flxServerEntry *e) { g_assert(i); g_assert(e); - if (!flx_interface_match(i, e->interface, e->protocol) || !flx_interface_relevant(i)) + g_message("NEW ANNOUNCEMENT: %s.%i [%s]", i->hardware->name, i->protocol, t = flx_record_to_string(e->record)); + g_free(t); + + if (!flx_interface_match(i, e->interface, e->protocol) || !i->announcing) return; /* We don't want duplicates */ @@ -58,18 +64,19 @@ static void new_announcement(flxServer *s, flxInterface *i, flxServerEntry *e) { g_message("New announcement on interface %s.%i for entry [%s]", i->hardware->name, i->protocol, t = flx_record_to_string(e->record)); g_free(t); - flx_interface_post_response(i, e->record); + flx_interface_post_response(i, e->record, FALSE); a = g_new(flxAnnouncement, 1); a->server = s; a->interface = i; a->entry = e; a->n_announced = 1; + a->sec_delay = 1; FLX_LLIST_PREPEND(flxAnnouncement, by_interface, i->announcements, a); FLX_LLIST_PREPEND(flxAnnouncement, by_entry, e->announcements, a); - flx_elapse_time(&tv, 1000, 100); + flx_elapse_time(&tv, a->sec_delay*1000, FLX_ANNOUNCEMENT_JITTER_MSEC); a->time_event = flx_time_event_queue_add(s->time_event_queue, &tv, elapse_announce, a); } @@ -79,40 +86,32 @@ void flx_announce_interface(flxServer *s, flxInterface *i) { g_assert(s); g_assert(i); - if (!flx_interface_relevant(i)) + if (!i->announcing) return; + + g_message("ANNOUNCE INTERFACE"); for (e = s->entries; e; e = e->entry_next) new_announcement(s, i, e); } -void flx_announce_entry(flxServer *s, flxServerEntry *e) { - g_assert(s); +static void announce_walk_callback(flxInterfaceMonitor *m, flxInterface *i, gpointer userdata) { + flxServerEntry *e = userdata; + + g_assert(m); + g_assert(i); g_assert(e); - if (e->interface > 0) { - - if (e->protocol != AF_UNSPEC) { - flxInterface *i; - - if ((i = flx_interface_monitor_get_interface(s->monitor, e->interface, e->protocol))) - new_announcement(s, i, e); - } else { - flxHwInterface *hw; + new_announcement(m->server, i, e); +} - if ((hw = flx_interface_monitor_get_hw_interface(s->monitor, e->interface))) { - flxInterface *i; +void flx_announce_entry(flxServer *s, flxServerEntry *e) { + g_assert(s); + g_assert(e); - for (i = hw->interfaces; i; i = i->by_hardware_next) - new_announcement(s, i, e); - } - } - } else { - flxInterface *i; + g_message("ANNOUNCE ENTRY"); - for (i = s->monitor->interfaces; i; i = i->interface_next) - new_announcement(s, i, e); - } + flx_interface_monitor_walk(s->monitor, e->interface, e->protocol, announce_walk_callback, e); } static flxRecord *make_goodbye_record(flxRecord *r) { @@ -139,7 +138,7 @@ void flx_goodbye_interface(flxServer *s, flxInterface *i, gboolean goodbye) { for (e = s->entries; e; e = e->entry_next) if (flx_interface_match(i, e->interface, e->protocol)) { flxRecord *g = make_goodbye_record(e->record); - flx_interface_post_response(i, g); + flx_interface_post_response(i, g, TRUE); flx_record_unref(g); } } diff --git a/announce.h b/announce.h index 0e65eae..49c5f13 100644 --- a/announce.h +++ b/announce.h @@ -17,6 +17,7 @@ struct _flxAnnouncement { flxTimeEvent *time_event; guint n_announced; + guint sec_delay; FLX_LLIST_FIELDS(flxAnnouncement, by_interface); FLX_LLIST_FIELDS(flxAnnouncement, by_entry); diff --git a/cache.c b/cache.c index 9d9590b..99d79fb 100644 --- a/cache.c +++ b/cache.c @@ -122,7 +122,7 @@ static void elapse_func(flxTimeEvent *t, void *userdata) { g_message("Requesting cache entry update at %i%%.", percent); /* Request a cache update */ - flx_interface_post_query(e->cache->interface, e->record->key); + flx_interface_post_query(e->cache->interface, e->record->key, TRUE); /* Check again later */ next_expiry(e->cache, e, percent); diff --git a/dns.c b/dns.c index 726eb21..243c5a3 100644 --- a/dns.c +++ b/dns.c @@ -131,7 +131,7 @@ guint8 *flx_dns_packet_append_bytes(flxDnsPacket *p, gconstpointer b, guint l) g_assert(p); g_assert(b); g_assert(l); - + if (!(d = flx_dns_packet_extend(p, l))) return NULL; @@ -373,9 +373,14 @@ flxRecord* flx_dns_packet_consume_record(flxDnsPacket *p, gboolean *ret_cache_fl } default: - if (!(data = flx_dns_packet_get_rptr(p)) || - flx_dns_packet_skip(p, rdlength) < 0) - return NULL; + + if (rdlength > 0) { + + if (!(data = flx_dns_packet_get_rptr(p)) || + flx_dns_packet_skip(p, rdlength) < 0) + return NULL; + } else + data = NULL; break; } @@ -462,7 +467,7 @@ guint8* flx_dns_packet_append_record(flxDnsPacket *p, flxRecord *r, gboolean cac default: if (!flx_dns_packet_append_uint16(p, r->size) || - !flx_dns_packet_append_bytes(p, r->data, r->size)) + (r->size != 0 && !flx_dns_packet_append_bytes(p, r->data, r->size))) return NULL; } diff --git a/iface.c b/iface.c index a7d210f..24d5a37 100644 --- a/iface.c +++ b/iface.c @@ -59,11 +59,16 @@ static void free_address(flxInterfaceMonitor *m, flxInterfaceAddress *a) { g_free(a); } -static void free_interface(flxInterfaceMonitor *m, flxInterface *i) { +static void free_interface(flxInterfaceMonitor *m, flxInterface *i, gboolean send_goodbye) { g_assert(m); g_assert(i); - flx_goodbye_interface(m->server, i, FALSE); + g_message("removing interface %s.%i", i->hardware->name, i->protocol); + flx_goodbye_interface(m->server, i, send_goodbye); + g_message("flushing..."); + flx_packet_scheduler_flush_responses(i->scheduler); + g_message("done"); + g_assert(!i->announcements); while (i->addresses) @@ -78,12 +83,12 @@ static void free_interface(flxInterfaceMonitor *m, flxInterface *i) { g_free(i); } -static void free_hw_interface(flxInterfaceMonitor *m, flxHwInterface *hw) { +static void free_hw_interface(flxInterfaceMonitor *m, flxHwInterface *hw, gboolean send_goodbye) { g_assert(m); g_assert(hw); while (hw->interfaces) - free_interface(m, hw->interfaces); + free_interface(m, hw->interfaces, send_goodbye); FLX_LLIST_REMOVE(flxHwInterface, hardware, m->hw_interfaces, hw); g_hash_table_remove(m->hash_table, &hw->index); @@ -136,7 +141,7 @@ static void new_interface(flxInterfaceMonitor *m, flxHwInterface *hw, guchar pro i->monitor = m; i->hardware = hw; i->protocol = protocol; - i->relevant = FALSE; + i->announcing = FALSE; FLX_LLIST_HEAD_INIT(flxInterfaceAddress, i->addresses); FLX_LLIST_HEAD_INIT(flxAnnouncement, i->announcements); @@ -155,17 +160,17 @@ static void check_interface_relevant(flxInterfaceMonitor *m, flxInterface *i) { b = flx_interface_relevant(i); - if (b && !i->relevant) { + if (b && !i->announcing) { g_message("New relevant interface %s.%i", i->hardware->name, i->protocol); + i->announcing = TRUE; flx_announce_interface(m->server, i); - } else if (!b && i->relevant) { + } else if (!b && i->announcing) { g_message("Interface %s.%i no longer relevant", i->hardware->name, i->protocol); + i->announcing = FALSE; flx_goodbye_interface(m->server, i, FALSE); } - - i->relevant = b; } static void check_hw_interface_relevant(flxInterfaceMonitor *m, flxHwInterface *hw) { @@ -252,7 +257,7 @@ static void callback(flxNetlink *nl, struct nlmsghdr *n, gpointer userdata) { return; update_hw_interface_rr(m, hw, TRUE); - free_hw_interface(m, hw); + free_hw_interface(m, hw, FALSE); } else if (n->nlmsg_type == RTM_NEWADDR || n->nlmsg_type == RTM_DELADDR) { @@ -336,8 +341,10 @@ static void callback(flxNetlink *nl, struct nlmsghdr *n, gpointer userdata) { g_warning("NETLINK: Failed to list addrs: %s", strerror(errno)); else m->list = LIST_ADDR; - } else + } else { m->list = LIST_DONE; + g_message("Enumeration complete"); + } } else if (n->nlmsg_type == NLMSG_ERROR && (n->nlmsg_seq == m->query_link_seq || n->nlmsg_seq == m->query_addr_seq)) { struct nlmsgerr *e = NLMSG_DATA (n); @@ -375,13 +382,14 @@ fail: void flx_interface_monitor_free(flxInterfaceMonitor *m) { g_assert(m); - if (m->netlink) - flx_netlink_free(m->netlink); - while (m->hw_interfaces) - free_hw_interface(m, m->hw_interfaces); + free_hw_interface(m, m->hw_interfaces, TRUE); g_assert(!m->interfaces); + + + if (m->netlink) + flx_netlink_free(m->netlink); if (m->hash_table) g_hash_table_destroy(m->hash_table); @@ -420,7 +428,7 @@ void flx_interface_send_packet(flxInterface *i, flxDnsPacket *p) { g_assert(i); g_assert(p); - if (i->relevant) { + if (flx_interface_relevant(i)) { g_message("sending on '%s.%i'", i->hardware->name, i->protocol); if (i->protocol == AF_INET && i->monitor->server->fd_ipv4 >= 0) @@ -430,21 +438,21 @@ void flx_interface_send_packet(flxInterface *i, flxDnsPacket *p) { } } -void flx_interface_post_query(flxInterface *i, flxKey *key) { +void flx_interface_post_query(flxInterface *i, flxKey *key, gboolean immediately) { g_assert(i); g_assert(key); - if (i->relevant) - flx_packet_scheduler_post_query(i->scheduler, key); + if (flx_interface_relevant(i)) + flx_packet_scheduler_post_query(i->scheduler, key, immediately); } -void flx_interface_post_response(flxInterface *i, flxRecord *record) { +void flx_interface_post_response(flxInterface *i, flxRecord *record, gboolean immediately) { g_assert(i); g_assert(record); - if (i->relevant) - flx_packet_scheduler_post_response(i->scheduler, record); + if (flx_interface_relevant(i)) + flx_packet_scheduler_post_response(i->scheduler, record, immediately); } void flx_dump_caches(flxInterfaceMonitor *m, FILE *f) { @@ -452,11 +460,12 @@ void flx_dump_caches(flxInterfaceMonitor *m, FILE *f) { g_assert(m); for (i = m->interfaces; i; i = i->interface_next) { - if (i->relevant) { - fprintf(f, ";;; INTERFACE %s.%i ;;;\n", i->hardware->name, i->protocol); + if (flx_interface_relevant(i)) { + fprintf(f, "\n;;; INTERFACE %s.%i ;;;\n", i->hardware->name, i->protocol); flx_cache_dump(i->cache, f); } } + fprintf(f, "\n"); } gboolean flx_interface_relevant(flxInterface *i) { diff --git a/iface.h b/iface.h index b5c2708..16e9650 100644 --- a/iface.h +++ b/iface.h @@ -53,7 +53,7 @@ struct _flxInterface { flxHwInterface *hardware; guchar protocol; - gboolean relevant; + gboolean announcing; flxCache *cache; flxPacketScheduler *scheduler; @@ -82,8 +82,8 @@ flxHwInterface* flx_interface_monitor_get_hw_interface(flxInterfaceMonitor *m, g void flx_interface_send_packet(flxInterface *i, flxDnsPacket *p); -void flx_interface_post_query(flxInterface *i, flxKey *k); -void flx_interface_post_response(flxInterface *i, flxRecord *rr); +void flx_interface_post_query(flxInterface *i, flxKey *k, gboolean immediately); +void flx_interface_post_response(flxInterface *i, flxRecord *rr, gboolean immediately); void flx_dump_caches(flxInterfaceMonitor *m, FILE *f); diff --git a/main.c b/main.c index 367dcd3..6f82d25 100644 --- a/main.c +++ b/main.c @@ -58,21 +58,21 @@ int main(int argc, char *argv[]) { flx_server_add_text(flx, 0, 0, AF_UNSPEC, FALSE, NULL, "hallo"); - k = flx_key_new("_http._tcp.local.", FLX_DNS_CLASS_IN, FLX_DNS_TYPE_PTR); - s = flx_subscription_new(flx, k, 0, AF_UNSPEC, subscription, NULL); - flx_key_unref(k); +/* k = flx_key_new("_http._tcp.local.", FLX_DNS_CLASS_IN, FLX_DNS_TYPE_PTR); */ +/* s = flx_subscription_new(flx, k, 0, AF_UNSPEC, subscription, NULL); */ +/* flx_key_unref(k); */ loop = g_main_loop_new(NULL, FALSE); - g_timeout_add(1000*60, quit_timeout, loop); + g_timeout_add(1000*30, quit_timeout, loop); g_timeout_add(1000, send_timeout, flx); - g_timeout_add(1000*10, dump_timeout, flx); + g_timeout_add(1000*20, dump_timeout, flx); g_main_loop_run(loop); g_main_loop_unref(loop); - flx_subscription_free(s); +/* flx_subscription_free(s); */ flx_server_free(flx); return 0; diff --git a/psched.c b/psched.c index db72ec3..162c1b2 100644 --- a/psched.c +++ b/psched.c @@ -1,6 +1,14 @@ +#include + #include "util.h" #include "psched.h" +#define FLX_QUERY_HISTORY_MSEC 700 +#define FLX_QUERY_DEFER_MSEC 100 +#define FLX_RESPONSE_HISTORY_MSEC 700 +#define FLX_RESPONSE_DEFER_MSEC 20 +#define FLX_RESPONSE_JITTER_MSEC 100 + flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i) { flxPacketScheduler *s; @@ -68,9 +76,11 @@ static guint8* packet_add_query_job(flxPacketScheduler *s, flxDnsPacket *p, flxQ qj->done = 1; - /* Drop query after 100ms from history */ - flx_elapse_time(&tv, 100, 0); + /* Drop query after some time from history from history */ + flx_elapse_time(&tv, FLX_QUERY_HISTORY_MSEC, 0); flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv); + + g_get_current_time(&qj->delivery); } return d; @@ -127,25 +137,47 @@ static flxQueryJob* look_for_query(flxPacketScheduler *s, flxKey *key) { return NULL; } -void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key) { +flxQueryJob* query_job_new(flxPacketScheduler *s, flxKey *key) { flxQueryJob *qj; - GTimeVal tv; g_assert(s); g_assert(key); - if (look_for_query(s, key)) - return; - qj = g_new(flxQueryJob, 1); + qj->scheduler = s; qj->key = flx_key_ref(key); qj->done = FALSE; + qj->time_event = NULL; + + FLX_LLIST_PREPEND(flxQueryJob, jobs, s->query_jobs, qj); - flx_elapse_time(&tv, 100, 0); - qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, query_elapse, qj); - qj->scheduler = s; + return qj; +} - FLX_LLIST_PREPEND(flxQueryJob, jobs, s->query_jobs, qj); +void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key, gboolean immediately) { + GTimeVal tv; + flxQueryJob *qj; + + g_assert(s); + g_assert(key); + + flx_elapse_time(&tv, immediately ? 0 : FLX_QUERY_DEFER_MSEC, 0); + + if ((qj = look_for_query(s, key))) { + glong d = flx_timeval_diff(&tv, &qj->delivery); + + /* Duplicate questions suppression */ + if (d >= 0 && d <= FLX_QUERY_HISTORY_MSEC*1000) { + g_message("WARNING! DUPLICATE QUERY SUPPRESSION ACTIVE!"); + return; + } + + query_job_free(s, qj); + } + + qj = query_job_new(s, key); + qj->delivery = tv; + qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &qj->delivery, query_elapse, qj); } static guint8* packet_add_response_job(flxPacketScheduler *s, flxDnsPacket *p, flxResponseJob *rj) { @@ -160,35 +192,32 @@ static guint8* packet_add_response_job(flxPacketScheduler *s, flxDnsPacket *p, f rj->done = 1; - /* Drop response after 1s from history */ - flx_elapse_time(&tv, 1000, 0); + /* Drop response after some time from history */ + flx_elapse_time(&tv, FLX_RESPONSE_HISTORY_MSEC, 0); flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv); + + g_get_current_time(&rj->delivery); } return d; } - -static void response_elapse(flxTimeEvent *e, gpointer data) { - flxResponseJob *rj = data; - flxPacketScheduler *s; +static void send_response_packet(flxPacketScheduler *s, flxResponseJob *rj) { flxDnsPacket *p; guint n; - guint8 *d; - - g_assert(rj); - s = rj->scheduler; - if (rj->done) { - /* Lets remove it from the history */ - response_job_free(s, rj); - return; - } + g_assert(s); p = flx_dns_packet_new_response(s->interface->hardware->mtu - 200); - d = packet_add_response_job(s, p, rj); - g_assert(d); - n = 1; + n = 0; + + /* If a job was specified, put it in the packet. */ + if (rj) { + guint8 *d; + d = packet_add_response_job(s, p, rj); + g_assert(d); + n++; + } /* Try to fill up packet with more responses, if available */ for (rj = s->response_jobs; rj; rj = rj->jobs_next) { @@ -207,6 +236,22 @@ static void response_elapse(flxTimeEvent *e, gpointer data) { flx_dns_packet_free(p); } +static void response_elapse(flxTimeEvent *e, gpointer data) { + flxResponseJob *rj = data; + flxPacketScheduler *s; + + g_assert(rj); + s = rj->scheduler; + + if (rj->done) { + /* Lets remove it from the history */ + response_job_free(s, rj); + return; + } + + send_response_packet(s, rj); +} + static flxResponseJob* look_for_response(flxPacketScheduler *s, flxRecord *record) { flxResponseJob *rj; @@ -220,67 +265,180 @@ static flxResponseJob* look_for_response(flxPacketScheduler *s, flxRecord *recor return NULL; } -void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record) { +static flxResponseJob* response_job_new(flxPacketScheduler *s, flxRecord *record) { flxResponseJob *rj; - GTimeVal tv; g_assert(s); g_assert(record); - if (look_for_response(s, record)) - return; - rj = g_new(flxResponseJob, 1); + rj->scheduler = s; rj->record = flx_record_ref(record); rj->done = FALSE; + rj->time_event = NULL; + + FLX_LLIST_PREPEND(flxResponseJob, jobs, s->response_jobs, rj); - flx_elapse_time(&tv, 20, 100); - rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, response_elapse, rj); - rj->scheduler = s; + return rj; +} - FLX_LLIST_PREPEND(flxResponseJob, jobs, s->response_jobs, rj); +void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record, gboolean immediately) { + flxResponseJob *rj; + GTimeVal tv; + gchar *t; + + g_assert(s); + g_assert(record); + + flx_elapse_time(&tv, immediately ? 0 : FLX_RESPONSE_DEFER_MSEC, immediately ? 0 : FLX_RESPONSE_JITTER_MSEC); + + /* Don't send out duplicates */ + + if ((rj = look_for_response(s, record))) { + glong d; + + d = flx_timeval_diff(&tv, &rj->delivery); + + /* If there's already a matching packet in our history or in + * the schedule, we do nothing. */ + if (!!record->ttl == !!rj->record->ttl && + d >= 0 && d <= FLX_RESPONSE_HISTORY_MSEC*1000) { + g_message("WARNING! DUPLICATE RESPONSE SUPPRESSION ACTIVE!"); + return; + } + + /* Either one was a goodbye packet, but the other was not, so + * let's drop the older one. */ + response_job_free(s, rj); + } + + g_message("ACCEPTED NEW RESPONSE [%s]", t = flx_record_to_string(record)); + g_free(t); + + /* Create a new job and schedule it */ + rj = response_job_new(s, record); + rj->delivery = tv; + rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &rj->delivery, response_elapse, rj); } -void flx_packet_scheduler_drop_query(flxPacketScheduler *s, flxKey *key) { +void flx_packet_scheduler_incoming_query(flxPacketScheduler *s, flxKey *key) { + GTimeVal tv; flxQueryJob *qj; g_assert(s); g_assert(key); + /* This function is called whenever an incoming query was + * receieved. We drop all scheduled queries which match here. The + * keyword is "DUPLICATE QUESTION SUPPRESION". */ + for (qj = s->query_jobs; qj; qj = qj->jobs_next) if (flx_key_equal(qj->key, key)) { - if (!qj->done) { - GTimeVal tv; - qj->done = TRUE; - - /* Drop query after 100ms from history */ - flx_elapse_time(&tv, 100, 0); - flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv); - } + if (qj->done) + return; - break; + goto mark_done; } + + + /* No matching job was found. Add the query to the history */ + qj = query_job_new(s, key); + +mark_done: + qj->done = TRUE; + + /* Drop the query after some time */ + flx_elapse_time(&tv, FLX_QUERY_HISTORY_MSEC, 0); + qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, query_elapse, qj); + + g_get_current_time(&qj->delivery); +} + +void response_job_set_elapse_time(flxPacketScheduler *s, flxResponseJob *rj, guint msec, guint jitter) { + GTimeVal tv; + + g_assert(s); + g_assert(rj); + + flx_elapse_time(&tv, msec, jitter); + + if (rj->time_event) + flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv); + else + rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, response_elapse, rj); + } -void flx_packet_scheduler_drop_response(flxPacketScheduler *s, flxRecord *record) { +void flx_packet_scheduler_incoming_response(flxPacketScheduler *s, flxRecord *record) { flxResponseJob *rj; g_assert(s); g_assert(record); - for (rj = s->response_jobs; rj; rj = rj->jobs_next) + /* This function is called whenever an incoming response was + * receieved. We drop all scheduled responses which match + * here. The keyword is "DUPLICATE ANSWER SUPPRESION". */ + + for (rj = s->response_jobs; rj; rj = rj->jobs_next) if (flx_record_equal_no_ttl(rj->record, record)) { - if (!rj->done) { - GTimeVal tv; - rj->done = TRUE; - - /* Drop response after 100ms from history */ - flx_elapse_time(&tv, 100, 0); - flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv); + if (rj->done) { + + if (!!record->ttl == !!rj->record->ttl) { + /* An entry like this is already in our history, + * so let's get out of here! */ + + return; + + } else { + /* Either one was a goodbye packet but other was + * none. We remove the history entry, and add a + * new one */ + + response_job_free(s, rj); + break; + } + + } else { + + if (!!record->ttl == !!rj->record->ttl) { + + /* The incoming packet matches our scheduled + * record, so let's mark that one as done */ + + goto mark_done; + + } else { + + /* Either one was a goodbye packet but other was + * none. We ignore the incoming packet. */ + + return; + } } - - break; } + + /* No matching job was found. Add the query to the history */ + rj = response_job_new(s, record); + +mark_done: + rj->done = TRUE; + + /* Drop response after 500ms from history */ + response_job_set_elapse_time(s, rj, FLX_RESPONSE_HISTORY_MSEC, 0); + + g_get_current_time(&rj->delivery); +} + +void flx_packet_scheduler_flush_responses(flxPacketScheduler *s) { + flxResponseJob *rj; + + g_assert(s); + + /* Send all scheduled responses, ignoring the scheduled time */ + + for (rj = s->response_jobs; rj; rj = rj->jobs_next) + if (!rj->done) + send_response_packet(s, rj); } diff --git a/psched.h b/psched.h index fae5d91..c312ce8 100644 --- a/psched.h +++ b/psched.h @@ -15,6 +15,7 @@ struct _flxQueryJob { flxTimeEvent *time_event; flxKey *key; gboolean done; + GTimeVal delivery; FLX_LLIST_FIELDS(flxQueryJob, jobs); }; @@ -23,6 +24,7 @@ struct _flxResponseJob { flxTimeEvent *time_event; flxRecord *record; gboolean done; + GTimeVal delivery; FLX_LLIST_FIELDS(flxResponseJob, jobs); }; @@ -38,10 +40,12 @@ struct _flxPacketScheduler { flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i); void flx_packet_scheduler_free(flxPacketScheduler *s); -void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key); -void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record); +void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key, gboolean immediately); +void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record, gboolean immediately); -void flx_packet_scheduler_drop_query(flxPacketScheduler *s, flxKey *key); -void flx_packet_scheduler_drop_response(flxPacketScheduler *s, flxRecord *record); +void flx_packet_scheduler_incoming_query(flxPacketScheduler *s, flxKey *key); +void flx_packet_scheduler_incoming_response(flxPacketScheduler *s, flxRecord *record); + +void flx_packet_scheduler_flush_responses(flxPacketScheduler *s); #endif diff --git a/rr.c b/rr.c index 4108c54..294d409 100644 --- a/rr.c +++ b/rr.c @@ -49,8 +49,8 @@ flxRecord *flx_record_new(flxKey *k, gconstpointer data, guint16 size, guint32 t flxRecord *r; g_assert(k); - g_assert(data); - + g_assert(size == 0 || data); + r = g_new(flxRecord, 1); r->ref = 1; r->key = flx_key_ref(k); @@ -64,6 +64,9 @@ flxRecord *flx_record_new(flxKey *k, gconstpointer data, guint16 size, guint32 t flxRecord *flx_record_new_full(const gchar *name, guint16 class, guint16 type, gconstpointer data, guint16 size, guint32 ttl) { flxRecord *r; flxKey *k; + + g_assert(name); + g_assert(size == 0 || data); k = flx_key_new(name, class, type); r = flx_record_new(k, data, size, ttl); diff --git a/server.c b/server.c index 94c7b1d..6b3c2dd 100644 --- a/server.c +++ b/server.c @@ -22,9 +22,11 @@ static void handle_query_key(flxServer *s, flxKey *k, flxInterface *i, const flx g_message("Handling query: %s", txt = flx_key_to_string(k)); g_free(txt); + flx_packet_scheduler_incoming_query(i->scheduler, k); + for (e = g_hash_table_lookup(s->rrset_by_key, k); e; e = e->by_key_next) if (flx_interface_match(i, e->interface, e->protocol)) - flx_interface_post_response(i, e->record); + flx_interface_post_response(i, e->record, FALSE); } static void handle_query(flxServer *s, flxDnsPacket *p, flxInterface *i, const flxAddress *a) { @@ -72,8 +74,7 @@ static void handle_response(flxServer *s, flxDnsPacket *p, flxInterface *i, cons flx_cache_update(i->cache, record, cache_flush, a); - if (record->ttl != 0) - flx_packet_scheduler_drop_response(i->scheduler, record); + flx_packet_scheduler_incoming_response(i->scheduler, record); flx_record_unref(record); } } @@ -548,7 +549,7 @@ static void post_query_callback(flxInterfaceMonitor *m, flxInterface *i, gpointe g_assert(i); g_assert(k); - flx_interface_post_query(i, k); + flx_interface_post_query(i, k, FALSE); } void flx_server_post_query(flxServer *s, gint interface, guchar protocol, flxKey *key) { @@ -565,7 +566,7 @@ static void post_response_callback(flxInterfaceMonitor *m, flxInterface *i, gpoi g_assert(i); g_assert(r); - flx_interface_post_response(i, r); + flx_interface_post_response(i, r, FALSE); } void flx_server_post_response(flxServer *s, gint interface, guchar protocol, flxRecord *record) { diff --git a/subscribe.c b/subscribe.c index 3abe464..7fd264d 100644 --- a/subscribe.c +++ b/subscribe.c @@ -15,7 +15,6 @@ static void elapse(flxTimeEvent *e, void *userdata) { g_message("%i. Continuous querying for %s", s->n_query, t = flx_key_to_string(s->key)); g_free(t); - flx_elapse_time(&tv, s->sec_delay*1000, 0); flx_time_event_queue_update(s->server->time_event_queue, s->time_event, &tv); diff --git a/timeeventq.c b/timeeventq.c index 0d4af97..ec1a074 100644 --- a/timeeventq.c +++ b/timeeventq.c @@ -154,3 +154,5 @@ flxTimeEvent* flx_time_event_next(flxTimeEvent *e) { return e->node->next->data; } + + diff --git a/timeeventq.h b/timeeventq.h index f663f1f..0add498 100644 --- a/timeeventq.h +++ b/timeeventq.h @@ -30,4 +30,7 @@ void flx_time_event_queue_update(flxTimeEventQueue *q, flxTimeEvent *e, const GT flxTimeEvent* flx_time_event_queue_root(flxTimeEventQueue *q); flxTimeEvent* flx_time_event_next(flxTimeEvent *e); + + + #endif diff --git a/todo b/todo new file mode 100644 index 0000000..2aea0e1 --- /dev/null +++ b/todo @@ -0,0 +1,8 @@ +* Unicast responses/queries +* Known-Answer suppression +* Truncation +* Probing/Conflict resolution +* Legacy unicast +* really send goodbye packets +* uniqueness +* defend our entries on incoming goodbye -- 2.39.5