]> git.meshlink.io Git - catta/blob - psched.c
* add subscription feature - with reissuing
[catta] / psched.c
1 #include "util.h"
2 #include "psched.h"
3
4 flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i) {
5     flxPacketScheduler *s;
6
7     g_assert(server);
8     g_assert(i);
9
10     s = g_new(flxPacketScheduler, 1);
11     s->server = server;
12     s->interface = i;
13
14     FLX_LLIST_HEAD_INIT(flxQueryJob, s->query_jobs);
15     FLX_LLIST_HEAD_INIT(flxResponseJob, s->response_jobs);
16     
17     return s;
18 }
19
20 static void query_job_free(flxPacketScheduler *s, flxQueryJob *qj) {
21     g_assert(qj);
22
23     if (qj->time_event)
24         flx_time_event_queue_remove(qj->scheduler->server->time_event_queue, qj->time_event);
25
26     FLX_LLIST_REMOVE(flxQueryJob, jobs, s->query_jobs, qj);
27     
28     flx_key_unref(qj->key);
29     g_free(qj);
30 }
31
32 static void response_job_free(flxPacketScheduler *s, flxResponseJob *rj) {
33     g_assert(rj);
34
35     if (rj->time_event)
36         flx_time_event_queue_remove(rj->scheduler->server->time_event_queue, rj->time_event);
37
38     FLX_LLIST_REMOVE(flxResponseJob, jobs, s->response_jobs, rj);
39
40     flx_record_unref(rj->record);
41     g_free(rj);
42 }
43
44 void flx_packet_scheduler_free(flxPacketScheduler *s) {
45     flxQueryJob *qj;
46     flxResponseJob *rj;
47     flxTimeEvent *e;
48     
49     g_assert(s);
50
51     while ((qj = s->query_jobs))
52         query_job_free(s, qj);
53     while ((rj = s->response_jobs))
54         response_job_free(s, rj);
55
56     g_free(s);
57 }
58
59 static guint8* packet_add_query_job(flxPacketScheduler *s, flxDnsPacket *p, flxQueryJob *qj) {
60     guint8 *d;
61
62     g_assert(s);
63     g_assert(p);
64     g_assert(qj);
65
66     if ((d = flx_dns_packet_append_key(p, qj->key))) {
67         GTimeVal tv;
68
69         qj->done = 1;
70
71         /* Drop query after 100ms from history */
72         flx_elapse_time(&tv, 100, 0);
73         flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv);
74     }
75
76     return d;
77 }
78                                  
79 static void query_elapse(flxTimeEvent *e, gpointer data) {
80     flxQueryJob *qj = data;
81     flxPacketScheduler *s;
82     flxDnsPacket *p;
83     guint n;
84     guint8 *d;
85
86     g_assert(qj);
87     s = qj->scheduler;
88
89     if (qj->done) {
90         /* Lets remove it  from the history */
91         query_job_free(s, qj);
92         return;
93     }
94
95     p = flx_dns_packet_new_query(s->interface->hardware->mtu - 48);
96     d = packet_add_query_job(s, p, qj);
97     g_assert(d);
98     n = 1;
99
100     /* Try to fill up packet with more queries, if available */
101     for (qj = s->query_jobs; qj; qj = qj->jobs_next) {
102
103         if (qj->done)
104             continue;
105
106         if (!packet_add_query_job(s, p, qj))
107             break;
108
109         n++;
110     }
111
112     flx_dns_packet_set_field(p, DNS_FIELD_QDCOUNT, n);
113     flx_interface_send_packet(s->interface, p);
114     flx_dns_packet_free(p);
115 }
116
117 static flxQueryJob* look_for_query(flxPacketScheduler *s, flxKey *key) {
118     flxQueryJob *qj;
119     
120     g_assert(s);
121     g_assert(key);
122
123     for (qj = s->query_jobs; qj; qj = qj->jobs_next)
124         if (flx_key_equal(qj->key, key))
125             return qj;
126
127     return NULL;
128 }
129
130 void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key) {
131     flxQueryJob *qj;
132     GTimeVal tv;
133     
134     g_assert(s);
135     g_assert(key);
136
137     if (look_for_query(s, key))
138         return;
139
140     qj = g_new(flxQueryJob, 1);
141     qj->key = flx_key_ref(key);
142     qj->done = FALSE;
143
144     flx_elapse_time(&tv, 100, 0);
145     qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, query_elapse, qj);
146     qj->scheduler = s;
147
148     FLX_LLIST_PREPEND(flxQueryJob, jobs, s->query_jobs, qj);
149 }
150
151 static guint8* packet_add_response_job(flxPacketScheduler *s, flxDnsPacket *p, flxResponseJob *rj) {
152     guint8 *d;
153
154     g_assert(s);
155     g_assert(p);
156     g_assert(rj);
157
158     if ((d = flx_dns_packet_append_record(p, rj->record, FALSE))) {
159         GTimeVal tv;
160
161         rj->done = 1;
162
163         /* Drop response after 1s from history */
164         flx_elapse_time(&tv, 1000, 0);
165         flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
166     }
167
168     return d;
169 }
170                                  
171
172 static void response_elapse(flxTimeEvent *e, gpointer data) {
173     flxResponseJob *rj = data;
174     flxPacketScheduler *s;
175     flxDnsPacket *p;
176     guint n;
177     guint8 *d;
178
179     g_assert(rj);
180     s = rj->scheduler;
181
182     if (rj->done) {
183         /* Lets remove it  from the history */
184         response_job_free(s, rj);
185         return;
186     }
187
188     p = flx_dns_packet_new_response(s->interface->hardware->mtu - 200);
189     d = packet_add_response_job(s, p, rj);
190     g_assert(d);
191     n = 1;
192
193     /* Try to fill up packet with more responses, if available */
194     for (rj = s->response_jobs; rj; rj = rj->jobs_next) {
195
196         if (rj->done)
197             continue;
198
199         if (!packet_add_response_job(s, p, rj))
200             break;
201
202         n++;
203     }
204
205     flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, n);
206     flx_interface_send_packet(s->interface, p);
207     flx_dns_packet_free(p);
208 }
209
210 static flxResponseJob* look_for_response(flxPacketScheduler *s, flxRecord *record) {
211     flxResponseJob *rj;
212
213     g_assert(s);
214     g_assert(record);
215
216     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
217         if (flx_record_equal_no_ttl(rj->record, record))
218             return rj;
219
220     return NULL;
221 }
222
223 void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record) {
224     flxResponseJob *rj;
225     GTimeVal tv;
226     
227     g_assert(s);
228     g_assert(record);
229
230     if (look_for_response(s, record))
231         return;
232
233     rj = g_new(flxResponseJob, 1);
234     rj->record = flx_record_ref(record);
235     rj->done = FALSE;
236
237     flx_elapse_time(&tv, 20, 100);
238     rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, response_elapse, rj);
239     rj->scheduler = s;
240
241     FLX_LLIST_PREPEND(flxResponseJob, jobs, s->response_jobs, rj);
242 }
243
244 void flx_packet_scheduler_drop_query(flxPacketScheduler *s, flxKey *key) {
245     flxQueryJob *qj;
246     
247     g_assert(s);
248     g_assert(key);
249
250     for (qj = s->query_jobs; qj; qj = qj->jobs_next)
251         if (flx_key_equal(qj->key, key)) {
252
253             if (!qj->done) {
254                 GTimeVal tv;
255                 qj->done = TRUE;
256                 
257                 /* Drop query after 100ms from history */
258                 flx_elapse_time(&tv, 100, 0);
259                 flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv);
260             }
261
262             break;
263         }
264 }
265
266 void flx_packet_scheduler_drop_response(flxPacketScheduler *s, flxRecord *record) {
267     flxResponseJob *rj;
268     
269     g_assert(s);
270     g_assert(record);
271
272     for  (rj = s->response_jobs; rj; rj = rj->jobs_next)
273         if (flx_record_equal_no_ttl(rj->record, record)) {
274
275             if (!rj->done) {
276                 GTimeVal tv;
277                 rj->done = TRUE;
278                 
279                 /* Drop response after 100ms from history */
280                 flx_elapse_time(&tv, 100, 0);
281                 flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
282             }
283
284             break;
285         }
286 }