6 #define FLX_QUERY_HISTORY_MSEC 700
7 #define FLX_QUERY_DEFER_MSEC 100
8 #define FLX_RESPONSE_HISTORY_MSEC 700
9 #define FLX_RESPONSE_DEFER_MSEC 20
10 #define FLX_RESPONSE_JITTER_MSEC 100
12 flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i) {
13 flxPacketScheduler *s;
18 s = g_new(flxPacketScheduler, 1);
22 FLX_LLIST_HEAD_INIT(flxQueryJob, s->query_jobs);
23 FLX_LLIST_HEAD_INIT(flxResponseJob, s->response_jobs);
28 static void query_job_free(flxPacketScheduler *s, flxQueryJob *qj) {
32 flx_time_event_queue_remove(qj->scheduler->server->time_event_queue, qj->time_event);
34 FLX_LLIST_REMOVE(flxQueryJob, jobs, s->query_jobs, qj);
36 flx_key_unref(qj->key);
40 static void response_job_free(flxPacketScheduler *s, flxResponseJob *rj) {
44 flx_time_event_queue_remove(rj->scheduler->server->time_event_queue, rj->time_event);
46 FLX_LLIST_REMOVE(flxResponseJob, jobs, s->response_jobs, rj);
48 flx_record_unref(rj->record);
52 void flx_packet_scheduler_free(flxPacketScheduler *s) {
59 while ((qj = s->query_jobs))
60 query_job_free(s, qj);
61 while ((rj = s->response_jobs))
62 response_job_free(s, rj);
67 static guint8* packet_add_query_job(flxPacketScheduler *s, flxDnsPacket *p, flxQueryJob *qj) {
74 if ((d = flx_dns_packet_append_key(p, qj->key))) {
79 /* Drop query after some time from history from history */
80 flx_elapse_time(&tv, FLX_QUERY_HISTORY_MSEC, 0);
81 flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv);
83 g_get_current_time(&qj->delivery);
89 static void query_elapse(flxTimeEvent *e, gpointer data) {
90 flxQueryJob *qj = data;
91 flxPacketScheduler *s;
100 /* Lets remove it from the history */
101 query_job_free(s, qj);
105 p = flx_dns_packet_new_query(s->interface->hardware->mtu - 48);
106 d = packet_add_query_job(s, p, qj);
110 /* Try to fill up packet with more queries, if available */
111 for (qj = s->query_jobs; qj; qj = qj->jobs_next) {
116 if (!packet_add_query_job(s, p, qj))
122 flx_dns_packet_set_field(p, DNS_FIELD_QDCOUNT, n);
123 flx_interface_send_packet(s->interface, p);
124 flx_dns_packet_free(p);
127 static flxQueryJob* look_for_query(flxPacketScheduler *s, flxKey *key) {
133 for (qj = s->query_jobs; qj; qj = qj->jobs_next)
134 if (flx_key_equal(qj->key, key))
140 flxQueryJob* query_job_new(flxPacketScheduler *s, flxKey *key) {
146 qj = g_new(flxQueryJob, 1);
148 qj->key = flx_key_ref(key);
150 qj->time_event = NULL;
152 FLX_LLIST_PREPEND(flxQueryJob, jobs, s->query_jobs, qj);
157 void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key, gboolean immediately) {
164 flx_elapse_time(&tv, immediately ? 0 : FLX_QUERY_DEFER_MSEC, 0);
166 if ((qj = look_for_query(s, key))) {
167 glong d = flx_timeval_diff(&tv, &qj->delivery);
169 /* Duplicate questions suppression */
170 if (d >= 0 && d <= FLX_QUERY_HISTORY_MSEC*1000) {
171 g_message("WARNING! DUPLICATE QUERY SUPPRESSION ACTIVE!");
175 query_job_free(s, qj);
178 qj = query_job_new(s, key);
180 qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &qj->delivery, query_elapse, qj);
183 static guint8* packet_add_response_job(flxPacketScheduler *s, flxDnsPacket *p, flxResponseJob *rj) {
190 if ((d = flx_dns_packet_append_record(p, rj->record, FALSE))) {
195 /* Drop response after some time from history */
196 flx_elapse_time(&tv, FLX_RESPONSE_HISTORY_MSEC, 0);
197 flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
199 g_get_current_time(&rj->delivery);
205 static void send_response_packet(flxPacketScheduler *s, flxResponseJob *rj) {
211 p = flx_dns_packet_new_response(s->interface->hardware->mtu - 200);
214 /* If a job was specified, put it in the packet. */
217 d = packet_add_response_job(s, p, rj);
222 /* Try to fill up packet with more responses, if available */
223 for (rj = s->response_jobs; rj; rj = rj->jobs_next) {
228 if (!packet_add_response_job(s, p, rj))
234 flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, n);
235 flx_interface_send_packet(s->interface, p);
236 flx_dns_packet_free(p);
239 static void response_elapse(flxTimeEvent *e, gpointer data) {
240 flxResponseJob *rj = data;
241 flxPacketScheduler *s;
247 /* Lets remove it from the history */
248 response_job_free(s, rj);
252 send_response_packet(s, rj);
255 static flxResponseJob* look_for_response(flxPacketScheduler *s, flxRecord *record) {
261 for (rj = s->response_jobs; rj; rj = rj->jobs_next)
262 if (flx_record_equal_no_ttl(rj->record, record))
268 static flxResponseJob* response_job_new(flxPacketScheduler *s, flxRecord *record) {
274 rj = g_new(flxResponseJob, 1);
276 rj->record = flx_record_ref(record);
278 rj->time_event = NULL;
280 FLX_LLIST_PREPEND(flxResponseJob, jobs, s->response_jobs, rj);
285 void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record, gboolean immediately) {
293 flx_elapse_time(&tv, immediately ? 0 : FLX_RESPONSE_DEFER_MSEC, immediately ? 0 : FLX_RESPONSE_JITTER_MSEC);
295 /* Don't send out duplicates */
297 if ((rj = look_for_response(s, record))) {
300 d = flx_timeval_diff(&tv, &rj->delivery);
302 /* If there's already a matching packet in our history or in
303 * the schedule, we do nothing. */
304 if (!!record->ttl == !!rj->record->ttl &&
305 d >= 0 && d <= FLX_RESPONSE_HISTORY_MSEC*1000) {
306 g_message("WARNING! DUPLICATE RESPONSE SUPPRESSION ACTIVE!");
310 /* Either one was a goodbye packet, but the other was not, so
311 * let's drop the older one. */
312 response_job_free(s, rj);
315 g_message("ACCEPTED NEW RESPONSE [%s]", t = flx_record_to_string(record));
318 /* Create a new job and schedule it */
319 rj = response_job_new(s, record);
321 rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &rj->delivery, response_elapse, rj);
324 void flx_packet_scheduler_incoming_query(flxPacketScheduler *s, flxKey *key) {
331 /* This function is called whenever an incoming query was
332 * receieved. We drop all scheduled queries which match here. The
333 * keyword is "DUPLICATE QUESTION SUPPRESION". */
335 for (qj = s->query_jobs; qj; qj = qj->jobs_next)
336 if (flx_key_equal(qj->key, key)) {
345 /* No matching job was found. Add the query to the history */
346 qj = query_job_new(s, key);
351 /* Drop the query after some time */
352 flx_elapse_time(&tv, FLX_QUERY_HISTORY_MSEC, 0);
353 qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, query_elapse, qj);
355 g_get_current_time(&qj->delivery);
358 void response_job_set_elapse_time(flxPacketScheduler *s, flxResponseJob *rj, guint msec, guint jitter) {
364 flx_elapse_time(&tv, msec, jitter);
367 flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
369 rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, response_elapse, rj);
373 void flx_packet_scheduler_incoming_response(flxPacketScheduler *s, flxRecord *record) {
379 /* This function is called whenever an incoming response was
380 * receieved. We drop all scheduled responses which match
381 * here. The keyword is "DUPLICATE ANSWER SUPPRESION". */
383 for (rj = s->response_jobs; rj; rj = rj->jobs_next)
384 if (flx_record_equal_no_ttl(rj->record, record)) {
388 if (!!record->ttl == !!rj->record->ttl) {
389 /* An entry like this is already in our history,
390 * so let's get out of here! */
395 /* Either one was a goodbye packet but other was
396 * none. We remove the history entry, and add a
399 response_job_free(s, rj);
405 if (!!record->ttl == !!rj->record->ttl) {
407 /* The incoming packet matches our scheduled
408 * record, so let's mark that one as done */
414 /* Either one was a goodbye packet but other was
415 * none. We ignore the incoming packet. */
422 /* No matching job was found. Add the query to the history */
423 rj = response_job_new(s, record);
428 /* Drop response after 500ms from history */
429 response_job_set_elapse_time(s, rj, FLX_RESPONSE_HISTORY_MSEC, 0);
431 g_get_current_time(&rj->delivery);
434 void flx_packet_scheduler_flush_responses(flxPacketScheduler *s) {
439 /* Send all scheduled responses, ignoring the scheduled time */
441 for (rj = s->response_jobs; rj; rj = rj->jobs_next)
443 send_response_packet(s, rj);