]> git.meshlink.io Git - catta/blobdiff - psched.c
add known answer suppresion server part
[catta] / psched.c
index 162c1b2b098708aa4b8bc3fe92037ba55e984628..1b6d26c8763d0f1b2c94be24d1bf2a89d1cfc353 100644 (file)
--- a/psched.c
+++ b/psched.c
@@ -21,6 +21,7 @@ flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i)
 
     FLX_LLIST_HEAD_INIT(flxQueryJob, s->query_jobs);
     FLX_LLIST_HEAD_INIT(flxResponseJob, s->response_jobs);
+    FLX_LLIST_HEAD_INIT(flxKnownAnswer, s->known_answers);
     
     return s;
 }
@@ -53,9 +54,11 @@ void flx_packet_scheduler_free(flxPacketScheduler *s) {
     flxQueryJob *qj;
     flxResponseJob *rj;
     flxTimeEvent *e;
-    
+
     g_assert(s);
 
+    g_assert(!s->known_answers);
+    
     while ((qj = s->query_jobs))
         query_job_free(s, qj);
     while ((rj = s->response_jobs))
@@ -64,6 +67,26 @@ void flx_packet_scheduler_free(flxPacketScheduler *s) {
     g_free(s);
 }
 
+static gpointer known_answer_walk_callback(flxCache *c, flxKey *pattern, flxCacheEntry *e, gpointer userdata) {
+    flxPacketScheduler *s = userdata;
+    flxKnownAnswer *ka;
+    
+    g_assert(c);
+    g_assert(pattern);
+    g_assert(e);
+    g_assert(s);
+
+    if (flx_cache_entry_half_ttl(c, e))
+        return NULL;
+    
+    ka = g_new0(flxKnownAnswer, 1);
+    ka->scheduler = s;
+    ka->record = flx_record_ref(e->record);
+
+    FLX_LLIST_PREPEND(flxKnownAnswer, known_answer, s->known_answers, ka);
+    return NULL;
+}
+
 static guint8* packet_add_query_job(flxPacketScheduler *s, flxDnsPacket *p, flxQueryJob *qj) {
     guint8 *d;
 
@@ -81,11 +104,49 @@ static guint8* packet_add_query_job(flxPacketScheduler *s, flxDnsPacket *p, flxQ
         flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv);
 
         g_get_current_time(&qj->delivery);
+
+        /* Add all matching known answers to the list */
+        flx_cache_walk(s->interface->cache, qj->key, known_answer_walk_callback, s);
     }
 
     return d;
 }
-                                 
+
+static void append_known_answers_and_send(flxPacketScheduler *s, flxDnsPacket *p) {
+    flxKnownAnswer *ka;
+    guint n;
+    g_assert(s);
+    g_assert(p);
+
+    n = 0;
+    
+    while ((ka = s->known_answers)) {
+
+        while (!flx_dns_packet_append_record(p, ka->record, FALSE)) {
+
+            g_assert(!flx_dns_packet_is_empty(p));
+
+            flx_dns_packet_set_field(p, DNS_FIELD_FLAGS, flx_dns_packet_get_field(p, DNS_FIELD_FLAGS) | DNS_FLAG_TC);
+            flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, n);
+            flx_interface_send_packet(s->interface, p);
+            flx_dns_packet_free(p);
+
+            p = flx_dns_packet_new_query(s->interface->hardware->mtu - 48);
+            n = 0;
+        }
+
+        FLX_LLIST_REMOVE(flxKnownAnswer, known_answer, s->known_answers, ka);
+        flx_record_unref(ka->record);
+        g_free(ka);
+        
+        n++;
+    }
+    
+    flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, n);
+    flx_interface_send_packet(s->interface, p);
+    flx_dns_packet_free(p);
+}
+
 static void query_elapse(flxTimeEvent *e, gpointer data) {
     flxQueryJob *qj = data;
     flxPacketScheduler *s;
@@ -102,6 +163,8 @@ static void query_elapse(flxTimeEvent *e, gpointer data) {
         return;
     }
 
+    g_assert(!s->known_answers);
+    
     p = flx_dns_packet_new_query(s->interface->hardware->mtu - 48);
     d = packet_add_query_job(s, p, qj);
     g_assert(d);
@@ -120,21 +183,9 @@ static void query_elapse(flxTimeEvent *e, gpointer data) {
     }
 
     flx_dns_packet_set_field(p, DNS_FIELD_QDCOUNT, n);
-    flx_interface_send_packet(s->interface, p);
-    flx_dns_packet_free(p);
-}
 
-static flxQueryJob* look_for_query(flxPacketScheduler *s, flxKey *key) {
-    flxQueryJob *qj;
-    
-    g_assert(s);
-    g_assert(key);
-
-    for (qj = s->query_jobs; qj; qj = qj->jobs_next)
-        if (flx_key_equal(qj->key, key))
-            return qj;
-
-    return NULL;
+    /* Now add known answers */
+    append_known_answers_and_send(s, p);
 }
 
 flxQueryJob* query_job_new(flxPacketScheduler *s, flxKey *key) {
@@ -162,18 +213,22 @@ void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key, gboolea
     g_assert(key);
 
     flx_elapse_time(&tv, immediately ? 0 : FLX_QUERY_DEFER_MSEC, 0);
-    
-    if ((qj = look_for_query(s, key))) {
-        glong d = flx_timeval_diff(&tv, &qj->delivery);
 
-        /* Duplicate questions suppression */
-        if (d >= 0 && d <= FLX_QUERY_HISTORY_MSEC*1000) {
-            g_message("WARNING! DUPLICATE QUERY SUPPRESSION ACTIVE!");
-            return;
-        }
+    for (qj = s->query_jobs; qj; qj = qj->jobs_next)
 
-        query_job_free(s, qj);
-    }
+        if (flx_key_equal(qj->key, key)) {
+
+            glong d = flx_timeval_diff(&tv, &qj->delivery);
+
+            /* Duplicate questions suppression */
+            if (d >= 0 && d <= FLX_QUERY_HISTORY_MSEC*1000) {
+                g_message("WARNING! DUPLICATE QUERY SUPPRESSION ACTIVE!");
+                return;
+            }
+            
+            query_job_free(s, qj);
+            break;
+        }
 
     qj = query_job_new(s, key);
     qj->delivery = tv;
@@ -282,7 +337,7 @@ static flxResponseJob* response_job_new(flxPacketScheduler *s, flxRecord *record
     return rj;
 }
 
-void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record, gboolean immediately) {
+void flx_packet_scheduler_post_response(flxPacketScheduler *s, const flxAddress *a, flxRecord *record, gboolean immediately) {
     flxResponseJob *rj;
     GTimeVal tv;
     gchar *t;
@@ -290,6 +345,8 @@ void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record
     g_assert(s);
     g_assert(record);
 
+    g_assert(!flx_key_is_pattern(record->key));
+    
     flx_elapse_time(&tv, immediately ? 0 : FLX_RESPONSE_DEFER_MSEC, immediately ? 0 : FLX_RESPONSE_JITTER_MSEC);
     
     /* Don't send out duplicates */
@@ -304,6 +361,12 @@ void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record
         if (!!record->ttl == !!rj->record->ttl &&
             d >= 0 && d <= FLX_RESPONSE_HISTORY_MSEC*1000) {
             g_message("WARNING! DUPLICATE RESPONSE SUPPRESSION ACTIVE!");
+
+            /* This job is no longer specific to a single querier, so
+             * make sure it isn't suppressed by known answer
+             * suppresion */
+            rj->address_valid = FALSE;
+            
             return;
         }
 
@@ -319,6 +382,13 @@ void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record
     rj = response_job_new(s, record);
     rj->delivery = tv;
     rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &rj->delivery, response_elapse, rj);
+
+    /* Store the address of the host this messages is intended to, so
+       that we can drop this job in case a truncated message with
+       known answer suppresion entries is recieved */
+
+    if ((rj->address_valid = !!a))
+        rj->address = *a;
 }
 
 void flx_packet_scheduler_incoming_query(flxPacketScheduler *s, flxKey *key) {
@@ -431,6 +501,27 @@ mark_done:
     g_get_current_time(&rj->delivery);
 }
 
+void flx_packet_scheduler_incoming_known_answer(flxPacketScheduler *s, flxRecord *record, const flxAddress *a) {
+    flxResponseJob *rj;
+    
+    g_assert(s);
+    g_assert(record);
+    g_assert(a);
+
+    for (rj = s->response_jobs; rj; rj = rj->jobs_next)
+        if (flx_record_equal_no_ttl(rj->record, record) &&
+            rj->address_valid &&
+            flx_address_cmp(&rj->address, a) &&
+            record->ttl >= rj->record->ttl/2) {
+
+            /* Let's suppress it */
+
+            response_job_free(s, rj);
+            break;
+        }
+    
+}
+
 void flx_packet_scheduler_flush_responses(flxPacketScheduler *s) {
     flxResponseJob *rj;