]> git.meshlink.io Git - catta/blob - libavahi-core/psched.c
faa2fa9344f9dd9f043aa688c8b14a0738b71a90
[catta] / libavahi-core / psched.c
1 #include <string.h>
2
3 #include "util.h"
4 #include "psched.h"
5
6 #define AVAHI_QUERY_HISTORY_MSEC 100
7 #define AVAHI_QUERY_DEFER_MSEC 100
8 #define AVAHI_RESPONSE_HISTORY_MSEC 700
9 #define AVAHI_RESPONSE_DEFER_MSEC 20
10 #define AVAHI_RESPONSE_JITTER_MSEC 100
11 #define AVAHI_PROBE_DEFER_MSEC 70
12
13 AvahiPacketScheduler *avahi_packet_scheduler_new(AvahiServer *server, AvahiInterface *i) {
14     AvahiPacketScheduler *s;
15
16     g_assert(server);
17     g_assert(i);
18
19     s = g_new(AvahiPacketScheduler, 1);
20     s->server = server;
21     s->interface = i;
22
23     AVAHI_LLIST_HEAD_INIT(AvahiQueryJob, s->query_jobs);
24     AVAHI_LLIST_HEAD_INIT(AvahiResponseJob, s->response_jobs);
25     AVAHI_LLIST_HEAD_INIT(AvahiKnownAnswer, s->known_answers);
26     AVAHI_LLIST_HEAD_INIT(AvahiProbeJob, s->probe_jobs);
27     
28     return s;
29 }
30
31 static void query_job_free(AvahiPacketScheduler *s, AvahiQueryJob *qj) {
32     g_assert(qj);
33
34     if (qj->time_event)
35         avahi_time_event_queue_remove(qj->scheduler->server->time_event_queue, qj->time_event);
36
37     AVAHI_LLIST_REMOVE(AvahiQueryJob, jobs, s->query_jobs, qj);
38     
39     avahi_key_unref(qj->key);
40     g_free(qj);
41 }
42
43 static void response_job_free(AvahiPacketScheduler *s, AvahiResponseJob *rj) {
44     g_assert(rj);
45
46     if (rj->time_event)
47         avahi_time_event_queue_remove(rj->scheduler->server->time_event_queue, rj->time_event);
48
49     AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->response_jobs, rj);
50
51     avahi_record_unref(rj->record);
52     g_free(rj);
53 }
54
55 static void probe_job_free(AvahiPacketScheduler *s, AvahiProbeJob *pj) {
56     g_assert(pj);
57
58     if (pj->time_event)
59         avahi_time_event_queue_remove(pj->scheduler->server->time_event_queue, pj->time_event);
60
61     AVAHI_LLIST_REMOVE(AvahiProbeJob, jobs, s->probe_jobs, pj);
62
63     avahi_record_unref(pj->record);
64     g_free(pj);
65 }
66
67 void avahi_packet_scheduler_free(AvahiPacketScheduler *s) {
68     AvahiQueryJob *qj;
69     AvahiResponseJob *rj;
70     AvahiProbeJob *pj;
71     AvahiTimeEvent *e;
72
73     g_assert(s);
74
75     g_assert(!s->known_answers);
76     
77     while ((qj = s->query_jobs))
78         query_job_free(s, qj);
79     while ((rj = s->response_jobs))
80         response_job_free(s, rj);
81     while ((pj = s->probe_jobs))
82         probe_job_free(s, pj);
83
84     g_free(s);
85 }
86
87 static gpointer known_answer_walk_callback(AvahiCache *c, AvahiKey *pattern, AvahiCacheEntry *e, gpointer userdata) {
88     AvahiPacketScheduler *s = userdata;
89     AvahiKnownAnswer *ka;
90     
91     g_assert(c);
92     g_assert(pattern);
93     g_assert(e);
94     g_assert(s);
95
96     if (avahi_cache_entry_half_ttl(c, e))
97         return NULL;
98     
99     ka = g_new0(AvahiKnownAnswer, 1);
100     ka->scheduler = s;
101     ka->record = avahi_record_ref(e->record);
102
103     AVAHI_LLIST_PREPEND(AvahiKnownAnswer, known_answer, s->known_answers, ka);
104     return NULL;
105 }
106
107 static guint8* packet_add_query_job(AvahiPacketScheduler *s, AvahiDnsPacket *p, AvahiQueryJob *qj) {
108     guint8 *d;
109
110     g_assert(s);
111     g_assert(p);
112     g_assert(qj);
113
114     if ((d = avahi_dns_packet_append_key(p, qj->key, FALSE))) {
115         GTimeVal tv;
116
117         qj->done = 1;
118
119         /* Drop query after some time from history */
120         avahi_elapse_time(&tv, AVAHI_QUERY_HISTORY_MSEC, 0);
121         avahi_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv);
122
123         g_get_current_time(&qj->delivery);
124
125         /* Add all matching known answers to the list */
126         avahi_cache_walk(s->interface->cache, qj->key, known_answer_walk_callback, s);
127     }
128
129     return d;
130 }
131
132 static void append_known_answers_and_send(AvahiPacketScheduler *s, AvahiDnsPacket *p) {
133     AvahiKnownAnswer *ka;
134     guint n;
135     g_assert(s);
136     g_assert(p);
137
138     n = 0;
139     
140     while ((ka = s->known_answers)) {
141
142         while (!avahi_dns_packet_append_record(p, ka->record, FALSE)) {
143
144             g_assert(!avahi_dns_packet_is_empty(p));
145
146             avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_FLAGS, avahi_dns_packet_get_field(p, AVAHI_DNS_FIELD_FLAGS) | AVAHI_DNS_FLAG_TC);
147             avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_ANCOUNT, n);
148             avahi_interface_send_packet(s->interface, p);
149             avahi_dns_packet_free(p);
150
151             p = avahi_dns_packet_new_query(s->interface->hardware->mtu - 48);
152             n = 0;
153         }
154
155         AVAHI_LLIST_REMOVE(AvahiKnownAnswer, known_answer, s->known_answers, ka);
156         avahi_record_unref(ka->record);
157         g_free(ka);
158         
159         n++;
160     }
161     
162     avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_ANCOUNT, n);
163     avahi_interface_send_packet(s->interface, p);
164     avahi_dns_packet_free(p);
165 }
166
167 static void query_elapse(AvahiTimeEvent *e, gpointer data) {
168     AvahiQueryJob *qj = data;
169     AvahiPacketScheduler *s;
170     AvahiDnsPacket *p;
171     guint n;
172     guint8 *d;
173
174     g_assert(qj);
175     s = qj->scheduler;
176
177     if (qj->done) {
178         /* Lets remove it  from the history */
179         query_job_free(s, qj);
180         return;
181     }
182
183     g_assert(!s->known_answers);
184     
185     p = avahi_dns_packet_new_query(s->interface->hardware->mtu - 48);
186     d = packet_add_query_job(s, p, qj);
187     g_assert(d);
188     n = 1;
189
190     /* Try to fill up packet with more queries, if available */
191     for (qj = s->query_jobs; qj; qj = qj->jobs_next) {
192
193         if (qj->done)
194             continue;
195
196         if (!packet_add_query_job(s, p, qj))
197             break;
198
199         n++;
200     }
201
202     avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_QDCOUNT, n);
203
204     /* Now add known answers */
205     append_known_answers_and_send(s, p);
206 }
207
208 AvahiQueryJob* query_job_new(AvahiPacketScheduler *s, AvahiKey *key) {
209     AvahiQueryJob *qj;
210     
211     g_assert(s);
212     g_assert(key);
213
214     qj = g_new(AvahiQueryJob, 1);
215     qj->scheduler = s;
216     qj->key = avahi_key_ref(key);
217     qj->done = FALSE;
218     qj->time_event = NULL;
219     
220     AVAHI_LLIST_PREPEND(AvahiQueryJob, jobs, s->query_jobs, qj);
221
222     return qj;
223 }
224
225 void avahi_packet_scheduler_post_query(AvahiPacketScheduler *s, AvahiKey *key, gboolean immediately) {
226     GTimeVal tv;
227     AvahiQueryJob *qj;
228     
229     g_assert(s);
230     g_assert(key);
231
232     avahi_elapse_time(&tv, immediately ? 0 : AVAHI_QUERY_DEFER_MSEC, 0);
233
234     for (qj = s->query_jobs; qj; qj = qj->jobs_next) {
235
236         if (avahi_key_equal(qj->key, key)) {
237
238             glong d = avahi_timeval_diff(&tv, &qj->delivery);
239
240             /* Duplicate questions suppression */
241             if (d >= 0 && d <= AVAHI_QUERY_HISTORY_MSEC*1000) {
242                 g_message("WARNING! DUPLICATE QUERY SUPPRESSION ACTIVE!");
243                 return;
244             }
245             
246             query_job_free(s, qj);
247             break;
248         }
249
250     }
251     
252     qj = query_job_new(s, key);
253     qj->delivery = tv;
254     qj->time_event = avahi_time_event_queue_add(s->server->time_event_queue, &qj->delivery, query_elapse, qj);
255 }
256
257 static guint8* packet_add_response_job(AvahiPacketScheduler *s, AvahiDnsPacket *p, AvahiResponseJob *rj) {
258     guint8 *d;
259
260     g_assert(s);
261     g_assert(p);
262     g_assert(rj);
263
264     if ((d = avahi_dns_packet_append_record(p, rj->record, rj->flush_cache))) {
265         GTimeVal tv;
266
267         rj->done = 1;
268
269         /* Drop response after some time from history */
270         avahi_elapse_time(&tv, AVAHI_RESPONSE_HISTORY_MSEC, 0);
271         avahi_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
272
273         g_get_current_time(&rj->delivery);
274     }
275
276     return d;
277 }
278
279 static void send_response_packet(AvahiPacketScheduler *s, AvahiResponseJob *rj) {
280     AvahiDnsPacket *p;
281     guint n;
282
283     g_assert(s);
284
285     p = avahi_dns_packet_new_response(s->interface->hardware->mtu - 200);
286     n = 0;
287
288     /* If a job was specified, put it in the packet. */
289     if (rj) {
290         guint8 *d;
291         d = packet_add_response_job(s, p, rj);
292         g_assert(d);
293         n++;
294     }
295
296     /* Try to fill up packet with more responses, if available */
297     for (rj = s->response_jobs; rj; rj = rj->jobs_next) {
298
299         if (rj->done)
300             continue;
301
302         if (!packet_add_response_job(s, p, rj))
303             break;
304
305         n++;
306     }
307
308     avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_ANCOUNT, n);
309     avahi_interface_send_packet(s->interface, p);
310     avahi_dns_packet_free(p);
311 }
312
313 static void response_elapse(AvahiTimeEvent *e, gpointer data) {
314     AvahiResponseJob *rj = data;
315     AvahiPacketScheduler *s;
316
317     g_assert(rj);
318     s = rj->scheduler;
319
320     if (rj->done) {
321         /* Lets remove it  from the history */
322         response_job_free(s, rj);
323         return;
324     }
325
326     send_response_packet(s, rj);
327 }
328
329 static AvahiResponseJob* look_for_response(AvahiPacketScheduler *s, AvahiRecord *record) {
330     AvahiResponseJob *rj;
331
332     g_assert(s);
333     g_assert(record);
334
335     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
336         if (avahi_record_equal_no_ttl(rj->record, record))
337             return rj;
338
339     return NULL;
340 }
341
342 static AvahiResponseJob* response_job_new(AvahiPacketScheduler *s, AvahiRecord *record) {
343     AvahiResponseJob *rj;
344     
345     g_assert(s);
346     g_assert(record);
347
348     rj = g_new(AvahiResponseJob, 1);
349     rj->scheduler = s;
350     rj->record = avahi_record_ref(record);
351     rj->done = FALSE;
352     rj->time_event = NULL;
353     rj->address_valid = FALSE;
354     rj->flush_cache = FALSE;
355     
356     AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->response_jobs, rj);
357
358     return rj;
359 }
360
361 void avahi_packet_scheduler_post_response(AvahiPacketScheduler *s, const AvahiAddress *a, AvahiRecord *record, gboolean flush_cache, gboolean immediately) {
362     AvahiResponseJob *rj;
363     GTimeVal tv;
364     gchar *t;
365     
366     g_assert(s);
367     g_assert(record);
368
369     g_assert(!avahi_key_is_pattern(record->key));
370     
371     avahi_elapse_time(&tv, immediately ? 0 : AVAHI_RESPONSE_DEFER_MSEC, immediately ? 0 : AVAHI_RESPONSE_JITTER_MSEC);
372     
373     /* Don't send out duplicates */
374     
375     if ((rj = look_for_response(s, record))) {
376         glong d;
377
378         d = avahi_timeval_diff(&tv, &rj->delivery);
379         
380         /* If there's already a matching packet in our history or in
381          * the schedule, we do nothing. */
382         if (!!record->ttl == !!rj->record->ttl &&
383             d >= 0 && d <= AVAHI_RESPONSE_HISTORY_MSEC*1000) {
384             g_message("WARNING! DUPLICATE RESPONSE SUPPRESSION ACTIVE!");
385
386             /* This job is no longer specific to a single querier, so
387              * make sure it isn't suppressed by known answer
388              * suppresion */
389
390             if (rj->address_valid && (!a || avahi_address_cmp(a, &rj->address) != 0))
391                 rj->address_valid = FALSE;
392
393             rj->flush_cache = flush_cache;
394             
395             return;
396         }
397
398         /* Either one was a goodbye packet, but the other was not, so
399          * let's drop the older one. */
400         response_job_free(s, rj);
401     }
402
403 /*     g_message("ACCEPTED NEW RESPONSE [%s]", t = avahi_record_to_string(record)); */
404 /*     g_free(t); */
405
406     /* Create a new job and schedule it */
407     rj = response_job_new(s, record);
408     rj->flush_cache = flush_cache;
409     rj->delivery = tv;
410     rj->time_event = avahi_time_event_queue_add(s->server->time_event_queue, &rj->delivery, response_elapse, rj);
411
412     /* Store the address of the host this messages is intended to, so
413        that we can drop this job in case a truncated message with
414        known answer suppresion entries is recieved */
415
416     if ((rj->address_valid = !!a))
417         rj->address = *a;
418 }
419
420 void avahi_packet_scheduler_incoming_query(AvahiPacketScheduler *s, AvahiKey *key) {
421     GTimeVal tv;
422     AvahiQueryJob *qj;
423     
424     g_assert(s);
425     g_assert(key);
426
427     /* This function is called whenever an incoming query was
428      * receieved. We drop all scheduled queries which match here. The
429      * keyword is "DUPLICATE QUESTION SUPPRESION". */
430
431     for (qj = s->query_jobs; qj; qj = qj->jobs_next)
432         if (avahi_key_equal(qj->key, key)) {
433
434             if (qj->done)
435                 return;
436
437             goto mark_done;
438         }
439
440
441     /* No matching job was found. Add the query to the history */
442     qj = query_job_new(s, key);
443
444 mark_done:
445     qj->done = TRUE;
446
447     /* Drop the query after some time */
448     avahi_elapse_time(&tv, AVAHI_QUERY_HISTORY_MSEC, 0);
449     qj->time_event = avahi_time_event_queue_add(s->server->time_event_queue, &tv, query_elapse, qj);
450
451     g_get_current_time(&qj->delivery);
452 }
453
454 void response_job_set_elapse_time(AvahiPacketScheduler *s, AvahiResponseJob *rj, guint msec, guint jitter) {
455     GTimeVal tv;
456
457     g_assert(s);
458     g_assert(rj);
459
460     avahi_elapse_time(&tv, msec, jitter);
461
462     if (rj->time_event)
463         avahi_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
464     else
465         rj->time_event = avahi_time_event_queue_add(s->server->time_event_queue, &tv, response_elapse, rj);
466     
467 }
468
469 void avahi_packet_scheduler_incoming_response(AvahiPacketScheduler *s, AvahiRecord *record) {
470     AvahiResponseJob *rj;
471     
472     g_assert(s);
473     g_assert(record);
474
475     /* This function is called whenever an incoming response was
476      * receieved. We drop all scheduled responses which match
477      * here. The keyword is "DUPLICATE ANSWER SUPPRESION". */
478     
479     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
480         if (avahi_record_equal_no_ttl(rj->record, record)) {
481
482             if (rj->done) {
483
484                 if (!!record->ttl == !!rj->record->ttl) {
485                     /* An entry like this is already in our history,
486                      * so let's get out of here! */
487                     
488                     return;
489                     
490                 } else {
491                     /* Either one was a goodbye packet but other was
492                      * none. We remove the history entry, and add a
493                      * new one */
494                     
495                     response_job_free(s, rj);
496                     break;
497                 }
498         
499             } else {
500
501                 if (!!record->ttl == !!rj->record->ttl) {
502
503                     /* The incoming packet matches our scheduled
504                      * record, so let's mark that one as done */
505
506                     goto mark_done;
507                     
508                 } else {
509
510                     /* Either one was a goodbye packet but other was
511                      * none. We ignore the incoming packet. */
512
513                     return;
514                 }
515             }
516         }
517
518     /* No matching job was found. Add the query to the history */
519     rj = response_job_new(s, record);
520
521 mark_done:
522     rj->done = TRUE;
523                     
524     /* Drop response after 500ms from history */
525     response_job_set_elapse_time(s, rj, AVAHI_RESPONSE_HISTORY_MSEC, 0);
526
527     g_get_current_time(&rj->delivery);
528 }
529
530 void avahi_packet_scheduler_incoming_known_answer(AvahiPacketScheduler *s, AvahiRecord *record, const AvahiAddress *a) {
531     AvahiResponseJob *rj;
532     
533     g_assert(s);
534     g_assert(record);
535     g_assert(a);
536
537     for (rj = s->response_jobs; rj; rj = rj->jobs_next) {
538
539         g_assert(record->ttl > 0);
540         g_assert(rj->record->ttl/2);
541         
542         if (avahi_record_equal_no_ttl(rj->record, record))
543             if (rj->address_valid)
544                 if (avahi_address_cmp(&rj->address, a))
545                     if (record->ttl >= rj->record->ttl/2) {
546
547             /* Let's suppress it */
548
549             response_job_free(s, rj);
550             break;
551         }
552     }
553 }
554
555 void avahi_packet_scheduler_flush_responses(AvahiPacketScheduler *s) {
556     AvahiResponseJob *rj;
557     
558     g_assert(s);
559
560     /* Send all scheduled responses, ignoring the scheduled time */
561     
562     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
563         if (!rj->done)
564             send_response_packet(s, rj);
565 }
566
567 static AvahiProbeJob* probe_job_new(AvahiPacketScheduler *s, AvahiRecord *record) {
568     AvahiProbeJob *pj;
569     
570     g_assert(s);
571     g_assert(record);
572
573     pj = g_new(AvahiProbeJob, 1);
574     pj->scheduler = s;
575     pj->record = avahi_record_ref(record);
576     pj->time_event = NULL;
577     pj->chosen = FALSE;
578     
579     AVAHI_LLIST_PREPEND(AvahiProbeJob, jobs, s->probe_jobs, pj);
580
581     return pj;
582 }
583
584 static guint8* packet_add_probe_query(AvahiPacketScheduler *s, AvahiDnsPacket *p, AvahiProbeJob *pj) {
585     guint size;
586     guint8 *ret;
587     AvahiKey *k;
588
589     g_assert(s);
590     g_assert(p);
591     g_assert(pj);
592
593     g_assert(!pj->chosen);
594     
595     /* Estimate the size for this record */
596     size =
597         avahi_key_get_estimate_size(pj->record->key) +
598         avahi_record_get_estimate_size(pj->record);
599
600     /* Too large */
601     if (size > avahi_dns_packet_space(p))
602         return NULL;
603
604     /* Create the probe query */
605     k = avahi_key_new(pj->record->key->name, pj->record->key->class, AVAHI_DNS_TYPE_ANY);
606     ret = avahi_dns_packet_append_key(p, k, FALSE);
607     g_assert(ret);
608
609     /* Mark this job for addition to the packet */
610     pj->chosen = TRUE;
611
612     /* Scan for more jobs whith matching key pattern */
613     for (pj = s->probe_jobs; pj; pj = pj->jobs_next) {
614         if (pj->chosen)
615             continue;
616
617         /* Does the record match the probe? */
618         if (k->class != pj->record->key->class || !avahi_domain_equal(k->name, pj->record->key->name))
619             continue;
620         
621         /* This job wouldn't fit in */
622         if (avahi_record_get_estimate_size(pj->record) > avahi_dns_packet_space(p))
623             break;
624
625         /* Mark this job for addition to the packet */
626         pj->chosen = TRUE;
627     }
628
629     avahi_key_unref(k);
630             
631     return ret;
632 }
633
634 static void probe_elapse(AvahiTimeEvent *e, gpointer data) {
635     AvahiProbeJob *pj = data, *next;
636     AvahiPacketScheduler *s;
637     AvahiDnsPacket *p;
638     guint n;
639     guint8 *d;
640
641     g_assert(pj);
642     s = pj->scheduler;
643
644     p = avahi_dns_packet_new_query(s->interface->hardware->mtu - 48);
645
646     /* Add the import probe */
647     if (!packet_add_probe_query(s, p, pj)) {
648         g_warning("Record too large! ---");
649         avahi_dns_packet_free(p);
650         return;
651     }
652
653     n = 1;
654     
655     /* Try to fill up packet with more probes, if available */
656     for (pj = s->probe_jobs; pj; pj = pj->jobs_next) {
657
658         if (pj->chosen)
659             continue;
660         
661         if (!packet_add_probe_query(s, p, pj))
662             break;
663         
664         n++;
665     }
666
667     avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_QDCOUNT, n);
668
669     n = 0;
670
671     /* Now add the chosen records to the authorative section */
672     for (pj = s->probe_jobs; pj; pj = next) {
673
674         next = pj->jobs_next;
675
676         if (!pj->chosen)
677             continue;
678
679         if (!avahi_dns_packet_append_record(p, pj->record, TRUE)) {
680             g_warning("Bad probe size estimate!");
681
682             /* Unmark all following jobs */
683             for (; pj; pj = pj->jobs_next)
684                 pj->chosen = FALSE;
685             
686             break;
687         }
688
689         probe_job_free(s, pj);
690         
691         n ++;
692     }
693     
694     avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_NSCOUNT, n);
695
696     /* Send it now */
697     avahi_interface_send_packet(s->interface, p);
698     avahi_dns_packet_free(p);
699 }
700
701 void avahi_packet_scheduler_post_probe(AvahiPacketScheduler *s, AvahiRecord *record, gboolean immediately) {
702     AvahiProbeJob *pj;
703     GTimeVal tv;
704     
705     g_assert(s);
706     g_assert(record);
707     g_assert(!avahi_key_is_pattern(record->key));
708     
709     avahi_elapse_time(&tv, immediately ? 0 : AVAHI_PROBE_DEFER_MSEC, 0);
710
711     /* Create a new job and schedule it */
712     pj = probe_job_new(s, record);
713     pj->delivery = tv;
714     pj->time_event = avahi_time_event_queue_add(s->server->time_event_queue, &pj->delivery, probe_elapse, pj);
715 }