]> git.meshlink.io Git - catta/blob - src/simple-watch.c
remove simple-watch debug output
[catta] / src / simple-watch.c
1 /***
2   This file is part of catta.
3
4   catta is free software; you can redistribute it and/or modify it
5   under the terms of the GNU Lesser General Public License as
6   published by the Free Software Foundation; either version 2.1 of the
7   License, or (at your option) any later version.
8
9   catta is distributed in the hope that it will be useful, but WITHOUT
10   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11   or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
12   Public License for more details.
13
14   You should have received a copy of the GNU Lesser General Public
15   License along with catta; if not, write to the Free Software
16   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
17   USA.
18 ***/
19
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23
24 #include <sys/poll.h>
25 #include <assert.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <unistd.h>
29 #include <fcntl.h>
30 #include <stdio.h>
31
32 #include <catta/llist.h>
33 #include <catta/malloc.h>
34 #include <catta/timeval.h>
35 #include <catta/simple-watch.h>
36 #include <catta/log.h>
37 #include "fdutil.h"                 // catta_set_nonblock
38 #include "internal.h"               // closesocket
39
40 struct CattaWatch {
41     CattaSimplePoll *simple_poll;
42     int dead;
43
44     int idx;
45     struct pollfd pollfd;
46
47     CattaWatchCallback callback;
48     void *userdata;
49
50     CATTA_LLIST_FIELDS(CattaWatch, watches);
51 };
52
53 struct CattaTimeout {
54     CattaSimplePoll *simple_poll;
55     int dead;
56
57     int enabled;
58     struct timeval expiry;
59
60     CattaTimeoutCallback callback;
61     void  *userdata;
62
63     CATTA_LLIST_FIELDS(CattaTimeout, timeouts);
64 };
65
66 struct CattaSimplePoll {
67     CattaPoll api;
68     CattaPollFunc poll_func;
69     void *poll_func_userdata;
70
71     struct pollfd* pollfds;
72     int n_pollfds, max_pollfds, rebuild_pollfds;
73
74     int watch_req_cleanup, timeout_req_cleanup;
75     int quit;
76     int events_valid;
77
78     int n_watches;
79     CATTA_LLIST_HEAD(CattaWatch, watches);
80     CATTA_LLIST_HEAD(CattaTimeout, timeouts);
81
82     int wakeup_pipe[2];
83     int wakeup_issued;
84
85     int prepared_timeout;
86
87     enum {
88         STATE_INIT,
89         STATE_PREPARING,
90         STATE_PREPARED,
91         STATE_RUNNING,
92         STATE_RAN,
93         STATE_DISPATCHING,
94         STATE_DISPATCHED,
95         STATE_QUIT,
96         STATE_FAILURE
97     } state;
98 };
99
100 void catta_simple_poll_wakeup(CattaSimplePoll *s) {
101     char c = 'W';
102     assert(s);
103
104     (void)writepipe(s->wakeup_pipe[1], &c, sizeof(c));
105     s->wakeup_issued = 1;
106 }
107
108 static void clear_wakeup(CattaSimplePoll *s) {
109     char c[10]; /* Read ten at a time */
110
111     if (!s->wakeup_issued)
112         return;
113
114     s->wakeup_issued = 0;
115
116     for(;;) {
117         if (readpipe(s->wakeup_pipe[0], c, sizeof(c)) != sizeof(c))
118             break;
119     }
120 }
121
122 static CattaWatch* watch_new(const CattaPoll *api, int fd, CattaWatchEvent event, CattaWatchCallback callback, void *userdata) {
123     CattaWatch *w;
124     CattaSimplePoll *s;
125
126     assert(api);
127     assert(fd >= 0);
128     assert(callback);
129
130     s = api->userdata;
131     assert(s);
132
133     if (!(w = catta_new(CattaWatch, 1)))
134         return NULL;
135
136     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
137     catta_simple_poll_wakeup(s);
138
139     w->simple_poll = s;
140     w->dead = 0;
141
142     w->pollfd.fd = fd;
143     w->pollfd.events = event;
144     w->pollfd.revents = 0;
145
146     w->callback = callback;
147     w->userdata = userdata;
148
149     w->idx = -1;
150     s->rebuild_pollfds = 1;
151
152     CATTA_LLIST_PREPEND(CattaWatch, watches, s->watches, w);
153     s->n_watches++;
154
155     return w;
156 }
157
158 static void watch_update(CattaWatch *w, CattaWatchEvent events) {
159     assert(w);
160     assert(!w->dead);
161
162     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
163     catta_simple_poll_wakeup(w->simple_poll);
164
165     w->pollfd.events = events;
166
167     if (w->idx != -1) {
168         assert(w->simple_poll);
169         w->simple_poll->pollfds[w->idx] = w->pollfd;
170     } else
171         w->simple_poll->rebuild_pollfds = 1;
172 }
173
174 static CattaWatchEvent watch_get_events(CattaWatch *w) {
175     assert(w);
176     assert(!w->dead);
177
178     if (w->idx != -1 && w->simple_poll->events_valid)
179         return w->simple_poll->pollfds[w->idx].revents;
180
181     return 0;
182 }
183
184 static void remove_pollfd(CattaWatch *w) {
185     assert(w);
186
187     if (w->idx == -1)
188         return;
189
190     w->simple_poll->rebuild_pollfds = 1;
191 }
192
193 static void watch_free(CattaWatch *w) {
194     assert(w);
195
196     assert(!w->dead);
197
198     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
199     catta_simple_poll_wakeup(w->simple_poll);
200
201     remove_pollfd(w);
202
203     w->dead = 1;
204     w->simple_poll->n_watches --;
205     w->simple_poll->watch_req_cleanup = 1;
206 }
207
208 static void destroy_watch(CattaWatch *w) {
209     assert(w);
210
211     remove_pollfd(w);
212     CATTA_LLIST_REMOVE(CattaWatch, watches, w->simple_poll->watches, w);
213
214     if (!w->dead)
215         w->simple_poll->n_watches --;
216
217     catta_free(w);
218 }
219
220 static void cleanup_watches(CattaSimplePoll *s, int all) {
221     CattaWatch *w, *next;
222     assert(s);
223
224     for (w = s->watches; w; w = next) {
225         next = w->watches_next;
226
227         if (all || w->dead)
228             destroy_watch(w);
229     }
230
231     s->timeout_req_cleanup = 0;
232 }
233
234 static CattaTimeout* timeout_new(const CattaPoll *api, const struct timeval *tv, CattaTimeoutCallback callback, void *userdata) {
235     CattaTimeout *t;
236     CattaSimplePoll *s;
237
238     assert(api);
239     assert(callback);
240
241     s = api->userdata;
242     assert(s);
243
244     if (!(t = catta_new(CattaTimeout, 1)))
245         return NULL;
246
247     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
248     catta_simple_poll_wakeup(s);
249
250     t->simple_poll = s;
251     t->dead = 0;
252
253     if ((t->enabled = !!tv))
254         t->expiry = *tv;
255
256     t->callback = callback;
257     t->userdata = userdata;
258
259     CATTA_LLIST_PREPEND(CattaTimeout, timeouts, s->timeouts, t);
260     return t;
261 }
262
263 static void timeout_update(CattaTimeout *t, const struct timeval *tv) {
264     assert(t);
265     assert(!t->dead);
266
267     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
268     catta_simple_poll_wakeup(t->simple_poll);
269
270     if ((t->enabled = !!tv))
271         t->expiry = *tv;
272 }
273
274 static void timeout_free(CattaTimeout *t) {
275     assert(t);
276     assert(!t->dead);
277
278     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
279     catta_simple_poll_wakeup(t->simple_poll);
280
281     t->dead = 1;
282     t->simple_poll->timeout_req_cleanup = 1;
283 }
284
285
286 static void destroy_timeout(CattaTimeout *t) {
287     assert(t);
288
289     CATTA_LLIST_REMOVE(CattaTimeout, timeouts, t->simple_poll->timeouts, t);
290
291     catta_free(t);
292 }
293
294 static void cleanup_timeouts(CattaSimplePoll *s, int all) {
295     CattaTimeout *t, *next;
296     assert(s);
297
298     for (t = s->timeouts; t; t = next) {
299         next = t->timeouts_next;
300
301         if (all || t->dead)
302             destroy_timeout(t);
303     }
304
305     s->timeout_req_cleanup = 0;
306 }
307
308 CattaSimplePoll *catta_simple_poll_new(void) {
309     CattaSimplePoll *s;
310
311     if (!(s = catta_new(CattaSimplePoll, 1)))
312         return NULL;
313
314     winsock_init();  // on Windows, pipe uses sockets; no-op on other platforms
315     if (pipe(s->wakeup_pipe) < 0) {
316         catta_log_error(__FILE__": pipe() failed: %s", errnostrsocket());
317         goto fail;
318     }
319
320     if (catta_set_nonblock(s->wakeup_pipe[0]) < 0 ||
321         catta_set_nonblock(s->wakeup_pipe[1]) < 0)
322     {
323         catta_log_error(__FILE__": O_NONBLOCK failed: %s", errnostrsocket());
324         goto fail;
325     }
326
327     s->api.userdata = s;
328
329     s->api.watch_new = watch_new;
330     s->api.watch_free = watch_free;
331     s->api.watch_update = watch_update;
332     s->api.watch_get_events = watch_get_events;
333
334     s->api.timeout_new = timeout_new;
335     s->api.timeout_free = timeout_free;
336     s->api.timeout_update = timeout_update;
337
338     s->pollfds = NULL;
339     s->max_pollfds = s->n_pollfds = 0;
340     s->rebuild_pollfds = 1;
341     s->quit = 0;
342     s->n_watches = 0;
343     s->events_valid = 0;
344
345     s->watch_req_cleanup = 0;
346     s->timeout_req_cleanup = 0;
347
348     s->prepared_timeout = 0;
349
350     s->state = STATE_INIT;
351
352     s->wakeup_issued = 0;
353
354     catta_simple_poll_set_func(s, NULL, NULL);
355
356     CATTA_LLIST_HEAD_INIT(CattaWatch, s->watches);
357     CATTA_LLIST_HEAD_INIT(CattaTimeout, s->timeouts);
358
359     return s;
360
361 fail:
362     catta_free(s);
363     winsock_exit();
364     return NULL;
365 }
366
367 void catta_simple_poll_free(CattaSimplePoll *s) {
368     assert(s);
369
370     cleanup_timeouts(s, 1);
371     cleanup_watches(s, 1);
372     assert(s->n_watches == 0);
373
374     catta_free(s->pollfds);
375
376     if (s->wakeup_pipe[0] >= 0)
377         closepipe(s->wakeup_pipe[0]);
378
379     if (s->wakeup_pipe[1] >= 0)
380         closepipe(s->wakeup_pipe[1]);
381
382     catta_free(s);
383     winsock_exit();  // match the winsock_init in catta_simple_poll_new
384 }
385
386 static int rebuild(CattaSimplePoll *s) {
387     CattaWatch *w;
388     int idx;
389
390     assert(s);
391
392     if (s->n_watches+1 > s->max_pollfds) {
393         struct pollfd *n;
394
395         s->max_pollfds = s->n_watches + 10;
396
397         if (!(n = catta_realloc(s->pollfds, sizeof(struct pollfd) * s->max_pollfds)))
398             return -1;
399
400         s->pollfds = n;
401     }
402
403
404     s->pollfds[0].fd = s->wakeup_pipe[0];
405     s->pollfds[0].events = POLLIN;
406     s->pollfds[0].revents = 0;
407
408     idx = 1;
409
410     for (w = s->watches; w; w = w->watches_next) {
411
412         if(w->dead)
413             continue;
414
415         assert(w->idx < s->max_pollfds);
416         s->pollfds[w->idx = idx++] = w->pollfd;
417     }
418
419     s->n_pollfds = idx;
420     s->events_valid = 0;
421     s->rebuild_pollfds = 0;
422
423     return 0;
424 }
425
426 static CattaTimeout* find_next_timeout(CattaSimplePoll *s) {
427     CattaTimeout *t, *n = NULL;
428     assert(s);
429
430     for (t = s->timeouts; t; t = t->timeouts_next) {
431
432         if (t->dead || !t->enabled)
433             continue;
434
435         if (!n || catta_timeval_compare(&t->expiry, &n->expiry) < 0)
436             n = t;
437     }
438
439     return n;
440 }
441
442 static void timeout_callback(CattaTimeout *t) {
443     assert(t);
444     assert(!t->dead);
445     assert(t->enabled);
446
447     t->enabled = 0;
448     t->callback(t, t->userdata);
449 }
450
451 int catta_simple_poll_prepare(CattaSimplePoll *s, int timeout) {
452     CattaTimeout *next_timeout;
453
454     assert(s);
455     assert(s->state == STATE_INIT || s->state == STATE_DISPATCHED || s->state == STATE_FAILURE);
456     s->state = STATE_PREPARING;
457
458     /* Clear pending wakeup requests */
459     clear_wakeup(s);
460
461     /* Cleanup things first */
462     if (s->watch_req_cleanup)
463         cleanup_watches(s, 0);
464
465     if (s->timeout_req_cleanup)
466         cleanup_timeouts(s, 0);
467
468     /* Check whether a quit was requested */
469     if (s->quit) {
470         s->state = STATE_QUIT;
471         return 1;
472     }
473
474     /* Do we need to rebuild our array of pollfds? */
475     if (s->rebuild_pollfds)
476         if (rebuild(s) < 0) {
477             s->state = STATE_FAILURE;
478             return -1;
479         }
480
481     /* Calculate the wakeup time */
482     if ((next_timeout = find_next_timeout(s))) {
483         struct timeval now;
484         int t;
485         CattaUsec usec;
486
487         if (next_timeout->expiry.tv_sec == 0 &&
488             next_timeout->expiry.tv_usec == 0) {
489
490             /* Just a shortcut so that we don't need to call gettimeofday() */
491             timeout = 0;
492             goto finish;
493         }
494
495         gettimeofday(&now, NULL);
496         usec = catta_timeval_diff(&next_timeout->expiry, &now);
497
498         if (usec <= 0) {
499             /* Timeout elapsed */
500
501             timeout = 0;
502             goto finish;
503         }
504
505         /* Calculate sleep time. We add 1ms because otherwise we'd
506          * wake up too early most of the time */
507         t = (int) (usec / 1000) + 1;
508
509         if (timeout < 0 || timeout > t)
510             timeout = t;
511     }
512
513 finish:
514     s->prepared_timeout = timeout;
515     s->state = STATE_PREPARED;
516     return 0;
517 }
518
519 int catta_simple_poll_run(CattaSimplePoll *s) {
520     assert(s);
521     assert(s->state == STATE_PREPARED || s->state == STATE_FAILURE);
522
523     s->state = STATE_RUNNING;
524
525     for (;;) {
526         errno = 0;
527
528         if (s->poll_func(s->pollfds, s->n_pollfds, s->prepared_timeout, s->poll_func_userdata) < 0) {
529
530             if (errno == EINTR)
531                 continue;
532
533             s->state = STATE_FAILURE;
534             return -1;
535         }
536
537         break;
538     }
539
540     /* The poll events are now valid again */
541     s->events_valid = 1;
542
543     /* Update state */
544     s->state = STATE_RAN;
545     return 0;
546 }
547
548 int catta_simple_poll_dispatch(CattaSimplePoll *s) {
549     CattaTimeout *next_timeout;
550     CattaWatch *w;
551
552     assert(s);
553     assert(s->state == STATE_RAN);
554     s->state = STATE_DISPATCHING;
555
556     /* We execute only one callback in every iteration */
557
558     /* Check whether the wakeup time has been reached now */
559     if ((next_timeout = find_next_timeout(s))) {
560
561         if (next_timeout->expiry.tv_sec == 0 && next_timeout->expiry.tv_usec == 0) {
562
563             /* Just a shortcut so that we don't need to call gettimeofday() */
564             timeout_callback(next_timeout);
565             goto finish;
566         }
567
568         if (catta_age(&next_timeout->expiry) >= 0) {
569
570             /* Timeout elapsed */
571             timeout_callback(next_timeout);
572             goto finish;
573         }
574     }
575
576     /* Look for some kind of I/O event */
577     for (w = s->watches; w; w = w->watches_next) {
578
579         if (w->dead)
580             continue;
581
582         assert(w->idx >= 0);
583         assert(w->idx < s->n_pollfds);
584
585         if (s->pollfds[w->idx].revents != 0) {
586             w->callback(w, w->pollfd.fd, s->pollfds[w->idx].revents, w->userdata);
587             goto finish;
588         }
589     }
590
591 finish:
592
593     s->state = STATE_DISPATCHED;
594     return 0;
595 }
596
597 int catta_simple_poll_iterate(CattaSimplePoll *s, int timeout) {
598     int r;
599
600     if ((r = catta_simple_poll_prepare(s, timeout)) != 0)
601         return r;
602
603     if ((r = catta_simple_poll_run(s)) != 0)
604         return r;
605
606     if ((r = catta_simple_poll_dispatch(s)) != 0)
607         return r;
608
609     return 0;
610 }
611
612 void catta_simple_poll_quit(CattaSimplePoll *s) {
613     assert(s);
614
615     s->quit = 1;
616
617     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
618     catta_simple_poll_wakeup(s);
619 }
620
621 const CattaPoll* catta_simple_poll_get(CattaSimplePoll *s) {
622     assert(s);
623
624     return &s->api;
625 }
626
627 static int system_poll(struct pollfd *ufds, unsigned int nfds, int timeout, CATTA_GCC_UNUSED void *userdata) {
628     return poll(ufds, nfds, timeout);
629 }
630
631 void catta_simple_poll_set_func(CattaSimplePoll *s, CattaPollFunc func, void *userdata) {
632     assert(s);
633
634     s->poll_func = func ? func : system_poll;
635     s->poll_func_userdata = func ? userdata : NULL;
636
637     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
638     catta_simple_poll_wakeup(s);
639 }
640
641 int catta_simple_poll_loop(CattaSimplePoll *s) {
642     int r;
643
644     assert(s);
645
646     for (;;)
647         if ((r = catta_simple_poll_iterate(s, -1)) != 0)
648             if (r >= 0 || errno != EINTR)
649                 return r;
650 }