]> git.meshlink.io Git - catta/blobdiff - psched.c
* improve dns.c testing program
[catta] / psched.c
index 138dd50d39a533c5857273a4c137d34ef80dcb55..105be93d57a7d214edbb68b3ea80b5bb7584260f 100644 (file)
--- a/psched.c
+++ b/psched.c
@@ -3,11 +3,12 @@
 #include "util.h"
 #include "psched.h"
 
-#define FLX_QUERY_HISTORY_MSEC 700
+#define FLX_QUERY_HISTORY_MSEC 100
 #define FLX_QUERY_DEFER_MSEC 100
 #define FLX_RESPONSE_HISTORY_MSEC 700
 #define FLX_RESPONSE_DEFER_MSEC 20
 #define FLX_RESPONSE_JITTER_MSEC 100
+#define FLX_PROBE_DEFER_MSEC 100
 
 flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i) {
     flxPacketScheduler *s;
@@ -22,6 +23,7 @@ flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i)
     FLX_LLIST_HEAD_INIT(flxQueryJob, s->query_jobs);
     FLX_LLIST_HEAD_INIT(flxResponseJob, s->response_jobs);
     FLX_LLIST_HEAD_INIT(flxKnownAnswer, s->known_answers);
+    FLX_LLIST_HEAD_INIT(flxProbeJob, s->probe_jobs);
     
     return s;
 }
@@ -50,9 +52,22 @@ static void response_job_free(flxPacketScheduler *s, flxResponseJob *rj) {
     g_free(rj);
 }
 
+static void probe_job_free(flxPacketScheduler *s, flxProbeJob *pj) {
+    g_assert(pj);
+
+    if (pj->time_event)
+        flx_time_event_queue_remove(pj->scheduler->server->time_event_queue, pj->time_event);
+
+    FLX_LLIST_REMOVE(flxProbeJob, jobs, s->probe_jobs, pj);
+
+    flx_record_unref(pj->record);
+    g_free(pj);
+}
+
 void flx_packet_scheduler_free(flxPacketScheduler *s) {
     flxQueryJob *qj;
     flxResponseJob *rj;
+    flxProbeJob *pj;
     flxTimeEvent *e;
 
     g_assert(s);
@@ -63,6 +78,8 @@ void flx_packet_scheduler_free(flxPacketScheduler *s) {
         query_job_free(s, qj);
     while ((rj = s->response_jobs))
         response_job_free(s, rj);
+    while ((pj = s->probe_jobs))
+        probe_job_free(s, pj);
 
     g_free(s);
 }
@@ -126,8 +143,8 @@ static void append_known_answers_and_send(flxPacketScheduler *s, flxDnsPacket *p
 
             g_assert(!flx_dns_packet_is_empty(p));
 
-            flx_dns_packet_set_field(p, DNS_FIELD_FLAGS, flx_dns_packet_get_field(p, DNS_FIELD_FLAGS) | DNS_FLAG_TC);
-            flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, n);
+            flx_dns_packet_set_field(p, FLX_DNS_FIELD_FLAGS, flx_dns_packet_get_field(p, FLX_DNS_FIELD_FLAGS) | FLX_DNS_FLAG_TC);
+            flx_dns_packet_set_field(p, FLX_DNS_FIELD_ANCOUNT, n);
             flx_interface_send_packet(s->interface, p);
             flx_dns_packet_free(p);
 
@@ -142,7 +159,7 @@ static void append_known_answers_and_send(flxPacketScheduler *s, flxDnsPacket *p
         n++;
     }
     
-    flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, n);
+    flx_dns_packet_set_field(p, FLX_DNS_FIELD_ANCOUNT, n);
     flx_interface_send_packet(s->interface, p);
     flx_dns_packet_free(p);
 }
@@ -182,7 +199,7 @@ static void query_elapse(flxTimeEvent *e, gpointer data) {
         n++;
     }
 
-    flx_dns_packet_set_field(p, DNS_FIELD_QDCOUNT, n);
+    flx_dns_packet_set_field(p, FLX_DNS_FIELD_QDCOUNT, n);
 
     /* Now add known answers */
     append_known_answers_and_send(s, p);
@@ -242,7 +259,7 @@ static guint8* packet_add_response_job(flxPacketScheduler *s, flxDnsPacket *p, f
     g_assert(p);
     g_assert(rj);
 
-    if ((d = flx_dns_packet_append_record(p, rj->record, FALSE))) {
+    if ((d = flx_dns_packet_append_record(p, rj->record, rj->flush_cache))) {
         GTimeVal tv;
 
         rj->done = 1;
@@ -286,7 +303,7 @@ static void send_response_packet(flxPacketScheduler *s, flxResponseJob *rj) {
         n++;
     }
 
-    flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, n);
+    flx_dns_packet_set_field(p, FLX_DNS_FIELD_ANCOUNT, n);
     flx_interface_send_packet(s->interface, p);
     flx_dns_packet_free(p);
 }
@@ -331,13 +348,15 @@ static flxResponseJob* response_job_new(flxPacketScheduler *s, flxRecord *record
     rj->record = flx_record_ref(record);
     rj->done = FALSE;
     rj->time_event = NULL;
+    rj->address_valid = FALSE;
+    rj->flush_cache = FALSE;
     
     FLX_LLIST_PREPEND(flxResponseJob, jobs, s->response_jobs, rj);
 
     return rj;
 }
 
-void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record, gboolean immediately) {
+void flx_packet_scheduler_post_response(flxPacketScheduler *s, const flxAddress *a, flxRecord *record, gboolean flush_cache, gboolean immediately) {
     flxResponseJob *rj;
     GTimeVal tv;
     gchar *t;
@@ -361,6 +380,16 @@ void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record
         if (!!record->ttl == !!rj->record->ttl &&
             d >= 0 && d <= FLX_RESPONSE_HISTORY_MSEC*1000) {
             g_message("WARNING! DUPLICATE RESPONSE SUPPRESSION ACTIVE!");
+
+            /* This job is no longer specific to a single querier, so
+             * make sure it isn't suppressed by known answer
+             * suppresion */
+
+            if (rj->address_valid && (!a || flx_address_cmp(a, &rj->address) != 0))
+                rj->address_valid = FALSE;
+
+            rj->flush_cache = flush_cache;
+            
             return;
         }
 
@@ -374,8 +403,16 @@ void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record
 
     /* Create a new job and schedule it */
     rj = response_job_new(s, record);
+    rj->flush_cache = flush_cache;
     rj->delivery = tv;
     rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &rj->delivery, response_elapse, rj);
+
+    /* Store the address of the host this messages is intended to, so
+       that we can drop this job in case a truncated message with
+       known answer suppresion entries is recieved */
+
+    if ((rj->address_valid = !!a))
+        rj->address = *a;
 }
 
 void flx_packet_scheduler_incoming_query(flxPacketScheduler *s, flxKey *key) {
@@ -488,6 +525,31 @@ mark_done:
     g_get_current_time(&rj->delivery);
 }
 
+void flx_packet_scheduler_incoming_known_answer(flxPacketScheduler *s, flxRecord *record, const flxAddress *a) {
+    flxResponseJob *rj;
+    
+    g_assert(s);
+    g_assert(record);
+    g_assert(a);
+
+    for (rj = s->response_jobs; rj; rj = rj->jobs_next) {
+
+        g_assert(record->ttl > 0);
+        g_assert(rj->record->ttl/2);
+        
+        if (flx_record_equal_no_ttl(rj->record, record))
+            if (rj->address_valid)
+                if (flx_address_cmp(&rj->address, a))
+                    if (record->ttl >= rj->record->ttl/2) {
+
+            /* Let's suppress it */
+
+            response_job_free(s, rj);
+            break;
+        }
+    }
+}
+
 void flx_packet_scheduler_flush_responses(flxPacketScheduler *s) {
     flxResponseJob *rj;
     
@@ -499,3 +561,153 @@ void flx_packet_scheduler_flush_responses(flxPacketScheduler *s) {
         if (!rj->done)
             send_response_packet(s, rj);
 }
+
+static flxProbeJob* probe_job_new(flxPacketScheduler *s, flxRecord *record) {
+    flxProbeJob *pj;
+    
+    g_assert(s);
+    g_assert(record);
+
+    pj = g_new(flxProbeJob, 1);
+    pj->scheduler = s;
+    pj->record = flx_record_ref(record);
+    pj->time_event = NULL;
+    pj->chosen = FALSE;
+    
+    FLX_LLIST_PREPEND(flxProbeJob, jobs, s->probe_jobs, pj);
+
+    return pj;
+}
+
+static guint8* packet_add_probe_query(flxPacketScheduler *s, flxDnsPacket *p, flxProbeJob *pj) {
+    guint size;
+    guint8 *ret;
+    flxKey *k;
+
+    g_assert(s);
+    g_assert(p);
+    g_assert(pj);
+
+    g_assert(!pj->chosen);
+    
+    /* Estimate the size for this record */
+    size =
+        flx_key_get_estimate_size(pj->record->key) +
+        flx_record_get_estimate_size(pj->record);
+
+    /* Too large */
+    if (size > flx_dns_packet_space(p))
+        return NULL;
+
+    /* Create the probe query */
+    k = flx_key_new(pj->record->key->name, pj->record->key->class, FLX_DNS_TYPE_ANY);
+    ret = flx_dns_packet_append_key(p, k);
+    g_assert(ret);
+
+    /* Mark this job for addition to the packet */
+    pj->chosen = TRUE;
+
+    /* Scan for more jobs whith matching key pattern */
+    for (pj = s->probe_jobs; pj; pj = pj->jobs_next) {
+        if (pj->chosen)
+            continue;
+
+        /* Does the record match the probe? */
+        if (k->class != pj->record->key->class || flx_domain_equal(k->name, pj->record->key->name))
+            continue;
+        
+        /* This job wouldn't fit in */
+        if (flx_record_get_estimate_size(pj->record) > flx_dns_packet_space(p))
+            break;
+
+        /* Mark this job for addition to the packet */
+        pj->chosen = TRUE;
+    }
+
+    flx_key_unref(k);
+            
+    return ret;
+}
+
+static void probe_elapse(flxTimeEvent *e, gpointer data) {
+    flxProbeJob *pj = data, *next;
+    flxPacketScheduler *s;
+    flxDnsPacket *p;
+    guint n;
+    guint8 *d;
+
+    g_assert(pj);
+    s = pj->scheduler;
+
+    p = flx_dns_packet_new_query(s->interface->hardware->mtu - 48);
+
+    /* Add the import probe */
+    if (!packet_add_probe_query(s, p, pj)) {
+        g_warning("Record too large! ---");
+        flx_dns_packet_free(p);
+        return;
+    }
+
+    n = 1;
+    
+    /* Try to fill up packet with more probes, if available */
+    for (pj = s->probe_jobs; pj; pj = pj->jobs_next) {
+
+        if (pj->chosen)
+            continue;
+        
+        if (!packet_add_probe_query(s, p, pj))
+            break;
+        
+        n++;
+    }
+
+    flx_dns_packet_set_field(p, FLX_DNS_FIELD_QDCOUNT, n);
+
+    n = 0;
+
+    /* Now add the chosen records to the authorative section */
+    for (pj = s->probe_jobs; pj; pj = next) {
+
+        next = pj->jobs_next;
+
+        if (!pj->chosen)
+            continue;
+
+        if (!flx_dns_packet_append_record(p, pj->record, TRUE)) {
+            g_warning("Bad probe size estimate!");
+
+            /* Unmark all following jobs */
+            for (; pj; pj = pj->jobs_next)
+                pj->chosen = FALSE;
+            
+            break;
+        }
+
+        probe_job_free(s, pj);
+        n ++;
+    }
+    
+    flx_dns_packet_set_field(p, FLX_DNS_FIELD_NSCOUNT, n);
+
+    /* Send it now */
+    flx_interface_send_packet(s->interface, p);
+    flx_dns_packet_free(p);
+}
+
+void flx_packet_scheduler_post_probe(flxPacketScheduler *s, flxRecord *record, gboolean immediately) {
+    flxProbeJob *pj;
+    GTimeVal tv;
+    
+    g_assert(s);
+    g_assert(record);
+    g_assert(!flx_key_is_pattern(record->key));
+    
+    flx_elapse_time(&tv, immediately ? 0 : FLX_PROBE_DEFER_MSEC, 0);
+
+    /* No duplication check here... */
+    /* Create a new job and schedule it */
+    pj = probe_job_new(s, record);
+    pj->delivery = tv;
+    pj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &pj->delivery, probe_elapse, pj);
+}