2 This file is part of catta.
4 catta is free software; you can redistribute it and/or modify it
5 under the terms of the GNU Lesser General Public License as
6 published by the Free Software Foundation; either version 2.1 of the
7 License, or (at your option) any later version.
9 catta is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
12 Public License for more details.
14 You should have received a copy of the GNU Lesser General Public
15 License along with catta; if not, write to the Free Software
16 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
28 #include <catta/timeval.h>
29 #include <catta/malloc.h>
30 #include <catta/log.h>
35 static void remove_entry(CattaCache *c, CattaCacheEntry *e) {
41 /* catta_log_debug("removing from cache: %p %p", c, e); */
43 /* Remove from hash table */
44 t = catta_hashmap_lookup(c->hashmap, e->record->key);
45 CATTA_LLIST_REMOVE(CattaCacheEntry, by_key, t, e);
47 catta_hashmap_replace(c->hashmap, t->record->key, t);
49 catta_hashmap_remove(c->hashmap, e->record->key);
51 /* Remove from linked list */
52 CATTA_LLIST_REMOVE(CattaCacheEntry, entry, c->entries, e);
55 catta_time_event_free(e->time_event);
57 catta_multicast_lookup_engine_notify(c->server->multicast_lookup_engine, c->iface, e->record, CATTA_BROWSER_REMOVE);
59 catta_record_unref(e->record);
63 assert(c->n_entries >= 1);
67 CattaCache *catta_cache_new(CattaServer *server, CattaInterface *iface) {
71 if (!(c = catta_new(CattaCache, 1))) {
72 catta_log_error(__FILE__": Out of memory.");
73 return NULL; /* OOM */
79 if (!(c->hashmap = catta_hashmap_new((CattaHashFunc) catta_key_hash, (CattaEqualFunc) catta_key_equal, NULL, NULL))) {
80 catta_log_error(__FILE__": Out of memory.");
82 return NULL; /* OOM */
85 CATTA_LLIST_HEAD_INIT(CattaCacheEntry, c->entries);
88 c->last_rand_timestamp = 0;
93 void catta_cache_free(CattaCache *c) {
97 remove_entry(c, c->entries);
98 assert(c->n_entries == 0);
100 catta_hashmap_free(c->hashmap);
105 static CattaCacheEntry *lookup_key(CattaCache *c, CattaKey *k) {
109 assert(!catta_key_is_pattern(k));
111 return catta_hashmap_lookup(c->hashmap, k);
114 void* catta_cache_walk(CattaCache *c, CattaKey *pattern, CattaCacheWalkCallback cb, void* userdata) {
121 if (catta_key_is_pattern(pattern)) {
122 CattaCacheEntry *e, *n;
124 for (e = c->entries; e; e = n) {
127 if (catta_key_pattern_match(pattern, e->record->key))
128 if ((ret = cb(c, pattern, e, userdata)))
133 CattaCacheEntry *e, *n;
135 for (e = lookup_key(c, pattern); e; e = n) {
138 if ((ret = cb(c, pattern, e, userdata)))
146 static void* lookup_record_callback(CattaCache *c, CattaKey *pattern, CattaCacheEntry *e, void *userdata) {
151 if (catta_record_equal_no_ttl(e->record, userdata))
157 static CattaCacheEntry *lookup_record(CattaCache *c, CattaRecord *r) {
161 return catta_cache_walk(c, r->key, lookup_record_callback, r);
164 static void next_expiry(CattaCache *c, CattaCacheEntry *e, unsigned percent);
166 static void elapse_func(CattaTimeEvent *t, void *userdata) {
167 CattaCacheEntry *e = userdata;
169 unsigned percent = 0;
174 /* txt = catta_record_to_string(e->record); */
178 case CATTA_CACHE_EXPIRY_FINAL:
179 case CATTA_CACHE_POOF_FINAL:
180 case CATTA_CACHE_GOODBYE_FINAL:
181 case CATTA_CACHE_REPLACE_FINAL:
183 remove_entry(e->cache, e);
186 /* catta_log_debug("Removing entry from cache due to expiration (%s)", txt); */
189 case CATTA_CACHE_VALID:
190 case CATTA_CACHE_POOF:
191 e->state = CATTA_CACHE_EXPIRY1;
195 case CATTA_CACHE_EXPIRY1:
196 e->state = CATTA_CACHE_EXPIRY2;
199 case CATTA_CACHE_EXPIRY2:
200 e->state = CATTA_CACHE_EXPIRY3;
204 case CATTA_CACHE_EXPIRY3:
205 e->state = CATTA_CACHE_EXPIRY_FINAL;
214 /* Request a cache update if we are subscribed to this entry */
215 if (catta_querier_shall_refresh_cache(e->cache->iface, e->record->key))
216 catta_interface_post_query(e->cache->iface, e->record->key, 0, NULL);
218 /* Check again later */
219 next_expiry(e->cache, e, percent);
223 /* catta_free(txt); */
226 static void update_time_event(CattaCache *c, CattaCacheEntry *e) {
231 catta_time_event_update(e->time_event, &e->expiry);
233 e->time_event = catta_time_event_new(c->server->time_event_queue, &e->expiry, elapse_func, e);
236 static void next_expiry(CattaCache *c, CattaCacheEntry *e, unsigned percent) {
237 CattaUsec usec, left, right;
242 assert(percent > 0 && percent <= 100);
244 usec = (CattaUsec) e->record->ttl * 10000;
246 left = usec * percent;
247 right = usec * (percent+2); /* 2% jitter */
251 if (now >= c->last_rand_timestamp + 10) {
252 c->last_rand = rand();
253 c->last_rand_timestamp = now;
256 usec = left + (CattaUsec) ((double) (right-left) * c->last_rand / (RAND_MAX+1.0));
258 e->expiry = e->timestamp;
259 catta_timeval_add(&e->expiry, usec);
261 /* g_message("wake up in +%lu seconds", e->expiry.tv_sec - e->timestamp.tv_sec); */
263 update_time_event(c, e);
266 static void expire_in_one_second(CattaCache *c, CattaCacheEntry *e, CattaCacheEntryState state) {
271 gettimeofday(&e->expiry, NULL);
272 catta_timeval_add(&e->expiry, 1000000); /* 1s */
273 update_time_event(c, e);
276 void catta_cache_update(CattaCache *c, CattaRecord *r, int cache_flush, const CattaAddress *a) {
280 assert(r && r->ref >= 1);
282 /* txt = catta_record_to_string(r); */
285 /* This is a goodbye request */
289 if ((e = lookup_record(c, r)))
290 expire_in_one_second(c, e, CATTA_CACHE_GOODBYE_FINAL);
293 CattaCacheEntry *e = NULL, *first;
296 gettimeofday(&now, NULL);
298 /* This is an update request */
300 if ((first = lookup_key(c, r->key))) {
304 /* For unique entries drop all entries older than one second */
305 for (e = first; e; e = e->by_key_next) {
308 t = catta_timeval_diff(&now, &e->timestamp);
311 expire_in_one_second(c, e, CATTA_CACHE_REPLACE_FINAL);
315 /* Look for exactly the same entry */
316 for (e = first; e; e = e->by_key_next)
317 if (catta_record_equal_no_ttl(e->record, r))
323 /* catta_log_debug("found matching cache entry"); */
325 /* We need to update the hash table key if we replace the
327 if (e->by_key_prev == NULL)
328 catta_hashmap_replace(c->hashmap, r->key, e);
330 /* Update the record */
331 catta_record_unref(e->record);
332 e->record = catta_record_ref(r);
334 /* catta_log_debug("cache: updating %s", txt); */
337 /* No entry found, therefore we create a new one */
339 /* catta_log_debug("cache: couldn't find matching cache entry for %s", txt); */
341 if (c->n_entries >= c->server->config.n_cache_entries_max)
344 if (!(e = catta_new(CattaCacheEntry, 1))) {
345 catta_log_error(__FILE__": Out of memory");
350 e->time_event = NULL;
351 e->record = catta_record_ref(r);
353 /* Append to hash table */
354 CATTA_LLIST_PREPEND(CattaCacheEntry, by_key, first, e);
355 catta_hashmap_replace(c->hashmap, e->record->key, first);
357 /* Append to linked list */
358 CATTA_LLIST_PREPEND(CattaCacheEntry, entry, c->entries, e);
362 /* Notify subscribers */
363 catta_multicast_lookup_engine_notify(c->server->multicast_lookup_engine, c->iface, e->record, CATTA_BROWSER_NEW);
368 next_expiry(c, e, 80);
369 e->state = CATTA_CACHE_VALID;
370 e->cache_flush = cache_flush;
373 /* catta_free(txt); */
377 CattaDumpCallback callback;
381 static void dump_callback(void* key, void* data, void* userdata) {
382 CattaCacheEntry *e = data;
384 struct dump_data *dump_data = userdata;
390 for (; e; e = e->by_key_next) {
393 if (!(t = catta_record_to_string(e->record)))
396 dump_data->callback(t, dump_data->userdata);
401 int catta_cache_dump(CattaCache *c, CattaDumpCallback callback, void* userdata) {
402 struct dump_data data;
407 callback(";;; CACHE DUMP FOLLOWS ;;;", userdata);
409 data.callback = callback;
410 data.userdata = userdata;
412 catta_hashmap_foreach(c->hashmap, dump_callback, &data);
417 int catta_cache_entry_half_ttl(CattaCache *c, CattaCacheEntry *e) {
424 gettimeofday(&now, NULL);
426 age = (unsigned) (catta_timeval_diff(&now, &e->timestamp)/1000000);
428 /* catta_log_debug("age: %lli, ttl/2: %u", age, e->record->ttl); */
430 return age >= e->record->ttl/2;
433 void catta_cache_flush(CattaCache *c) {
437 remove_entry(c, c->entries);
440 /*** Passive observation of failure ***/
442 static void* start_poof_callback(CattaCache *c, CattaKey *pattern, CattaCacheEntry *e, void *userdata) {
443 CattaAddress *a = userdata;
451 gettimeofday(&now, NULL);
454 case CATTA_CACHE_VALID:
456 /* The entry was perfectly valid till, now, so let's enter
459 e->state = CATTA_CACHE_POOF;
460 e->poof_address = *a;
461 e->poof_timestamp = now;
466 case CATTA_CACHE_POOF:
467 if (catta_timeval_diff(&now, &e->poof_timestamp) < 1000000)
470 e->poof_timestamp = now;
471 e->poof_address = *a;
474 /* This is the 4th time we got no response, so let's
475 * fucking remove this entry. */
477 expire_in_one_second(c, e, CATTA_CACHE_POOF_FINAL);
487 void catta_cache_start_poof(CattaCache *c, CattaKey *key, const CattaAddress *a) {
491 catta_cache_walk(c, key, start_poof_callback, (void*) a);
494 void catta_cache_stop_poof(CattaCache *c, CattaRecord *record, const CattaAddress *a) {
501 if (!(e = lookup_record(c, record)))
504 /* This function is called for each response suppression
505 record. If the matching cache entry is in POOF state and the
506 query address is the same, we put it back into valid mode */
508 if (e->state == CATTA_CACHE_POOF || e->state == CATTA_CACHE_POOF_FINAL)
509 if (catta_address_cmp(a, &e->poof_address) == 0) {
510 e->state = CATTA_CACHE_VALID;
511 next_expiry(c, e, 80);