]> git.meshlink.io Git - catta/blob - psched.c
* add todo list
[catta] / psched.c
1 #include <string.h>
2
3 #include "util.h"
4 #include "psched.h"
5
6 #define FLX_QUERY_HISTORY_MSEC 700
7 #define FLX_QUERY_DEFER_MSEC 100
8 #define FLX_RESPONSE_HISTORY_MSEC 700
9 #define FLX_RESPONSE_DEFER_MSEC 20
10 #define FLX_RESPONSE_JITTER_MSEC 100
11
12 flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i) {
13     flxPacketScheduler *s;
14
15     g_assert(server);
16     g_assert(i);
17
18     s = g_new(flxPacketScheduler, 1);
19     s->server = server;
20     s->interface = i;
21
22     FLX_LLIST_HEAD_INIT(flxQueryJob, s->query_jobs);
23     FLX_LLIST_HEAD_INIT(flxResponseJob, s->response_jobs);
24     
25     return s;
26 }
27
28 static void query_job_free(flxPacketScheduler *s, flxQueryJob *qj) {
29     g_assert(qj);
30
31     if (qj->time_event)
32         flx_time_event_queue_remove(qj->scheduler->server->time_event_queue, qj->time_event);
33
34     FLX_LLIST_REMOVE(flxQueryJob, jobs, s->query_jobs, qj);
35     
36     flx_key_unref(qj->key);
37     g_free(qj);
38 }
39
40 static void response_job_free(flxPacketScheduler *s, flxResponseJob *rj) {
41     g_assert(rj);
42
43     if (rj->time_event)
44         flx_time_event_queue_remove(rj->scheduler->server->time_event_queue, rj->time_event);
45
46     FLX_LLIST_REMOVE(flxResponseJob, jobs, s->response_jobs, rj);
47
48     flx_record_unref(rj->record);
49     g_free(rj);
50 }
51
52 void flx_packet_scheduler_free(flxPacketScheduler *s) {
53     flxQueryJob *qj;
54     flxResponseJob *rj;
55     flxTimeEvent *e;
56     
57     g_assert(s);
58
59     while ((qj = s->query_jobs))
60         query_job_free(s, qj);
61     while ((rj = s->response_jobs))
62         response_job_free(s, rj);
63
64     g_free(s);
65 }
66
67 static guint8* packet_add_query_job(flxPacketScheduler *s, flxDnsPacket *p, flxQueryJob *qj) {
68     guint8 *d;
69
70     g_assert(s);
71     g_assert(p);
72     g_assert(qj);
73
74     if ((d = flx_dns_packet_append_key(p, qj->key))) {
75         GTimeVal tv;
76
77         qj->done = 1;
78
79         /* Drop query after some time from history from history */
80         flx_elapse_time(&tv, FLX_QUERY_HISTORY_MSEC, 0);
81         flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv);
82
83         g_get_current_time(&qj->delivery);
84     }
85
86     return d;
87 }
88                                  
89 static void query_elapse(flxTimeEvent *e, gpointer data) {
90     flxQueryJob *qj = data;
91     flxPacketScheduler *s;
92     flxDnsPacket *p;
93     guint n;
94     guint8 *d;
95
96     g_assert(qj);
97     s = qj->scheduler;
98
99     if (qj->done) {
100         /* Lets remove it  from the history */
101         query_job_free(s, qj);
102         return;
103     }
104
105     p = flx_dns_packet_new_query(s->interface->hardware->mtu - 48);
106     d = packet_add_query_job(s, p, qj);
107     g_assert(d);
108     n = 1;
109
110     /* Try to fill up packet with more queries, if available */
111     for (qj = s->query_jobs; qj; qj = qj->jobs_next) {
112
113         if (qj->done)
114             continue;
115
116         if (!packet_add_query_job(s, p, qj))
117             break;
118
119         n++;
120     }
121
122     flx_dns_packet_set_field(p, DNS_FIELD_QDCOUNT, n);
123     flx_interface_send_packet(s->interface, p);
124     flx_dns_packet_free(p);
125 }
126
127 static flxQueryJob* look_for_query(flxPacketScheduler *s, flxKey *key) {
128     flxQueryJob *qj;
129     
130     g_assert(s);
131     g_assert(key);
132
133     for (qj = s->query_jobs; qj; qj = qj->jobs_next)
134         if (flx_key_equal(qj->key, key))
135             return qj;
136
137     return NULL;
138 }
139
140 flxQueryJob* query_job_new(flxPacketScheduler *s, flxKey *key) {
141     flxQueryJob *qj;
142     
143     g_assert(s);
144     g_assert(key);
145
146     qj = g_new(flxQueryJob, 1);
147     qj->scheduler = s;
148     qj->key = flx_key_ref(key);
149     qj->done = FALSE;
150     qj->time_event = NULL;
151     
152     FLX_LLIST_PREPEND(flxQueryJob, jobs, s->query_jobs, qj);
153
154     return qj;
155 }
156
157 void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key, gboolean immediately) {
158     GTimeVal tv;
159     flxQueryJob *qj;
160     
161     g_assert(s);
162     g_assert(key);
163
164     flx_elapse_time(&tv, immediately ? 0 : FLX_QUERY_DEFER_MSEC, 0);
165     
166     if ((qj = look_for_query(s, key))) {
167         glong d = flx_timeval_diff(&tv, &qj->delivery);
168
169         /* Duplicate questions suppression */
170         if (d >= 0 && d <= FLX_QUERY_HISTORY_MSEC*1000) {
171             g_message("WARNING! DUPLICATE QUERY SUPPRESSION ACTIVE!");
172             return;
173         }
174
175         query_job_free(s, qj);
176     }
177
178     qj = query_job_new(s, key);
179     qj->delivery = tv;
180     qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &qj->delivery, query_elapse, qj);
181 }
182
183 static guint8* packet_add_response_job(flxPacketScheduler *s, flxDnsPacket *p, flxResponseJob *rj) {
184     guint8 *d;
185
186     g_assert(s);
187     g_assert(p);
188     g_assert(rj);
189
190     if ((d = flx_dns_packet_append_record(p, rj->record, FALSE))) {
191         GTimeVal tv;
192
193         rj->done = 1;
194
195         /* Drop response after some time from history */
196         flx_elapse_time(&tv, FLX_RESPONSE_HISTORY_MSEC, 0);
197         flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
198
199         g_get_current_time(&rj->delivery);
200     }
201
202     return d;
203 }
204
205 static void send_response_packet(flxPacketScheduler *s, flxResponseJob *rj) {
206     flxDnsPacket *p;
207     guint n;
208
209     g_assert(s);
210
211     p = flx_dns_packet_new_response(s->interface->hardware->mtu - 200);
212     n = 0;
213
214     /* If a job was specified, put it in the packet. */
215     if (rj) {
216         guint8 *d;
217         d = packet_add_response_job(s, p, rj);
218         g_assert(d);
219         n++;
220     }
221
222     /* Try to fill up packet with more responses, if available */
223     for (rj = s->response_jobs; rj; rj = rj->jobs_next) {
224
225         if (rj->done)
226             continue;
227
228         if (!packet_add_response_job(s, p, rj))
229             break;
230
231         n++;
232     }
233
234     flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, n);
235     flx_interface_send_packet(s->interface, p);
236     flx_dns_packet_free(p);
237 }
238
239 static void response_elapse(flxTimeEvent *e, gpointer data) {
240     flxResponseJob *rj = data;
241     flxPacketScheduler *s;
242
243     g_assert(rj);
244     s = rj->scheduler;
245
246     if (rj->done) {
247         /* Lets remove it  from the history */
248         response_job_free(s, rj);
249         return;
250     }
251
252     send_response_packet(s, rj);
253 }
254
255 static flxResponseJob* look_for_response(flxPacketScheduler *s, flxRecord *record) {
256     flxResponseJob *rj;
257
258     g_assert(s);
259     g_assert(record);
260
261     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
262         if (flx_record_equal_no_ttl(rj->record, record))
263             return rj;
264
265     return NULL;
266 }
267
268 static flxResponseJob* response_job_new(flxPacketScheduler *s, flxRecord *record) {
269     flxResponseJob *rj;
270     
271     g_assert(s);
272     g_assert(record);
273
274     rj = g_new(flxResponseJob, 1);
275     rj->scheduler = s;
276     rj->record = flx_record_ref(record);
277     rj->done = FALSE;
278     rj->time_event = NULL;
279     
280     FLX_LLIST_PREPEND(flxResponseJob, jobs, s->response_jobs, rj);
281
282     return rj;
283 }
284
285 void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record, gboolean immediately) {
286     flxResponseJob *rj;
287     GTimeVal tv;
288     gchar *t;
289     
290     g_assert(s);
291     g_assert(record);
292
293     flx_elapse_time(&tv, immediately ? 0 : FLX_RESPONSE_DEFER_MSEC, immediately ? 0 : FLX_RESPONSE_JITTER_MSEC);
294     
295     /* Don't send out duplicates */
296     
297     if ((rj = look_for_response(s, record))) {
298         glong d;
299
300         d = flx_timeval_diff(&tv, &rj->delivery);
301         
302         /* If there's already a matching packet in our history or in
303          * the schedule, we do nothing. */
304         if (!!record->ttl == !!rj->record->ttl &&
305             d >= 0 && d <= FLX_RESPONSE_HISTORY_MSEC*1000) {
306             g_message("WARNING! DUPLICATE RESPONSE SUPPRESSION ACTIVE!");
307             return;
308         }
309
310         /* Either one was a goodbye packet, but the other was not, so
311          * let's drop the older one. */
312         response_job_free(s, rj);
313     }
314
315     g_message("ACCEPTED NEW RESPONSE [%s]", t = flx_record_to_string(record));
316     g_free(t);
317
318     /* Create a new job and schedule it */
319     rj = response_job_new(s, record);
320     rj->delivery = tv;
321     rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &rj->delivery, response_elapse, rj);
322 }
323
324 void flx_packet_scheduler_incoming_query(flxPacketScheduler *s, flxKey *key) {
325     GTimeVal tv;
326     flxQueryJob *qj;
327     
328     g_assert(s);
329     g_assert(key);
330
331     /* This function is called whenever an incoming query was
332      * receieved. We drop all scheduled queries which match here. The
333      * keyword is "DUPLICATE QUESTION SUPPRESION". */
334
335     for (qj = s->query_jobs; qj; qj = qj->jobs_next)
336         if (flx_key_equal(qj->key, key)) {
337
338             if (qj->done)
339                 return;
340
341             goto mark_done;
342         }
343
344
345     /* No matching job was found. Add the query to the history */
346     qj = query_job_new(s, key);
347
348 mark_done:
349     qj->done = TRUE;
350
351     /* Drop the query after some time */
352     flx_elapse_time(&tv, FLX_QUERY_HISTORY_MSEC, 0);
353     qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, query_elapse, qj);
354
355     g_get_current_time(&qj->delivery);
356 }
357
358 void response_job_set_elapse_time(flxPacketScheduler *s, flxResponseJob *rj, guint msec, guint jitter) {
359     GTimeVal tv;
360
361     g_assert(s);
362     g_assert(rj);
363
364     flx_elapse_time(&tv, msec, jitter);
365
366     if (rj->time_event)
367         flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
368     else
369         rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, response_elapse, rj);
370     
371 }
372
373 void flx_packet_scheduler_incoming_response(flxPacketScheduler *s, flxRecord *record) {
374     flxResponseJob *rj;
375     
376     g_assert(s);
377     g_assert(record);
378
379     /* This function is called whenever an incoming response was
380      * receieved. We drop all scheduled responses which match
381      * here. The keyword is "DUPLICATE ANSWER SUPPRESION". */
382     
383     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
384         if (flx_record_equal_no_ttl(rj->record, record)) {
385
386             if (rj->done) {
387
388                 if (!!record->ttl == !!rj->record->ttl) {
389                     /* An entry like this is already in our history,
390                      * so let's get out of here! */
391                     
392                     return;
393                     
394                 } else {
395                     /* Either one was a goodbye packet but other was
396                      * none. We remove the history entry, and add a
397                      * new one */
398                     
399                     response_job_free(s, rj);
400                     break;
401                 }
402         
403             } else {
404
405                 if (!!record->ttl == !!rj->record->ttl) {
406
407                     /* The incoming packet matches our scheduled
408                      * record, so let's mark that one as done */
409
410                     goto mark_done;
411                     
412                 } else {
413
414                     /* Either one was a goodbye packet but other was
415                      * none. We ignore the incoming packet. */
416
417                     return;
418                 }
419             }
420         }
421
422     /* No matching job was found. Add the query to the history */
423     rj = response_job_new(s, record);
424
425 mark_done:
426     rj->done = TRUE;
427                     
428     /* Drop response after 500ms from history */
429     response_job_set_elapse_time(s, rj, FLX_RESPONSE_HISTORY_MSEC, 0);
430
431     g_get_current_time(&rj->delivery);
432 }
433
434 void flx_packet_scheduler_flush_responses(flxPacketScheduler *s) {
435     flxResponseJob *rj;
436     
437     g_assert(s);
438
439     /* Send all scheduled responses, ignoring the scheduled time */
440     
441     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
442         if (!rj->done)
443             send_response_packet(s, rj);
444 }