]> git.meshlink.io Git - catta/blob - psched.c
add packet scheduler
[catta] / psched.c
1 #include "util.h"
2 #include "psched.h"
3
4 flxPacketScheduler *flx_packet_scheduler_new(flxServer *server, flxInterface *i, guchar protocol) {
5     flxPacketScheduler *s;
6
7     g_assert(server);
8     g_assert(i);
9
10     s = g_new(flxPacketScheduler, 1);
11     s->server = server;
12     s->interface = i;
13     s->protocol = protocol;
14
15     FLX_LLIST_HEAD_INIT(flxQueryJob, s->query_jobs);
16     FLX_LLIST_HEAD_INIT(flxResponseJob, s->response_jobs);
17     
18     return s;
19 }
20
21 static void query_job_free(flxPacketScheduler *s, flxQueryJob *qj) {
22     g_assert(qj);
23
24     if (qj->time_event)
25         flx_time_event_queue_remove(qj->scheduler->server->time_event_queue, qj->time_event);
26
27     FLX_LLIST_REMOVE(flxQueryJob, jobs, s->query_jobs, qj);
28     
29     flx_key_unref(qj->key);
30     g_free(qj);
31 }
32
33 static void response_job_free(flxPacketScheduler *s, flxResponseJob *rj) {
34     g_assert(rj);
35
36     if (rj->time_event)
37         flx_time_event_queue_remove(rj->scheduler->server->time_event_queue, rj->time_event);
38
39     FLX_LLIST_REMOVE(flxResponseJob, jobs, s->response_jobs, rj);
40
41     flx_record_unref(rj->record);
42     g_free(rj);
43 }
44
45 void flx_packet_scheduler_free(flxPacketScheduler *s) {
46     flxQueryJob *qj;
47     flxResponseJob *rj;
48     flxTimeEvent *e;
49     
50     g_assert(s);
51
52     while ((qj = s->query_jobs))
53         query_job_free(s, qj);
54     while ((rj = s->response_jobs))
55         response_job_free(s, rj);
56
57     g_free(s);
58 }
59
60 static guint8* packet_add_query_job(flxPacketScheduler *s, flxDnsPacket *p, flxQueryJob *qj) {
61     guint8 *d;
62
63     g_assert(s);
64     g_assert(p);
65     g_assert(qj);
66
67     if ((d = flx_dns_packet_append_key(p, qj->key))) {
68         GTimeVal tv;
69
70         qj->done = 1;
71
72         /* Drop query after 100ms from history */
73         flx_elapse_time(&tv, 100, 0);
74         flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv);
75     }
76
77     return d;
78 }
79                                  
80 static void query_elapse(flxTimeEvent *e, gpointer data) {
81     flxQueryJob *qj = data;
82     flxPacketScheduler *s;
83     flxDnsPacket *p;
84     guint n;
85     guint8 *d;
86
87     g_assert(qj);
88     s = qj->scheduler;
89
90     if (qj->done) {
91         /* Lets remove it  from the history */
92         query_job_free(s, qj);
93         return;
94     }
95
96     p = flx_dns_packet_new_query(s->interface->mtu - 200);
97     d = packet_add_query_job(s, p, qj);
98     g_assert(d);
99     n = 1;
100
101     /* Try to fill up packet with more queries, if available */
102     for (qj = s->query_jobs; qj; qj = qj->jobs_next) {
103
104         if (qj->done)
105             continue;
106
107         if (!packet_add_query_job(s, p, qj))
108             break;
109
110         n++;
111     }
112
113     flx_dns_packet_set_field(p, DNS_FIELD_QDCOUNT, n);
114     flx_interface_send_packet(s->interface, s->protocol, p);
115     flx_dns_packet_free(p);
116 }
117
118 static flxQueryJob* look_for_query(flxPacketScheduler *s, flxKey *key) {
119     flxQueryJob *qj;
120     
121     g_assert(s);
122     g_assert(key);
123
124     for (qj = s->query_jobs; qj; qj = qj->jobs_next)
125         if (flx_key_equal(qj->key, key))
126             return qj;
127
128     return NULL;
129 }
130
131 void flx_packet_scheduler_post_query(flxPacketScheduler *s, flxKey *key) {
132     flxQueryJob *qj;
133     GTimeVal tv;
134     
135     g_assert(s);
136     g_assert(key);
137
138     if (look_for_query(s, key))
139         return;
140
141     qj = g_new(flxQueryJob, 1);
142     qj->key = flx_key_ref(key);
143     qj->done = FALSE;
144
145     flx_elapse_time(&tv, 100, 0);
146     qj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, query_elapse, qj);
147     qj->scheduler = s;
148
149     FLX_LLIST_PREPEND(flxQueryJob, jobs, s->query_jobs, qj);
150 }
151
152 static guint8* packet_add_response_job(flxPacketScheduler *s, flxDnsPacket *p, flxResponseJob *rj) {
153     guint8 *d;
154
155     g_assert(s);
156     g_assert(p);
157     g_assert(rj);
158
159     if ((d = flx_dns_packet_append_record(p, rj->record, FALSE))) {
160         GTimeVal tv;
161
162         rj->done = 1;
163
164         /* Drop response after 1s from history */
165         flx_elapse_time(&tv, 1000, 0);
166         flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
167     }
168
169     return d;
170 }
171                                  
172
173 static void response_elapse(flxTimeEvent *e, gpointer data) {
174     flxResponseJob *rj = data;
175     flxPacketScheduler *s;
176     flxDnsPacket *p;
177     guint n;
178     guint8 *d;
179
180     g_assert(rj);
181     s = rj->scheduler;
182
183     if (rj->done) {
184         /* Lets remove it  from the history */
185         response_job_free(s, rj);
186         return;
187     }
188
189     p = flx_dns_packet_new_response(s->interface->mtu - 200);
190     d = packet_add_response_job(s, p, rj);
191     g_assert(d);
192     n = 1;
193
194     /* Try to fill up packet with more responses, if available */
195     for (rj = s->response_jobs; rj; rj = rj->jobs_next) {
196
197         if (rj->done)
198             continue;
199
200         if (!packet_add_response_job(s, p, rj))
201             break;
202
203         n++;
204     }
205
206     flx_dns_packet_set_field(p, DNS_FIELD_ANCOUNT, n);
207     flx_interface_send_packet(s->interface, s->protocol, p);
208     flx_dns_packet_free(p);
209 }
210
211 static flxResponseJob* look_for_response(flxPacketScheduler *s, flxRecord *record) {
212     flxResponseJob *rj;
213
214     g_assert(s);
215     g_assert(record);
216
217     for (rj = s->response_jobs; rj; rj = rj->jobs_next)
218         if (flx_record_equal(rj->record, record))
219             return rj;
220
221     return NULL;
222 }
223
224 void flx_packet_scheduler_post_response(flxPacketScheduler *s, flxRecord *record) {
225     flxResponseJob *rj;
226     GTimeVal tv;
227     
228     g_assert(s);
229     g_assert(record);
230
231     if (look_for_response(s, record))
232         return;
233
234     rj = g_new(flxResponseJob, 1);
235     rj->record = flx_record_ref(record);
236     rj->done = FALSE;
237
238     flx_elapse_time(&tv, 20, 100);
239     rj->time_event = flx_time_event_queue_add(s->server->time_event_queue, &tv, response_elapse, rj);
240     rj->scheduler = s;
241
242     FLX_LLIST_PREPEND(flxResponseJob, jobs, s->response_jobs, rj);
243 }
244
245 void flx_packet_scheduler_drop_query(flxPacketScheduler *s, flxKey *key) {
246     flxQueryJob *qj;
247     
248     g_assert(s);
249     g_assert(key);
250
251     for (qj = s->query_jobs; qj; qj = qj->jobs_next)
252         if (flx_key_equal(qj->key, key)) {
253
254             if (!qj->done) {
255                 GTimeVal tv;
256                 qj->done = TRUE;
257                 
258                 /* Drop query after 100ms from history */
259                 flx_elapse_time(&tv, 100, 0);
260                 flx_time_event_queue_update(s->server->time_event_queue, qj->time_event, &tv);
261             }
262
263             break;
264         }
265 }
266
267 void flx_packet_scheduler_drop_response(flxPacketScheduler *s, flxRecord *record) {
268     flxResponseJob *rj;
269     
270     g_assert(s);
271     g_assert(record);
272
273     for  (rj = s->response_jobs; rj; rj = rj->jobs_next)
274         if (flx_record_equal(rj->record, record)) {
275
276             if (!rj->done) {
277                 GTimeVal tv;
278                 rj->done = TRUE;
279                 
280                 /* Drop response after 100ms from history */
281                 flx_elapse_time(&tv, 100, 0);
282                 flx_time_event_queue_update(s->server->time_event_queue, rj->time_event, &tv);
283             }
284
285             break;
286         }
287 }