X-Git-Url: http://git.meshlink.io/?a=blobdiff_plain;f=subscribe.c;h=abd16206ba086bbddff4f7ee6b60184a103b4a76;hb=b8c78f5c0da93d92aa28d3ef3757e78d03141f41;hp=3abe464ea911a1a1dd70c9df64514efaeab9e586;hpb=8e7f83aa5b6d910e80c56b31f4eb79b02e7ca67b;p=catta diff --git a/subscribe.c b/subscribe.c index 3abe464..abd1620 100644 --- a/subscribe.c +++ b/subscribe.c @@ -15,22 +15,44 @@ static void elapse(flxTimeEvent *e, void *userdata) { g_message("%i. Continuous querying for %s", s->n_query, t = flx_key_to_string(s->key)); g_free(t); - flx_elapse_time(&tv, s->sec_delay*1000, 0); flx_time_event_queue_update(s->server->time_event_queue, s->time_event, &tv); } -static void scan_cache_callback(flxInterfaceMonitor *m, flxInterface *i, gpointer userdata) { +struct cbdata { + flxSubscription *subscription; + flxInterface *interface; +}; + +static gpointer scan_cache_callback(flxCache *c, flxKey *pattern, flxCacheEntry *e, gpointer userdata) { + struct cbdata *cbdata = userdata; + + g_assert(c); + g_assert(pattern); + g_assert(e); + g_assert(cbdata); + + cbdata->subscription->callback( + cbdata->subscription, + e->record, + cbdata->interface->hardware->index, + cbdata->interface->protocol, + FLX_SUBSCRIPTION_NEW, + cbdata->subscription->userdata); + + return NULL; +} + +static void scan_interface_callback(flxInterfaceMonitor *m, flxInterface *i, gpointer userdata) { flxSubscription *s = userdata; - flxCacheEntry *e; + struct cbdata cbdata = { s, i }; g_assert(m); g_assert(i); g_assert(s); - for (e = flx_cache_lookup_key(i->cache, s->key); e; e = e->by_name_next) - s->callback(s, e->record, i->hardware->index, i->protocol, FLX_SUBSCRIPTION_NEW, s->userdata); + flx_cache_walk(i->cache, s->key, scan_cache_callback, &cbdata); } flxSubscription *flx_subscription_new(flxServer *server, flxKey *key, gint interface, guchar protocol, flxSubscriptionCallback callback, gpointer userdata) { @@ -41,6 +63,8 @@ flxSubscription *flx_subscription_new(flxServer *server, flxKey *key, gint inter g_assert(key); g_assert(callback); + g_assert(!flx_key_is_pattern(key)); + s = g_new(flxSubscription, 1); s->server = server; s->key = flx_key_ref(key); @@ -64,7 +88,7 @@ flxSubscription *flx_subscription_new(flxServer *server, flxKey *key, gint inter g_hash_table_replace(server->subscription_hashtable, key, t); /* Scan the caches */ - flx_interface_monitor_walk(s->server->monitor, s->interface, s->protocol, scan_cache_callback, s); + flx_interface_monitor_walk(s->server->monitor, s->interface, s->protocol, scan_interface_callback, s); return s; } @@ -92,6 +116,7 @@ void flx_subscription_free(flxSubscription *s) { void flx_subscription_notify(flxServer *server, flxInterface *i, flxRecord *record, flxSubscriptionEvent event) { flxSubscription *s; + flxKey *pattern; g_assert(server); g_assert(record); @@ -99,5 +124,11 @@ void flx_subscription_notify(flxServer *server, flxInterface *i, flxRecord *reco for (s = g_hash_table_lookup(server->subscription_hashtable, record->key); s; s = s->by_key_next) if (flx_interface_match(i, s->interface, s->protocol)) s->callback(s, record, i->hardware->index, i->protocol, event, s->userdata); - +} + +gboolean flx_is_subscribed(flxServer *server, flxKey *k) { + g_assert(server); + g_assert(k); + + return !!g_hash_table_lookup(server->subscription_hashtable, k); }