]> git.meshlink.io Git - catta/blob - libavahi-core/psched.c
autotoolize
[catta] / libavahi-core / psched.c
1 /* $Id$ */
2
3 /***
4   This file is part of avahi.
5  
6   avahi is free software; you can redistribute it and/or modify it
7   under the terms of the GNU Lesser General Public License as
8   published by the Free Software Foundation; either version 2.1 of the
9   License, or (at your option) any later version.
10  
11   avahi is distributed in the hope that it will be useful, but WITHOUT
12   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13   or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
14   Public License for more details.
15  
16   You should have received a copy of the GNU Lesser General Public
17   License along with avahi; if not, write to the Free Software
18   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19   USA.
20 ***/
21
22 #include <string.h>
23
24 #include "util.h"
25 #include "psched.h"
26
27 #define AVAHI_QUERY_HISTORY_MSEC 100
28 #define AVAHI_QUERY_DEFER_MSEC 100
29 #define AVAHI_RESPONSE_HISTORY_MSEC 700
30 #define AVAHI_RESPONSE_DEFER_MSEC 20
31 #define AVAHI_RESPONSE_JITTER_MSEC 100
32 #define AVAHI_PROBE_DEFER_MSEC 70
33
34 AvahiPacketScheduler *avahi_packet_scheduler_new(AvahiServer *server, AvahiInterface *i) {
35     AvahiPacketScheduler *s;
36
37     g_assert(server);
38     g_assert(i);
39
40     s = g_new(AvahiPacketScheduler, 1);
41     s->server = server;
42     s->interface = i;
43
44     AVAHI_LLIST_HEAD_INIT(AvahiQueryJob, s->query_jobs);
45     AVAHI_LLIST_HEAD_INIT(AvahiResponseJob, s->response_jobs);
46     AVAHI_LLIST_HEAD_INIT(AvahiKnownAnswer, s->known_answers);
47     AVAHI_LLIST_HEAD_INIT(AvahiProbeJob, s->probe_jobs);
48     
49     return s;
50 }
51
52 static void query_job_free(AvahiPacketScheduler *s, AvahiQueryJob *qj) {
53     g_assert(qj);
54
55     if (qj->time_event)
56         avahi_time_event_queue_remove(qj->scheduler->server->time_event_queue, qj->time_event);
57
58     AVAHI_LLIST_REMOVE(AvahiQueryJob, jobs, s->query_jobs, qj);
59     
60     avahi_key_unref(qj->key);
61     g_free(qj);
62 }
63
64 static void response_job_free(AvahiPacketScheduler *s, AvahiResponseJob *rj) {
65     g_assert(rj);
66
67     if (rj->time_event)
68         avahi_time_event_queue_remove(rj->scheduler->server->time_event_queue, rj->time_event);
69
70     AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->response_jobs, rj);
71
72     avahi_record_unref(rj->record);
73     g_free(rj);
74 }
75
76 static void probe_job_free(AvahiPacketScheduler *s, AvahiProbeJob *pj) {
77     g_assert(pj);
78
79     if (pj->time_event)
80         avahi_time_event_queue_remove(pj->scheduler->server->time_event_queue, pj->time_event);
81
82     AVAHI_LLIST_REMOVE(AvahiProbeJob, jobs, s->probe_jobs, pj);
83
84     avahi_record_unref(pj->record);
85     g_free(pj);
86 }
87
88 void avahi_packet_scheduler_free(AvahiPacketScheduler *s) {
89     AvahiQueryJob *qj;
90     AvahiResponseJob *rj;
91     AvahiProbeJob *pj;
92
93     g_assert(s);
94
95     g_assert(!s->known_answers);
96     
97     while ((qj = s->query_jobs))
98         query_job_free(s, qj);
99     while ((rj = s->response_jobs))
100         response_job_free(s, rj);
101     while ((pj = s->probe_jobs))
102         probe_job_free(s, pj);
103
104     g_free(s);
105 }
106
107 static gpointer known_answer_walk_callback(AvahiCache *c, AvahiKey *pattern, AvahiCacheEntry *e, gpointer userdata) {
108     AvahiPacketScheduler *s = userdata;
109     AvahiKnownAnswer *ka;
110     
111     g_assert(c);
112     g_assert(pattern);
113     g_assert(e);
114     g_assert(s);
115
116     if (avahi_cache_entry_half_ttl(c, e))
117         return NULL;
118     
119     ka = g_new0(AvahiKnownAnswer, 1);
120     ka->scheduler = s;
121     ka->record = avahi_record_ref(e->record);
122
123     AVAHI_LLIST_PREPEND(AvahiKnownAnswer, known_answer, s->known_answers, ka);
124     return NULL;
125 }
126
127 static guint8* packet_add_query_job(AvahiPacketScheduler *s, AvahiDnsPacket *p, AvahiQueryJob *qj) {
128     guint8 *d;
129
130     g_assert(s);
131     g_assert(p);
132     g_assert(qj);
133
134     if ((d = avahi_dns_packet_append_key(p, qj->key, FALSE))) {
135         GTimeVal tv;
136
137         qj->done = 1;
138
139         /* Drop query after some time from history */
140         avahi_elapse_time(&tv, AVAHI_QUERY_HISTORY_MSEC, 0);
141         avahi_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv);
142
143         g_get_current_time(&qj->delivery);
144
145         /* Add all matching known answers to the list */
146         avahi_cache_walk(s->interface->cache, qj->key, known_answer_walk_callback, s);
147     }
148
149     return d;
150 }
151
152 static void append_known_answers_and_send(AvahiPacketScheduler *s, AvahiDnsPacket *p) {
153     AvahiKnownAnswer *ka;
154     guint n;
155     g_assert(s);
156     g_assert(p);
157
158     n = 0;
159     
160     while ((ka = s->known_answers)) {
161
162         while (!avahi_dns_packet_append_record(p, ka->record, FALSE)) {
163
164             g_assert(!avahi_dns_packet_is_empty(p));
165
166             avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_FLAGS, avahi_dns_packet_get_field(p, AVAHI_DNS_FIELD_FLAGS) | AVAHI_DNS_FLAG_TC);
167             avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_ANCOUNT, n);
168             avahi_interface_send_packet(s->interface, p);
169             avahi_dns_packet_free(p);
170
171             p = avahi_dns_packet_new_query(s->interface->hardware->mtu - 48);
172             n = 0;
173         }
174
175         AVAHI_LLIST_REMOVE(AvahiKnownAnswer, known_answer, s->known_answers, ka);
176         avahi_record_unref(ka->record);
177         g_free(ka);
178         
179         n++;
180     }
181     
182     avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_ANCOUNT, n);
183     avahi_interface_send_packet(s->interface, p);
184     avahi_dns_packet_free(p);
185 }
186
187 static void query_elapse(AvahiTimeEvent *e, gpointer data) {
188     AvahiQueryJob *qj = data;
189     AvahiPacketScheduler *s;
190     AvahiDnsPacket *p;
191     guint n;
192     guint8 *d;
193
194     g_assert(qj);
195     s = qj->scheduler;
196
197     if (qj->done) {
198         /* Lets remove it  from the history */
199         query_job_free(s, qj);
200         return;
201     }
202
203     g_assert(!s->known_answers);
204     
205     p = avahi_dns_packet_new_query(s->interface->hardware->mtu - 48);
206     d = packet_add_query_job(s, p, qj);
207     g_assert(d);
208     n = 1;
209
210     /* Try to fill up packet with more queries, if available */
211     for (qj = s->query_jobs; qj; qj = qj->jobs_next) {
212
213         if (qj->done)
214             continue;
215
216         if (!packet_add_query_job(s, p, qj))
217             break;
218
219         n++;
220     }
221
222     avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_QDCOUNT, n);
223
224     /* Now add known answers */
225     append_known_answers_and_send(s, p);
226 }
227
228 AvahiQueryJob* query_job_new(AvahiPacketScheduler *s, AvahiKey *key) {
229     AvahiQueryJob *qj;
230     
231     g_assert(s);
232     g_assert(key);
233
234     qj = g_new(AvahiQueryJob, 1);
235     qj->scheduler = s;
236     qj->key = avahi_key_ref(key);
237     qj->done = FALSE;
238     qj->time_event = NULL;
239     
240     AVAHI_LLIST_PREPEND(AvahiQueryJob, jobs, s->query_jobs, qj);
241
242     return qj;
243 }
244
245 void avahi_packet_scheduler_post_query(AvahiPacketScheduler *s, AvahiKey *key, gboolean immediately) {
246     GTimeVal tv;
247     AvahiQueryJob *qj;
248     
249     g_assert(s);
250     g_assert(key);
251
252     avahi_elapse_time(&tv, immediately ? 0 : AVAHI_QUERY_DEFER_MSEC, 0);
253
254     for (qj = s->query_jobs; qj; qj = qj->jobs_next) {
255
256         if (avahi_key_equal(qj->key, key)) {
257
258             glong d = avahi_timeval_diff(&tv, &qj->delivery);
259
260             /* Duplicate questions suppression */
261             if (d >= 0 && d <= AVAHI_QUERY_HISTORY_MSEC*1000) {
262                 g_message("WARNING! DUPLICATE QUERY SUPPRESSION ACTIVE!");
263                 return;
264             }
265             
266             query_job_free(s, qj);
267             break;
268         }
269
270     }
271     
272     qj = query_job_new(s, key);
273     qj->delivery = tv;
274     qj->time_event = avahi_time_event_queue_add(s->server->time_event_queue, &qj->delivery, query_elapse, qj);
275 }
276
277 static guint8* packet_add_response_job(AvahiPacketScheduler *s, AvahiDnsPacket *p, AvahiResponseJob *rj) {
278     guint8 *d;
279
280     g_assert(s);
281     g_assert(p);
282     g_assert(rj);
283
284     if ((d = avahi_dns_packet_append_record(p, rj->record, rj->flush_cache))) {
285         GTimeVal tv;
286
287         rj->done = 1;
288
289         /* Drop response after some time from history */
290         avahi_elapse_time(&tv, AVAHI_RESPONSE_HISTORY_MSEC, 0);
291         avahi_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
292
293         g_get_current_time(&rj->delivery);
294     }
295
296     return d;
297 }
298
299 static void send_response_packet(AvahiPacketScheduler *s, AvahiResponseJob *rj) {
300     AvahiDnsPacket *p;
301     guint n;
302
303     g_assert(s);
304
305     p = avahi_dns_packet_new_response(s->interface->hardware->mtu - 200);
306     n = 0;
307
308     /* If a job was specified, put it in the packet. */
309     if (rj) {
310         guint8 *d;
311         d = packet_add_response_job(s, p, rj);
312         g_assert(d);
313         n++;
314     }
315
316     /* Try to fill up packet with more responses, if available */
317     for (rj = s->response_jobs; rj; rj = rj->jobs_next) {
318
319         if (rj->done)
320             continue;
321
322         if (!packet_add_response_job(s, p, rj))
323             break;
324
325         n++;
326     }
327
328     avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_ANCOUNT, n);
329     avahi_interface_send_packet(s->interface, p);
330     avahi_dns_packet_free(p);
331 }
332
333 static void response_elapse(AvahiTimeEvent *e, gpointer data) {
334     AvahiResponseJob *rj = data;
335     AvahiPacketScheduler *s;
336
337     g_assert(rj);
338     s = rj->scheduler;
339
340     if (rj->done) {
341         /* Lets remove it  from the history */
342         response_job_free(s, rj);
343         return;
344     }
345
346     send_response_packet(s, rj);
347 }
348
349 static AvahiResponseJob* look_for_response(AvahiPacketScheduler *s, AvahiRecord *record) {
350     AvahiResponseJob *rj;
351
352     g_assert(s);
353     g_assert(record);
354
355     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
356         if (avahi_record_equal_no_ttl(rj->record, record))
357             return rj;
358
359     return NULL;
360 }
361
362 static AvahiResponseJob* response_job_new(AvahiPacketScheduler *s, AvahiRecord *record) {
363     AvahiResponseJob *rj;
364     
365     g_assert(s);
366     g_assert(record);
367
368     rj = g_new(AvahiResponseJob, 1);
369     rj->scheduler = s;
370     rj->record = avahi_record_ref(record);
371     rj->done = FALSE;
372     rj->time_event = NULL;
373     rj->address_valid = FALSE;
374     rj->flush_cache = FALSE;
375     
376     AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->response_jobs, rj);
377
378     return rj;
379 }
380
381 void avahi_packet_scheduler_post_response(AvahiPacketScheduler *s, const AvahiAddress *a, AvahiRecord *record, gboolean flush_cache, gboolean immediately) {
382     AvahiResponseJob *rj;
383     GTimeVal tv;
384     
385     g_assert(s);
386     g_assert(record);
387
388     g_assert(!avahi_key_is_pattern(record->key));
389     
390     avahi_elapse_time(&tv, immediately ? 0 : AVAHI_RESPONSE_DEFER_MSEC, immediately ? 0 : AVAHI_RESPONSE_JITTER_MSEC);
391     
392     /* Don't send out duplicates */
393     
394     if ((rj = look_for_response(s, record))) {
395         glong d;
396
397         d = avahi_timeval_diff(&tv, &rj->delivery);
398         
399         /* If there's already a matching packet in our history or in
400          * the schedule, we do nothing. */
401         if (!!record->ttl == !!rj->record->ttl &&
402             d >= 0 && d <= AVAHI_RESPONSE_HISTORY_MSEC*1000) {
403             g_message("WARNING! DUPLICATE RESPONSE SUPPRESSION ACTIVE!");
404
405             /* This job is no longer specific to a single querier, so
406              * make sure it isn't suppressed by known answer
407              * suppresion */
408
409             if (rj->address_valid && (!a || avahi_address_cmp(a, &rj->address) != 0))
410                 rj->address_valid = FALSE;
411
412             rj->flush_cache = flush_cache;
413             
414             return;
415         }
416
417         /* Either one was a goodbye packet, but the other was not, so
418          * let's drop the older one. */
419         response_job_free(s, rj);
420     }
421
422 /*     g_message("ACCEPTED NEW RESPONSE [%s]", t = avahi_record_to_string(record)); */
423 /*     g_free(t); */
424
425     /* Create a new job and schedule it */
426     rj = response_job_new(s, record);
427     rj->flush_cache = flush_cache;
428     rj->delivery = tv;
429     rj->time_event = avahi_time_event_queue_add(s->server->time_event_queue, &rj->delivery, response_elapse, rj);
430
431     /* Store the address of the host this messages is intended to, so
432        that we can drop this job in case a truncated message with
433        known answer suppresion entries is recieved */
434
435     if ((rj->address_valid = !!a))
436         rj->address = *a;
437 }
438
439 void avahi_packet_scheduler_incoming_query(AvahiPacketScheduler *s, AvahiKey *key) {
440     GTimeVal tv;
441     AvahiQueryJob *qj;
442     
443     g_assert(s);
444     g_assert(key);
445
446     /* This function is called whenever an incoming query was
447      * receieved. We drop all scheduled queries which match here. The
448      * keyword is "DUPLICATE QUESTION SUPPRESION". */
449
450     for (qj = s->query_jobs; qj; qj = qj->jobs_next)
451         if (avahi_key_equal(qj->key, key)) {
452
453             if (qj->done)
454                 return;
455
456             goto mark_done;
457         }
458
459
460     /* No matching job was found. Add the query to the history */
461     qj = query_job_new(s, key);
462
463 mark_done:
464     qj->done = TRUE;
465
466     /* Drop the query after some time */
467     avahi_elapse_time(&tv, AVAHI_QUERY_HISTORY_MSEC, 0);
468     qj->time_event = avahi_time_event_queue_add(s->server->time_event_queue, &tv, query_elapse, qj);
469
470     g_get_current_time(&qj->delivery);
471 }
472
473 void response_job_set_elapse_time(AvahiPacketScheduler *s, AvahiResponseJob *rj, guint msec, guint jitter) {
474     GTimeVal tv;
475
476     g_assert(s);
477     g_assert(rj);
478
479     avahi_elapse_time(&tv, msec, jitter);
480
481     if (rj->time_event)
482         avahi_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
483     else
484         rj->time_event = avahi_time_event_queue_add(s->server->time_event_queue, &tv, response_elapse, rj);
485     
486 }
487
488 void avahi_packet_scheduler_incoming_response(AvahiPacketScheduler *s, AvahiRecord *record) {
489     AvahiResponseJob *rj;
490     
491     g_assert(s);
492     g_assert(record);
493
494     /* This function is called whenever an incoming response was
495      * receieved. We drop all scheduled responses which match
496      * here. The keyword is "DUPLICATE ANSWER SUPPRESION". */
497     
498     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
499         if (avahi_record_equal_no_ttl(rj->record, record)) {
500
501             if (rj->done) {
502
503                 if (!!record->ttl == !!rj->record->ttl) {
504                     /* An entry like this is already in our history,
505                      * so let's get out of here! */
506                     
507                     return;
508                     
509                 } else {
510                     /* Either one was a goodbye packet but other was
511                      * none. We remove the history entry, and add a
512                      * new one */
513                     
514                     response_job_free(s, rj);
515                     break;
516                 }
517         
518             } else {
519
520                 if (!!record->ttl == !!rj->record->ttl) {
521
522                     /* The incoming packet matches our scheduled
523                      * record, so let's mark that one as done */
524
525                     goto mark_done;
526                     
527                 } else {
528
529                     /* Either one was a goodbye packet but other was
530                      * none. We ignore the incoming packet. */
531
532                     return;
533                 }
534             }
535         }
536
537     /* No matching job was found. Add the query to the history */
538     rj = response_job_new(s, record);
539
540 mark_done:
541     rj->done = TRUE;
542                     
543     /* Drop response after 500ms from history */
544     response_job_set_elapse_time(s, rj, AVAHI_RESPONSE_HISTORY_MSEC, 0);
545
546     g_get_current_time(&rj->delivery);
547 }
548
549 void avahi_packet_scheduler_incoming_known_answer(AvahiPacketScheduler *s, AvahiRecord *record, const AvahiAddress *a) {
550     AvahiResponseJob *rj;
551     
552     g_assert(s);
553     g_assert(record);
554     g_assert(a);
555
556     for (rj = s->response_jobs; rj; rj = rj->jobs_next) {
557
558         g_assert(record->ttl > 0);
559         g_assert(rj->record->ttl/2);
560         
561         if (avahi_record_equal_no_ttl(rj->record, record))
562             if (rj->address_valid)
563                 if (avahi_address_cmp(&rj->address, a))
564                     if (record->ttl >= rj->record->ttl/2) {
565
566             /* Let's suppress it */
567
568             response_job_free(s, rj);
569             break;
570         }
571     }
572 }
573
574 void avahi_packet_scheduler_flush_responses(AvahiPacketScheduler *s) {
575     AvahiResponseJob *rj;
576     
577     g_assert(s);
578
579     /* Send all scheduled responses, ignoring the scheduled time */
580     
581     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
582         if (!rj->done)
583             send_response_packet(s, rj);
584 }
585
586 static AvahiProbeJob* probe_job_new(AvahiPacketScheduler *s, AvahiRecord *record) {
587     AvahiProbeJob *pj;
588     
589     g_assert(s);
590     g_assert(record);
591
592     pj = g_new(AvahiProbeJob, 1);
593     pj->scheduler = s;
594     pj->record = avahi_record_ref(record);
595     pj->time_event = NULL;
596     pj->chosen = FALSE;
597     
598     AVAHI_LLIST_PREPEND(AvahiProbeJob, jobs, s->probe_jobs, pj);
599
600     return pj;
601 }
602
603 static guint8* packet_add_probe_query(AvahiPacketScheduler *s, AvahiDnsPacket *p, AvahiProbeJob *pj) {
604     guint size;
605     guint8 *ret;
606     AvahiKey *k;
607
608     g_assert(s);
609     g_assert(p);
610     g_assert(pj);
611
612     g_assert(!pj->chosen);
613     
614     /* Estimate the size for this record */
615     size =
616         avahi_key_get_estimate_size(pj->record->key) +
617         avahi_record_get_estimate_size(pj->record);
618
619     /* Too large */
620     if (size > avahi_dns_packet_space(p))
621         return NULL;
622
623     /* Create the probe query */
624     k = avahi_key_new(pj->record->key->name, pj->record->key->class, AVAHI_DNS_TYPE_ANY);
625     ret = avahi_dns_packet_append_key(p, k, FALSE);
626     g_assert(ret);
627
628     /* Mark this job for addition to the packet */
629     pj->chosen = TRUE;
630
631     /* Scan for more jobs whith matching key pattern */
632     for (pj = s->probe_jobs; pj; pj = pj->jobs_next) {
633         if (pj->chosen)
634             continue;
635
636         /* Does the record match the probe? */
637         if (k->class != pj->record->key->class || !avahi_domain_equal(k->name, pj->record->key->name))
638             continue;
639         
640         /* This job wouldn't fit in */
641         if (avahi_record_get_estimate_size(pj->record) > avahi_dns_packet_space(p))
642             break;
643
644         /* Mark this job for addition to the packet */
645         pj->chosen = TRUE;
646     }
647
648     avahi_key_unref(k);
649             
650     return ret;
651 }
652
653 static void probe_elapse(AvahiTimeEvent *e, gpointer data) {
654     AvahiProbeJob *pj = data, *next;
655     AvahiPacketScheduler *s;
656     AvahiDnsPacket *p;
657     guint n;
658
659     g_assert(pj);
660     s = pj->scheduler;
661
662     p = avahi_dns_packet_new_query(s->interface->hardware->mtu - 48);
663
664     /* Add the import probe */
665     if (!packet_add_probe_query(s, p, pj)) {
666         g_warning("Record too large! ---");
667         avahi_dns_packet_free(p);
668         return;
669     }
670
671     n = 1;
672     
673     /* Try to fill up packet with more probes, if available */
674     for (pj = s->probe_jobs; pj; pj = pj->jobs_next) {
675
676         if (pj->chosen)
677             continue;
678         
679         if (!packet_add_probe_query(s, p, pj))
680             break;
681         
682         n++;
683     }
684
685     avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_QDCOUNT, n);
686
687     n = 0;
688
689     /* Now add the chosen records to the authorative section */
690     for (pj = s->probe_jobs; pj; pj = next) {
691
692         next = pj->jobs_next;
693
694         if (!pj->chosen)
695             continue;
696
697         if (!avahi_dns_packet_append_record(p, pj->record, TRUE)) {
698             g_warning("Bad probe size estimate!");
699
700             /* Unmark all following jobs */
701             for (; pj; pj = pj->jobs_next)
702                 pj->chosen = FALSE;
703             
704             break;
705         }
706
707         probe_job_free(s, pj);
708         
709         n ++;
710     }
711     
712     avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_NSCOUNT, n);
713
714     /* Send it now */
715     avahi_interface_send_packet(s->interface, p);
716     avahi_dns_packet_free(p);
717 }
718
719 void avahi_packet_scheduler_post_probe(AvahiPacketScheduler *s, AvahiRecord *record, gboolean immediately) {
720     AvahiProbeJob *pj;
721     GTimeVal tv;
722     
723     g_assert(s);
724     g_assert(record);
725     g_assert(!avahi_key_is_pattern(record->key));
726     
727     avahi_elapse_time(&tv, immediately ? 0 : AVAHI_PROBE_DEFER_MSEC, 0);
728
729     /* Create a new job and schedule it */
730     pj = probe_job_new(s, record);
731     pj->delivery = tv;
732     pj->time_event = avahi_time_event_queue_add(s->server->time_event_queue, &pj->delivery, probe_elapse, pj);
733 }