]> git.meshlink.io Git - catta/blob - psched.c
add support for dots and backslashes in domain names (required for DNS-SD)
[catta] / psched.c
1 #include <string.h>
2
3 #include "util.h"
4 #include "psched.h"
5
6 #define FLX_QUERY_HISTORY_MSEC 100
7 #define FLX_QUERY_DEFER_MSEC 100
8 #define FLX_RESPONSE_HISTORY_MSEC 700
9 #define FLX_RESPONSE_DEFER_MSEC 20
10 #define FLX_RESPONSE_JITTER_MSEC 100
11 #define FLX_PROBE_DEFER_MSEC 100
12
13 flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i) {
14     flxPacketScheduler *s;
15
16     g_assert(server);
17     g_assert(i);
18
19     s = g_new(flxPacketScheduler, 1);
20     s->server = server;
21     s->interface = i;
22
23     FLX_LLIST_HEAD_INIT(flxQueryJob, s->query_jobs);
24     FLX_LLIST_HEAD_INIT(flxResponseJob, s->response_jobs);
25     FLX_LLIST_HEAD_INIT(flxKnownAnswer, s->known_answers);
26     FLX_LLIST_HEAD_INIT(flxProbeJob, s->probe_jobs);
27     
28     return s;
29 }
30
31 static void query_job_free(flxPacketScheduler *s, flxQueryJob *qj) {
32     g_assert(qj);
33
34     if (qj->time_event)
35         flx_time_event_queue_remove(qj->scheduler->server->time_event_queue, qj->time_event);
36
37     FLX_LLIST_REMOVE(flxQueryJob, jobs, s->query_jobs, qj);
38     
39     flx_key_unref(qj->key);
40     g_free(qj);
41 }
42
43 static void response_job_free(flxPacketScheduler *s, flxResponseJob *rj) {
44     g_assert(rj);
45
46     if (rj->time_event)
47         flx_time_event_queue_remove(rj->scheduler->server->time_event_queue, rj->time_event);
48
49     FLX_LLIST_REMOVE(flxResponseJob, jobs, s->response_jobs, rj);
50
51     flx_record_unref(rj->record);
52     g_free(rj);
53 }
54
55 static void probe_job_free(flxPacketScheduler *s, flxProbeJob *pj) {
56     g_assert(pj);
57
58     if (pj->time_event)
59         flx_time_event_queue_remove(pj->scheduler->server->time_event_queue, pj->time_event);
60
61     FLX_LLIST_REMOVE(flxProbeJob, jobs, s->probe_jobs, pj);
62
63     flx_record_unref(pj->record);
64     g_free(pj);
65 }
66
67 void flx_packet_scheduler_free(flxPacketScheduler *s) {
68     flxQueryJob *qj;
69     flxResponseJob *rj;
70     flxProbeJob *pj;
71     flxTimeEvent *e;
72
73     g_assert(s);
74
75     g_assert(!s->known_answers);
76     
77     while ((qj = s->query_jobs))
78         query_job_free(s, qj);
79     while ((rj = s->response_jobs))
80         response_job_free(s, rj);
81     while ((pj = s->probe_jobs))
82         probe_job_free(s, pj);
83
84     g_free(s);
85 }
86
87 static gpointer known_answer_walk_callback(flxCache *c, flxKey *pattern, flxCacheEntry *e, gpointer userdata) {
88     flxPacketScheduler *s = userdata;
89     flxKnownAnswer *ka;
90     
91     g_assert(c);
92     g_assert(pattern);
93     g_assert(e);
94     g_assert(s);
95
96     if (flx_cache_entry_half_ttl(c, e))
97         return NULL;
98     
99     ka = g_new0(flxKnownAnswer, 1);
100     ka->scheduler = s;
101     ka->record = flx_record_ref(e->record);
102
103     FLX_LLIST_PREPEND(flxKnownAnswer, known_answer, s->known_answers, ka);
104     return NULL;
105 }
106
107 static guint8* packet_add_query_job(flxPacketScheduler *s, flxDnsPacket *p, flxQueryJob *qj) {
108     guint8 *d;
109
110     g_assert(s);
111     g_assert(p);
112     g_assert(qj);
113
114     if ((d = flx_dns_packet_append_key(p, qj->key))) {
115         GTimeVal tv;
116
117         qj->done = 1;
118
119         /* Drop query after some time from history from history */
120         flx_elapse_time(&tv, FLX_QUERY_HISTORY_MSEC, 0);
121         flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv);
122
123         g_get_current_time(&qj->delivery);
124
125         /* Add all matching known answers to the list */
126         flx_cache_walk(s->interface->cache, qj->key, known_answer_walk_callback, s);
127     }
128
129     return d;
130 }
131
132 static void append_known_answers_and_send(flxPacketScheduler *s, flxDnsPacket *p) {
133     flxKnownAnswer *ka;
134     guint n;
135     g_assert(s);
136     g_assert(p);
137
138     n = 0;
139     
140     while ((ka = s->known_answers)) {
141
142         while (!flx_dns_packet_append_record(p, ka->record, FALSE)) {
143
144             g_assert(!flx_dns_packet_is_empty(p));
145
146             flx_dns_packet_set_field(p, FLX_DNS_FIELD_FLAGS, flx_dns_packet_get_field(p, FLX_DNS_FIELD_FLAGS) | FLX_DNS_FLAG_TC);
147             flx_dns_packet_set_field(p, FLX_DNS_FIELD_ANCOUNT, n);
148             flx_interface_send_packet(s->interface, p);
149             flx_dns_packet_free(p);
150
151             p = flx_dns_packet_new_query(s->interface->hardware->mtu - 48);
152             n = 0;
153         }
154
155         FLX_LLIST_REMOVE(flxKnownAnswer, known_answer, s->known_answers, ka);
156         flx_record_unref(ka->record);
157         g_free(ka);
158         
159         n++;
160     }
161     
162     flx_dns_packet_set_field(p, FLX_DNS_FIELD_ANCOUNT, n);
163     flx_interface_send_packet(s->interface, p);
164     flx_dns_packet_free(p);
165 }
166
167 static void query_elapse(flxTimeEvent *e, gpointer data) {
168     flxQueryJob *qj = data;
169     flxPacketScheduler *s;
170     flxDnsPacket *p;
171     guint n;
172     guint8 *d;
173
174     g_assert(qj);
175     s = qj->scheduler;
176
177     if (qj->done) {
178         /* Lets remove it  from the history */
179         query_job_free(s, qj);
180         return;
181     }
182
183     g_assert(!s->known_answers);
184     
185     p = flx_dns_packet_new_query(s->interface->hardware->mtu - 48);
186     d = packet_add_query_job(s, p, qj);
187     g_assert(d);
188     n = 1;
189
190     /* Try to fill up packet with more queries, if available */
191     for (qj = s->query_jobs; qj; qj = qj->jobs_next) {
192
193         if (qj->done)
194             continue;
195
196         if (!packet_add_query_job(s, p, qj))
197             break;
198
199         n++;
200     }
201
202     flx_dns_packet_set_field(p, FLX_DNS_FIELD_QDCOUNT, n);
203
204     /* Now add known answers */
205     append_known_answers_and_send(s, p);
206 }
207
208 flxQueryJob* query_job_new(flxPacketScheduler *s, flxKey *key) {
209     flxQueryJob *qj;
210     
211     g_assert(s);
212     g_assert(key);
213
214     qj = g_new(flxQueryJob, 1);
215     qj->scheduler = s;
216     qj->key = flx_key_ref(key);
217     qj->done = FALSE;
218     qj->time_event = NULL;
219     
220     FLX_LLIST_PREPEND(flxQueryJob, jobs, s->query_jobs, qj);
221
222     return qj;
223 }
224
225 void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key, gboolean immediately) {
226     GTimeVal tv;
227     flxQueryJob *qj;
228     
229     g_assert(s);
230     g_assert(key);
231
232     flx_elapse_time(&tv, immediately ? 0 : FLX_QUERY_DEFER_MSEC, 0);
233
234     for (qj = s->query_jobs; qj; qj = qj->jobs_next)
235
236         if (flx_key_equal(qj->key, key)) {
237
238             glong d = flx_timeval_diff(&tv, &qj->delivery);
239
240             /* Duplicate questions suppression */
241             if (d >= 0 && d <= FLX_QUERY_HISTORY_MSEC*1000) {
242                 g_message("WARNING! DUPLICATE QUERY SUPPRESSION ACTIVE!");
243                 return;
244             }
245             
246             query_job_free(s, qj);
247             break;
248         }
249
250     qj = query_job_new(s, key);
251     qj->delivery = tv;
252     qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &qj->delivery, query_elapse, qj);
253 }
254
255 static guint8* packet_add_response_job(flxPacketScheduler *s, flxDnsPacket *p, flxResponseJob *rj) {
256     guint8 *d;
257
258     g_assert(s);
259     g_assert(p);
260     g_assert(rj);
261
262     if ((d = flx_dns_packet_append_record(p, rj->record, rj->flush_cache))) {
263         GTimeVal tv;
264
265         rj->done = 1;
266
267         /* Drop response after some time from history */
268         flx_elapse_time(&tv, FLX_RESPONSE_HISTORY_MSEC, 0);
269         flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
270
271         g_get_current_time(&rj->delivery);
272     }
273
274     return d;
275 }
276
277 static void send_response_packet(flxPacketScheduler *s, flxResponseJob *rj) {
278     flxDnsPacket *p;
279     guint n;
280
281     g_assert(s);
282
283     p = flx_dns_packet_new_response(s->interface->hardware->mtu - 200);
284     n = 0;
285
286     /* If a job was specified, put it in the packet. */
287     if (rj) {
288         guint8 *d;
289         d = packet_add_response_job(s, p, rj);
290         g_assert(d);
291         n++;
292     }
293
294     /* Try to fill up packet with more responses, if available */
295     for (rj = s->response_jobs; rj; rj = rj->jobs_next) {
296
297         if (rj->done)
298             continue;
299
300         if (!packet_add_response_job(s, p, rj))
301             break;
302
303         n++;
304     }
305
306     flx_dns_packet_set_field(p, FLX_DNS_FIELD_ANCOUNT, n);
307     flx_interface_send_packet(s->interface, p);
308     flx_dns_packet_free(p);
309 }
310
311 static void response_elapse(flxTimeEvent *e, gpointer data) {
312     flxResponseJob *rj = data;
313     flxPacketScheduler *s;
314
315     g_assert(rj);
316     s = rj->scheduler;
317
318     if (rj->done) {
319         /* Lets remove it  from the history */
320         response_job_free(s, rj);
321         return;
322     }
323
324     send_response_packet(s, rj);
325 }
326
327 static flxResponseJob* look_for_response(flxPacketScheduler *s, flxRecord *record) {
328     flxResponseJob *rj;
329
330     g_assert(s);
331     g_assert(record);
332
333     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
334         if (flx_record_equal_no_ttl(rj->record, record))
335             return rj;
336
337     return NULL;
338 }
339
340 static flxResponseJob* response_job_new(flxPacketScheduler *s, flxRecord *record) {
341     flxResponseJob *rj;
342     
343     g_assert(s);
344     g_assert(record);
345
346     rj = g_new(flxResponseJob, 1);
347     rj->scheduler = s;
348     rj->record = flx_record_ref(record);
349     rj->done = FALSE;
350     rj->time_event = NULL;
351     rj->address_valid = FALSE;
352     rj->flush_cache = FALSE;
353     
354     FLX_LLIST_PREPEND(flxResponseJob, jobs, s->response_jobs, rj);
355
356     return rj;
357 }
358
359 void flx_packet_scheduler_post_response(flxPacketScheduler *s, const flxAddress *a, flxRecord *record, gboolean flush_cache, gboolean immediately) {
360     flxResponseJob *rj;
361     GTimeVal tv;
362     gchar *t;
363     
364     g_assert(s);
365     g_assert(record);
366
367     g_assert(!flx_key_is_pattern(record->key));
368     
369     flx_elapse_time(&tv, immediately ? 0 : FLX_RESPONSE_DEFER_MSEC, immediately ? 0 : FLX_RESPONSE_JITTER_MSEC);
370     
371     /* Don't send out duplicates */
372     
373     if ((rj = look_for_response(s, record))) {
374         glong d;
375
376         d = flx_timeval_diff(&tv, &rj->delivery);
377         
378         /* If there's already a matching packet in our history or in
379          * the schedule, we do nothing. */
380         if (!!record->ttl == !!rj->record->ttl &&
381             d >= 0 && d <= FLX_RESPONSE_HISTORY_MSEC*1000) {
382             g_message("WARNING! DUPLICATE RESPONSE SUPPRESSION ACTIVE!");
383
384             /* This job is no longer specific to a single querier, so
385              * make sure it isn't suppressed by known answer
386              * suppresion */
387
388             if (rj->address_valid && (!a || flx_address_cmp(a, &rj->address) != 0))
389                 rj->address_valid = FALSE;
390
391             rj->flush_cache = flush_cache;
392             
393             return;
394         }
395
396         /* Either one was a goodbye packet, but the other was not, so
397          * let's drop the older one. */
398         response_job_free(s, rj);
399     }
400
401     g_message("ACCEPTED NEW RESPONSE [%s]", t = flx_record_to_string(record));
402     g_free(t);
403
404     /* Create a new job and schedule it */
405     rj = response_job_new(s, record);
406     rj->flush_cache = flush_cache;
407     rj->delivery = tv;
408     rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &rj->delivery, response_elapse, rj);
409
410     /* Store the address of the host this messages is intended to, so
411        that we can drop this job in case a truncated message with
412        known answer suppresion entries is recieved */
413
414     if ((rj->address_valid = !!a))
415         rj->address = *a;
416 }
417
418 void flx_packet_scheduler_incoming_query(flxPacketScheduler *s, flxKey *key) {
419     GTimeVal tv;
420     flxQueryJob *qj;
421     
422     g_assert(s);
423     g_assert(key);
424
425     /* This function is called whenever an incoming query was
426      * receieved. We drop all scheduled queries which match here. The
427      * keyword is "DUPLICATE QUESTION SUPPRESION". */
428
429     for (qj = s->query_jobs; qj; qj = qj->jobs_next)
430         if (flx_key_equal(qj->key, key)) {
431
432             if (qj->done)
433                 return;
434
435             goto mark_done;
436         }
437
438
439     /* No matching job was found. Add the query to the history */
440     qj = query_job_new(s, key);
441
442 mark_done:
443     qj->done = TRUE;
444
445     /* Drop the query after some time */
446     flx_elapse_time(&tv, FLX_QUERY_HISTORY_MSEC, 0);
447     qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, query_elapse, qj);
448
449     g_get_current_time(&qj->delivery);
450 }
451
452 void response_job_set_elapse_time(flxPacketScheduler *s, flxResponseJob *rj, guint msec, guint jitter) {
453     GTimeVal tv;
454
455     g_assert(s);
456     g_assert(rj);
457
458     flx_elapse_time(&tv, msec, jitter);
459
460     if (rj->time_event)
461         flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
462     else
463         rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, response_elapse, rj);
464     
465 }
466
467 void flx_packet_scheduler_incoming_response(flxPacketScheduler *s, flxRecord *record) {
468     flxResponseJob *rj;
469     
470     g_assert(s);
471     g_assert(record);
472
473     /* This function is called whenever an incoming response was
474      * receieved. We drop all scheduled responses which match
475      * here. The keyword is "DUPLICATE ANSWER SUPPRESION". */
476     
477     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
478         if (flx_record_equal_no_ttl(rj->record, record)) {
479
480             if (rj->done) {
481
482                 if (!!record->ttl == !!rj->record->ttl) {
483                     /* An entry like this is already in our history,
484                      * so let's get out of here! */
485                     
486                     return;
487                     
488                 } else {
489                     /* Either one was a goodbye packet but other was
490                      * none. We remove the history entry, and add a
491                      * new one */
492                     
493                     response_job_free(s, rj);
494                     break;
495                 }
496         
497             } else {
498
499                 if (!!record->ttl == !!rj->record->ttl) {
500
501                     /* The incoming packet matches our scheduled
502                      * record, so let's mark that one as done */
503
504                     goto mark_done;
505                     
506                 } else {
507
508                     /* Either one was a goodbye packet but other was
509                      * none. We ignore the incoming packet. */
510
511                     return;
512                 }
513             }
514         }
515
516     /* No matching job was found. Add the query to the history */
517     rj = response_job_new(s, record);
518
519 mark_done:
520     rj->done = TRUE;
521                     
522     /* Drop response after 500ms from history */
523     response_job_set_elapse_time(s, rj, FLX_RESPONSE_HISTORY_MSEC, 0);
524
525     g_get_current_time(&rj->delivery);
526 }
527
528 void flx_packet_scheduler_incoming_known_answer(flxPacketScheduler *s, flxRecord *record, const flxAddress *a) {
529     flxResponseJob *rj;
530     
531     g_assert(s);
532     g_assert(record);
533     g_assert(a);
534
535     for (rj = s->response_jobs; rj; rj = rj->jobs_next) {
536
537         g_assert(record->ttl > 0);
538         g_assert(rj->record->ttl/2);
539         
540         if (flx_record_equal_no_ttl(rj->record, record))
541             if (rj->address_valid)
542                 if (flx_address_cmp(&rj->address, a))
543                     if (record->ttl >= rj->record->ttl/2) {
544
545             /* Let's suppress it */
546
547             response_job_free(s, rj);
548             break;
549         }
550     }
551 }
552
553 void flx_packet_scheduler_flush_responses(flxPacketScheduler *s) {
554     flxResponseJob *rj;
555     
556     g_assert(s);
557
558     /* Send all scheduled responses, ignoring the scheduled time */
559     
560     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
561         if (!rj->done)
562             send_response_packet(s, rj);
563 }
564
565 static flxProbeJob* probe_job_new(flxPacketScheduler *s, flxRecord *record) {
566     flxProbeJob *pj;
567     
568     g_assert(s);
569     g_assert(record);
570
571     pj = g_new(flxProbeJob, 1);
572     pj->scheduler = s;
573     pj->record = flx_record_ref(record);
574     pj->time_event = NULL;
575     pj->chosen = FALSE;
576     
577     FLX_LLIST_PREPEND(flxProbeJob, jobs, s->probe_jobs, pj);
578
579     return pj;
580 }
581
582 static guint8* packet_add_probe_query(flxPacketScheduler *s, flxDnsPacket *p, flxProbeJob *pj) {
583     guint size;
584     guint8 *r;
585     flxKey *k;
586
587     g_assert(s);
588     g_assert(p);
589     g_assert(pj);
590
591     g_assert(!pj->chosen);
592     
593     /* Estimate the size for this record */
594     size =
595         flx_key_get_estimate_size(pj->record->key) +
596         flx_record_get_estimate_size(pj->record);
597
598     /* Too large */
599     if (size > flx_dns_packet_space(p))
600         return NULL;
601
602     /* Create the probe query */
603     k = flx_key_new(pj->record->key->name, pj->record->key->class, FLX_DNS_TYPE_ANY);
604     r = flx_dns_packet_append_key(p, k);
605
606     /* Mark this job for addition to the packet */
607     pj->chosen = TRUE;
608
609     /* Scan for more jobs whith matching key pattern */
610     for (pj = s->probe_jobs; pj; pj = pj->jobs_next) {
611         if (pj->chosen)
612             continue;
613
614         /* Does the record match the probe? */
615         if (k->class != pj->record->key->class || flx_domain_equal(k->name, pj->record->key->name))
616             continue;
617         
618         /* This job wouldn't fit in */
619         if (flx_record_get_estimate_size(pj->record) > flx_dns_packet_space(p))
620             break;
621
622         /* Mark this job for addition to the packet */
623         pj->chosen = TRUE;
624     }
625
626     flx_key_unref(k);
627             
628     return r;
629 }
630
631 static void probe_elapse(flxTimeEvent *e, gpointer data) {
632     flxProbeJob *pj = data, *next;
633     flxPacketScheduler *s;
634     flxDnsPacket *p;
635     guint n;
636     guint8 *d;
637
638     g_assert(pj);
639     s = pj->scheduler;
640
641     p = flx_dns_packet_new_query(s->interface->hardware->mtu - 48);
642
643     /* Add the import probe */
644     if (!packet_add_probe_query(s, p, pj)) {
645         g_warning("Record too large!");
646         flx_dns_packet_free(p);
647         return;
648     }
649
650     n = 1;
651     
652     /* Try to fill up packet with more probes, if available */
653     for (pj = s->probe_jobs; pj; pj = pj->jobs_next) {
654
655         if (pj->chosen)
656             continue;
657         
658         if (!packet_add_probe_query(s, p, pj))
659             break;
660         
661         n++;
662     }
663
664     flx_dns_packet_set_field(p, FLX_DNS_FIELD_QDCOUNT, n);
665
666     n = 0;
667
668     /* Now add the chosen records to the authorative section */
669     for (pj = s->probe_jobs; pj; pj = next) {
670
671         next = pj->jobs_next;
672
673         if (!pj->chosen)
674             continue;
675
676         if (!flx_dns_packet_append_record(p, pj->record, TRUE)) {
677             g_warning("Bad probe size estimate!");
678
679             /* Unmark all following jobs */
680             for (; pj; pj = pj->jobs_next)
681                 pj->chosen = FALSE;
682             
683             break;
684         }
685
686         probe_job_free(s, pj);
687         n ++;
688     }
689     
690     flx_dns_packet_set_field(p, FLX_DNS_FIELD_NSCOUNT, n);
691
692     /* Send it now */
693     flx_interface_send_packet(s->interface, p);
694     flx_dns_packet_free(p);
695 }
696
697 void flx_packet_scheduler_post_probe(flxPacketScheduler *s, flxRecord *record, gboolean immediately) {
698     flxProbeJob *pj;
699     GTimeVal tv;
700     
701     g_assert(s);
702     g_assert(record);
703     g_assert(!flx_key_is_pattern(record->key));
704     
705     flx_elapse_time(&tv, immediately ? 0 : FLX_PROBE_DEFER_MSEC, 0);
706
707     /* No duplication check here... */
708     /* Create a new job and schedule it */
709     pj = probe_job_new(s, record);
710     pj->delivery = tv;
711     pj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &pj->delivery, probe_elapse, pj);
712 }