]> git.meshlink.io Git - meshlink/blob - src/event.c
Don't send UDP probes to tiny nodes.
[meshlink] / src / event.c
1 /*
2     event.c -- I/O, timeout and signal event handling
3     Copyright (C) 2014-2017 Guus Sliepen <guus@meshlink.io>
4
5     This program is free software; you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation; either version 2 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License along
16     with this program; if not, write to the Free Software Foundation, Inc.,
17     51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "system.h"
21
22 #include "dropin.h"
23 #include "event.h"
24 #include "logger.h"
25 #include "meshlink.h"
26 #include "net.h"
27 #include "splay_tree.h"
28 #include "utils.h"
29 #include "xalloc.h"
30
31 #ifndef EVENT_CLOCK
32 #if defined(CLOCK_MONOTONIC_RAW) && defined(__x86_64__)
33 #define EVENT_CLOCK CLOCK_MONOTONIC_RAW
34 #else
35 #define EVENT_CLOCK CLOCK_MONOTONIC
36 #endif
37 #endif
38
39 static int io_compare(const io_t *a, const io_t *b) {
40         return a->fd - b->fd;
41 }
42
43 static int timeout_compare(const timeout_t *a, const timeout_t *b) {
44         if(a->tv.tv_sec < b->tv.tv_sec) {
45                 return -1;
46         } else if(a->tv.tv_sec > b->tv.tv_sec) {
47                 return 1;
48         } else if(a->tv.tv_nsec < b->tv.tv_nsec) {
49                 return -1;
50         } else if(a->tv.tv_nsec > b->tv.tv_nsec) {
51                 return 1;
52         } else if(a < b) {
53                 return -1;
54         } else if(a > b) {
55                 return 1;
56         } else {
57                 return 0;
58         }
59 }
60
61 void io_add(event_loop_t *loop, io_t *io, io_cb_t cb, void *data, int fd, int flags) {
62         assert(!io->cb);
63
64         io->fd = fd;
65         io->cb = cb;
66         io->data = data;
67         io->node.data = io;
68
69         io_set(loop, io, flags);
70
71         splay_node_t *node = splay_insert_node(&loop->ios, &io->node);
72         assert(node);
73         (void)node;
74 }
75
76 void io_set(event_loop_t *loop, io_t *io, int flags) {
77         assert(io->cb);
78
79         io->flags = flags;
80
81         if(flags & IO_READ) {
82                 FD_SET(io->fd, &loop->readfds);
83         } else {
84                 FD_CLR(io->fd, &loop->readfds);
85         }
86
87         if(flags & IO_WRITE) {
88                 FD_SET(io->fd, &loop->writefds);
89         } else {
90                 FD_CLR(io->fd, &loop->writefds);
91         }
92 }
93
94 void io_del(event_loop_t *loop, io_t *io) {
95         assert(io->cb);
96
97         loop->deletion = true;
98
99         io_set(loop, io, 0);
100
101         splay_unlink_node(&loop->ios, &io->node);
102         io->cb = NULL;
103 }
104
105 void timeout_add(event_loop_t *loop, timeout_t *timeout, timeout_cb_t cb, void *data, struct timespec *tv) {
106         timeout->cb = cb;
107         timeout->data = data;
108
109         timeout_set(loop, timeout, tv);
110 }
111
112 void timeout_set(event_loop_t *loop, timeout_t *timeout, struct timespec *tv) {
113         assert(timeout->cb);
114
115         if(timeout->node.data) {
116                 splay_unlink_node(&loop->timeouts, &timeout->node);
117         } else {
118                 timeout->node.data = timeout;
119         }
120
121         if(!loop->now.tv_sec) {
122                 clock_gettime(EVENT_CLOCK, &loop->now);
123         }
124
125         timespec_add(&loop->now, tv, &timeout->tv);
126
127         if(!splay_insert_node(&loop->timeouts, &timeout->node)) {
128                 abort();
129         }
130
131         loop->deletion = true;
132 }
133
134 static void timeout_disable(event_loop_t *loop, timeout_t *timeout) {
135         if(timeout->node.data) {
136                 splay_unlink_node(&loop->timeouts, &timeout->node);
137                 timeout->node.data = NULL;
138         }
139
140         timespec_clear(&timeout->tv);
141 }
142
143 void timeout_del(event_loop_t *loop, timeout_t *timeout) {
144         if(!timeout->cb) {
145                 return;
146         }
147
148         if(timeout->node.data) {
149                 timeout_disable(loop, timeout);
150         }
151
152         timeout->cb = NULL;
153         loop->deletion = true;
154 }
155
156 static int signal_compare(const signal_t *a, const signal_t *b) {
157         return (int)a->signum - (int)b->signum;
158 }
159
160 static void signalio_handler(event_loop_t *loop, void *data, int flags) {
161         (void)data;
162         (void)flags;
163         unsigned char signum;
164
165         if(read(loop->pipefd[0], &signum, 1) != 1) {
166                 return;
167         }
168
169         signal_t *sig = splay_search(&loop->signals, &(signal_t) {
170                 .signum = signum
171         });
172
173         if(sig) {
174 #ifdef HAVE_STDATOMIC_H
175                 atomic_flag_clear(&sig->set);
176 #endif
177                 sig->cb(loop, sig->data);
178         }
179 }
180
181 static void pipe_init(event_loop_t *loop) {
182         int result = pipe(loop->pipefd);
183         assert(result == 0);
184
185         if(result == 0) {
186 #ifdef O_NONBLOCK
187                 fcntl(loop->pipefd[0], F_SETFL, O_NONBLOCK);
188                 fcntl(loop->pipefd[1], F_SETFL, O_NONBLOCK);
189 #endif
190                 io_add(loop, &loop->signalio, signalio_handler, NULL, loop->pipefd[0], IO_READ);
191         }
192 }
193
194 static void pipe_exit(event_loop_t *loop) {
195         io_del(loop, &loop->signalio);
196
197         close(loop->pipefd[0]);
198         close(loop->pipefd[1]);
199
200         loop->pipefd[0] = -1;
201         loop->pipefd[1] = -1;
202 }
203
204 void signal_trigger(event_loop_t *loop, signal_t *sig) {
205 #ifdef HAVE_STDATOMIC_H
206
207         if(atomic_flag_test_and_set(&sig->set)) {
208                 return;
209         }
210
211 #endif
212
213         uint8_t signum = sig->signum;
214         write(loop->pipefd[1], &signum, 1);
215         return;
216 }
217
218 void signal_add(event_loop_t *loop, signal_t *sig, signal_cb_t cb, void *data, uint8_t signum) {
219         assert(!sig->cb);
220
221         sig->cb = cb;
222         sig->data = data;
223         sig->signum = signum;
224         sig->node.data = sig;
225
226 #ifdef HAVE_STDATOMIC_H
227         atomic_flag_clear(&sig->set);
228 #endif
229
230         if(loop->pipefd[0] == -1) {
231                 pipe_init(loop);
232         }
233
234         if(!splay_insert_node(&loop->signals, &sig->node)) {
235                 abort();
236         }
237 }
238
239 void signal_del(event_loop_t *loop, signal_t *sig) {
240         assert(sig->cb);
241
242         loop->deletion = true;
243
244         splay_unlink_node(&loop->signals, &sig->node);
245         sig->cb = NULL;
246
247         if(!loop->signals.count && loop->pipefd[0] != -1) {
248                 pipe_exit(loop);
249         }
250 }
251
252 void idle_set(event_loop_t *loop, idle_cb_t cb, void *data) {
253         loop->idle_cb = cb;
254         loop->idle_data = data;
255 }
256
257 static void check_bad_fds(event_loop_t *loop, meshlink_handle_t *mesh) {
258         // Just call all registered callbacks and have them check their fds
259
260         do {
261                 loop->deletion = false;
262
263                 for splay_each(io_t, io, &loop->ios) {
264                         if(io->flags & IO_WRITE) {
265                                 io->cb(loop, io->data, IO_WRITE);
266                         }
267
268                         if(loop->deletion) {
269                                 break;
270                         }
271
272                         if(io->flags & IO_READ) {
273                                 io->cb(loop, io->data, IO_READ);
274                         }
275
276                         if(loop->deletion) {
277                                 break;
278                         }
279                 }
280         } while(loop->deletion);
281
282         // Rebuild the fdsets
283
284         fd_set old_readfds;
285         fd_set old_writefds;
286         memcpy(&old_readfds, &loop->readfds, sizeof(old_readfds));
287         memcpy(&old_writefds, &loop->writefds, sizeof(old_writefds));
288
289         memset(&loop->readfds, 0, sizeof(loop->readfds));
290         memset(&loop->writefds, 0, sizeof(loop->writefds));
291
292         for splay_each(io_t, io, &loop->ios) {
293                 if(io->flags & IO_READ) {
294                         FD_SET(io->fd, &loop->readfds);
295                         io->cb(loop, io->data, IO_READ);
296                 }
297
298                 if(io->flags & IO_WRITE) {
299                         FD_SET(io->fd, &loop->writefds);
300                         io->cb(loop, io->data, IO_WRITE);
301                 }
302         }
303
304         if(memcmp(&old_readfds, &loop->readfds, sizeof(old_readfds))) {
305                 logger(mesh, MESHLINK_WARNING, "Incorrect readfds fixed");
306         }
307
308         if(memcmp(&old_writefds, &loop->writefds, sizeof(old_writefds))) {
309                 logger(mesh, MESHLINK_WARNING, "Incorrect writefds fixed");
310         }
311 }
312
313 bool event_loop_run(event_loop_t *loop, meshlink_handle_t *mesh) {
314         assert(mesh);
315
316         fd_set readable;
317         fd_set writable;
318         int errors = 0;
319
320         while(loop->running) {
321                 clock_gettime(EVENT_CLOCK, &loop->now);
322                 struct timespec it, ts = {3600, 0};
323
324                 while(loop->timeouts.head) {
325                         timeout_t *timeout = loop->timeouts.head->data;
326
327                         if(timespec_lt(&timeout->tv, &loop->now)) {
328                                 timeout_disable(loop, timeout);
329                                 timeout->cb(loop, timeout->data);
330                         } else {
331                                 timespec_sub(&timeout->tv, &loop->now, &ts);
332                                 break;
333                         }
334                 }
335
336                 if(loop->idle_cb) {
337                         it = loop->idle_cb(loop, loop->idle_data);
338
339                         if(it.tv_sec >= 0 && timespec_lt(&it, &ts)) {
340                                 ts = it;
341                         }
342                 }
343
344                 memcpy(&readable, &loop->readfds, sizeof(readable));
345                 memcpy(&writable, &loop->writefds, sizeof(writable));
346
347                 int fds = 0;
348
349                 if(loop->ios.tail) {
350                         io_t *last = loop->ios.tail->data;
351                         fds = last->fd + 1;
352                 }
353
354                 // release mesh mutex during select
355                 pthread_mutex_unlock(&mesh->mutex);
356
357 #ifdef HAVE_PSELECT
358                 int n = pselect(fds, &readable, &writable, NULL, &ts, NULL);
359 #else
360                 struct timeval tv = {ts.tv_sec, ts.tv_nsec / 1000};
361                 int n = select(fds, &readable, &writable, NULL, (struct timeval *)&tv);
362 #endif
363
364                 if(pthread_mutex_lock(&mesh->mutex) != 0) {
365                         abort();
366                 }
367
368                 clock_gettime(EVENT_CLOCK, &loop->now);
369
370                 if(n < 0) {
371                         if(sockwouldblock(errno)) {
372                                 continue;
373                         } else {
374                                 errors++;
375
376                                 if(errors > 10) {
377                                         logger(mesh, MESHLINK_ERROR, "Unrecoverable error from select(): %s", strerror(errno));
378                                         return false;
379                                 }
380
381                                 logger(mesh, MESHLINK_WARNING, "Error from select(), checking for bad fds: %s", strerror(errno));
382                                 check_bad_fds(loop, mesh);
383                                 continue;
384                         }
385                 }
386
387                 errors = 0;
388
389                 if(!n) {
390                         continue;
391                 }
392
393                 // Normally, splay_each allows the current node to be deleted. However,
394                 // it can be that one io callback triggers the deletion of another io,
395                 // so we have to detect this and break the loop.
396
397                 loop->deletion = false;
398
399                 for splay_each(io_t, io, &loop->ios) {
400                         if(FD_ISSET(io->fd, &writable) && io->cb) {
401                                 io->cb(loop, io->data, IO_WRITE);
402                         }
403
404                         if(loop->deletion) {
405                                 break;
406                         }
407
408                         if(FD_ISSET(io->fd, &readable) && io->cb) {
409                                 io->cb(loop, io->data, IO_READ);
410                         }
411
412                         if(loop->deletion) {
413                                 break;
414                         }
415                 }
416         }
417
418         return true;
419 }
420
421 void event_loop_start(event_loop_t *loop) {
422         loop->running = true;
423 }
424
425 void event_loop_stop(event_loop_t *loop) {
426         loop->running = false;
427 }
428
429 void event_loop_init(event_loop_t *loop) {
430         loop->ios.compare = (splay_compare_t)io_compare;
431         loop->timeouts.compare = (splay_compare_t)timeout_compare;
432         loop->signals.compare = (splay_compare_t)signal_compare;
433         loop->pipefd[0] = -1;
434         loop->pipefd[1] = -1;
435         clock_gettime(EVENT_CLOCK, &loop->now);
436 }
437
438 void event_loop_exit(event_loop_t *loop) {
439         assert(!loop->ios.count);
440         assert(!loop->timeouts.count);
441         assert(!loop->signals.count);
442
443         for splay_each(io_t, io, &loop->ios) {
444                 splay_unlink_node(&loop->ios, splay_node);
445         }
446
447         for splay_each(timeout_t, timeout, &loop->timeouts) {
448                 splay_unlink_node(&loop->timeouts, splay_node);
449         }
450
451         for splay_each(signal_t, signal, &loop->signals) {
452                 splay_unlink_node(&loop->signals, splay_node);
453         }
454 }