]> git.meshlink.io Git - catta/blob - avahi-core/response-sched.c
* remove lots of DOXYGEN_SHOULD_SKIP_THIS from the header files, use doxygen macro...
[catta] / avahi-core / response-sched.c
1 /* $Id$ */
2
3 /***
4   This file is part of avahi.
5  
6   avahi is free software; you can redistribute it and/or modify it
7   under the terms of the GNU Lesser General Public License as
8   published by the Free Software Foundation; either version 2.1 of the
9   License, or (at your option) any later version.
10  
11   avahi is distributed in the hope that it will be useful, but WITHOUT
12   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13   or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
14   Public License for more details.
15  
16   You should have received a copy of the GNU Lesser General Public
17   License along with avahi; if not, write to the Free Software
18   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19   USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <avahi-common/timeval.h>
27 #include <avahi-common/malloc.h>
28
29 #include "response-sched.h"
30 #include "log.h"
31 #include "rr-util.h"
32
33 /* Local packets are supressed this long after sending them */
34 #define AVAHI_RESPONSE_HISTORY_MSEC 500
35
36 /* Local packets are deferred this long before sending them */
37 #define AVAHI_RESPONSE_DEFER_MSEC 20
38
39 /* Additional jitter for deferred packets */
40 #define AVAHI_RESPONSE_JITTER_MSEC 100
41
42 /* Remote packets can suppress local traffic as long as this value */
43 #define AVAHI_RESPONSE_SUPPRESS_MSEC 700
44
45 typedef struct AvahiResponseJob AvahiResponseJob;
46
47 typedef enum {
48     AVAHI_SCHEDULED,
49     AVAHI_DONE,
50     AVAHI_SUPPRESSED
51 } AvahiResponseJobState;
52
53 struct AvahiResponseJob {
54     AvahiResponseScheduler *scheduler;
55     AvahiTimeEvent *time_event;
56     
57     AvahiResponseJobState state;
58     struct timeval delivery;
59
60     AvahiRecord *record;
61     int flush_cache;
62     AvahiAddress querier;
63     int querier_valid;
64     
65     AVAHI_LLIST_FIELDS(AvahiResponseJob, jobs);
66 };
67
68 struct AvahiResponseScheduler {
69     AvahiInterface *interface;
70     AvahiTimeEventQueue *time_event_queue;
71
72     AVAHI_LLIST_HEAD(AvahiResponseJob, jobs);
73     AVAHI_LLIST_HEAD(AvahiResponseJob, history);
74     AVAHI_LLIST_HEAD(AvahiResponseJob, suppressed);
75 };
76
77 static AvahiResponseJob* job_new(AvahiResponseScheduler *s, AvahiRecord *record, AvahiResponseJobState state) {
78     AvahiResponseJob *rj;
79     
80     assert(s);
81     assert(record);
82
83     if (!(rj = avahi_new(AvahiResponseJob, 1))) {
84         avahi_log_error(__FILE__": Out of memory");
85         return NULL;
86     }
87     
88     rj->scheduler = s;
89     rj->record = avahi_record_ref(record);
90     rj->time_event = NULL;
91     rj->flush_cache = 0;
92     rj->querier_valid = 0;
93     
94     if ((rj->state = state) == AVAHI_SCHEDULED) 
95         AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->jobs, rj);
96     else if (rj->state == AVAHI_DONE)
97         AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->history, rj);
98     else  /* rj->state == AVAHI_SUPPRESSED */
99         AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->suppressed, rj);
100
101     return rj;
102 }
103
104 static void job_free(AvahiResponseScheduler *s, AvahiResponseJob *rj) {
105     assert(s);
106     assert(rj);
107
108     if (rj->time_event)
109         avahi_time_event_free(rj->time_event);
110
111     if (rj->state == AVAHI_SCHEDULED)
112         AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->jobs, rj);
113     else if (rj->state == AVAHI_DONE)
114         AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->history, rj);
115     else /* rj->state == AVAHI_SUPPRESSED */
116         AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->suppressed, rj);
117
118     avahi_record_unref(rj->record);
119     avahi_free(rj);
120 }
121
122 static void elapse_callback(AvahiTimeEvent *e, void* data);
123
124 static void job_set_elapse_time(AvahiResponseScheduler *s, AvahiResponseJob *rj, unsigned msec, unsigned jitter) {
125     struct timeval tv;
126
127     assert(s);
128     assert(rj);
129
130     avahi_elapse_time(&tv, msec, jitter);
131
132     if (rj->time_event)
133         avahi_time_event_update(rj->time_event, &tv);
134     else
135         rj->time_event = avahi_time_event_new(s->time_event_queue, &tv, elapse_callback, rj);
136 }
137
138 static void job_mark_done(AvahiResponseScheduler *s, AvahiResponseJob *rj) {
139     assert(s);
140     assert(rj);
141
142     assert(rj->state == AVAHI_SCHEDULED);
143
144     AVAHI_LLIST_REMOVE(AvahiResponseJob, jobs, s->jobs, rj);
145     AVAHI_LLIST_PREPEND(AvahiResponseJob, jobs, s->history, rj);
146
147     rj->state = AVAHI_DONE;
148
149     job_set_elapse_time(s, rj, AVAHI_RESPONSE_HISTORY_MSEC, 0);
150
151     gettimeofday(&rj->delivery, NULL);
152 }
153
154 AvahiResponseScheduler *avahi_response_scheduler_new(AvahiInterface *i) {
155     AvahiResponseScheduler *s;
156     assert(i);
157
158     if (!(s = avahi_new(AvahiResponseScheduler, 1))) {
159         avahi_log_error(__FILE__": Out of memory");
160         return NULL;
161     }
162         
163     s->interface = i;
164     s->time_event_queue = i->monitor->server->time_event_queue;
165     
166     AVAHI_LLIST_HEAD_INIT(AvahiResponseJob, s->jobs);
167     AVAHI_LLIST_HEAD_INIT(AvahiResponseJob, s->history);
168     AVAHI_LLIST_HEAD_INIT(AvahiResponseJob, s->suppressed);
169
170     return s;
171 }
172
173 void avahi_response_scheduler_free(AvahiResponseScheduler *s) {
174     assert(s);
175
176     avahi_response_scheduler_clear(s);
177     avahi_free(s);
178 }
179
180 void avahi_response_scheduler_clear(AvahiResponseScheduler *s) {
181     assert(s);
182     
183     while (s->jobs)
184         job_free(s, s->jobs);
185     while (s->history)
186         job_free(s, s->history);
187     while (s->suppressed)
188         job_free(s, s->suppressed);
189 }
190
191 static void enumerate_aux_records_callback(AvahiServer *s, AvahiRecord *r, int flush_cache, void* userdata) {
192     AvahiResponseJob *rj = userdata;
193     
194     assert(r);
195     assert(rj);
196
197     avahi_response_scheduler_post(rj->scheduler, r, flush_cache, rj->querier_valid ? &rj->querier : NULL, 0);
198 }
199
200 static int packet_add_response_job(AvahiResponseScheduler *s, AvahiDnsPacket *p, AvahiResponseJob *rj) {
201     assert(s);
202     assert(p);
203     assert(rj);
204
205     /* Try to add this record to the packet */
206     if (!avahi_dns_packet_append_record(p, rj->record, rj->flush_cache, 0))
207         return 0;
208
209     /* Ok, this record will definitely be sent, so schedule the
210      * auxilliary packets, too */
211     avahi_server_enumerate_aux_records(s->interface->monitor->server, s->interface, rj->record, enumerate_aux_records_callback, rj);
212     job_mark_done(s, rj);
213     
214     return 1;
215 }
216
217 static void send_response_packet(AvahiResponseScheduler *s, AvahiResponseJob *rj) {
218     AvahiDnsPacket *p;
219     unsigned n;
220
221     assert(s);
222     assert(rj);
223
224     if (!(p = avahi_dns_packet_new_response(s->interface->hardware->mtu, 1)))
225         return; /* OOM */
226     n = 1;
227
228     /* Put it in the packet. */
229     if (packet_add_response_job(s, p, rj)) {
230
231         /* Try to fill up packet with more responses, if available */
232         while (s->jobs) {
233             
234             if (!packet_add_response_job(s, p, s->jobs))
235                 break;
236             
237             n++;
238         }
239         
240     } else {
241         size_t size;
242         
243         avahi_dns_packet_free(p);
244
245         /* OK, the packet was too small, so create one that fits */
246         size = avahi_record_get_estimate_size(rj->record) + AVAHI_DNS_PACKET_HEADER_SIZE;
247
248         if (size > AVAHI_DNS_PACKET_SIZE_MAX)
249             size = AVAHI_DNS_PACKET_SIZE_MAX;
250         
251         if (!(p = avahi_dns_packet_new_response(size, 1)))
252             return; /* OOM */
253
254         if (!packet_add_response_job(s, p, rj)) {
255             avahi_dns_packet_free(p);
256
257             avahi_log_warn("Record too large, cannot send");
258             job_mark_done(s, rj);
259             return;
260         }
261     }
262
263     avahi_dns_packet_set_field(p, AVAHI_DNS_FIELD_ANCOUNT, n);
264     avahi_interface_send_packet(s->interface, p);
265     avahi_dns_packet_free(p);
266 }
267
268 static void elapse_callback(AvahiTimeEvent *e, void* data) {
269     AvahiResponseJob *rj = data;
270
271     assert(rj);
272
273     if (rj->state == AVAHI_DONE || rj->state == AVAHI_SUPPRESSED) 
274         job_free(rj->scheduler, rj);         /* Lets drop this entry */
275     else
276         send_response_packet(rj->scheduler, rj);
277 }
278
279 static AvahiResponseJob* find_scheduled_job(AvahiResponseScheduler *s, AvahiRecord *record) {
280     AvahiResponseJob *rj;
281
282     assert(s);
283     assert(record);
284
285     for (rj = s->jobs; rj; rj = rj->jobs_next) {
286         assert(rj->state == AVAHI_SCHEDULED);
287     
288         if (avahi_record_equal_no_ttl(rj->record, record))
289             return rj;
290     }
291
292     return NULL;
293 }
294
295 static AvahiResponseJob* find_history_job(AvahiResponseScheduler *s, AvahiRecord *record) {
296     AvahiResponseJob *rj;
297     
298     assert(s);
299     assert(record);
300
301     for (rj = s->history; rj; rj = rj->jobs_next) {
302         assert(rj->state == AVAHI_DONE);
303
304         if (avahi_record_equal_no_ttl(rj->record, record)) {
305             /* Check whether this entry is outdated */
306
307 /*             avahi_log_debug("history age: %u", (unsigned) (avahi_age(&rj->delivery)/1000)); */
308             
309             if (avahi_age(&rj->delivery)/1000 > AVAHI_RESPONSE_HISTORY_MSEC) {
310                 /* it is outdated, so let's remove it */
311                 job_free(s, rj);
312                 return NULL;
313             }
314                 
315             return rj;
316         }
317     }
318
319     return NULL;
320 }
321
322 static AvahiResponseJob* find_suppressed_job(AvahiResponseScheduler *s, AvahiRecord *record, const AvahiAddress *querier) {
323     AvahiResponseJob *rj;
324     
325     assert(s);
326     assert(record);
327     assert(querier);
328
329     for (rj = s->suppressed; rj; rj = rj->jobs_next) {
330         assert(rj->state == AVAHI_SUPPRESSED);
331         assert(rj->querier_valid);
332         
333         if (avahi_record_equal_no_ttl(rj->record, record) &&
334             avahi_address_cmp(&rj->querier, querier) == 0) {
335             /* Check whether this entry is outdated */
336
337             if (avahi_age(&rj->delivery) > AVAHI_RESPONSE_SUPPRESS_MSEC*1000) {
338                 /* it is outdated, so let's remove it */
339                 job_free(s, rj);
340                 return NULL;
341             }
342
343             return rj;
344         }
345     }
346
347     return NULL;
348 }
349
350 int avahi_response_scheduler_post(AvahiResponseScheduler *s, AvahiRecord *record, int flush_cache, const AvahiAddress *querier, int immediately) {
351     AvahiResponseJob *rj;
352     struct timeval tv;
353 /*     char *t; */
354     
355     assert(s);
356     assert(record);
357
358     assert(!avahi_key_is_pattern(record->key));
359
360 /*     t = avahi_record_to_string(record); */
361 /*     avahi_log_debug("post %i %s", immediately, t); */
362 /*     avahi_free(t); */
363
364     /* Check whether this response is suppressed */
365     if (querier &&
366         (rj = find_suppressed_job(s, record, querier)) &&
367         avahi_record_is_goodbye(record) == avahi_record_is_goodbye(rj->record) &&
368         rj->record->ttl >= record->ttl/2) {
369
370 /*         avahi_log_debug("Response suppressed by known answer suppression.");  */
371         return 0;
372     }
373
374     /* Check if we already sent this response recently */
375     if ((rj = find_history_job(s, record))) {
376
377         if (avahi_record_is_goodbye(record) == avahi_record_is_goodbye(rj->record) &&
378             rj->record->ttl >= record->ttl/2 &&
379             (rj->flush_cache || !flush_cache)) {
380 /*             avahi_log_debug("Response suppressed by local duplicate suppression (history)");  */
381             return 0;
382         }
383
384         /* Outdated ... */
385         job_free(s, rj);
386     }
387
388     avahi_elapse_time(&tv, immediately ? 0 : AVAHI_RESPONSE_DEFER_MSEC, immediately ? 0 : AVAHI_RESPONSE_JITTER_MSEC);
389          
390     if ((rj = find_scheduled_job(s, record))) {
391 /*          avahi_log_debug("Response suppressed by local duplicate suppression (scheduled)"); */
392
393         /* Update a little ... */
394
395         /* Update the time if the new is prior to the old */
396         if (avahi_timeval_compare(&tv, &rj->delivery) < 0) {
397             rj->delivery = tv;
398             avahi_time_event_update(rj->time_event, &rj->delivery);
399         }
400
401         /* Update the flush cache bit */
402         if (flush_cache)
403             rj->flush_cache = 1;
404
405         /* Update the querier field */
406         if (!querier || (rj->querier_valid && avahi_address_cmp(querier, &rj->querier) != 0))
407             rj->querier_valid = 0;
408
409         /* Update record data (just for the TTL) */
410         avahi_record_unref(rj->record);
411         rj->record = avahi_record_ref(record);
412
413         return 1;
414     } else {
415 /*         avahi_log_debug("Accepted new response job.");  */
416
417         /* Create a new job and schedule it */
418         if (!(rj = job_new(s, record, AVAHI_SCHEDULED)))
419             return 0; /* OOM */
420         
421         rj->delivery = tv;
422         rj->time_event = avahi_time_event_new(s->time_event_queue, &rj->delivery, elapse_callback, rj);
423         rj->flush_cache = flush_cache;
424
425         if ((rj->querier_valid = !!querier))
426             rj->querier = *querier;
427
428         return 1;
429     }
430 }
431
432 void avahi_response_scheduler_incoming(AvahiResponseScheduler *s, AvahiRecord *record, int flush_cache) {
433     AvahiResponseJob *rj;
434     assert(s);
435
436     /* This function is called whenever an incoming response was
437      * receieved. We drop scheduled responses which match here. The
438      * keyword is "DUPLICATE ANSWER SUPPRESION". */
439     
440     if ((rj = find_scheduled_job(s, record))) {
441
442         if ((!rj->flush_cache || flush_cache) &&    /* flush cache bit was set correctly */
443             avahi_record_is_goodbye(record) == avahi_record_is_goodbye(rj->record) &&   /* both goodbye packets, or both not */
444             record->ttl >= rj->record->ttl/2) {     /* sensible TTL */
445
446             /* A matching entry was found, so let's mark it done */
447 /*             avahi_log_debug("Response suppressed by distributed duplicate suppression"); */
448             job_mark_done(s, rj);
449         }
450
451         return;
452     }
453
454     if ((rj = find_history_job(s, record))) {
455         /* Found a history job, let's update it */
456         avahi_record_unref(rj->record);
457         rj->record = avahi_record_ref(record);
458     } else
459         /* Found no existing history job, so let's create a new one */
460         if (!(rj = job_new(s, record, AVAHI_DONE)))
461             return; /* OOM */
462
463     rj->flush_cache = flush_cache;
464     rj->querier_valid = 0;
465     
466     gettimeofday(&rj->delivery, NULL);
467     job_set_elapse_time(s, rj, AVAHI_RESPONSE_HISTORY_MSEC, 0);
468 }
469
470 void avahi_response_scheduler_suppress(AvahiResponseScheduler *s, AvahiRecord *record, const AvahiAddress *querier) {
471     AvahiResponseJob *rj;
472     
473     assert(s);
474     assert(record);
475     assert(querier);
476
477     if ((rj = find_scheduled_job(s, record))) {
478         
479         if (rj->querier_valid && avahi_address_cmp(querier, &rj->querier) == 0 && /* same originator */
480             avahi_record_is_goodbye(record) == avahi_record_is_goodbye(rj->record) && /* both goodbye packets, or both not */
481             record->ttl >= rj->record->ttl/2) {                                  /* sensible TTL */
482
483             /* A matching entry was found, so let's drop it */
484 /*             avahi_log_debug("Known answer suppression active!"); */
485             job_free(s, rj);
486         }
487     }
488
489     if ((rj = find_suppressed_job(s, record, querier))) {
490
491         /* Let's update the old entry */
492         avahi_record_unref(rj->record);
493         rj->record = avahi_record_ref(record);
494         
495     } else {
496
497         /* Create a new entry */
498         if (!(rj = job_new(s, record, AVAHI_SUPPRESSED)))
499             return; /* OOM */
500         rj->querier_valid = 1;
501         rj->querier = *querier;
502     }
503
504     gettimeofday(&rj->delivery, NULL);
505     job_set_elapse_time(s, rj, AVAHI_RESPONSE_SUPPRESS_MSEC, 0);
506 }
507
508 void avahi_response_scheduler_force(AvahiResponseScheduler *s) {
509     assert(s);
510
511     /* Send all scheduled responses immediately */
512     while (s->jobs)
513         send_response_packet(s, s->jobs);
514 }