]> git.meshlink.io Git - catta/blob - subscribe.c
* add subscription feature - with reissuing
[catta] / subscribe.c
1 #include "subscribe.h"
2 #include "util.h"
3
4 static void elapse(flxTimeEvent *e, void *userdata) {
5     flxSubscription *s = userdata;
6     GTimeVal tv;
7     gchar *t;
8     
9     g_assert(s);
10
11     flx_server_post_query(s->server, s->interface, s->protocol, s->key);
12
13     if (s->n_query++ <= 8)
14         s->sec_delay *= 2;
15
16     g_message("%i. Continuous querying for %s", s->n_query, t = flx_key_to_string(s->key));
17     g_free(t);
18
19     
20     flx_elapse_time(&tv, s->sec_delay*1000, 0);
21     flx_time_event_queue_update(s->server->time_event_queue, s->time_event, &tv);
22 }
23
24 static void scan_cache_callback(flxInterfaceMonitor *m, flxInterface *i, gpointer userdata) {
25     flxSubscription *s = userdata;
26     flxCacheEntry *e;
27
28     g_assert(m);
29     g_assert(i);
30     g_assert(s);
31
32     for (e = flx_cache_lookup_key(i->cache, s->key); e; e = e->by_name_next)
33         s->callback(s, e->record, i->hardware->index, i->protocol, FLX_SUBSCRIPTION_NEW, s->userdata);
34 }
35
36 flxSubscription *flx_subscription_new(flxServer *server, flxKey *key, gint interface, guchar protocol, flxSubscriptionCallback callback, gpointer userdata) {
37     flxSubscription *s, *t;
38     GTimeVal tv;
39
40     g_assert(server);
41     g_assert(key);
42     g_assert(callback);
43
44     s = g_new(flxSubscription, 1);
45     s->server = server;
46     s->key = flx_key_ref(key);
47     s->interface = interface;
48     s->protocol = protocol;
49     s->callback = callback;
50     s->userdata = userdata;
51     s->n_query = 1;
52     s->sec_delay = 1;
53
54     flx_server_post_query(s->server, s->interface, s->protocol, s->key);
55     
56     flx_elapse_time(&tv, s->sec_delay*1000, 0);
57     s->time_event = flx_time_event_queue_add(server->time_event_queue, &tv, elapse, s);
58
59     FLX_LLIST_PREPEND(flxSubscription, subscriptions, server->subscriptions, s);
60
61     /* Add the new entry to the subscription hash table */
62     t = g_hash_table_lookup(server->subscription_hashtable, key);
63     FLX_LLIST_PREPEND(flxSubscription, by_key, t, s);
64     g_hash_table_replace(server->subscription_hashtable, key, t);
65
66     /* Scan the caches */
67     flx_interface_monitor_walk(s->server->monitor, s->interface, s->protocol, scan_cache_callback, s);
68     
69     return s;
70 }
71
72 void flx_subscription_free(flxSubscription *s) {
73     flxSubscription *t;
74     
75     g_assert(s);
76
77     FLX_LLIST_REMOVE(flxSubscription, subscriptions, s->server->subscriptions, s);
78
79     t = g_hash_table_lookup(s->server->subscription_hashtable, s->key);
80     FLX_LLIST_REMOVE(flxSubscription, by_key, t, s);
81     if (t)
82         g_hash_table_replace(s->server->subscription_hashtable, t->key, t);
83     else
84         g_hash_table_remove(s->server->subscription_hashtable, s->key);
85     
86     flx_time_event_queue_remove(s->server->time_event_queue, s->time_event);
87     flx_key_unref(s->key);
88
89     
90     g_free(s);
91 }
92
93 void flx_subscription_notify(flxServer *server, flxInterface *i, flxRecord *record, flxSubscriptionEvent event) {
94     flxSubscription *s;
95     
96     g_assert(server);
97     g_assert(record);
98
99     for (s = g_hash_table_lookup(server->subscription_hashtable, record->key); s; s = s->by_key_next)
100         if (flx_interface_match(i, s->interface, s->protocol))
101             s->callback(s, record, i->hardware->index, i->protocol, event, s->userdata);
102     
103 }