]> git.meshlink.io Git - catta/blob - src/simple-watch.c
check for failure of catta_set_nonblock in simple-watch and iface-windows
[catta] / src / simple-watch.c
1 /***
2   This file is part of catta.
3
4   catta is free software; you can redistribute it and/or modify it
5   under the terms of the GNU Lesser General Public License as
6   published by the Free Software Foundation; either version 2.1 of the
7   License, or (at your option) any later version.
8
9   catta is distributed in the hope that it will be useful, but WITHOUT
10   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11   or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
12   Public License for more details.
13
14   You should have received a copy of the GNU Lesser General Public
15   License along with catta; if not, write to the Free Software
16   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
17   USA.
18 ***/
19
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23
24 #include <sys/poll.h>
25 #include <assert.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <unistd.h>
29 #include <fcntl.h>
30 #include <stdio.h>
31
32 #include <catta/llist.h>
33 #include <catta/malloc.h>
34 #include <catta/timeval.h>
35 #include <catta/simple-watch.h>
36 #include <catta/log.h>
37 #include "fdutil.h"                 // catta_set_nonblock
38 #include "internal.h"               // closesocket
39
40 struct CattaWatch {
41     CattaSimplePoll *simple_poll;
42     int dead;
43
44     int idx;
45     struct pollfd pollfd;
46
47     CattaWatchCallback callback;
48     void *userdata;
49
50     CATTA_LLIST_FIELDS(CattaWatch, watches);
51 };
52
53 struct CattaTimeout {
54     CattaSimplePoll *simple_poll;
55     int dead;
56
57     int enabled;
58     struct timeval expiry;
59
60     CattaTimeoutCallback callback;
61     void  *userdata;
62
63     CATTA_LLIST_FIELDS(CattaTimeout, timeouts);
64 };
65
66 struct CattaSimplePoll {
67     CattaPoll api;
68     CattaPollFunc poll_func;
69     void *poll_func_userdata;
70
71     struct pollfd* pollfds;
72     int n_pollfds, max_pollfds, rebuild_pollfds;
73
74     int watch_req_cleanup, timeout_req_cleanup;
75     int quit;
76     int events_valid;
77
78     int n_watches;
79     CATTA_LLIST_HEAD(CattaWatch, watches);
80     CATTA_LLIST_HEAD(CattaTimeout, timeouts);
81
82     int wakeup_pipe[2];
83     int wakeup_issued;
84
85     int prepared_timeout;
86
87     enum {
88         STATE_INIT,
89         STATE_PREPARING,
90         STATE_PREPARED,
91         STATE_RUNNING,
92         STATE_RAN,
93         STATE_DISPATCHING,
94         STATE_DISPATCHED,
95         STATE_QUIT,
96         STATE_FAILURE
97     } state;
98 };
99
100 void catta_simple_poll_wakeup(CattaSimplePoll *s) {
101     char c = 'W';
102     assert(s);
103
104     (void)writepipe(s->wakeup_pipe[1], &c, sizeof(c));
105     s->wakeup_issued = 1;
106 }
107
108 static void clear_wakeup(CattaSimplePoll *s) {
109     char c[10]; /* Read ten at a time */
110
111     if (!s->wakeup_issued)
112         return;
113
114     s->wakeup_issued = 0;
115
116     printf("(clear wake-up"); fflush(stdout); // XXX
117     for(;;) {
118         int n;
119         ioctl(s->wakeup_pipe[0], FIONREAD, &n);
120         printf(" %d", n); fflush(stdout); // XXX
121         if (readpipe(s->wakeup_pipe[0], c, sizeof(c)) != sizeof(c))
122             break;
123     }
124     printf(")\n"); // XXX
125 }
126
127 static CattaWatch* watch_new(const CattaPoll *api, int fd, CattaWatchEvent event, CattaWatchCallback callback, void *userdata) {
128     CattaWatch *w;
129     CattaSimplePoll *s;
130
131     assert(api);
132     assert(fd >= 0);
133     assert(callback);
134
135     s = api->userdata;
136     assert(s);
137
138     if (!(w = catta_new(CattaWatch, 1)))
139         return NULL;
140
141     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
142     catta_simple_poll_wakeup(s);
143
144     w->simple_poll = s;
145     w->dead = 0;
146
147     w->pollfd.fd = fd;
148     w->pollfd.events = event;
149     w->pollfd.revents = 0;
150
151     w->callback = callback;
152     w->userdata = userdata;
153
154     w->idx = -1;
155     s->rebuild_pollfds = 1;
156
157     CATTA_LLIST_PREPEND(CattaWatch, watches, s->watches, w);
158     s->n_watches++;
159
160     return w;
161 }
162
163 static void watch_update(CattaWatch *w, CattaWatchEvent events) {
164     assert(w);
165     assert(!w->dead);
166
167     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
168     catta_simple_poll_wakeup(w->simple_poll);
169
170     w->pollfd.events = events;
171
172     if (w->idx != -1) {
173         assert(w->simple_poll);
174         w->simple_poll->pollfds[w->idx] = w->pollfd;
175     } else
176         w->simple_poll->rebuild_pollfds = 1;
177 }
178
179 static CattaWatchEvent watch_get_events(CattaWatch *w) {
180     assert(w);
181     assert(!w->dead);
182
183     if (w->idx != -1 && w->simple_poll->events_valid)
184         return w->simple_poll->pollfds[w->idx].revents;
185
186     return 0;
187 }
188
189 static void remove_pollfd(CattaWatch *w) {
190     assert(w);
191
192     if (w->idx == -1)
193         return;
194
195     w->simple_poll->rebuild_pollfds = 1;
196 }
197
198 static void watch_free(CattaWatch *w) {
199     assert(w);
200
201     assert(!w->dead);
202
203     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
204     catta_simple_poll_wakeup(w->simple_poll);
205
206     remove_pollfd(w);
207
208     w->dead = 1;
209     w->simple_poll->n_watches --;
210     w->simple_poll->watch_req_cleanup = 1;
211 }
212
213 static void destroy_watch(CattaWatch *w) {
214     assert(w);
215
216     remove_pollfd(w);
217     CATTA_LLIST_REMOVE(CattaWatch, watches, w->simple_poll->watches, w);
218
219     if (!w->dead)
220         w->simple_poll->n_watches --;
221
222     catta_free(w);
223 }
224
225 static void cleanup_watches(CattaSimplePoll *s, int all) {
226     CattaWatch *w, *next;
227     assert(s);
228
229     for (w = s->watches; w; w = next) {
230         next = w->watches_next;
231
232         if (all || w->dead)
233             destroy_watch(w);
234     }
235
236     s->timeout_req_cleanup = 0;
237 }
238
239 static CattaTimeout* timeout_new(const CattaPoll *api, const struct timeval *tv, CattaTimeoutCallback callback, void *userdata) {
240     CattaTimeout *t;
241     CattaSimplePoll *s;
242
243     assert(api);
244     assert(callback);
245
246     s = api->userdata;
247     assert(s);
248
249     if (!(t = catta_new(CattaTimeout, 1)))
250         return NULL;
251
252     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
253     catta_simple_poll_wakeup(s);
254
255     t->simple_poll = s;
256     t->dead = 0;
257
258     if ((t->enabled = !!tv))
259         t->expiry = *tv;
260
261     t->callback = callback;
262     t->userdata = userdata;
263
264     CATTA_LLIST_PREPEND(CattaTimeout, timeouts, s->timeouts, t);
265     return t;
266 }
267
268 static void timeout_update(CattaTimeout *t, const struct timeval *tv) {
269     assert(t);
270     assert(!t->dead);
271
272     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
273     catta_simple_poll_wakeup(t->simple_poll);
274
275     if ((t->enabled = !!tv))
276         t->expiry = *tv;
277 }
278
279 static void timeout_free(CattaTimeout *t) {
280     assert(t);
281     assert(!t->dead);
282
283     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
284     catta_simple_poll_wakeup(t->simple_poll);
285
286     t->dead = 1;
287     t->simple_poll->timeout_req_cleanup = 1;
288 }
289
290
291 static void destroy_timeout(CattaTimeout *t) {
292     assert(t);
293
294     CATTA_LLIST_REMOVE(CattaTimeout, timeouts, t->simple_poll->timeouts, t);
295
296     catta_free(t);
297 }
298
299 static void cleanup_timeouts(CattaSimplePoll *s, int all) {
300     CattaTimeout *t, *next;
301     assert(s);
302
303     for (t = s->timeouts; t; t = next) {
304         next = t->timeouts_next;
305
306         if (all || t->dead)
307             destroy_timeout(t);
308     }
309
310     s->timeout_req_cleanup = 0;
311 }
312
313 CattaSimplePoll *catta_simple_poll_new(void) {
314     CattaSimplePoll *s;
315
316     if (!(s = catta_new(CattaSimplePoll, 1)))
317         return NULL;
318
319     winsock_init();  // on Windows, pipe uses sockets; no-op on other platforms
320     if (pipe(s->wakeup_pipe) < 0) {
321         catta_log_error(__FILE__": pipe() failed: %s", errnostrsocket());
322         goto fail;
323     }
324
325     if (catta_set_nonblock(s->wakeup_pipe[0]) < 0 ||
326         catta_set_nonblock(s->wakeup_pipe[1]) < 0)
327     {
328         catta_log_error(__FILE__": O_NONBLOCK failed: %s", errnostrsocket());
329         goto fail;
330     }
331
332     s->api.userdata = s;
333
334     s->api.watch_new = watch_new;
335     s->api.watch_free = watch_free;
336     s->api.watch_update = watch_update;
337     s->api.watch_get_events = watch_get_events;
338
339     s->api.timeout_new = timeout_new;
340     s->api.timeout_free = timeout_free;
341     s->api.timeout_update = timeout_update;
342
343     s->pollfds = NULL;
344     s->max_pollfds = s->n_pollfds = 0;
345     s->rebuild_pollfds = 1;
346     s->quit = 0;
347     s->n_watches = 0;
348     s->events_valid = 0;
349
350     s->watch_req_cleanup = 0;
351     s->timeout_req_cleanup = 0;
352
353     s->prepared_timeout = 0;
354
355     s->state = STATE_INIT;
356
357     s->wakeup_issued = 0;
358
359     catta_simple_poll_set_func(s, NULL, NULL);
360
361     CATTA_LLIST_HEAD_INIT(CattaWatch, s->watches);
362     CATTA_LLIST_HEAD_INIT(CattaTimeout, s->timeouts);
363
364     return s;
365
366 fail:
367     catta_free(s);
368     winsock_exit();
369     return NULL;
370 }
371
372 void catta_simple_poll_free(CattaSimplePoll *s) {
373     assert(s);
374
375     cleanup_timeouts(s, 1);
376     cleanup_watches(s, 1);
377     assert(s->n_watches == 0);
378
379     catta_free(s->pollfds);
380
381     if (s->wakeup_pipe[0] >= 0)
382         closepipe(s->wakeup_pipe[0]);
383
384     if (s->wakeup_pipe[1] >= 0)
385         closepipe(s->wakeup_pipe[1]);
386
387     catta_free(s);
388     winsock_exit();  // match the winsock_init in catta_simple_poll_new
389 }
390
391 static int rebuild(CattaSimplePoll *s) {
392     CattaWatch *w;
393     int idx;
394
395     assert(s);
396
397     if (s->n_watches+1 > s->max_pollfds) {
398         struct pollfd *n;
399
400         s->max_pollfds = s->n_watches + 10;
401
402         if (!(n = catta_realloc(s->pollfds, sizeof(struct pollfd) * s->max_pollfds)))
403             return -1;
404
405         s->pollfds = n;
406     }
407
408
409     s->pollfds[0].fd = s->wakeup_pipe[0];
410     s->pollfds[0].events = POLLIN;
411     s->pollfds[0].revents = 0;
412
413     idx = 1;
414
415     for (w = s->watches; w; w = w->watches_next) {
416
417         if(w->dead)
418             continue;
419
420         assert(w->idx < s->max_pollfds);
421         s->pollfds[w->idx = idx++] = w->pollfd;
422     }
423
424     s->n_pollfds = idx;
425     s->events_valid = 0;
426     s->rebuild_pollfds = 0;
427
428     return 0;
429 }
430
431 static CattaTimeout* find_next_timeout(CattaSimplePoll *s) {
432     CattaTimeout *t, *n = NULL;
433     assert(s);
434
435     for (t = s->timeouts; t; t = t->timeouts_next) {
436
437         if (t->dead || !t->enabled)
438             continue;
439
440         if (!n || catta_timeval_compare(&t->expiry, &n->expiry) < 0)
441             n = t;
442     }
443
444     return n;
445 }
446
447 static void timeout_callback(CattaTimeout *t) {
448     assert(t);
449     assert(!t->dead);
450     assert(t->enabled);
451
452     t->enabled = 0;
453     t->callback(t, t->userdata);
454 }
455
456 int catta_simple_poll_prepare(CattaSimplePoll *s, int timeout) {
457     CattaTimeout *next_timeout;
458
459     assert(s);
460     assert(s->state == STATE_INIT || s->state == STATE_DISPATCHED || s->state == STATE_FAILURE);
461     s->state = STATE_PREPARING;
462
463     /* Clear pending wakeup requests */
464     clear_wakeup(s);
465
466     /* Cleanup things first */
467     if (s->watch_req_cleanup)
468         cleanup_watches(s, 0);
469
470     if (s->timeout_req_cleanup)
471         cleanup_timeouts(s, 0);
472
473     /* Check whether a quit was requested */
474     if (s->quit) {
475         s->state = STATE_QUIT;
476         return 1;
477     }
478
479     /* Do we need to rebuild our array of pollfds? */
480     if (s->rebuild_pollfds)
481         if (rebuild(s) < 0) {
482             s->state = STATE_FAILURE;
483             return -1;
484         }
485
486     /* Calculate the wakeup time */
487     if ((next_timeout = find_next_timeout(s))) {
488         struct timeval now;
489         int t;
490         CattaUsec usec;
491
492         if (next_timeout->expiry.tv_sec == 0 &&
493             next_timeout->expiry.tv_usec == 0) {
494
495             /* Just a shortcut so that we don't need to call gettimeofday() */
496             timeout = 0;
497             goto finish;
498         }
499
500         gettimeofday(&now, NULL);
501         usec = catta_timeval_diff(&next_timeout->expiry, &now);
502
503         if (usec <= 0) {
504             /* Timeout elapsed */
505
506             timeout = 0;
507             goto finish;
508         }
509
510         /* Calculate sleep time. We add 1ms because otherwise we'd
511          * wake up too early most of the time */
512         t = (int) (usec / 1000) + 1;
513
514         if (timeout < 0 || timeout > t)
515             timeout = t;
516     }
517
518 finish:
519     s->prepared_timeout = timeout;
520     s->state = STATE_PREPARED;
521     return 0;
522 }
523
524 int catta_simple_poll_run(CattaSimplePoll *s) {
525     assert(s);
526     assert(s->state == STATE_PREPARED || s->state == STATE_FAILURE);
527
528     s->state = STATE_RUNNING;
529
530     for (;;) {
531         errno = 0;
532
533         // XXX debug
534         {
535             printf("(poll %d...", s->n_pollfds);
536             fflush(stdout);
537         }
538         if (s->poll_func(s->pollfds, s->n_pollfds, s->prepared_timeout, s->poll_func_userdata) < 0) {
539
540             if (errno == EINTR) {
541                 printf(" interrupted)\n"); // XXX
542                 continue;
543             }
544
545             s->state = STATE_FAILURE;
546             printf(" FAIL)\n"); // XXX
547             return -1;
548         }
549
550         break;
551     }
552
553     /* The poll events are now valid again */
554     s->events_valid = 1;
555
556     /* Update state */
557     s->state = STATE_RAN;
558     return 0;
559 }
560
561 int catta_simple_poll_dispatch(CattaSimplePoll *s) {
562     CattaTimeout *next_timeout;
563     CattaWatch *w;
564
565     assert(s);
566     assert(s->state == STATE_RAN);
567     s->state = STATE_DISPATCHING;
568
569     // XXX debug
570     {
571         int i, nready=0, nwatches=0;
572         for(w=s->watches; w; w=w->watches_next)
573             nwatches++;
574         for(i=0; i<s->n_pollfds; i++)
575             if(s->pollfds[i].revents)
576                 nready++;
577         if(nready > 0 && s->pollfds[i].revents)
578             printf(" wake-up,");
579         printf(" %d ready, %d watches)\n", nready, nwatches);
580     }
581
582     /* We execute only one callback in every iteration */
583
584     /* Check whether the wakeup time has been reached now */
585     if ((next_timeout = find_next_timeout(s))) {
586
587         if (next_timeout->expiry.tv_sec == 0 && next_timeout->expiry.tv_usec == 0) {
588
589             /* Just a shortcut so that we don't need to call gettimeofday() */
590             timeout_callback(next_timeout);
591             goto finish;
592         }
593
594         if (catta_age(&next_timeout->expiry) >= 0) {
595
596             /* Timeout elapsed */
597             timeout_callback(next_timeout);
598             goto finish;
599         }
600     }
601
602     /* Look for some kind of I/O event */
603     for (w = s->watches; w; w = w->watches_next) {
604
605         if (w->dead)
606             continue;
607
608         assert(w->idx >= 0);
609         assert(w->idx < s->n_pollfds);
610
611         if (s->pollfds[w->idx].revents != 0) {
612             w->callback(w, w->pollfd.fd, s->pollfds[w->idx].revents, w->userdata);
613             goto finish;
614         }
615     }
616
617 finish:
618
619     s->state = STATE_DISPATCHED;
620     return 0;
621 }
622
623 int catta_simple_poll_iterate(CattaSimplePoll *s, int timeout) {
624     int r;
625
626     if ((r = catta_simple_poll_prepare(s, timeout)) != 0)
627         return r;
628
629     if ((r = catta_simple_poll_run(s)) != 0)
630         return r;
631
632     if ((r = catta_simple_poll_dispatch(s)) != 0)
633         return r;
634
635     return 0;
636 }
637
638 void catta_simple_poll_quit(CattaSimplePoll *s) {
639     assert(s);
640
641     s->quit = 1;
642
643     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
644     catta_simple_poll_wakeup(s);
645 }
646
647 const CattaPoll* catta_simple_poll_get(CattaSimplePoll *s) {
648     assert(s);
649
650     return &s->api;
651 }
652
653 static int system_poll(struct pollfd *ufds, unsigned int nfds, int timeout, CATTA_GCC_UNUSED void *userdata) {
654     return poll(ufds, nfds, timeout);
655 }
656
657 void catta_simple_poll_set_func(CattaSimplePoll *s, CattaPollFunc func, void *userdata) {
658     assert(s);
659
660     s->poll_func = func ? func : system_poll;
661     s->poll_func_userdata = func ? userdata : NULL;
662
663     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
664     catta_simple_poll_wakeup(s);
665 }
666
667 int catta_simple_poll_loop(CattaSimplePoll *s) {
668     int r;
669
670     assert(s);
671
672     for (;;)
673         if ((r = catta_simple_poll_iterate(s, -1)) != 0)
674             if (r >= 0 || errno != EINTR)
675                 return r;
676 }