]> git.meshlink.io Git - catta/blob - src/simple-watch.c
2992d87108075144468a4110f469b90770515129
[catta] / src / simple-watch.c
1 /***
2   This file is part of catta.
3
4   catta is free software; you can redistribute it and/or modify it
5   under the terms of the GNU Lesser General Public License as
6   published by the Free Software Foundation; either version 2.1 of the
7   License, or (at your option) any later version.
8
9   catta is distributed in the hope that it will be useful, but WITHOUT
10   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11   or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
12   Public License for more details.
13
14   You should have received a copy of the GNU Lesser General Public
15   License along with catta; if not, write to the Free Software
16   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
17   USA.
18 ***/
19
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23
24 #include <sys/poll.h>
25 #include <assert.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <unistd.h>
29 #include <fcntl.h>
30 #include <stdio.h>
31
32 #include <catta/llist.h>
33 #include <catta/malloc.h>
34 #include <catta/timeval.h>
35 #include <catta/simple-watch.h>
36 #include "fdutil.h"                 // catta_set_nonblock
37 #include "internal.h"               // closesocket
38
39 struct CattaWatch {
40     CattaSimplePoll *simple_poll;
41     int dead;
42
43     int idx;
44     struct pollfd pollfd;
45
46     CattaWatchCallback callback;
47     void *userdata;
48
49     CATTA_LLIST_FIELDS(CattaWatch, watches);
50 };
51
52 struct CattaTimeout {
53     CattaSimplePoll *simple_poll;
54     int dead;
55
56     int enabled;
57     struct timeval expiry;
58
59     CattaTimeoutCallback callback;
60     void  *userdata;
61
62     CATTA_LLIST_FIELDS(CattaTimeout, timeouts);
63 };
64
65 struct CattaSimplePoll {
66     CattaPoll api;
67     CattaPollFunc poll_func;
68     void *poll_func_userdata;
69
70     struct pollfd* pollfds;
71     int n_pollfds, max_pollfds, rebuild_pollfds;
72
73     int watch_req_cleanup, timeout_req_cleanup;
74     int quit;
75     int events_valid;
76
77     int n_watches;
78     CATTA_LLIST_HEAD(CattaWatch, watches);
79     CATTA_LLIST_HEAD(CattaTimeout, timeouts);
80
81     int wakeup_pipe[2];
82     int wakeup_issued;
83
84     int prepared_timeout;
85
86     enum {
87         STATE_INIT,
88         STATE_PREPARING,
89         STATE_PREPARED,
90         STATE_RUNNING,
91         STATE_RAN,
92         STATE_DISPATCHING,
93         STATE_DISPATCHED,
94         STATE_QUIT,
95         STATE_FAILURE
96     } state;
97 };
98
99 void catta_simple_poll_wakeup(CattaSimplePoll *s) {
100     char c = 'W';
101     assert(s);
102
103     (void)writepipe(s->wakeup_pipe[1], &c, sizeof(c));
104     s->wakeup_issued = 1;
105 }
106
107 static void clear_wakeup(CattaSimplePoll *s) {
108     char c[10]; /* Read ten at a time */
109
110     if (!s->wakeup_issued)
111         return;
112
113     s->wakeup_issued = 0;
114
115     printf("(clear wake-up"); fflush(stdout); // XXX
116     for(;;) {
117         int n;
118         ioctl(s->wakeup_pipe[0], FIONREAD, &n);
119         printf(" %d", n); fflush(stdout); // XXX
120         if (readpipe(s->wakeup_pipe[0], c, sizeof(c)) != sizeof(c))
121             break;
122     }
123     printf(")\n"); // XXX
124 }
125
126 static CattaWatch* watch_new(const CattaPoll *api, int fd, CattaWatchEvent event, CattaWatchCallback callback, void *userdata) {
127     CattaWatch *w;
128     CattaSimplePoll *s;
129
130     assert(api);
131     assert(fd >= 0);
132     assert(callback);
133
134     s = api->userdata;
135     assert(s);
136
137     if (!(w = catta_new(CattaWatch, 1)))
138         return NULL;
139
140     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
141     catta_simple_poll_wakeup(s);
142
143     w->simple_poll = s;
144     w->dead = 0;
145
146     w->pollfd.fd = fd;
147     w->pollfd.events = event;
148     w->pollfd.revents = 0;
149
150     w->callback = callback;
151     w->userdata = userdata;
152
153     w->idx = -1;
154     s->rebuild_pollfds = 1;
155
156     CATTA_LLIST_PREPEND(CattaWatch, watches, s->watches, w);
157     s->n_watches++;
158
159     return w;
160 }
161
162 static void watch_update(CattaWatch *w, CattaWatchEvent events) {
163     assert(w);
164     assert(!w->dead);
165
166     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
167     catta_simple_poll_wakeup(w->simple_poll);
168
169     w->pollfd.events = events;
170
171     if (w->idx != -1) {
172         assert(w->simple_poll);
173         w->simple_poll->pollfds[w->idx] = w->pollfd;
174     } else
175         w->simple_poll->rebuild_pollfds = 1;
176 }
177
178 static CattaWatchEvent watch_get_events(CattaWatch *w) {
179     assert(w);
180     assert(!w->dead);
181
182     if (w->idx != -1 && w->simple_poll->events_valid)
183         return w->simple_poll->pollfds[w->idx].revents;
184
185     return 0;
186 }
187
188 static void remove_pollfd(CattaWatch *w) {
189     assert(w);
190
191     if (w->idx == -1)
192         return;
193
194     w->simple_poll->rebuild_pollfds = 1;
195 }
196
197 static void watch_free(CattaWatch *w) {
198     assert(w);
199
200     assert(!w->dead);
201
202     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
203     catta_simple_poll_wakeup(w->simple_poll);
204
205     remove_pollfd(w);
206
207     w->dead = 1;
208     w->simple_poll->n_watches --;
209     w->simple_poll->watch_req_cleanup = 1;
210 }
211
212 static void destroy_watch(CattaWatch *w) {
213     assert(w);
214
215     remove_pollfd(w);
216     CATTA_LLIST_REMOVE(CattaWatch, watches, w->simple_poll->watches, w);
217
218     if (!w->dead)
219         w->simple_poll->n_watches --;
220
221     catta_free(w);
222 }
223
224 static void cleanup_watches(CattaSimplePoll *s, int all) {
225     CattaWatch *w, *next;
226     assert(s);
227
228     for (w = s->watches; w; w = next) {
229         next = w->watches_next;
230
231         if (all || w->dead)
232             destroy_watch(w);
233     }
234
235     s->timeout_req_cleanup = 0;
236 }
237
238 static CattaTimeout* timeout_new(const CattaPoll *api, const struct timeval *tv, CattaTimeoutCallback callback, void *userdata) {
239     CattaTimeout *t;
240     CattaSimplePoll *s;
241
242     assert(api);
243     assert(callback);
244
245     s = api->userdata;
246     assert(s);
247
248     if (!(t = catta_new(CattaTimeout, 1)))
249         return NULL;
250
251     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
252     catta_simple_poll_wakeup(s);
253
254     t->simple_poll = s;
255     t->dead = 0;
256
257     if ((t->enabled = !!tv))
258         t->expiry = *tv;
259
260     t->callback = callback;
261     t->userdata = userdata;
262
263     CATTA_LLIST_PREPEND(CattaTimeout, timeouts, s->timeouts, t);
264     return t;
265 }
266
267 static void timeout_update(CattaTimeout *t, const struct timeval *tv) {
268     assert(t);
269     assert(!t->dead);
270
271     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
272     catta_simple_poll_wakeup(t->simple_poll);
273
274     if ((t->enabled = !!tv))
275         t->expiry = *tv;
276 }
277
278 static void timeout_free(CattaTimeout *t) {
279     assert(t);
280     assert(!t->dead);
281
282     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
283     catta_simple_poll_wakeup(t->simple_poll);
284
285     t->dead = 1;
286     t->simple_poll->timeout_req_cleanup = 1;
287 }
288
289
290 static void destroy_timeout(CattaTimeout *t) {
291     assert(t);
292
293     CATTA_LLIST_REMOVE(CattaTimeout, timeouts, t->simple_poll->timeouts, t);
294
295     catta_free(t);
296 }
297
298 static void cleanup_timeouts(CattaSimplePoll *s, int all) {
299     CattaTimeout *t, *next;
300     assert(s);
301
302     for (t = s->timeouts; t; t = next) {
303         next = t->timeouts_next;
304
305         if (all || t->dead)
306             destroy_timeout(t);
307     }
308
309     s->timeout_req_cleanup = 0;
310 }
311
312 CattaSimplePoll *catta_simple_poll_new(void) {
313     CattaSimplePoll *s;
314
315     if (!(s = catta_new(CattaSimplePoll, 1)))
316         return NULL;
317
318     winsock_init();  // on Windows, pipe uses sockets; no-op on other platforms
319     if (pipe(s->wakeup_pipe) < 0) {
320         catta_free(s);
321         winsock_exit();
322         return NULL;
323     }
324
325     catta_set_nonblock(s->wakeup_pipe[0]);
326     catta_set_nonblock(s->wakeup_pipe[1]);
327
328     s->api.userdata = s;
329
330     s->api.watch_new = watch_new;
331     s->api.watch_free = watch_free;
332     s->api.watch_update = watch_update;
333     s->api.watch_get_events = watch_get_events;
334
335     s->api.timeout_new = timeout_new;
336     s->api.timeout_free = timeout_free;
337     s->api.timeout_update = timeout_update;
338
339     s->pollfds = NULL;
340     s->max_pollfds = s->n_pollfds = 0;
341     s->rebuild_pollfds = 1;
342     s->quit = 0;
343     s->n_watches = 0;
344     s->events_valid = 0;
345
346     s->watch_req_cleanup = 0;
347     s->timeout_req_cleanup = 0;
348
349     s->prepared_timeout = 0;
350
351     s->state = STATE_INIT;
352
353     s->wakeup_issued = 0;
354
355     catta_simple_poll_set_func(s, NULL, NULL);
356
357     CATTA_LLIST_HEAD_INIT(CattaWatch, s->watches);
358     CATTA_LLIST_HEAD_INIT(CattaTimeout, s->timeouts);
359
360     return s;
361 }
362
363 void catta_simple_poll_free(CattaSimplePoll *s) {
364     assert(s);
365
366     cleanup_timeouts(s, 1);
367     cleanup_watches(s, 1);
368     assert(s->n_watches == 0);
369
370     catta_free(s->pollfds);
371
372     if (s->wakeup_pipe[0] >= 0)
373         closepipe(s->wakeup_pipe[0]);
374
375     if (s->wakeup_pipe[1] >= 0)
376         closepipe(s->wakeup_pipe[1]);
377
378     catta_free(s);
379     winsock_exit();  // match the winsock_init in catta_simple_poll_new
380 }
381
382 static int rebuild(CattaSimplePoll *s) {
383     CattaWatch *w;
384     int idx;
385
386     assert(s);
387
388     if (s->n_watches+1 > s->max_pollfds) {
389         struct pollfd *n;
390
391         s->max_pollfds = s->n_watches + 10;
392
393         if (!(n = catta_realloc(s->pollfds, sizeof(struct pollfd) * s->max_pollfds)))
394             return -1;
395
396         s->pollfds = n;
397     }
398
399
400     s->pollfds[0].fd = s->wakeup_pipe[0];
401     s->pollfds[0].events = POLLIN;
402     s->pollfds[0].revents = 0;
403
404     idx = 1;
405
406     for (w = s->watches; w; w = w->watches_next) {
407
408         if(w->dead)
409             continue;
410
411         assert(w->idx < s->max_pollfds);
412         s->pollfds[w->idx = idx++] = w->pollfd;
413     }
414
415     s->n_pollfds = idx;
416     s->events_valid = 0;
417     s->rebuild_pollfds = 0;
418
419     return 0;
420 }
421
422 static CattaTimeout* find_next_timeout(CattaSimplePoll *s) {
423     CattaTimeout *t, *n = NULL;
424     assert(s);
425
426     for (t = s->timeouts; t; t = t->timeouts_next) {
427
428         if (t->dead || !t->enabled)
429             continue;
430
431         if (!n || catta_timeval_compare(&t->expiry, &n->expiry) < 0)
432             n = t;
433     }
434
435     return n;
436 }
437
438 static void timeout_callback(CattaTimeout *t) {
439     assert(t);
440     assert(!t->dead);
441     assert(t->enabled);
442
443     t->enabled = 0;
444     t->callback(t, t->userdata);
445 }
446
447 int catta_simple_poll_prepare(CattaSimplePoll *s, int timeout) {
448     CattaTimeout *next_timeout;
449
450     assert(s);
451     assert(s->state == STATE_INIT || s->state == STATE_DISPATCHED || s->state == STATE_FAILURE);
452     s->state = STATE_PREPARING;
453
454     /* Clear pending wakeup requests */
455     clear_wakeup(s);
456
457     /* Cleanup things first */
458     if (s->watch_req_cleanup)
459         cleanup_watches(s, 0);
460
461     if (s->timeout_req_cleanup)
462         cleanup_timeouts(s, 0);
463
464     /* Check whether a quit was requested */
465     if (s->quit) {
466         s->state = STATE_QUIT;
467         return 1;
468     }
469
470     /* Do we need to rebuild our array of pollfds? */
471     if (s->rebuild_pollfds)
472         if (rebuild(s) < 0) {
473             s->state = STATE_FAILURE;
474             return -1;
475         }
476
477     /* Calculate the wakeup time */
478     if ((next_timeout = find_next_timeout(s))) {
479         struct timeval now;
480         int t;
481         CattaUsec usec;
482
483         if (next_timeout->expiry.tv_sec == 0 &&
484             next_timeout->expiry.tv_usec == 0) {
485
486             /* Just a shortcut so that we don't need to call gettimeofday() */
487             timeout = 0;
488             goto finish;
489         }
490
491         gettimeofday(&now, NULL);
492         usec = catta_timeval_diff(&next_timeout->expiry, &now);
493
494         if (usec <= 0) {
495             /* Timeout elapsed */
496
497             timeout = 0;
498             goto finish;
499         }
500
501         /* Calculate sleep time. We add 1ms because otherwise we'd
502          * wake up too early most of the time */
503         t = (int) (usec / 1000) + 1;
504
505         if (timeout < 0 || timeout > t)
506             timeout = t;
507     }
508
509 finish:
510     s->prepared_timeout = timeout;
511     s->state = STATE_PREPARED;
512     return 0;
513 }
514
515 int catta_simple_poll_run(CattaSimplePoll *s) {
516     assert(s);
517     assert(s->state == STATE_PREPARED || s->state == STATE_FAILURE);
518
519     s->state = STATE_RUNNING;
520
521     for (;;) {
522         errno = 0;
523
524         // XXX debug
525         {
526             printf("(poll %d...", s->n_pollfds);
527             fflush(stdout);
528         }
529         if (s->poll_func(s->pollfds, s->n_pollfds, s->prepared_timeout, s->poll_func_userdata) < 0) {
530
531             if (errno == EINTR) {
532                 printf(" interrupted)\n"); // XXX
533                 continue;
534             }
535
536             s->state = STATE_FAILURE;
537             printf(" FAIL)\n"); // XXX
538             return -1;
539         }
540
541         break;
542     }
543
544     /* The poll events are now valid again */
545     s->events_valid = 1;
546
547     /* Update state */
548     s->state = STATE_RAN;
549     return 0;
550 }
551
552 int catta_simple_poll_dispatch(CattaSimplePoll *s) {
553     CattaTimeout *next_timeout;
554     CattaWatch *w;
555
556     assert(s);
557     assert(s->state == STATE_RAN);
558     s->state = STATE_DISPATCHING;
559
560     // XXX debug
561     {
562         int i, nready=0, nwatches=0;
563         for(w=s->watches; w; w=w->watches_next)
564             nwatches++;
565         for(i=0; i<s->n_pollfds; i++)
566             if(s->pollfds[i].revents)
567                 nready++;
568         if(nready > 0 && s->pollfds[i].revents)
569             printf(" wake-up,");
570         printf(" %d ready, %d watches)\n", nready, nwatches);
571     }
572
573     /* We execute only one callback in every iteration */
574
575     /* Check whether the wakeup time has been reached now */
576     if ((next_timeout = find_next_timeout(s))) {
577
578         if (next_timeout->expiry.tv_sec == 0 && next_timeout->expiry.tv_usec == 0) {
579
580             /* Just a shortcut so that we don't need to call gettimeofday() */
581             timeout_callback(next_timeout);
582             goto finish;
583         }
584
585         if (catta_age(&next_timeout->expiry) >= 0) {
586
587             /* Timeout elapsed */
588             timeout_callback(next_timeout);
589             goto finish;
590         }
591     }
592
593     /* Look for some kind of I/O event */
594     for (w = s->watches; w; w = w->watches_next) {
595
596         if (w->dead)
597             continue;
598
599         assert(w->idx >= 0);
600         assert(w->idx < s->n_pollfds);
601
602         if (s->pollfds[w->idx].revents != 0) {
603             w->callback(w, w->pollfd.fd, s->pollfds[w->idx].revents, w->userdata);
604             goto finish;
605         }
606     }
607
608 finish:
609
610     s->state = STATE_DISPATCHED;
611     return 0;
612 }
613
614 int catta_simple_poll_iterate(CattaSimplePoll *s, int timeout) {
615     int r;
616
617     if ((r = catta_simple_poll_prepare(s, timeout)) != 0)
618         return r;
619
620     if ((r = catta_simple_poll_run(s)) != 0)
621         return r;
622
623     if ((r = catta_simple_poll_dispatch(s)) != 0)
624         return r;
625
626     return 0;
627 }
628
629 void catta_simple_poll_quit(CattaSimplePoll *s) {
630     assert(s);
631
632     s->quit = 1;
633
634     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
635     catta_simple_poll_wakeup(s);
636 }
637
638 const CattaPoll* catta_simple_poll_get(CattaSimplePoll *s) {
639     assert(s);
640
641     return &s->api;
642 }
643
644 static int system_poll(struct pollfd *ufds, unsigned int nfds, int timeout, CATTA_GCC_UNUSED void *userdata) {
645     return poll(ufds, nfds, timeout);
646 }
647
648 void catta_simple_poll_set_func(CattaSimplePoll *s, CattaPollFunc func, void *userdata) {
649     assert(s);
650
651     s->poll_func = func ? func : system_poll;
652     s->poll_func_userdata = func ? userdata : NULL;
653
654     /* If there is a background thread running the poll() for us, tell it to exit the poll() */
655     catta_simple_poll_wakeup(s);
656 }
657
658 int catta_simple_poll_loop(CattaSimplePoll *s) {
659     int r;
660
661     assert(s);
662
663     for (;;)
664         if ((r = catta_simple_poll_iterate(s, -1)) != 0)
665             if (r >= 0 || errno != EINTR)
666                 return r;
667 }