From: Lennart Poettering Date: Sat, 26 Mar 2005 13:58:11 +0000 (+0000) Subject: * add subscription feature - with reissuing X-Git-Url: https://git.meshlink.io/?a=commitdiff_plain;h=8e7f83aa5b6d910e80c56b31f4eb79b02e7ca67b;p=catta * add subscription feature - with reissuing * interpret goodbye responses git-svn-id: file:///home/lennart/svn/public/avahi/trunk@17 941a03a8-eaeb-0310-b9a0-b1bbd8fe43fe --- diff --git a/Makefile b/Makefile index 2731696..6798fe7 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ LIBS=$(shell pkg-config --libs glib-2.0) all: flexmdns prioq-test -flexmdns: timeeventq.o main.o iface.o netlink.o server.o address.o util.o prioq.o cache.o rr.o dns.o socket.o psched.o announce.o +flexmdns: timeeventq.o main.o iface.o netlink.o server.o address.o util.o prioq.o cache.o rr.o dns.o socket.o psched.o announce.o subscribe.o $(CC) -o $@ $^ $(LIBS) #test-llist: test-llist.o diff --git a/cache.c b/cache.c index b668d33..9d9590b 100644 --- a/cache.c +++ b/cache.c @@ -6,7 +6,7 @@ static void remove_entry(flxCache *c, flxCacheEntry *e, gboolean remove_from_has g_assert(c); g_assert(e); - g_message("remvoin from cache: %p %p", c, e); + g_message("removing from cache: %p %p", c, e); if (remove_from_hash_table) { flxCacheEntry *t; @@ -18,10 +18,12 @@ static void remove_entry(flxCache *c, flxCacheEntry *e, gboolean remove_from_has g_hash_table_remove(c->hash_table, e->record->key); } - flx_record_unref(e->record); - if (e->time_event) flx_time_event_queue_remove(c->server->time_event_queue, e->time_event); + + flx_subscription_notify(c->server, c->interface, e->record, FLX_SUBSCRIPTION_REMOVE); + + flx_record_unref(e->record); g_free(e); } @@ -71,7 +73,7 @@ flxCacheEntry *flx_cache_lookup_record(flxCache *c, flxRecord *r) { g_assert(r); for (e = flx_cache_lookup_key(c, r->key); e; e = e->by_name_next) - if (e->record->size == r->size && !memcmp(e->record->data, r->data, r->size)) + if (flx_record_equal_no_ttl(e->record, r)) return e; return NULL; @@ -127,6 +129,16 @@ static void elapse_func(flxTimeEvent *t, void *userdata) { } } +static void update_time_event(flxCache *c, flxCacheEntry *e) { + g_assert(c); + g_assert(e); + + if (e->time_event) + flx_time_event_queue_update(c->server->time_event_queue, e->time_event, &e->expiry); + else + e->time_event = flx_time_event_queue_add(c->server->time_event_queue, &e->expiry, elapse_func, e); +} + static void next_expiry(flxCache *c, flxCacheEntry *e, guint percent) { gulong usec; @@ -142,14 +154,10 @@ static void next_expiry(flxCache *c, flxCacheEntry *e, guint percent) { usec = g_random_int_range(usec*percent, usec*(percent+2)); g_time_val_add(&e->expiry, usec); - - if (e->time_event) - flx_time_event_queue_update(c->server->time_event_queue, e->time_event, &e->expiry); - else - e->time_event = flx_time_event_queue_add(c->server->time_event_queue, &e->expiry, elapse_func, e); + update_time_event(c, e); } -flxCacheEntry *flx_cache_update(flxCache *c, flxRecord *r, gboolean unique, const flxAddress *a) { +void flx_cache_update(flxCache *c, flxRecord *r, gboolean unique, const flxAddress *a) { flxCacheEntry *e, *t; gchar *txt; @@ -159,55 +167,77 @@ flxCacheEntry *flx_cache_update(flxCache *c, flxRecord *r, gboolean unique, cons g_message("cache update: %s", (txt = flx_record_to_string(r))); g_free(txt); - if ((t = e = flx_cache_lookup_key(c, r->key))) { + if (r->ttl == 0) { -/* g_message("found prev cache entry"); */ + /* This is a goodbye request */ - if (unique) { - /* Drop all entries but the first which we replace */ - while (e->by_name_next) - remove_entry(c, e->by_name_next, TRUE); + if ((e = flx_cache_lookup_record(c, r))) { - } else { - /* Look for exactly the same entry */ - for (; e; e = e->by_name_next) - if (flx_record_equal(e->record, r)) - break; + e->state = FLX_CACHE_FINAL; + g_get_current_time(&e->timestamp); + e->expiry = e->timestamp; + g_time_val_add(&e->expiry, 1000000); /* 1s */ + update_time_event(c, e); } - } - - if (e) { -/* g_message("found matching cache entry"); */ + } else { - /* We are the first in the linked list so let's replace the hash table key with the new one */ - if (e->by_name_prev == NULL) - g_hash_table_replace(c->hash_table, r->key, e); - - /* Update the record */ - flx_record_unref(e->record); - e->record = flx_record_ref(r); + /* This is an update request */ + if ((t = e = flx_cache_lookup_key(c, r->key))) { - } else { - /* No entry found, therefore we create a new one */ - + if (unique) { + + /* For unique records, remove all entries but one */ + while (e->by_name_next) + remove_entry(c, e->by_name_next, TRUE); + + } else { + + /* For non-unique record, look for exactly the same entry */ + for (; e; e = e->by_name_next) + if (flx_record_equal_no_ttl(e->record, r)) + break; + } + } + + if (e) { + +/* g_message("found matching cache entry"); */ + + /* We are the first in the linked list so let's replace the hash table key with the new one */ + if (e->by_name_prev == NULL) + g_hash_table_replace(c->hash_table, r->key, e); + + /* Notify subscribers */ + if (!flx_record_equal_no_ttl(e->record, r)) + flx_subscription_notify(c->server, c->interface, r, FLX_SUBSCRIPTION_CHANGE); + + /* Update the record */ + flx_record_unref(e->record); + e->record = flx_record_ref(r); + + } else { + /* No entry found, therefore we create a new one */ + /* g_message("couldn't find matching cache entry"); */ + + e = g_new(flxCacheEntry, 1); + e->cache = c; + e->time_event = NULL; + e->record = flx_record_ref(r); + FLX_LLIST_PREPEND(flxCacheEntry, by_name, t, e); + g_hash_table_replace(c->hash_table, e->record->key, t); + + /* Notify subscribers */ + flx_subscription_notify(c->server, c->interface, e->record, FLX_SUBSCRIPTION_NEW); + } - e = g_new(flxCacheEntry, 1); - e->cache = c; - e->time_event = NULL; - e->record = flx_record_ref(r); - FLX_LLIST_PREPEND(flxCacheEntry, by_name, t, e); - g_hash_table_replace(c->hash_table, e->record->key, t); - } - - e->origin = *a; - g_get_current_time(&e->timestamp); - next_expiry(c, e, 80); - e->state = FLX_CACHE_VALID; - - return e; + e->origin = *a; + g_get_current_time(&e->timestamp); + next_expiry(c, e, 80); + e->state = FLX_CACHE_VALID; + } } void flx_cache_drop_key(flxCache *c, flxKey *k) { diff --git a/cache.h b/cache.h index f51e18e..e7c8e7e 100644 --- a/cache.h +++ b/cache.h @@ -33,7 +33,6 @@ struct flxCacheEntry { flxTimeEvent *time_event; FLX_LLIST_FIELDS(flxCacheEntry, by_name); - }; struct _flxCache { @@ -50,7 +49,7 @@ void flx_cache_free(flxCache *c); flxCacheEntry *flx_cache_lookup_key(flxCache *c, flxKey *k); flxCacheEntry *flx_cache_lookup_record(flxCache *c, flxRecord *r); -flxCacheEntry *flx_cache_update(flxCache *c, flxRecord *r, gboolean unique, const flxAddress *a); +void flx_cache_update(flxCache *c, flxRecord *r, gboolean unique, const flxAddress *a); void flx_cache_drop_key(flxCache *c, flxKey *k); void flx_cache_drop_record(flxCache *c, flxRecord *r); diff --git a/iface.c b/iface.c index 5e6c94a..a7d210f 100644 --- a/iface.c +++ b/iface.c @@ -489,3 +489,33 @@ gboolean flx_interface_match(flxInterface *i, gint index, guchar protocol) { return TRUE; } + +void flx_interface_monitor_walk(flxInterfaceMonitor *m, gint interface, guchar protocol, flxInterfaceMonitorWalkCallback callback, gpointer userdata) { + g_assert(m); + g_assert(callback); + + if (interface > 0) { + if (protocol != AF_UNSPEC) { + flxInterface *i; + + if ((i = flx_interface_monitor_get_interface(m, interface, protocol))) + callback(m, i, userdata); + + } else { + flxHwInterface *hw; + flxInterface *i; + + if ((hw = flx_interface_monitor_get_hw_interface(m, interface))) + for (i = hw->interfaces; i; i = i->by_hardware_next) + if (flx_interface_match(i, interface, protocol)) + callback(m, i, userdata); + } + + } else { + flxInterface *i; + + for (i = m->interfaces; i; i = i->interface_next) + if (flx_interface_match(i, interface, protocol)) + callback(m, i, userdata); + } +} diff --git a/iface.h b/iface.h index 85e535c..b5c2708 100644 --- a/iface.h +++ b/iface.h @@ -92,4 +92,8 @@ gboolean flx_interface_address_relevant(flxInterfaceAddress *a); gboolean flx_interface_match(flxInterface *i, gint index, guchar protocol); +typedef void (*flxInterfaceMonitorWalkCallback)(flxInterfaceMonitor *m, flxInterface *i, gpointer userdata); + +void flx_interface_monitor_walk(flxInterfaceMonitor *m, gint index, guchar protocol, flxInterfaceMonitorWalkCallback callback, gpointer userdata); + #endif diff --git a/main.c b/main.c index 47f3cd7..367dcd3 100644 --- a/main.c +++ b/main.c @@ -4,6 +4,7 @@ #include "flx.h" #include "server.h" +#include "subscribe.h" static gboolean quit_timeout(gpointer data) { g_main_loop_quit(data); @@ -31,15 +32,36 @@ static gboolean dump_timeout(gpointer data) { return TRUE; } +static void subscription(flxSubscription *s, flxRecord *r, gint interface, guchar protocol, flxSubscriptionEvent event, gpointer userdata) { + gchar *t; + + g_assert(s); + g_assert(r); + g_assert(interface > 0); + g_assert(protocol != AF_UNSPEC); + + g_message("SUBSCRIPTION: record [%s] on %i.%i is %s", t = flx_record_to_string(r), interface, protocol, + event == FLX_SUBSCRIPTION_NEW ? "new" : (event == FLX_SUBSCRIPTION_CHANGE ? "changed" : "removed")); + + g_free(t); +} + + int main(int argc, char *argv[]) { flxServer *flx; gchar *r; GMainLoop *loop = NULL; + flxSubscription *s; + flxKey *k; flx = flx_server_new(NULL); flx_server_add_text(flx, 0, 0, AF_UNSPEC, FALSE, NULL, "hallo"); + k = flx_key_new("_http._tcp.local.", FLX_DNS_CLASS_IN, FLX_DNS_TYPE_PTR); + s = flx_subscription_new(flx, k, 0, AF_UNSPEC, subscription, NULL); + flx_key_unref(k); + loop = g_main_loop_new(NULL, FALSE); g_timeout_add(1000*60, quit_timeout, loop); @@ -50,7 +72,7 @@ int main(int argc, char *argv[]) { g_main_loop_unref(loop); - + flx_subscription_free(s); flx_server_free(flx); return 0; diff --git a/psched.c b/psched.c index ebae9d3..db72ec3 100644 --- a/psched.c +++ b/psched.c @@ -214,7 +214,7 @@ static flxResponseJob* look_for_response(flxPacketScheduler *s, flxRecord *recor g_assert(record); for (rj = s->response_jobs; rj; rj = rj->jobs_next) - if (flx_record_equal(rj->record, record)) + if (flx_record_equal_no_ttl(rj->record, record)) return rj; return NULL; @@ -270,7 +270,7 @@ void flx_packet_scheduler_drop_response(flxPacketScheduler *s, flxRecord *record g_assert(record); for (rj = s->response_jobs; rj; rj = rj->jobs_next) - if (flx_record_equal(rj->record, record)) { + if (flx_record_equal_no_ttl(rj->record, record)) { if (!rj->done) { GTimeVal tv; diff --git a/rr.c b/rr.c index c984934..4108c54 100644 --- a/rr.c +++ b/rr.c @@ -50,12 +50,11 @@ flxRecord *flx_record_new(flxKey *k, gconstpointer data, guint16 size, guint32 t g_assert(k); g_assert(data); - g_assert(size > 0); r = g_new(flxRecord, 1); r->ref = 1; r->key = flx_key_ref(k); - r->data = g_memdup(data, size); + r->data = size > 0 ? g_memdup(data, size) : NULL; r->size = size; r->ttl = ttl; @@ -154,10 +153,17 @@ gchar *flx_record_to_string(flxRecord *r) { } case FLX_DNS_TYPE_TXT: { - g_assert(((guchar*) r->data)[0] == r->size-1); - memcpy(t, r->data+1, ((guchar*) r->data)[0]); - t[((guchar*) r->data)[0]] = 0; + if (r->size == 0) + t[0] = 0; + else { + guchar l = ((guchar*) r->data)[0]; + + if ((size_t) l+1 <= r->size) { + memcpy(t, r->data+1, ((guchar*) r->data)[0]); + t[((guchar*) r->data)[0]] = 0; + } + } break; } @@ -203,7 +209,7 @@ gchar *flx_record_to_string(flxRecord *r) { } p = flx_key_to_string(r->key); - s = g_strdup_printf("[%s %s ; ttl=%u]", p, t, r->ttl); + s = g_strdup_printf("%s %s ; ttl=%u", p, t, r->ttl); g_free(p); return s; @@ -224,12 +230,12 @@ guint flx_key_hash(const flxKey *k) { return g_str_hash(k->name) + k->type + k->class; } -gboolean flx_record_equal(const flxRecord *a, const flxRecord *b) { +gboolean flx_record_equal_no_ttl(const flxRecord *a, const flxRecord *b) { g_assert(a); g_assert(b); return flx_key_equal(a->key, b->key) && - a->ttl == b->ttl && +/* a->ttl == b->ttl && */ a->size == b->size && - memcmp(a->data, b->data, a->size) == 0; + (a->size == 0 || memcmp(a->data, b->data, a->size) == 0); } diff --git a/rr.h b/rr.h index a31b632..f17c36f 100644 --- a/rr.h +++ b/rr.h @@ -56,6 +56,6 @@ const gchar *flx_dns_type_to_string(guint16 type); gchar *flx_key_to_string(flxKey *k); /* g_free() the result! */ gchar *flx_record_to_string(flxRecord *r); /* g_free() the result! */ -gboolean flx_record_equal(const flxRecord *a, const flxRecord *b); +gboolean flx_record_equal_no_ttl(const flxRecord *a, const flxRecord *b); #endif diff --git a/server.c b/server.c index 0aaaed1..94c7b1d 100644 --- a/server.c +++ b/server.c @@ -8,7 +8,7 @@ #include "util.h" #include "iface.h" #include "socket.h" - +#include "subscribe.h" static void handle_query_key(flxServer *s, flxKey *k, flxInterface *i, const flxAddress *a) { flxServerEntry *e; @@ -71,7 +71,9 @@ static void handle_response(flxServer *s, flxDnsPacket *p, flxInterface *i, cons g_free(txt); flx_cache_update(i->cache, record, cache_flush, a); - flx_packet_scheduler_drop_response(i->scheduler, record); + + if (record->ttl != 0) + flx_packet_scheduler_drop_response(i->scheduler, record); flx_record_unref(record); } } @@ -258,10 +260,13 @@ flxServer *flx_server_new(GMainContext *c) { s->context = g_main_context_default(); s->current_id = 1; + + FLX_LLIST_HEAD_INIT(flxServerEntry, s->entries); s->rrset_by_id = g_hash_table_new(g_int_hash, g_int_equal); s->rrset_by_key = g_hash_table_new((GHashFunc) flx_key_hash, (GEqualFunc) flx_key_equal); - FLX_LLIST_HEAD_INIT(flxServerEntry, s->entries); + FLX_LLIST_HEAD_INIT(flxSubscription, s->subscriptions); + s->subscription_hashtable = g_hash_table_new((GHashFunc) flx_key_hash, (GEqualFunc) flx_key_equal); s->monitor = flx_interface_monitor_new(s); s->time_event_queue = flx_time_event_queue_new(s->context); @@ -300,6 +305,10 @@ void flx_server_free(flxServer* s) { flx_interface_monitor_free(s->monitor); flx_server_remove(s, 0); + + while (s->subscriptions) + flx_subscription_free(s->subscriptions); + g_hash_table_destroy(s->subscription_hashtable); g_hash_table_destroy(s->rrset_by_id); g_hash_table_destroy(s->rrset_by_key); @@ -532,60 +541,36 @@ void flx_server_add_text( flx_server_add_full(s, id, interface, protocol, unique, name, FLX_DNS_CLASS_IN, FLX_DNS_TYPE_TXT, buf, l+1, FLX_DEFAULT_TTL); } +static void post_query_callback(flxInterfaceMonitor *m, flxInterface *i, gpointer userdata) { + flxKey *k = userdata; + + g_assert(m); + g_assert(i); + g_assert(k); + + flx_interface_post_query(i, k); +} + void flx_server_post_query(flxServer *s, gint interface, guchar protocol, flxKey *key) { g_assert(s); g_assert(key); - - if (interface > 0) { - if (protocol != AF_UNSPEC) { - flxInterface *i; - - if ((i = flx_interface_monitor_get_interface(s->monitor, interface, protocol))) - flx_interface_post_query(i, key); - } else { - flxHwInterface *hw; - flxInterface *i; - - if ((hw = flx_interface_monitor_get_hw_interface(s->monitor, interface))) - for (i = hw->interfaces; i; i = i->by_hardware_next) - if (flx_interface_match(i, interface, protocol)) - flx_interface_post_query(i, key); - } - - } else { - flxInterface *i; - - for (i = s->monitor->interfaces; i; i = i->interface_next) - if (flx_interface_match(i, interface, protocol)) - flx_interface_post_query(i, key); - } + + flx_interface_monitor_walk(s->monitor, interface, protocol, post_query_callback, key); +} + +static void post_response_callback(flxInterfaceMonitor *m, flxInterface *i, gpointer userdata) { + flxRecord *r = userdata; + + g_assert(m); + g_assert(i); + g_assert(r); + + flx_interface_post_response(i, r); } void flx_server_post_response(flxServer *s, gint interface, guchar protocol, flxRecord *record) { g_assert(s); g_assert(record); - - if (interface > 0) { - if (protocol != AF_UNSPEC) { - flxInterface *i; - - if ((i = flx_interface_monitor_get_interface(s->monitor, interface, protocol))) - flx_interface_post_response(i, record); - } else { - flxHwInterface *hw; - flxInterface *i; - - if ((hw = flx_interface_monitor_get_hw_interface(s->monitor, interface))) - for (i = hw->interfaces; i; i = i->by_hardware_next) - if (flx_interface_match(i, interface, protocol)) - flx_interface_post_response(i, record); - } - - } else { - flxInterface *i; - - for (i = s->monitor->interfaces; i; i = i->interface_next) - if (flx_interface_match(i, interface, protocol)) - flx_interface_post_response(i, record); - } + + flx_interface_monitor_walk(s->monitor, interface, protocol, post_response_callback, record); } diff --git a/server.h b/server.h index b7addf6..d050bcc 100644 --- a/server.h +++ b/server.h @@ -9,6 +9,7 @@ typedef struct _flxServerEntry flxServerEntry; #include "llist.h" #include "timeeventq.h" #include "announce.h" +#include "subscribe.h" struct _flxServerEntry { flxRecord *record; @@ -31,10 +32,12 @@ struct _flxServer { gint current_id; + FLX_LLIST_HEAD(flxServerEntry, entries); GHashTable *rrset_by_id; GHashTable *rrset_by_key; - FLX_LLIST_HEAD(flxServerEntry, entries); + FLX_LLIST_HEAD(flxSubscription, subscriptions); + GHashTable *subscription_hashtable; flxTimeEventQueue *time_event_queue; diff --git a/subscribe.c b/subscribe.c new file mode 100644 index 0000000..3abe464 --- /dev/null +++ b/subscribe.c @@ -0,0 +1,103 @@ +#include "subscribe.h" +#include "util.h" + +static void elapse(flxTimeEvent *e, void *userdata) { + flxSubscription *s = userdata; + GTimeVal tv; + gchar *t; + + g_assert(s); + + flx_server_post_query(s->server, s->interface, s->protocol, s->key); + + if (s->n_query++ <= 8) + s->sec_delay *= 2; + + g_message("%i. Continuous querying for %s", s->n_query, t = flx_key_to_string(s->key)); + g_free(t); + + + flx_elapse_time(&tv, s->sec_delay*1000, 0); + flx_time_event_queue_update(s->server->time_event_queue, s->time_event, &tv); +} + +static void scan_cache_callback(flxInterfaceMonitor *m, flxInterface *i, gpointer userdata) { + flxSubscription *s = userdata; + flxCacheEntry *e; + + g_assert(m); + g_assert(i); + g_assert(s); + + for (e = flx_cache_lookup_key(i->cache, s->key); e; e = e->by_name_next) + s->callback(s, e->record, i->hardware->index, i->protocol, FLX_SUBSCRIPTION_NEW, s->userdata); +} + +flxSubscription *flx_subscription_new(flxServer *server, flxKey *key, gint interface, guchar protocol, flxSubscriptionCallback callback, gpointer userdata) { + flxSubscription *s, *t; + GTimeVal tv; + + g_assert(server); + g_assert(key); + g_assert(callback); + + s = g_new(flxSubscription, 1); + s->server = server; + s->key = flx_key_ref(key); + s->interface = interface; + s->protocol = protocol; + s->callback = callback; + s->userdata = userdata; + s->n_query = 1; + s->sec_delay = 1; + + flx_server_post_query(s->server, s->interface, s->protocol, s->key); + + flx_elapse_time(&tv, s->sec_delay*1000, 0); + s->time_event = flx_time_event_queue_add(server->time_event_queue, &tv, elapse, s); + + FLX_LLIST_PREPEND(flxSubscription, subscriptions, server->subscriptions, s); + + /* Add the new entry to the subscription hash table */ + t = g_hash_table_lookup(server->subscription_hashtable, key); + FLX_LLIST_PREPEND(flxSubscription, by_key, t, s); + g_hash_table_replace(server->subscription_hashtable, key, t); + + /* Scan the caches */ + flx_interface_monitor_walk(s->server->monitor, s->interface, s->protocol, scan_cache_callback, s); + + return s; +} + +void flx_subscription_free(flxSubscription *s) { + flxSubscription *t; + + g_assert(s); + + FLX_LLIST_REMOVE(flxSubscription, subscriptions, s->server->subscriptions, s); + + t = g_hash_table_lookup(s->server->subscription_hashtable, s->key); + FLX_LLIST_REMOVE(flxSubscription, by_key, t, s); + if (t) + g_hash_table_replace(s->server->subscription_hashtable, t->key, t); + else + g_hash_table_remove(s->server->subscription_hashtable, s->key); + + flx_time_event_queue_remove(s->server->time_event_queue, s->time_event); + flx_key_unref(s->key); + + + g_free(s); +} + +void flx_subscription_notify(flxServer *server, flxInterface *i, flxRecord *record, flxSubscriptionEvent event) { + flxSubscription *s; + + g_assert(server); + g_assert(record); + + for (s = g_hash_table_lookup(server->subscription_hashtable, record->key); s; s = s->by_key_next) + if (flx_interface_match(i, s->interface, s->protocol)) + s->callback(s, record, i->hardware->index, i->protocol, event, s->userdata); + +} diff --git a/subscribe.h b/subscribe.h new file mode 100644 index 0000000..75818c7 --- /dev/null +++ b/subscribe.h @@ -0,0 +1,39 @@ +#ifndef foosubscribehfoo +#define foosubscribehfoo + +typedef struct _flxSubscription flxSubscription; + +#include "llist.h" +#include "server.h" + +typedef enum { + FLX_SUBSCRIPTION_NEW, + FLX_SUBSCRIPTION_REMOVE, + FLX_SUBSCRIPTION_CHANGE +} flxSubscriptionEvent; + +typedef void (*flxSubscriptionCallback)(flxSubscription *s, flxRecord *record, gint interface, guchar protocol, flxSubscriptionEvent event, gpointer userdata); + +struct _flxSubscription { + flxServer *server; + flxKey *key; + gint interface; + guchar protocol; + gint n_query; + guint sec_delay; + + flxTimeEvent *time_event; + + flxSubscriptionCallback callback; + gpointer userdata; + + FLX_LLIST_FIELDS(flxSubscription, subscriptions); + FLX_LLIST_FIELDS(flxSubscription, by_key); +}; + +flxSubscription *flx_subscription_new(flxServer *s, flxKey *key, gint interface, guchar protocol, flxSubscriptionCallback callback, gpointer userdata); +void flx_subscription_free(flxSubscription *s); + +void flx_subscription_notify(flxServer *s, flxInterface *i, flxRecord *record, flxSubscriptionEvent event); + +#endif