]> git.meshlink.io Git - catta/blob - src/announce.c
change some int declarations that should have said CattaIfIndex
[catta] / src / announce.c
1 /***
2   This file is part of catta.
3
4   catta is free software; you can redistribute it and/or modify it
5   under the terms of the GNU Lesser General Public License as
6   published by the Free Software Foundation; either version 2.1 of the
7   License, or (at your option) any later version.
8
9   catta is distributed in the hope that it will be useful, but WITHOUT
10   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11   or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
12   Public License for more details.
13
14   You should have received a copy of the GNU Lesser General Public
15   License along with catta; if not, write to the Free Software
16   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
17   USA.
18 ***/
19
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23
24 #include <stdlib.h>
25
26 #include <catta/timeval.h>
27 #include <catta/malloc.h>
28
29 #include "announce.h"
30 #include <catta/log.h>
31 #include "rr-util.h"
32
33 #define CATTA_ANNOUNCEMENT_JITTER_MSEC 250
34 #define CATTA_PROBE_JITTER_MSEC 250
35 #define CATTA_PROBE_INTERVAL_MSEC 250
36
37 static void remove_announcer(CattaServer *s, CattaAnnouncer *a) {
38     assert(s);
39     assert(a);
40
41     if (a->time_event)
42         catta_time_event_free(a->time_event);
43
44     CATTA_LLIST_REMOVE(CattaAnnouncer, by_interface, a->iface->announcers, a);
45     CATTA_LLIST_REMOVE(CattaAnnouncer, by_entry, a->entry->announcers, a);
46
47     catta_free(a);
48 }
49
50 static void elapse_announce(CattaTimeEvent *e, void *userdata);
51
52 static void set_timeout(CattaAnnouncer *a, const struct timeval *tv) {
53     assert(a);
54
55     if (!tv) {
56         if (a->time_event) {
57             catta_time_event_free(a->time_event);
58             a->time_event = NULL;
59         }
60     } else {
61
62         if (a->time_event)
63             catta_time_event_update(a->time_event, tv);
64         else
65             a->time_event = catta_time_event_new(a->server->time_event_queue, tv, elapse_announce, a);
66     }
67 }
68
69 static void next_state(CattaAnnouncer *a);
70
71 void catta_s_entry_group_check_probed(CattaSEntryGroup *g, int immediately) {
72     CattaEntry *e;
73     assert(g);
74     assert(!g->dead);
75
76     /* Check whether all group members have been probed */
77
78     if (g->state != CATTA_ENTRY_GROUP_REGISTERING || g->n_probing > 0)
79         return;
80
81     catta_s_entry_group_change_state(g, CATTA_ENTRY_GROUP_ESTABLISHED);
82
83     if (g->dead)
84         return;
85
86     for (e = g->entries; e; e = e->by_group_next) {
87         CattaAnnouncer *a;
88
89         for (a = e->announcers; a; a = a->by_entry_next) {
90
91             if (a->state != CATTA_WAITING)
92                 continue;
93
94             a->state = CATTA_ANNOUNCING;
95
96             if (immediately) {
97                 /* Shortcut */
98
99                 a->n_iteration = 1;
100                 next_state(a);
101             } else {
102                 struct timeval tv;
103                 a->n_iteration = 0;
104                 catta_elapse_time(&tv, 0, CATTA_ANNOUNCEMENT_JITTER_MSEC);
105                 set_timeout(a, &tv);
106             }
107         }
108     }
109 }
110
111 static void next_state(CattaAnnouncer *a) {
112     assert(a);
113
114     if (a->state == CATTA_WAITING) {
115
116         assert(a->entry->group);
117
118         catta_s_entry_group_check_probed(a->entry->group, 1);
119
120     } else if (a->state == CATTA_PROBING) {
121
122         if (a->n_iteration >= 4) {
123             /* Probing done */
124
125             if (a->entry->group) {
126                 assert(a->entry->group->n_probing);
127                 a->entry->group->n_probing--;
128             }
129
130             if (a->entry->group && a->entry->group->state == CATTA_ENTRY_GROUP_REGISTERING)
131                 a->state = CATTA_WAITING;
132             else {
133                 a->state = CATTA_ANNOUNCING;
134                 a->n_iteration = 1;
135             }
136
137             set_timeout(a, NULL);
138             next_state(a);
139         } else {
140             struct timeval tv;
141
142             catta_interface_post_probe(a->iface, a->entry->record, 0);
143
144             catta_elapse_time(&tv, CATTA_PROBE_INTERVAL_MSEC, 0);
145             set_timeout(a, &tv);
146
147             a->n_iteration++;
148         }
149
150     } else if (a->state == CATTA_ANNOUNCING) {
151
152         if (a->entry->flags & CATTA_PUBLISH_UNIQUE)
153             /* Send the whole rrset at once */
154             catta_server_prepare_matching_responses(a->server, a->iface, a->entry->record->key, 0);
155         else
156             catta_server_prepare_response(a->server, a->iface, a->entry, 0, 0);
157
158         catta_server_generate_response(a->server, a->iface, NULL, NULL, 0, 0, 0);
159
160         if (++a->n_iteration >= 4) {
161             /* Announcing done */
162
163             a->state = CATTA_ESTABLISHED;
164
165             set_timeout(a, NULL);
166         } else {
167             struct timeval tv;
168             catta_elapse_time(&tv, a->sec_delay*1000, CATTA_ANNOUNCEMENT_JITTER_MSEC);
169
170             if (a->n_iteration < 10)
171                 a->sec_delay *= 2;
172
173             set_timeout(a, &tv);
174         }
175     }
176 }
177
178 static void elapse_announce(CattaTimeEvent *e, void *userdata) {
179     assert(e);
180
181     next_state(userdata);
182 }
183
184 static CattaAnnouncer *get_announcer(CattaServer *s, CattaEntry *e, CattaInterface *i) {
185     CattaAnnouncer *a;
186
187     assert(s);
188     assert(e);
189     assert(i);
190
191     for (a = e->announcers; a; a = a->by_entry_next)
192         if (a->iface == i)
193             return a;
194
195     return NULL;
196 }
197
198 static void go_to_initial_state(CattaAnnouncer *a) {
199     CattaEntry *e;
200     struct timeval tv;
201
202     assert(a);
203     e = a->entry;
204
205     if ((e->flags & CATTA_PUBLISH_UNIQUE) && !(e->flags & CATTA_PUBLISH_NO_PROBE))
206         a->state = CATTA_PROBING;
207     else if (!(e->flags & CATTA_PUBLISH_NO_ANNOUNCE)) {
208
209         if (!e->group || e->group->state == CATTA_ENTRY_GROUP_ESTABLISHED)
210             a->state = CATTA_ANNOUNCING;
211         else
212             a->state = CATTA_WAITING;
213
214     } else
215         a->state = CATTA_ESTABLISHED;
216
217     a->n_iteration = 1;
218     a->sec_delay = 1;
219
220     if (a->state == CATTA_PROBING && e->group)
221         e->group->n_probing++;
222
223     if (a->state == CATTA_PROBING)
224         set_timeout(a, catta_elapse_time(&tv, 0, CATTA_PROBE_JITTER_MSEC));
225     else if (a->state == CATTA_ANNOUNCING)
226         set_timeout(a, catta_elapse_time(&tv, 0, CATTA_ANNOUNCEMENT_JITTER_MSEC));
227     else
228         set_timeout(a, NULL);
229 }
230
231 static void new_announcer(CattaServer *s, CattaInterface *i, CattaEntry *e) {
232     CattaAnnouncer *a;
233
234     assert(s);
235     assert(i);
236     assert(e);
237     assert(!e->dead);
238
239     if (!catta_interface_match(i, e->iface, e->protocol) || !i->announcing || !catta_entry_is_commited(e))
240         return;
241
242     /* We don't want duplicate announcers */
243     if (get_announcer(s, e, i))
244         return;
245
246     if ((!(a = catta_new(CattaAnnouncer, 1)))) {
247         catta_log_error(__FILE__": Out of memory.");
248         return;
249     }
250
251     a->server = s;
252     a->iface = i;
253     a->entry = e;
254     a->time_event = NULL;
255
256     CATTA_LLIST_PREPEND(CattaAnnouncer, by_interface, i->announcers, a);
257     CATTA_LLIST_PREPEND(CattaAnnouncer, by_entry, e->announcers, a);
258
259     go_to_initial_state(a);
260 }
261
262 void catta_announce_interface(CattaServer *s, CattaInterface *i) {
263     CattaEntry *e;
264
265     assert(s);
266     assert(i);
267
268     if (!i->announcing)
269         return;
270
271     for (e = s->entries; e; e = e->entries_next)
272         if (!e->dead)
273             new_announcer(s, i, e);
274 }
275
276 static void announce_walk_callback(CattaInterfaceMonitor *m, CattaInterface *i, void* userdata) {
277     CattaEntry *e = userdata;
278
279     assert(m);
280     assert(i);
281     assert(e);
282     assert(!e->dead);
283
284     new_announcer(m->server, i, e);
285 }
286
287 void catta_announce_entry(CattaServer *s, CattaEntry *e) {
288     assert(s);
289     assert(e);
290     assert(!e->dead);
291
292     catta_interface_monitor_walk(s->monitor, e->iface, e->protocol, announce_walk_callback, e);
293 }
294
295 void catta_announce_group(CattaServer *s, CattaSEntryGroup *g) {
296     CattaEntry *e;
297
298     assert(s);
299     assert(g);
300
301     for (e = g->entries; e; e = e->by_group_next)
302         if (!e->dead)
303             catta_announce_entry(s, e);
304 }
305
306 int catta_entry_is_registered(CattaServer *s, CattaEntry *e, CattaInterface *i) {
307     CattaAnnouncer *a;
308
309     assert(s);
310     assert(e);
311     assert(i);
312     assert(!e->dead);
313
314     if (!(a = get_announcer(s, e, i)))
315         return 0;
316
317     return
318         a->state == CATTA_ANNOUNCING ||
319         a->state == CATTA_ESTABLISHED ||
320         (a->state == CATTA_WAITING && !(e->flags & CATTA_PUBLISH_UNIQUE));
321 }
322
323 int catta_entry_is_probing(CattaServer *s, CattaEntry *e, CattaInterface *i) {
324     CattaAnnouncer *a;
325
326     assert(s);
327     assert(e);
328     assert(i);
329     assert(!e->dead);
330
331     if (!(a = get_announcer(s, e, i)))
332         return 0;
333
334     return
335         a->state == CATTA_PROBING ||
336         (a->state == CATTA_WAITING && (e->flags & CATTA_PUBLISH_UNIQUE));
337 }
338
339 void catta_entry_return_to_initial_state(CattaServer *s, CattaEntry *e, CattaInterface *i) {
340     CattaAnnouncer *a;
341
342     assert(s);
343     assert(e);
344     assert(i);
345
346     if (!(a = get_announcer(s, e, i)))
347         return;
348
349     if (a->state == CATTA_PROBING && a->entry->group)
350         a->entry->group->n_probing--;
351
352     go_to_initial_state(a);
353 }
354
355 static CattaRecord *make_goodbye_record(CattaRecord *r) {
356     CattaRecord *g;
357
358     assert(r);
359
360     if (!(g = catta_record_copy(r)))
361         return NULL; /* OOM */
362
363     assert(g->ref == 1);
364     g->ttl = 0;
365
366     return g;
367 }
368
369 static int is_duplicate_entry(CattaServer *s, CattaEntry *e) {
370     CattaEntry *i;
371
372     assert(s);
373     assert(e);
374
375     for (i = catta_hashmap_lookup(s->entries_by_key, e->record->key); i; i = i->by_key_next) {
376
377         if ((i == e) || (i->dead))
378             continue;
379
380         if (!catta_record_equal_no_ttl(i->record, e->record))
381             continue;
382
383         return 1;
384     }
385
386     return 0;
387 }
388
389 static void send_goodbye_callback(CattaInterfaceMonitor *m, CattaInterface *i, void* userdata) {
390     CattaEntry *e = userdata;
391     CattaRecord *g;
392
393     assert(m);
394     assert(i);
395     assert(e);
396     assert(!e->dead);
397
398     if (!catta_interface_match(i, e->iface, e->protocol))
399         return;
400
401     if (e->flags & CATTA_PUBLISH_NO_ANNOUNCE)
402         return;
403
404     if (!catta_entry_is_registered(m->server, e, i))
405         return;
406
407     if (is_duplicate_entry(m->server, e))
408         return;
409
410     if (!(g = make_goodbye_record(e->record)))
411         return; /* OOM */
412
413     catta_interface_post_response(i, g, e->flags & CATTA_PUBLISH_UNIQUE, NULL, 1);
414     catta_record_unref(g);
415 }
416
417 static void reannounce(CattaAnnouncer *a) {
418     CattaEntry *e;
419     struct timeval tv;
420
421     assert(a);
422     e = a->entry;
423
424     /* If the group this entry belongs to is not even commited, there's nothing to reannounce */
425     if (e->group && (e->group->state == CATTA_ENTRY_GROUP_UNCOMMITED || e->group->state == CATTA_ENTRY_GROUP_COLLISION))
426         return;
427
428     /* Because we might change state we decrease the probing counter first */
429     if (a->state == CATTA_PROBING && a->entry->group)
430         a->entry->group->n_probing--;
431
432     if (a->state == CATTA_PROBING ||
433         (a->state == CATTA_WAITING && (e->flags & CATTA_PUBLISH_UNIQUE) && !(e->flags & CATTA_PUBLISH_NO_PROBE)))
434
435         /* We were probing or waiting after probe, so we restart probing from the beginning here */
436
437         a->state = CATTA_PROBING;
438     else if (a->state == CATTA_WAITING)
439
440         /* We were waiting, but were not probing before, so we continue waiting  */
441         a->state = CATTA_WAITING;
442
443     else if (e->flags & CATTA_PUBLISH_NO_ANNOUNCE)
444
445         /* No announcer needed */
446         a->state = CATTA_ESTABLISHED;
447
448     else {
449
450         /* Ok, let's restart announcing */
451         a->state = CATTA_ANNOUNCING;
452     }
453
454     /* Now let's increase the probing counter again */
455     if (a->state == CATTA_PROBING && e->group)
456         e->group->n_probing++;
457
458     a->n_iteration = 1;
459     a->sec_delay = 1;
460
461     if (a->state == CATTA_PROBING)
462         set_timeout(a, catta_elapse_time(&tv, 0, CATTA_PROBE_JITTER_MSEC));
463     else if (a->state == CATTA_ANNOUNCING)
464         set_timeout(a, catta_elapse_time(&tv, 0, CATTA_ANNOUNCEMENT_JITTER_MSEC));
465     else
466         set_timeout(a, NULL);
467 }
468
469
470 static void reannounce_walk_callback(CattaInterfaceMonitor *m, CattaInterface *i, void* userdata) {
471     CattaEntry *e = userdata;
472     CattaAnnouncer *a;
473
474     assert(m);
475     assert(i);
476     assert(e);
477     assert(!e->dead);
478
479     if (!(a = get_announcer(m->server, e, i)))
480         return;
481
482     reannounce(a);
483 }
484
485 void catta_reannounce_entry(CattaServer *s, CattaEntry *e) {
486
487     assert(s);
488     assert(e);
489     assert(!e->dead);
490
491     catta_interface_monitor_walk(s->monitor, e->iface, e->protocol, reannounce_walk_callback, e);
492 }
493
494 void catta_goodbye_interface(CattaServer *s, CattaInterface *i, int send_goodbye, int remove) {
495     assert(s);
496     assert(i);
497
498     if (send_goodbye)
499         if (i->announcing) {
500             CattaEntry *e;
501
502             for (e = s->entries; e; e = e->entries_next)
503                 if (!e->dead)
504                     send_goodbye_callback(s->monitor, i, e);
505         }
506
507     if (remove)
508         while (i->announcers)
509             remove_announcer(s, i->announcers);
510 }
511
512 void catta_goodbye_entry(CattaServer *s, CattaEntry *e, int send_goodbye, int remove) {
513     assert(s);
514     assert(e);
515
516     if (send_goodbye)
517         if (!e->dead)
518             catta_interface_monitor_walk(s->monitor, CATTA_IF_UNSPEC, CATTA_PROTO_UNSPEC, send_goodbye_callback, e);
519
520     if (remove)
521         while (e->announcers)
522             remove_announcer(s, e->announcers);
523 }
524