]> git.meshlink.io Git - catta/blob - psched.c
assorted work:
[catta] / psched.c
1 #include <string.h>
2
3 #include "util.h"
4 #include "psched.h"
5
6 #define FLX_QUERY_HISTORY_MSEC 700
7 #define FLX_QUERY_DEFER_MSEC 100
8 #define FLX_RESPONSE_HISTORY_MSEC 700
9 #define FLX_RESPONSE_DEFER_MSEC 20
10 #define FLX_RESPONSE_JITTER_MSEC 100
11
12 flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i) {
13     flxPacketScheduler *s;
14
15     g_assert(server);
16     g_assert(i);
17
18     s = g_new(flxPacketScheduler, 1);
19     s->server = server;
20     s->interface = i;
21
22     FLX_LLIST_HEAD_INIT(flxQueryJob, s->query_jobs);
23     FLX_LLIST_HEAD_INIT(flxResponseJob, s->response_jobs);
24     FLX_LLIST_HEAD_INIT(flxKnownAnswer, s->known_answers);
25     
26     return s;
27 }
28
29 static void query_job_free(flxPacketScheduler *s, flxQueryJob *qj) {
30     g_assert(qj);
31
32     if (qj->time_event)
33         flx_time_event_queue_remove(qj->scheduler->server->time_event_queue, qj->time_event);
34
35     FLX_LLIST_REMOVE(flxQueryJob, jobs, s->query_jobs, qj);
36     
37     flx_key_unref(qj->key);
38     g_free(qj);
39 }
40
41 static void response_job_free(flxPacketScheduler *s, flxResponseJob *rj) {
42     g_assert(rj);
43
44     if (rj->time_event)
45         flx_time_event_queue_remove(rj->scheduler->server->time_event_queue, rj->time_event);
46
47     FLX_LLIST_REMOVE(flxResponseJob, jobs, s->response_jobs, rj);
48
49     flx_record_unref(rj->record);
50     g_free(rj);
51 }
52
53 void flx_packet_scheduler_free(flxPacketScheduler *s) {
54     flxQueryJob *qj;
55     flxResponseJob *rj;
56     flxTimeEvent *e;
57
58     g_assert(s);
59
60     g_assert(!s->known_answers);
61     
62     while ((qj = s->query_jobs))
63         query_job_free(s, qj);
64     while ((rj = s->response_jobs))
65         response_job_free(s, rj);
66
67     g_free(s);
68 }
69
70 static gpointer known_answer_walk_callback(flxCache *c, flxKey *pattern, flxCacheEntry *e, gpointer userdata) {
71     flxPacketScheduler *s = userdata;
72     flxKnownAnswer *ka;
73     
74     g_assert(c);
75     g_assert(pattern);
76     g_assert(e);
77     g_assert(s);
78
79     if (flx_cache_entry_half_ttl(c, e))
80         return NULL;
81     
82     ka = g_new0(flxKnownAnswer, 1);
83     ka->scheduler = s;
84     ka->record = flx_record_ref(e->record);
85
86     FLX_LLIST_PREPEND(flxKnownAnswer, known_answer, s->known_answers, ka);
87     return NULL;
88 }
89
90 static guint8* packet_add_query_job(flxPacketScheduler *s, flxDnsPacket *p, flxQueryJob *qj) {
91     guint8 *d;
92
93     g_assert(s);
94     g_assert(p);
95     g_assert(qj);
96
97     if ((d = flx_dns_packet_append_key(p, qj->key))) {
98         GTimeVal tv;
99
100         qj->done = 1;
101
102         /* Drop query after some time from history from history */
103         flx_elapse_time(&tv, FLX_QUERY_HISTORY_MSEC, 0);
104         flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv);
105
106         g_get_current_time(&qj->delivery);
107
108         /* Add all matching known answers to the list */
109         flx_cache_walk(s->interface->cache, qj->key, known_answer_walk_callback, s);
110     }
111
112     return d;
113 }
114
115 static void append_known_answers_and_send(flxPacketScheduler *s, flxDnsPacket *p) {
116     flxKnownAnswer *ka;
117     guint n;
118     g_assert(s);
119     g_assert(p);
120
121     n = 0;
122     
123     while ((ka = s->known_answers)) {
124
125         while (!flx_dns_packet_append_record(p, ka->record, FALSE)) {
126
127             g_assert(!flx_dns_packet_is_empty(p));
128
129             flx_dns_packet_set_field(p, DNS_FIELD_FLAGS, flx_dns_packet_get_field(p, DNS_FIELD_FLAGS) | DNS_FLAG_TC);
130             flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, n);
131             flx_interface_send_packet(s->interface, p);
132             flx_dns_packet_free(p);
133
134             p = flx_dns_packet_new_query(s->interface->hardware->mtu - 48);
135             n = 0;
136         }
137
138         FLX_LLIST_REMOVE(flxKnownAnswer, known_answer, s->known_answers, ka);
139         flx_record_unref(ka->record);
140         g_free(ka);
141         
142         n++;
143     }
144     
145     flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, n);
146     flx_interface_send_packet(s->interface, p);
147     flx_dns_packet_free(p);
148 }
149
150 static void query_elapse(flxTimeEvent *e, gpointer data) {
151     flxQueryJob *qj = data;
152     flxPacketScheduler *s;
153     flxDnsPacket *p;
154     guint n;
155     guint8 *d;
156
157     g_assert(qj);
158     s = qj->scheduler;
159
160     if (qj->done) {
161         /* Lets remove it  from the history */
162         query_job_free(s, qj);
163         return;
164     }
165
166     g_assert(!s->known_answers);
167     
168     p = flx_dns_packet_new_query(s->interface->hardware->mtu - 48);
169     d = packet_add_query_job(s, p, qj);
170     g_assert(d);
171     n = 1;
172
173     /* Try to fill up packet with more queries, if available */
174     for (qj = s->query_jobs; qj; qj = qj->jobs_next) {
175
176         if (qj->done)
177             continue;
178
179         if (!packet_add_query_job(s, p, qj))
180             break;
181
182         n++;
183     }
184
185     flx_dns_packet_set_field(p, DNS_FIELD_QDCOUNT, n);
186
187     /* Now add known answers */
188     append_known_answers_and_send(s, p);
189 }
190
191 flxQueryJob* query_job_new(flxPacketScheduler *s, flxKey *key) {
192     flxQueryJob *qj;
193     
194     g_assert(s);
195     g_assert(key);
196
197     qj = g_new(flxQueryJob, 1);
198     qj->scheduler = s;
199     qj->key = flx_key_ref(key);
200     qj->done = FALSE;
201     qj->time_event = NULL;
202     
203     FLX_LLIST_PREPEND(flxQueryJob, jobs, s->query_jobs, qj);
204
205     return qj;
206 }
207
208 void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key, gboolean immediately) {
209     GTimeVal tv;
210     flxQueryJob *qj;
211     
212     g_assert(s);
213     g_assert(key);
214
215     flx_elapse_time(&tv, immediately ? 0 : FLX_QUERY_DEFER_MSEC, 0);
216
217     for (qj = s->query_jobs; qj; qj = qj->jobs_next)
218
219         if (flx_key_equal(qj->key, key)) {
220
221             glong d = flx_timeval_diff(&tv, &qj->delivery);
222
223             /* Duplicate questions suppression */
224             if (d >= 0 && d <= FLX_QUERY_HISTORY_MSEC*1000) {
225                 g_message("WARNING! DUPLICATE QUERY SUPPRESSION ACTIVE!");
226                 return;
227             }
228             
229             query_job_free(s, qj);
230             break;
231         }
232
233     qj = query_job_new(s, key);
234     qj->delivery = tv;
235     qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &qj->delivery, query_elapse, qj);
236 }
237
238 static guint8* packet_add_response_job(flxPacketScheduler *s, flxDnsPacket *p, flxResponseJob *rj) {
239     guint8 *d;
240
241     g_assert(s);
242     g_assert(p);
243     g_assert(rj);
244
245     if ((d = flx_dns_packet_append_record(p, rj->record, FALSE))) {
246         GTimeVal tv;
247
248         rj->done = 1;
249
250         /* Drop response after some time from history */
251         flx_elapse_time(&tv, FLX_RESPONSE_HISTORY_MSEC, 0);
252         flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
253
254         g_get_current_time(&rj->delivery);
255     }
256
257     return d;
258 }
259
260 static void send_response_packet(flxPacketScheduler *s, flxResponseJob *rj) {
261     flxDnsPacket *p;
262     guint n;
263
264     g_assert(s);
265
266     p = flx_dns_packet_new_response(s->interface->hardware->mtu - 200);
267     n = 0;
268
269     /* If a job was specified, put it in the packet. */
270     if (rj) {
271         guint8 *d;
272         d = packet_add_response_job(s, p, rj);
273         g_assert(d);
274         n++;
275     }
276
277     /* Try to fill up packet with more responses, if available */
278     for (rj = s->response_jobs; rj; rj = rj->jobs_next) {
279
280         if (rj->done)
281             continue;
282
283         if (!packet_add_response_job(s, p, rj))
284             break;
285
286         n++;
287     }
288
289     flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, n);
290     flx_interface_send_packet(s->interface, p);
291     flx_dns_packet_free(p);
292 }
293
294 static void response_elapse(flxTimeEvent *e, gpointer data) {
295     flxResponseJob *rj = data;
296     flxPacketScheduler *s;
297
298     g_assert(rj);
299     s = rj->scheduler;
300
301     if (rj->done) {
302         /* Lets remove it  from the history */
303         response_job_free(s, rj);
304         return;
305     }
306
307     send_response_packet(s, rj);
308 }
309
310 static flxResponseJob* look_for_response(flxPacketScheduler *s, flxRecord *record) {
311     flxResponseJob *rj;
312
313     g_assert(s);
314     g_assert(record);
315
316     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
317         if (flx_record_equal_no_ttl(rj->record, record))
318             return rj;
319
320     return NULL;
321 }
322
323 static flxResponseJob* response_job_new(flxPacketScheduler *s, flxRecord *record) {
324     flxResponseJob *rj;
325     
326     g_assert(s);
327     g_assert(record);
328
329     rj = g_new(flxResponseJob, 1);
330     rj->scheduler = s;
331     rj->record = flx_record_ref(record);
332     rj->done = FALSE;
333     rj->time_event = NULL;
334     
335     FLX_LLIST_PREPEND(flxResponseJob, jobs, s->response_jobs, rj);
336
337     return rj;
338 }
339
340 void flx_packet_scheduler_post_response(flxPacketScheduler *s, const flxAddress *a, flxRecord *record, gboolean immediately) {
341     flxResponseJob *rj;
342     GTimeVal tv;
343     gchar *t;
344     
345     g_assert(s);
346     g_assert(record);
347
348     g_assert(!flx_key_is_pattern(record->key));
349     
350     flx_elapse_time(&tv, immediately ? 0 : FLX_RESPONSE_DEFER_MSEC, immediately ? 0 : FLX_RESPONSE_JITTER_MSEC);
351     
352     /* Don't send out duplicates */
353     
354     if ((rj = look_for_response(s, record))) {
355         glong d;
356
357         d = flx_timeval_diff(&tv, &rj->delivery);
358         
359         /* If there's already a matching packet in our history or in
360          * the schedule, we do nothing. */
361         if (!!record->ttl == !!rj->record->ttl &&
362             d >= 0 && d <= FLX_RESPONSE_HISTORY_MSEC*1000) {
363             g_message("WARNING! DUPLICATE RESPONSE SUPPRESSION ACTIVE!");
364
365             /* This job is no longer specific to a single querier, so
366              * make sure it isn't suppressed by known answer
367              * suppresion */
368             rj->address_valid = FALSE;
369             
370             return;
371         }
372
373         /* Either one was a goodbye packet, but the other was not, so
374          * let's drop the older one. */
375         response_job_free(s, rj);
376     }
377
378     g_message("ACCEPTED NEW RESPONSE [%s]", t = flx_record_to_string(record));
379     g_free(t);
380
381     /* Create a new job and schedule it */
382     rj = response_job_new(s, record);
383     rj->delivery = tv;
384     rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &rj->delivery, response_elapse, rj);
385
386     /* Store the address of the host this messages is intended to, so
387        that we can drop this job in case a truncated message with
388        known answer suppresion entries is recieved */
389
390     if ((rj->address_valid = !!a))
391         rj->address = *a;
392 }
393
394 void flx_packet_scheduler_incoming_query(flxPacketScheduler *s, flxKey *key) {
395     GTimeVal tv;
396     flxQueryJob *qj;
397     
398     g_assert(s);
399     g_assert(key);
400
401     /* This function is called whenever an incoming query was
402      * receieved. We drop all scheduled queries which match here. The
403      * keyword is "DUPLICATE QUESTION SUPPRESION". */
404
405     for (qj = s->query_jobs; qj; qj = qj->jobs_next)
406         if (flx_key_equal(qj->key, key)) {
407
408             if (qj->done)
409                 return;
410
411             goto mark_done;
412         }
413
414
415     /* No matching job was found. Add the query to the history */
416     qj = query_job_new(s, key);
417
418 mark_done:
419     qj->done = TRUE;
420
421     /* Drop the query after some time */
422     flx_elapse_time(&tv, FLX_QUERY_HISTORY_MSEC, 0);
423     qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, query_elapse, qj);
424
425     g_get_current_time(&qj->delivery);
426 }
427
428 void response_job_set_elapse_time(flxPacketScheduler *s, flxResponseJob *rj, guint msec, guint jitter) {
429     GTimeVal tv;
430
431     g_assert(s);
432     g_assert(rj);
433
434     flx_elapse_time(&tv, msec, jitter);
435
436     if (rj->time_event)
437         flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
438     else
439         rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, response_elapse, rj);
440     
441 }
442
443 void flx_packet_scheduler_incoming_response(flxPacketScheduler *s, flxRecord *record) {
444     flxResponseJob *rj;
445     
446     g_assert(s);
447     g_assert(record);
448
449     /* This function is called whenever an incoming response was
450      * receieved. We drop all scheduled responses which match
451      * here. The keyword is "DUPLICATE ANSWER SUPPRESION". */
452     
453     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
454         if (flx_record_equal_no_ttl(rj->record, record)) {
455
456             if (rj->done) {
457
458                 if (!!record->ttl == !!rj->record->ttl) {
459                     /* An entry like this is already in our history,
460                      * so let's get out of here! */
461                     
462                     return;
463                     
464                 } else {
465                     /* Either one was a goodbye packet but other was
466                      * none. We remove the history entry, and add a
467                      * new one */
468                     
469                     response_job_free(s, rj);
470                     break;
471                 }
472         
473             } else {
474
475                 if (!!record->ttl == !!rj->record->ttl) {
476
477                     /* The incoming packet matches our scheduled
478                      * record, so let's mark that one as done */
479
480                     goto mark_done;
481                     
482                 } else {
483
484                     /* Either one was a goodbye packet but other was
485                      * none. We ignore the incoming packet. */
486
487                     return;
488                 }
489             }
490         }
491
492     /* No matching job was found. Add the query to the history */
493     rj = response_job_new(s, record);
494
495 mark_done:
496     rj->done = TRUE;
497                     
498     /* Drop response after 500ms from history */
499     response_job_set_elapse_time(s, rj, FLX_RESPONSE_HISTORY_MSEC, 0);
500
501     g_get_current_time(&rj->delivery);
502 }
503
504 void flx_packet_scheduler_incoming_known_answer(flxPacketScheduler *s, flxRecord *record, const flxAddress *a) {
505     flxResponseJob *rj;
506     
507     g_assert(s);
508     g_assert(record);
509     g_assert(a);
510
511     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
512         if (flx_record_equal_no_ttl(rj->record, record) &&
513             rj->address_valid &&
514             flx_address_cmp(&rj->address, a) &&
515             record->ttl >= rj->record->ttl/2) {
516
517             /* Let's suppress it */
518
519             response_job_free(s, rj);
520             break;
521         }
522     
523 }
524
525 void flx_packet_scheduler_flush_responses(flxPacketScheduler *s) {
526     flxResponseJob *rj;
527     
528     g_assert(s);
529
530     /* Send all scheduled responses, ignoring the scheduled time */
531     
532     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
533         if (!rj->done)
534             send_response_packet(s, rj);
535 }