]> git.meshlink.io Git - catta/commitdiff
add packet scheduler
authorLennart Poettering <lennart@poettering.net>
Wed, 23 Mar 2005 21:20:57 +0000 (21:20 +0000)
committerLennart Poettering <lennart@poettering.net>
Wed, 23 Mar 2005 21:20:57 +0000 (21:20 +0000)
git-svn-id: file:///home/lennart/svn/public/avahi/trunk@14 941a03a8-eaeb-0310-b9a0-b1bbd8fe43fe

16 files changed:
Makefile
dns.c
dns.h
iface.c
iface.h
psched.c [new file with mode: 0644]
psched.h [new file with mode: 0644]
rr.c
rr.h
server.c
server.h
socket.c
timeeventq.c
timeeventq.h
util.c
util.h

index 6dac8af1dd5730cee315f756a95369ac48465eb1..306bbfee1b0cf57e0d346b17a59446e27ee65e8f 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -3,7 +3,7 @@ LIBS=$(shell pkg-config --libs glib-2.0)
 
 all: flexmdns prioq-test
 
-flexmdns: timeeventq.o main.o iface.o netlink.o server.o address.o util.o prioq.o cache.o rr.o dns.o socket.o
+flexmdns: timeeventq.o main.o iface.o netlink.o server.o address.o util.o prioq.o cache.o rr.o dns.o socket.o psched.o
        $(CC) -o $@ $^ $(LIBS)
 
 #test-llist: test-llist.o
diff --git a/dns.c b/dns.c
index a78772391699e7e3cb729f8a088f462bc521cd25..6824fd3ed5551f0f0ed8d7108e4a9ae85cd4d0b7 100644 (file)
--- a/dns.c
+++ b/dns.c
@@ -4,11 +4,35 @@
 
 #include "dns.h"
 
-flxDnsPacket* flx_dns_packet_new(void) {
+flxDnsPacket* flx_dns_packet_new(guint max_size) {
     flxDnsPacket *p;
-    p = g_new(flxDnsPacket, 1);
-    p->size = p->rindex = 2*6;
-    memset(p->data, 0, p->size);
+
+    if (max_size <= 0)
+        max_size = FLX_DNS_PACKET_MAX_SIZE;
+    else if (max_size < FLX_DNS_PACKET_HEADER_SIZE)
+        max_size = FLX_DNS_PACKET_HEADER_SIZE;
+    
+    p = g_malloc(sizeof(flxDnsPacket) + max_size);
+    p->size = p->rindex = FLX_DNS_PACKET_HEADER_SIZE;
+    p->max_size = max_size;
+
+    memset(FLX_DNS_PACKET_DATA(p), 0, p->size);
+    return p;
+}
+
+flxDnsPacket* flx_dns_packet_new_query(guint max_size) {
+    flxDnsPacket *p;
+
+    p = flx_dns_packet_new(max_size);
+    flx_dns_packet_set_field(p, DNS_FIELD_FLAGS, DNS_FLAGS(0, 0, 0, 0, 0, 0, 0, 0, 0, 0));
+    return p;
+}
+
+flxDnsPacket* flx_dns_packet_new_response(guint max_size) {
+    flxDnsPacket *p;
+
+    p = flx_dns_packet_new(max_size);
+    flx_dns_packet_set_field(p, DNS_FIELD_FLAGS, DNS_FLAGS(1, 0, 0, 0, 0, 0, 0, 0, 0, 0));
     return p;
 }
 
@@ -19,30 +43,35 @@ void flx_dns_packet_free(flxDnsPacket *p) {
 
 void flx_dns_packet_set_field(flxDnsPacket *p, guint index, guint16 v) {
     g_assert(p);
-    g_assert(index < 2*6);
+    g_assert(index < FLX_DNS_PACKET_HEADER_SIZE);
     
-    ((guint16*) p->data)[index] = g_htons(v);
+    ((guint16*) FLX_DNS_PACKET_DATA(p))[index] = g_htons(v);
 }
 
 guint16 flx_dns_packet_get_field(flxDnsPacket *p, guint index) {
     g_assert(p);
-    g_assert(index < 2*6);
+    g_assert(index < FLX_DNS_PACKET_HEADER_SIZE);
 
-    return g_ntohs(((guint16*) p->data)[index]);
+    return g_ntohs(((guint16*) FLX_DNS_PACKET_DATA(p))[index]);
 }
 
 guint8* flx_dns_packet_append_name(flxDnsPacket *p, const gchar *name) {
     guint8 *d, *f = NULL;
+    guint saved_size;
     
     g_assert(p);
     g_assert(name);
 
+    saved_size = p->size;
+
     for (;;) {
         guint n = strcspn(name, ".");
         if (!n || n > 63)
-            return NULL;
+            goto fail;
         
-        d = flx_dns_packet_extend(p, n+1);
+        if (!(d = flx_dns_packet_extend(p, n+1)))
+            goto fail;
+            
         if (!f)
             f = d;
         d[0] = n;
@@ -61,28 +90,36 @@ guint8* flx_dns_packet_append_name(flxDnsPacket *p, const gchar *name) {
             break;
     }
 
-    d = flx_dns_packet_extend(p, 1);
+    if (!(d = flx_dns_packet_extend(p, 1)))
+        goto fail;
+    
     d[0] = 0;
 
     return f;
+
+fail:
+    p->size = saved_size;
+    return NULL;
 }
 
 guint8* flx_dns_packet_append_uint16(flxDnsPacket *p, guint16 v) {
     guint8 *d;
-    
     g_assert(p);
     
-    d = flx_dns_packet_extend(p, sizeof(guint16));
-    *((guint16*) d) = g_htons(v);
+    if (!(d = flx_dns_packet_extend(p, sizeof(guint16))))
+        return NULL;
     
+    *((guint16*) d) = g_htons(v);
     return d;
 }
 
 guint8 *flx_dns_packet_append_uint32(flxDnsPacket *p, guint32 v) {
     guint8 *d;
-
     g_assert(p);
-    d = flx_dns_packet_extend(p, sizeof(guint32));
+
+    if (!(d = flx_dns_packet_extend(p, sizeof(guint32))))
+        return NULL;
+    
     *((guint32*) d) = g_htonl(v);
 
     return d;
@@ -95,21 +132,22 @@ guint8 *flx_dns_packet_append_bytes(flxDnsPacket  *p, gconstpointer b, guint l)
     g_assert(b);
     g_assert(l);
     
-    d = flx_dns_packet_extend(p, l);
-    g_assert(d);
-    memcpy(d, b, l);
+    if (!(d = flx_dns_packet_extend(p, l)))
+        return NULL;
 
+    memcpy(d, b, l);
     return d;
 }
 
-
 guint8 *flx_dns_packet_extend(flxDnsPacket *p, guint l) {
     guint8 *d;
     
     g_assert(p);
-    g_assert(p->size+l <= sizeof(p->data));
 
-    d = p->data + p->size;
+    if (p->size+l > p->max_size)
+        return NULL;
+    
+    d = FLX_DNS_PACKET_DATA(p) + p->size;
     p->size += l;
     
     return d;
@@ -123,13 +161,14 @@ guint8 *flx_dns_packet_append_name_compressed(flxDnsPacket *p, const gchar *name
     if (!prev)
         return flx_dns_packet_append_name(p, name);
     
-    k = prev - p->data;
+    k = prev - FLX_DNS_PACKET_DATA(p);
     if (k < 0 || k >= 0x4000 || (guint) k >= p->size)
         return flx_dns_packet_append_name(p, name);
 
-    d = (guint16*) flx_dns_packet_extend(p, sizeof(guint16));
-    *d = g_htons((0xC000 | k));
+    if (!(d = (guint16*) flx_dns_packet_extend(p, sizeof(guint16))))
+        return NULL;
     
+    *d = g_htons((0xC000 | k));
     return prev;
 }
 
@@ -166,7 +205,7 @@ static gint consume_labels(flxDnsPacket *p, guint index, gchar *ret_name, guint
         if (index+1 > p->size)
             return -1;
 
-        n = p->data[index];
+        n = FLX_DNS_PACKET_DATA(p)[index];
 
         if (!n) {
             index++;
@@ -197,7 +236,7 @@ static gint consume_labels(flxDnsPacket *p, guint index, gchar *ret_name, guint
             } else
                 first_label = 0;
 
-            memcpy(ret_name, p->data + index, n);
+            memcpy(ret_name, FLX_DNS_PACKET_DATA(p) + index, n);
             index += n;
             ret_name += n;
             l -= n;
@@ -210,7 +249,7 @@ static gint consume_labels(flxDnsPacket *p, guint index, gchar *ret_name, guint
             if (index+2 > p->size)
                 return -1;
 
-            index = ((guint) (p->data[index] & ~0xC0)) << 8 | p->data[index+1];
+            index = ((guint) (FLX_DNS_PACKET_DATA(p)[index] & ~0xC0)) << 8 | FLX_DNS_PACKET_DATA(p)[index+1];
 
             if (!compressed)
                 ret += 2;
@@ -238,7 +277,7 @@ gint flx_dns_packet_consume_uint16(flxDnsPacket *p, guint16 *ret_v) {
     if (p->rindex + sizeof(guint16) > p->size)
         return -1;
 
-    *ret_v = g_ntohs(*((guint16*) (p->data + p->rindex)));
+    *ret_v = g_ntohs(*((guint16*) (FLX_DNS_PACKET_DATA(p) + p->rindex)));
     p->rindex += sizeof(guint16);
 
     return 0;
@@ -251,7 +290,7 @@ gint flx_dns_packet_consume_uint32(flxDnsPacket *p, guint32 *ret_v) {
     if (p->rindex + sizeof(guint32) > p->size)
         return -1;
 
-    *ret_v = g_ntohl(*((guint32*) (p->data + p->rindex)));
+    *ret_v = g_ntohl(*((guint32*) (FLX_DNS_PACKET_DATA(p) + p->rindex)));
     p->rindex += sizeof(guint32);
     
     return 0;
@@ -265,7 +304,7 @@ gint flx_dns_packet_consume_bytes(flxDnsPacket *p, gpointer ret_data, guint l) {
     if (p->rindex + l > p->size)
         return -1;
 
-    memcpy(ret_data, p->data + p->rindex, l);
+    memcpy(ret_data, FLX_DNS_PACKET_DATA(p) + p->rindex, l);
     p->rindex += l;
 
     return 0;
@@ -277,7 +316,7 @@ gconstpointer flx_dns_packet_get_rptr(flxDnsPacket *p) {
     if (p->rindex >= p->size)
         return NULL;
 
-    return p->data + p->rindex;
+    return FLX_DNS_PACKET_DATA(p) + p->rindex;
 }
 
 gint flx_dns_packet_skip(flxDnsPacket *p, guint length) {
diff --git a/dns.h b/dns.h
index bdb0f0748ce49a31bdece57c5a664b273adf16da..0b6e750feb0d85adf19e94a18774307bf9f98bf6 100644 (file)
--- a/dns.h
+++ b/dns.h
@@ -5,14 +5,20 @@
 
 #include "rr.h"
 
-#define FLX_DNS_MAX_PACKET_SIZE 9000
+#define FLX_DNS_PACKET_MAX_SIZE 9000
+#define FLX_DNS_PACKET_HEADER_SIZE 12
 
 typedef struct _flxDnsPacket {
-    guint size, rindex;
-    guint8 data[FLX_DNS_MAX_PACKET_SIZE];
+    guint size, rindex, max_size;
 } flxDnsPacket;
 
-flxDnsPacket* flx_dns_packet_new(void);
+
+#define FLX_DNS_PACKET_DATA(p) (((guint8*) p) + sizeof(flxDnsPacket))
+
+flxDnsPacket* flx_dns_packet_new(guint size);
+flxDnsPacket* flx_dns_packet_new_query(guint size);
+flxDnsPacket* flx_dns_packet_new_response(guint size);
+
 void flx_dns_packet_free(flxDnsPacket *p);
 void flx_dns_packet_set_field(flxDnsPacket *p, guint index, guint16 v);
 guint16 flx_dns_packet_get_field(flxDnsPacket *p, guint index);
diff --git a/iface.c b/iface.c
index 9acef47806d1aa9ea6dbc79e7a93069e43e44a5f..c4bd0ff5827cfae3523bf7471e41165809401946 100644 (file)
--- a/iface.c
+++ b/iface.c
@@ -58,6 +58,11 @@ static void free_interface(flxInterfaceMonitor *m, flxInterface *i) {
     g_assert(m);
     g_assert(i);
 
+    if (i->ipv4_scheduler)
+        flx_packet_scheduler_free(i->ipv4_scheduler);
+    if (i->ipv6_scheduler)
+        flx_packet_scheduler_free(i->ipv6_scheduler);
+    
     while (i->addresses)
         free_address(m, i->addresses);
 
@@ -140,8 +145,11 @@ static void callback(flxNetlink *nl, struct nlmsghdr *n, gpointer userdata) {
             i->n_ipv4_addrs = i->n_ipv6_addrs = 0;
             FLX_LLIST_PREPEND(flxInterface, interface, m->interfaces, i);
             g_hash_table_insert(m->hash_table, &i->index, i);
+            i->mtu = 1500;
             i->ipv4_cache = flx_cache_new(m->server, i);
             i->ipv6_cache = flx_cache_new(m->server, i);
+            i->ipv4_scheduler = flx_packet_scheduler_new(m->server, i, AF_INET);
+            i->ipv6_scheduler = flx_packet_scheduler_new(m->server, i, AF_INET6);
             
             changed = 0;
         }
@@ -157,6 +165,11 @@ static void callback(flxNetlink *nl, struct nlmsghdr *n, gpointer userdata) {
                     g_free(i->name);
                     i->name = g_strndup(RTA_DATA(a), RTA_PAYLOAD(a));
                     break;
+
+                case IFLA_MTU:
+                    g_assert(RTA_PAYLOAD(a) == sizeof(unsigned int));
+                    i->mtu = *((unsigned int*) RTA_DATA(a));
+                    break;
                     
                 default:
                     ;
@@ -347,44 +360,47 @@ int flx_address_is_relevant(flxInterfaceAddress *a) {
 void flx_interface_send_packet(flxInterface *i, guchar protocol, flxDnsPacket *p) {
     g_assert(i);
     g_assert(p);
+
+    if (!flx_interface_is_relevant(i))
+        return;
     
-    if ((protocol == AF_INET || protocol == AF_UNSPEC) && i->n_ipv4_addrs > 0 && flx_interface_is_relevant(i)) {
+    if ((protocol == AF_INET || protocol == AF_UNSPEC) && i->n_ipv4_addrs > 0) {
         g_message("sending on '%s':IPv4", i->name);
         flx_send_dns_packet_ipv4(i->monitor->server->fd_ipv4, i->index, p);
     }
 
-    if ((protocol == AF_INET6 || protocol == AF_UNSPEC) && i->n_ipv6_addrs > 0 && flx_interface_is_relevant(i)) {
+    if ((protocol == AF_INET6 || protocol == AF_UNSPEC) && i->n_ipv6_addrs > 0) {
         g_message("sending on '%s':IPv6", i->name);
         flx_send_dns_packet_ipv6(i->monitor->server->fd_ipv6, i->index, p);
     }
 }
 
-void flx_interface_send_query(flxInterface *i, guchar protocol, flxKey *k) {
-    flxDnsPacket *p;
-    
+void flx_interface_post_query(flxInterface *i, guchar protocol, flxKey *k) {
     g_assert(i);
     g_assert(k);
 
-    p = flx_dns_packet_new();
-    flx_dns_packet_set_field(p, DNS_FIELD_FLAGS, DNS_FLAGS(0, 0, 0, 0, 0, 0, 0, 0, 0, 0));
-    flx_dns_packet_append_key(p, k);
-    flx_dns_packet_set_field(p, DNS_FIELD_QDCOUNT, 1);
-    flx_interface_send_packet(i, protocol, p);
-    flx_dns_packet_free(p);
+    if (!flx_interface_is_relevant(i))
+        return;
+
+    if ((protocol == AF_INET || protocol == AF_UNSPEC) && i->n_ipv4_addrs > 0)
+        flx_packet_scheduler_post_query(i->ipv4_scheduler, k);
+
+    if ((protocol == AF_INET6 || protocol == AF_UNSPEC) && i->n_ipv6_addrs > 0)
+        flx_packet_scheduler_post_query(i->ipv6_scheduler, k);
 }
 
-void flx_interface_send_response(flxInterface *i, guchar protocol, flxRecord *rr) {
-    flxDnsPacket *p;
-    
+void flx_interface_post_response(flxInterface *i, guchar protocol, flxRecord *rr) {
     g_assert(i);
     g_assert(rr);
 
-    p = flx_dns_packet_new();
-    flx_dns_packet_set_field(p, DNS_FIELD_FLAGS, DNS_FLAGS(1, 0, 0, 0, 0, 0, 0, 0, 0, 0));
-    flx_dns_packet_append_record(p, rr, FALSE);
-    flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, 1);
-    flx_interface_send_packet(i, protocol, p);
-    flx_dns_packet_free(p);
+    if (!flx_interface_is_relevant(i))
+        return;
+
+    if ((protocol == AF_INET || protocol == AF_UNSPEC) && i->n_ipv4_addrs > 0)
+        flx_packet_scheduler_post_response(i->ipv4_scheduler, rr);
+
+    if ((protocol == AF_INET6 || protocol == AF_UNSPEC) && i->n_ipv6_addrs > 0)
+        flx_packet_scheduler_post_response(i->ipv6_scheduler, rr);
 }
 
 void flx_dump_caches(flxServer *s, FILE *f) {
diff --git a/iface.h b/iface.h
index 2adab726744f08d35ce8e9252f95354f048f4046..50cc98f09c25bf64c1e1b8d8cc15ee5ffe9d2a5e 100644 (file)
--- a/iface.h
+++ b/iface.h
@@ -17,6 +17,8 @@ typedef struct _flxInterface flxInterface;
 #include "netlink.h"
 #include "cache.h"
 #include "llist.h"
+#include "psched.h"
+#include "dns.h"
 
 struct _flxInterfaceMonitor {
     flxServer *server;
@@ -41,6 +43,10 @@ struct _flxInterface {
 
     guint n_ipv6_addrs, n_ipv4_addrs;
     flxCache *ipv4_cache, *ipv6_cache;
+
+    guint mtu;
+
+    flxPacketScheduler *ipv4_scheduler, *ipv6_scheduler;
 };
 
 struct _flxInterfaceAddress {
@@ -64,8 +70,10 @@ flxInterface* flx_interface_monitor_get_first(flxInterfaceMonitor *m);
 int flx_interface_is_relevant(flxInterface *i);
 int flx_address_is_relevant(flxInterfaceAddress *a);
 
-void flx_interface_send_query(flxInterface *i, guchar protocol, flxKey *k);
-void flx_interface_send_response(flxInterface *i, guchar protocol, flxRecord *rr);
+void flx_interface_send_packet(flxInterface *i, guchar protocol, flxDnsPacket *p);
+
+void flx_interface_post_query(flxInterface *i, guchar protocol, flxKey *k);
+void flx_interface_post_response(flxInterface *i, guchar protocol, flxRecord *rr);
 
 void flx_dump_caches(flxServer *s, FILE *f);
 
diff --git a/psched.c b/psched.c
new file mode 100644 (file)
index 0000000..7b74053
--- /dev/null
+++ b/psched.c
@@ -0,0 +1,287 @@
+#include "util.h"
+#include "psched.h"
+
+flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i, guchar protocol) {
+    flxPacketScheduler *s;
+
+    g_assert(server);
+    g_assert(i);
+
+    s = g_new(flxPacketScheduler, 1);
+    s->server = server;
+    s->interface = i;
+    s->protocol = protocol;
+
+    FLX_LLIST_HEAD_INIT(flxQueryJob, s->query_jobs);
+    FLX_LLIST_HEAD_INIT(flxResponseJob, s->response_jobs);
+    
+    return s;
+}
+
+static void query_job_free(flxPacketScheduler *s, flxQueryJob *qj) {
+    g_assert(qj);
+
+    if (qj->time_event)
+        flx_time_event_queue_remove(qj->scheduler->server->time_event_queue, qj->time_event);
+
+    FLX_LLIST_REMOVE(flxQueryJob, jobs, s->query_jobs, qj);
+    
+    flx_key_unref(qj->key);
+    g_free(qj);
+}
+
+static void response_job_free(flxPacketScheduler *s, flxResponseJob *rj) {
+    g_assert(rj);
+
+    if (rj->time_event)
+        flx_time_event_queue_remove(rj->scheduler->server->time_event_queue, rj->time_event);
+
+    FLX_LLIST_REMOVE(flxResponseJob, jobs, s->response_jobs, rj);
+
+    flx_record_unref(rj->record);
+    g_free(rj);
+}
+
+void flx_packet_scheduler_free(flxPacketScheduler *s) {
+    flxQueryJob *qj;
+    flxResponseJob *rj;
+    flxTimeEvent *e;
+    
+    g_assert(s);
+
+    while ((qj = s->query_jobs))
+        query_job_free(s, qj);
+    while ((rj = s->response_jobs))
+        response_job_free(s, rj);
+
+    g_free(s);
+}
+
+static guint8* packet_add_query_job(flxPacketScheduler *s, flxDnsPacket *p, flxQueryJob *qj) {
+    guint8 *d;
+
+    g_assert(s);
+    g_assert(p);
+    g_assert(qj);
+
+    if ((d = flx_dns_packet_append_key(p, qj->key))) {
+        GTimeVal tv;
+
+        qj->done = 1;
+
+        /* Drop query after 100ms from history */
+        flx_elapse_time(&tv, 100, 0);
+        flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv);
+    }
+
+    return d;
+}
+                                 
+static void query_elapse(flxTimeEvent *e, gpointer data) {
+    flxQueryJob *qj = data;
+    flxPacketScheduler *s;
+    flxDnsPacket *p;
+    guint n;
+    guint8 *d;
+
+    g_assert(qj);
+    s = qj->scheduler;
+
+    if (qj->done) {
+        /* Lets remove it  from the history */
+        query_job_free(s, qj);
+        return;
+    }
+
+    p = flx_dns_packet_new_query(s->interface->mtu - 200);
+    d = packet_add_query_job(s, p, qj);
+    g_assert(d);
+    n = 1;
+
+    /* Try to fill up packet with more queries, if available */
+    for (qj = s->query_jobs; qj; qj = qj->jobs_next) {
+
+        if (qj->done)
+            continue;
+
+        if (!packet_add_query_job(s, p, qj))
+            break;
+
+        n++;
+    }
+
+    flx_dns_packet_set_field(p, DNS_FIELD_QDCOUNT, n);
+    flx_interface_send_packet(s->interface, s->protocol, p);
+    flx_dns_packet_free(p);
+}
+
+static flxQueryJob* look_for_query(flxPacketScheduler *s, flxKey *key) {
+    flxQueryJob *qj;
+    
+    g_assert(s);
+    g_assert(key);
+
+    for (qj = s->query_jobs; qj; qj = qj->jobs_next)
+        if (flx_key_equal(qj->key, key))
+            return qj;
+
+    return NULL;
+}
+
+void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key) {
+    flxQueryJob *qj;
+    GTimeVal tv;
+    
+    g_assert(s);
+    g_assert(key);
+
+    if (look_for_query(s, key))
+        return;
+
+    qj = g_new(flxQueryJob, 1);
+    qj->key = flx_key_ref(key);
+    qj->done = FALSE;
+
+    flx_elapse_time(&tv, 100, 0);
+    qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, query_elapse, qj);
+    qj->scheduler = s;
+
+    FLX_LLIST_PREPEND(flxQueryJob, jobs, s->query_jobs, qj);
+}
+
+static guint8* packet_add_response_job(flxPacketScheduler *s, flxDnsPacket *p, flxResponseJob *rj) {
+    guint8 *d;
+
+    g_assert(s);
+    g_assert(p);
+    g_assert(rj);
+
+    if ((d = flx_dns_packet_append_record(p, rj->record, FALSE))) {
+        GTimeVal tv;
+
+        rj->done = 1;
+
+        /* Drop response after 1s from history */
+        flx_elapse_time(&tv, 1000, 0);
+        flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
+    }
+
+    return d;
+}
+                                 
+
+static void response_elapse(flxTimeEvent *e, gpointer data) {
+    flxResponseJob *rj = data;
+    flxPacketScheduler *s;
+    flxDnsPacket *p;
+    guint n;
+    guint8 *d;
+
+    g_assert(rj);
+    s = rj->scheduler;
+
+    if (rj->done) {
+        /* Lets remove it  from the history */
+        response_job_free(s, rj);
+        return;
+    }
+
+    p = flx_dns_packet_new_response(s->interface->mtu - 200);
+    d = packet_add_response_job(s, p, rj);
+    g_assert(d);
+    n = 1;
+
+    /* Try to fill up packet with more responses, if available */
+    for (rj = s->response_jobs; rj; rj = rj->jobs_next) {
+
+        if (rj->done)
+            continue;
+
+        if (!packet_add_response_job(s, p, rj))
+            break;
+
+        n++;
+    }
+
+    flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, n);
+    flx_interface_send_packet(s->interface, s->protocol, p);
+    flx_dns_packet_free(p);
+}
+
+static flxResponseJob* look_for_response(flxPacketScheduler *s, flxRecord *record) {
+    flxResponseJob *rj;
+
+    g_assert(s);
+    g_assert(record);
+
+    for (rj = s->response_jobs; rj; rj = rj->jobs_next)
+        if (flx_record_equal(rj->record, record))
+            return rj;
+
+    return NULL;
+}
+
+void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record) {
+    flxResponseJob *rj;
+    GTimeVal tv;
+    
+    g_assert(s);
+    g_assert(record);
+
+    if (look_for_response(s, record))
+        return;
+
+    rj = g_new(flxResponseJob, 1);
+    rj->record = flx_record_ref(record);
+    rj->done = FALSE;
+
+    flx_elapse_time(&tv, 20, 100);
+    rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, response_elapse, rj);
+    rj->scheduler = s;
+
+    FLX_LLIST_PREPEND(flxResponseJob, jobs, s->response_jobs, rj);
+}
+
+void flx_packet_scheduler_drop_query(flxPacketScheduler *s, flxKey *key) {
+    flxQueryJob *qj;
+    
+    g_assert(s);
+    g_assert(key);
+
+    for (qj = s->query_jobs; qj; qj = qj->jobs_next)
+        if (flx_key_equal(qj->key, key)) {
+
+            if (!qj->done) {
+                GTimeVal tv;
+                qj->done = TRUE;
+                
+                /* Drop query after 100ms from history */
+                flx_elapse_time(&tv, 100, 0);
+                flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv);
+            }
+
+            break;
+        }
+}
+
+void flx_packet_scheduler_drop_response(flxPacketScheduler *s, flxRecord *record) {
+    flxResponseJob *rj;
+    
+    g_assert(s);
+    g_assert(record);
+
+    for  (rj = s->response_jobs; rj; rj = rj->jobs_next)
+        if (flx_record_equal(rj->record, record)) {
+
+            if (!rj->done) {
+                GTimeVal tv;
+                rj->done = TRUE;
+                
+                /* Drop response after 100ms from history */
+                flx_elapse_time(&tv, 100, 0);
+                flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
+            }
+
+            break;
+        }
+}
diff --git a/psched.h b/psched.h
new file mode 100644 (file)
index 0000000..90b67db
--- /dev/null
+++ b/psched.h
@@ -0,0 +1,48 @@
+#ifndef foopschedhfoo
+#define foopschedhfoo
+
+typedef struct _flxQueryJob flxQueryJob;
+typedef struct _flxResponseJob flxResponseJob;
+typedef struct _flxPacketScheduler flxPacketScheduler;
+
+#include "timeeventq.h"
+#include "rr.h"
+#include "llist.h"
+#include "iface.h"
+
+struct _flxQueryJob {
+    flxPacketScheduler *scheduler;
+    flxTimeEvent *time_event;
+    flxKey *key;
+    gboolean done;
+    FLX_LLIST_FIELDS(flxQueryJob, jobs);
+};
+
+struct _flxResponseJob {
+    flxPacketScheduler *scheduler;
+    flxTimeEvent *time_event;
+    flxRecord *record;
+    gboolean done;
+    FLX_LLIST_FIELDS(flxResponseJob, jobs);
+};
+
+struct _flxPacketScheduler {
+    flxServer *server;
+    
+    flxInterface *interface;
+    guchar protocol;
+
+    FLX_LLIST_HEAD(flxQueryJob, query_jobs);
+    FLX_LLIST_HEAD(flxResponseJob, response_jobs);
+};
+
+flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i, guchar protocol);
+void flx_packet_scheduler_free(flxPacketScheduler *s);
+
+void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key);
+void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record);
+
+void flx_packet_scheduler_drop_query(flxPacketScheduler *s, flxKey *key);
+void flx_packet_scheduler_drop_response(flxPacketScheduler *s, flxRecord *record);
+
+#endif
diff --git a/rr.c b/rr.c
index 0153b2819e85ad90af07e3aea7623daf98c59353..5acc09baa8dc33706e05d2e6663509ac6f190835 100644 (file)
--- a/rr.c
+++ b/rr.c
@@ -164,3 +164,13 @@ guint flx_key_hash(const flxKey *k) {
 
     return g_str_hash(k->name) + k->type + k->class;
 }
+
+gboolean flx_record_equal(const flxRecord *a, const flxRecord *b) {
+    g_assert(a);
+    g_assert(b);
+
+    return flx_key_equal(a->key, b->key) &&
+        a->ttl == b->ttl &&
+        a->size == b->size &&
+        memcmp(a->data, b->data, a->size) == 0;
+}
diff --git a/rr.h b/rr.h
index 0e61a7358136129a4b404ee2a0aed48388de5f9a..e3d36544adff5ea544e89e88150dd52fc5222f62 100644 (file)
--- a/rr.h
+++ b/rr.h
@@ -55,4 +55,6 @@ const gchar *flx_dns_type_to_string(guint16 type);
 gchar *flx_key_to_string(flxKey *k); /* g_free() the result! */
 gchar *flx_record_to_string(flxRecord *r);  /* g_free() the result! */
 
+gboolean flx_record_equal(const flxRecord *a, const flxRecord *b);
+
 #endif
index 6027f5f4aa75fd05f6ce0f730018b2c109b42589..0f292a349e84b7d302db79b4a103ee03c87c5bbe 100644 (file)
--- a/server.c
+++ b/server.c
@@ -9,45 +9,33 @@
 #include "iface.h"
 #include "socket.h"
 
-static void post_response(flxServer *s, flxRecord *r, gint iface, const flxAddress *a) {
-    flxInterface *i;
-
-    g_assert(s);
-    g_assert(r);
-    g_assert(iface > 0);
-    g_assert(a);
-        
-    if ((i = flx_interface_monitor_get_interface(s->monitor, iface)))
-        flx_interface_send_response(i, a->family, r);
-            
-}
-
-static void handle_query_key(flxServer *s, flxKey *k, gint iface, const flxAddress *a) {
+static void handle_query_key(flxServer *s, flxKey *k, flxInterface *i, const flxAddress *a) {
     flxEntry *e;
     
     g_assert(s);
     g_assert(k);
+    g_assert(i);
     g_assert(a);
 
     for (e = g_hash_table_lookup(s->rrset_by_name, k); e; e = e->by_name_next) {
 
-        if ((e->interface <= 0 || e->interface == iface) &&
+        if ((e->interface <= 0 || e->interface == i->index) &&
             (e->protocol == AF_UNSPEC || e->protocol == a->family)) {
-            post_response(s, e->record, iface, a);
 
+            flx_interface_post_response(i, a->family, e->record);
         }
     }
 }
 
-static void handle_query(flxServer *s, flxDnsPacket *p, gint iface, const flxAddress *a) {
+static void handle_query(flxServer *s, flxDnsPacket *p, flxInterface *i, const flxAddress *a) {
     guint n;
     
     g_assert(s);
     g_assert(p);
+    g_assert(i);
     g_assert(a);
 
     for (n = flx_dns_packet_get_field(p, DNS_FIELD_QDCOUNT); n > 0; n --) {
-
         flxKey *key;
 
         if (!(key = flx_dns_packet_consume_key(p))) {
@@ -55,29 +43,31 @@ static void handle_query(flxServer *s, flxDnsPacket *p, gint iface, const flxAdd
             return;
         }
 
-        handle_query_key(s, key, iface, a);
+        handle_query_key(s, key, i, a);
         flx_key_unref(key);
     }
 }
 
-static void add_response_to_cache(flxCache *c, flxDnsPacket *p, const flxAddress *a) {
+static void handle_response(flxServer *s, flxDnsPacket *p, flxInterface *i, const flxAddress *a) {
     guint n;
     
-    g_assert(c);
+    g_assert(s);
     g_assert(p);
+    g_assert(i);
     g_assert(a);
+    
     for (n = flx_dns_packet_get_field(p, DNS_FIELD_ANCOUNT); n > 0; n--) {
-
-        flxRecord *rr;
+        flxRecord *record;
         gboolean cache_flush = FALSE;
         
-        if (!(rr = flx_dns_packet_consume_record(p, &cache_flush))) {
+        if (!(record = flx_dns_packet_consume_record(p, &cache_flush))) {
             g_warning("Packet too short");
             return;
         }
 
-        flx_cache_update(c, rr, cache_flush, a);
-        flx_record_unref(rr);
+        flx_cache_update(a->family == AF_INET ? i->ipv4_cache : i->ipv6_cache, record, cache_flush, a);
+        flx_packet_scheduler_drop_response(a->family == AF_INET ? i->ipv4_scheduler : i->ipv6_scheduler, record);
+        flx_record_unref(record);
     }
 }
 
@@ -120,8 +110,8 @@ static void dispatch_packet(flxServer *s, flxDnsPacket *p, struct sockaddr *sa,
         return;
     }
 
-
     flx_address_from_sockaddr(sa, &a);
+
     
     if (flx_dns_packet_is_query(p)) {
 
@@ -132,11 +122,9 @@ static void dispatch_packet(flxServer *s, flxDnsPacket *p, struct sockaddr *sa,
             return;
         }
                 
-        handle_query(s, p, iface, &a);    
+        handle_query(s, p, i, &a);    
         g_message("Handled query");
     } else {
-        flxCache *c;
-
         if (flx_dns_packet_get_field(p, DNS_FIELD_QDCOUNT) != 0 ||
             flx_dns_packet_get_field(p, DNS_FIELD_ANCOUNT) == 0 ||
             flx_dns_packet_get_field(p, DNS_FIELD_NSCOUNT) != 0 ||
@@ -145,10 +133,8 @@ static void dispatch_packet(flxServer *s, flxDnsPacket *p, struct sockaddr *sa,
             return;
         }
 
-        c = a.family == AF_INET ? i->ipv4_cache : i->ipv6_cache;
-        add_response_to_cache(c, p, &a);
-
-        g_message("Handled responnse");
+        handle_response(s, p, i, &a);
+        g_message("Handled response");
     }
 }
 
@@ -528,7 +514,7 @@ void flx_server_send_query(flxServer *s, gint interface, guchar protocol, flxKey
         flxInterface *i;
 
         for (i = flx_interface_monitor_get_first(s->monitor); i; i = i->interface_next)
-            flx_interface_send_query(i, protocol, k);
+            flx_interface_post_query(i, protocol, k);
         
     } else {
         flxInterface *i;
@@ -536,6 +522,6 @@ void flx_server_send_query(flxServer *s, gint interface, guchar protocol, flxKey
         if (!(i = flx_interface_monitor_get_interface(s->monitor, interface)))
             return;
 
-        flx_interface_send_query(i, protocol, k);
+        flx_interface_post_query(i, protocol, k);
     }
 }
index 63f2bf6557704d4ae32fae456c5fba070a17c469..fbd5715445c392891b2e146ab4f4d6c333c223c8 100644 (file)
--- a/server.h
+++ b/server.h
@@ -2,7 +2,6 @@
 #define fooflxserverhfoo
 
 typedef struct _flxEntry flxEntry;
-typedef struct _flxResponseJob flxResponseJob;
 
 #include "flx.h"
 #include "iface.h"
@@ -23,12 +22,6 @@ struct _flxEntry {
     FLX_LLIST_FIELDS(flxEntry, by_id);
 };
 
-struct _flxResponseJob {
-    flxTimeEvent *time_event;
-    flxRecord *record;
-    FLX_LLIST_FIELDS(flxResponseJob, response);
-};
-
 struct _flxServer {
     GMainContext *context;
     flxInterfaceMonitor *monitor;
index 1887e1d932eba6694e339551b77691b2d683c1fe..cc43f160a458e15c795ec31a225eb0091124fdc5 100644 (file)
--- a/socket.c
+++ b/socket.c
@@ -252,7 +252,7 @@ gint flx_send_dns_packet_ipv4(gint fd, gint interface, flxDnsPacket *p) {
     mdns_mcast_group_ipv4(&sa);
 
     memset(&io, 0, sizeof(io));
-    io.iov_base = p->data;
+    io.iov_base = FLX_DNS_PACKET_DATA(p);
     io.iov_len = p->size;
 
     memset(cmsg_data, 0, sizeof(cmsg_data));
@@ -292,7 +292,7 @@ gint flx_send_dns_packet_ipv6(gint fd, gint interface, flxDnsPacket *p) {
     mdns_mcast_group_ipv6(&sa);
 
     memset(&io, 0, sizeof(io));
-    io.iov_base = p->data;
+    io.iov_base = FLX_DNS_PACKET_DATA(p);
     io.iov_len = p->size;
 
     memset(cmsg_data, 0, sizeof(cmsg_data));
@@ -330,10 +330,10 @@ flxDnsPacket* flx_recv_dns_packet_ipv4(gint fd, struct sockaddr_in *ret_sa, gint
     g_assert(ret_iface);
     g_assert(ret_ttl);
 
-    p = flx_dns_packet_new();
+    p = flx_dns_packet_new(0);
 
-    io.iov_base = p->data;
-    io.iov_len = sizeof(p->data);
+    io.iov_base = FLX_DNS_PACKET_DATA(p);
+    io.iov_len = p->max_size;
     
     memset(&msg, 0, sizeof(msg));
     msg.msg_name = ret_sa;
@@ -376,7 +376,7 @@ fail:
 }
 
 flxDnsPacket* flx_recv_dns_packet_ipv6(gint fd, struct sockaddr_in6 *ret_sa, gint *ret_iface, guint8* ret_ttl) {
-    flxDnsPacket *p= NULL;
+    flxDnsPacket *p = NULL;
     struct msghdr msg;
     struct iovec io;
     uint8_t aux[64];
@@ -389,10 +389,10 @@ flxDnsPacket* flx_recv_dns_packet_ipv6(gint fd, struct sockaddr_in6 *ret_sa, gin
     g_assert(ret_iface);
     g_assert(ret_ttl);
 
-    p = flx_dns_packet_new();
+    p = flx_dns_packet_new(0);
 
-    io.iov_base = p->data;
-    io.iov_len = sizeof(p->data);
+    io.iov_base = FLX_DNS_PACKET_DATA(p);
+    io.iov_len = p->max_size;
     
     memset(&msg, 0, sizeof(msg));
     msg.msg_name = ret_sa;
index b3dd897f38d2db9200b8a46892773cf8b23d1794..0d4af97385356a1d4c39d08536c1d79708a115ff 100644 (file)
@@ -133,7 +133,7 @@ void flx_time_event_queue_remove(flxTimeEventQueue *q, flxTimeEvent *e) {
     g_free(e);
 }
 
-void flx_time_event_update(flxTimeEventQueue *q, flxTimeEvent *e, const GTimeVal *timeval) {
+void flx_time_event_queue_update(flxTimeEventQueue *q, flxTimeEvent *e, const GTimeVal *timeval) {
     g_assert(q);
     g_assert(e);
     g_assert(e->queue == q);
@@ -142,3 +142,15 @@ void flx_time_event_update(flxTimeEventQueue *q, flxTimeEvent *e, const GTimeVal
 
     flx_prio_queue_shuffle(q->prioq, e->node);
 }
+
+flxTimeEvent* flx_time_event_queue_root(flxTimeEventQueue *q) {
+    g_assert(q);
+
+    return q->prioq->root ? q->prioq->root->data : NULL;
+}
+
+flxTimeEvent* flx_time_event_next(flxTimeEvent *e) {
+    g_assert(e);
+
+    return e->node->next->data;
+}
index 8fd5c02a2974e9be1dba01aafd083d2669e37fa0..f663f1fc4ff1bf52524ce6482b953b796800f9c2 100644 (file)
@@ -25,6 +25,9 @@ void flx_time_event_queue_free(flxTimeEventQueue *q);
 flxTimeEvent* flx_time_event_queue_add(flxTimeEventQueue *q, const GTimeVal *timeval, void (*callback)(flxTimeEvent *e, void *userdata), void *userdata);
 void flx_time_event_queue_remove(flxTimeEventQueue *q, flxTimeEvent *e);
 
-void flx_time_event_update(flxTimeEventQueue *q, flxTimeEvent *e, const GTimeVal *timeval);
+void flx_time_event_queue_update(flxTimeEventQueue *q, flxTimeEvent *e, const GTimeVal *timeval);
+
+flxTimeEvent* flx_time_event_queue_root(flxTimeEventQueue *q);
+flxTimeEvent* flx_time_event_next(flxTimeEvent *e);
 
 #endif
diff --git a/util.c b/util.c
index faf1ac068e03aa89f6b4e53981ac426e19b79b39..b8128c40c1e05de312200598d55c53e2247834ae 100644 (file)
--- a/util.c
+++ b/util.c
@@ -99,3 +99,17 @@ gint flx_wait_for_write(gint fd) {
 
     return 0;
 }
+
+GTimeVal *flx_elapse_time(GTimeVal *tv, guint msec, guint jitter) {
+    g_assert(tv);
+
+    g_get_current_time(tv);
+
+    if (msec)
+        g_time_val_add(tv, msec*1000);
+
+    if (jitter)
+        g_time_val_add(tv, g_random_int_range(0, jitter) * 1000);
+        
+    return tv;
+}
diff --git a/util.h b/util.h
index 78f86d2e196d457ee38cbd013b5a35d23fcd9029..517c2f1b8efa3f9712abf1afab8b278c2f45da68 100644 (file)
--- a/util.h
+++ b/util.h
@@ -13,4 +13,6 @@ gint flx_set_cloexec(gint fd);
 gint flx_set_nonblock(gint fd);
 gint flx_wait_for_write(gint fd);
 
+GTimeVal *flx_elapse_time(GTimeVal *tv, guint msec, guint jitter);
+
 #endif