Line data Source code
1 : /*
2 : * Quagga Work Queue Support.
3 : *
4 : * Copyright (C) 2005 Sun Microsystems, Inc.
5 : *
6 : * This file is part of GNU Zebra.
7 : *
8 : * Quagga is free software; you can redistribute it and/or modify it
9 : * under the terms of the GNU General Public License as published by the
10 : * Free Software Foundation; either version 2, or (at your option) any
11 : * later version.
12 : *
13 : * Quagga is distributed in the hope that it will be useful, but
14 : * WITHOUT ANY WARRANTY; without even the implied warranty of
15 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 : * General Public License for more details.
17 : *
18 : * You should have received a copy of the GNU General Public License
19 : * along with Quagga; see the file COPYING. If not, write to the Free
20 : * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
21 : * 02111-1307, USA.
22 : */
23 :
24 : #include <lib/zebra.h>
25 : #include "thread.h"
26 : #include "memory.h"
27 : #include "workqueue.h"
28 : #include "linklist.h"
29 : #include "command.h"
30 : #include "log.h"
31 :
32 : /* master list of work_queues */
33 : static struct list work_queues;
34 :
35 : #define WORK_QUEUE_MIN_GRANULARITY 1
36 :
37 : static struct work_queue_item *
38 184 : work_queue_item_new (struct work_queue *wq)
39 : {
40 : struct work_queue_item *item;
41 184 : assert (wq);
42 :
43 184 : item = XCALLOC (MTYPE_WORK_QUEUE_ITEM,
44 : sizeof (struct work_queue_item));
45 :
46 184 : return item;
47 : }
48 :
49 : static void
50 176 : work_queue_item_free (struct work_queue_item *item)
51 : {
52 176 : XFREE (MTYPE_WORK_QUEUE_ITEM, item);
53 176 : return;
54 : }
55 :
56 : /* create new work queue */
57 : struct work_queue *
58 45 : work_queue_new (struct thread_master *m, const char *queue_name)
59 : {
60 : struct work_queue *new;
61 :
62 45 : new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue));
63 :
64 45 : if (new == NULL)
65 0 : return new;
66 :
67 45 : new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
68 45 : new->master = m;
69 45 : SET_FLAG (new->flags, WQ_UNPLUGGED);
70 :
71 45 : if ( (new->items = list_new ()) == NULL)
72 : {
73 0 : XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
74 0 : XFREE (MTYPE_WORK_QUEUE, new);
75 :
76 0 : return NULL;
77 : }
78 :
79 45 : new->items->del = (void (*)(void *)) work_queue_item_free;
80 :
81 45 : listnode_add (&work_queues, new);
82 :
83 45 : new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
84 :
85 : /* Default values, can be overriden by caller */
86 45 : new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
87 :
88 45 : return new;
89 : }
90 :
91 : void
92 0 : work_queue_free (struct work_queue *wq)
93 : {
94 0 : if (wq->thread != NULL)
95 0 : thread_cancel(wq->thread);
96 :
97 : /* list_delete frees items via callback */
98 0 : list_delete (wq->items);
99 0 : listnode_delete (&work_queues, wq);
100 :
101 0 : XFREE (MTYPE_WORK_QUEUE_NAME, wq->name);
102 0 : XFREE (MTYPE_WORK_QUEUE, wq);
103 0 : return;
104 : }
105 :
106 : static int
107 628 : work_queue_schedule (struct work_queue *wq, unsigned int delay)
108 : {
109 : /* if appropriate, schedule work queue thread */
110 628 : if ( CHECK_FLAG (wq->flags, WQ_UNPLUGGED)
111 628 : && (wq->thread == NULL)
112 628 : && (listcount (wq->items) > 0) )
113 : {
114 628 : wq->thread = thread_add_background (wq->master, work_queue_run,
115 : wq, delay);
116 628 : return 1;
117 : }
118 : else
119 0 : return 0;
120 : }
121 :
122 : void
123 184 : work_queue_add (struct work_queue *wq, void *data)
124 : {
125 : struct work_queue_item *item;
126 :
127 184 : assert (wq);
128 :
129 184 : if (!(item = work_queue_item_new (wq)))
130 : {
131 0 : zlog_err ("%s: unable to get new queue item", __func__);
132 0 : return;
133 : }
134 :
135 184 : item->data = data;
136 184 : listnode_add (wq->items, item);
137 :
138 184 : work_queue_schedule (wq, wq->spec.hold);
139 :
140 184 : return;
141 : }
142 :
143 : static void
144 176 : work_queue_item_remove (struct work_queue *wq, struct listnode *ln)
145 : {
146 176 : struct work_queue_item *item = listgetdata (ln);
147 :
148 176 : assert (item && item->data);
149 :
150 : /* call private data deletion callback if needed */
151 176 : if (wq->spec.del_item_data)
152 0 : wq->spec.del_item_data (wq, item->data);
153 :
154 176 : list_delete_node (wq->items, ln);
155 176 : work_queue_item_free (item);
156 :
157 176 : return;
158 : }
159 :
160 : static void
161 444 : work_queue_item_requeue (struct work_queue *wq, struct listnode *ln)
162 : {
163 444 : LISTNODE_DETACH (wq->items, ln);
164 444 : LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */
165 444 : }
166 :
167 0 : DEFUN(show_work_queues,
168 : show_work_queues_cmd,
169 : "show work-queues",
170 : SHOW_STR
171 : "Work Queue information\n")
172 : {
173 : struct listnode *node;
174 : struct work_queue *wq;
175 :
176 0 : vty_out (vty,
177 : "%c %8s %5s %8s %21s%s",
178 : ' ', "List","(ms) ","Q. Runs","Cycle Counts ",
179 0 : VTY_NEWLINE);
180 0 : vty_out (vty,
181 : "%c %8s %5s %8s %7s %6s %6s %s%s",
182 : 'P',
183 : "Items",
184 : "Hold",
185 : "Total",
186 : "Best","Gran.","Avg.",
187 : "Name",
188 0 : VTY_NEWLINE);
189 :
190 0 : for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq))
191 : {
192 0 : vty_out (vty,"%c %8d %5d %8ld %7d %6d %6u %s%s",
193 0 : (CHECK_FLAG (wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
194 0 : listcount (wq->items),
195 : wq->spec.hold,
196 : wq->runs,
197 : wq->cycles.best, wq->cycles.granularity,
198 0 : (wq->runs) ?
199 0 : (unsigned int) (wq->cycles.total / wq->runs) : 0,
200 : wq->name,
201 0 : VTY_NEWLINE);
202 : }
203 :
204 0 : return CMD_SUCCESS;
205 : }
206 :
207 : /* 'plug' a queue: Stop it from being scheduled,
208 : * ie: prevent the queue from draining.
209 : */
210 : void
211 0 : work_queue_plug (struct work_queue *wq)
212 : {
213 0 : if (wq->thread)
214 0 : thread_cancel (wq->thread);
215 :
216 0 : wq->thread = NULL;
217 :
218 0 : UNSET_FLAG (wq->flags, WQ_UNPLUGGED);
219 0 : }
220 :
221 : /* unplug queue, schedule it again, if appropriate
222 : * Ie: Allow the queue to be drained again
223 : */
224 : void
225 0 : work_queue_unplug (struct work_queue *wq)
226 : {
227 0 : SET_FLAG (wq->flags, WQ_UNPLUGGED);
228 :
229 : /* if thread isnt already waiting, add one */
230 0 : work_queue_schedule (wq, wq->spec.hold);
231 0 : }
232 :
233 : /* timer thread to process a work queue
234 : * will reschedule itself if required,
235 : * otherwise work_queue_item_add
236 : */
237 : int
238 620 : work_queue_run (struct thread *thread)
239 : {
240 : struct work_queue *wq;
241 : struct work_queue_item *item;
242 : wq_item_status ret;
243 620 : unsigned int cycles = 0;
244 : struct listnode *node, *nnode;
245 620 : char yielded = 0;
246 :
247 620 : wq = THREAD_ARG (thread);
248 620 : wq->thread = NULL;
249 :
250 620 : assert (wq && wq->items);
251 :
252 : /* calculate cycle granularity:
253 : * list iteration == 1 cycle
254 : * granularity == # cycles between checks whether we should yield.
255 : *
256 : * granularity should be > 0, and can increase slowly after each run to
257 : * provide some hysteris, but not past cycles.best or 2*cycles.
258 : *
259 : * Best: starts low, can only increase
260 : *
261 : * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased
262 : * if we run to end of time slot, can increase otherwise
263 : * by a small factor.
264 : *
265 : * We could use just the average and save some work, however we want to be
266 : * able to adjust quickly to CPU pressure. Average wont shift much if
267 : * daemon has been running a long time.
268 : */
269 620 : if (wq->cycles.granularity == 0)
270 0 : wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
271 :
272 1240 : for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item))
273 : {
274 620 : assert (item && item->data);
275 :
276 : /* dont run items which are past their allowed retries */
277 620 : if (item->ran > wq->spec.max_retries)
278 : {
279 : /* run error handler, if any */
280 0 : if (wq->spec.errorfunc)
281 0 : wq->spec.errorfunc (wq, item->data);
282 0 : work_queue_item_remove (wq, node);
283 0 : continue;
284 : }
285 :
286 : /* run and take care of items that want to be retried immediately */
287 : do
288 : {
289 620 : ret = wq->spec.workfunc (wq, item->data);
290 620 : item->ran++;
291 : }
292 : while ((ret == WQ_RETRY_NOW)
293 620 : && (item->ran < wq->spec.max_retries));
294 :
295 620 : switch (ret)
296 : {
297 : case WQ_QUEUE_BLOCKED:
298 : {
299 : /* decrement item->ran again, cause this isn't an item
300 : * specific error, and fall through to WQ_RETRY_LATER
301 : */
302 0 : item->ran--;
303 : }
304 : case WQ_RETRY_LATER:
305 : {
306 0 : goto stats;
307 : }
308 : case WQ_REQUEUE:
309 : {
310 444 : item->ran--;
311 444 : work_queue_item_requeue (wq, node);
312 444 : break;
313 : }
314 : case WQ_RETRY_NOW:
315 : /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */
316 : case WQ_ERROR:
317 : {
318 0 : if (wq->spec.errorfunc)
319 0 : wq->spec.errorfunc (wq, item);
320 : }
321 : /* fall through here is deliberate */
322 : case WQ_SUCCESS:
323 : default:
324 : {
325 176 : work_queue_item_remove (wq, node);
326 176 : break;
327 : }
328 : }
329 :
330 : /* completed cycle */
331 620 : cycles++;
332 :
333 : /* test if we should yield */
334 620 : if ( !(cycles % wq->cycles.granularity)
335 620 : && thread_should_yield (thread))
336 : {
337 0 : yielded = 1;
338 0 : goto stats;
339 : }
340 : }
341 :
342 : stats:
343 :
344 : #define WQ_HYSTERESIS_FACTOR 4
345 :
346 : /* we yielded, check whether granularity should be reduced */
347 620 : if (yielded && (cycles < wq->cycles.granularity))
348 : {
349 0 : wq->cycles.granularity = ((cycles > 0) ? cycles
350 0 : : WORK_QUEUE_MIN_GRANULARITY);
351 : }
352 : /* otherwise, should granularity increase? */
353 620 : else if (cycles >= (wq->cycles.granularity))
354 : {
355 620 : if (cycles > wq->cycles.best)
356 45 : wq->cycles.best = cycles;
357 :
358 : /* along with yielded check, provides hysteresis for granularity */
359 1240 : if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR
360 620 : * WQ_HYSTERESIS_FACTOR))
361 0 : wq->cycles.granularity *= WQ_HYSTERESIS_FACTOR; /* quick ramp-up */
362 620 : else if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR))
363 0 : wq->cycles.granularity += WQ_HYSTERESIS_FACTOR;
364 : }
365 : #undef WQ_HYSTERIS_FACTOR
366 :
367 620 : wq->runs++;
368 620 : wq->cycles.total += cycles;
369 :
370 : #if 0
371 : printf ("%s: cycles %d, new: best %d, worst %d\n",
372 : __func__, cycles, wq->cycles.best, wq->cycles.granularity);
373 : #endif
374 :
375 : /* Is the queue done yet? If it is, call the completion callback. */
376 620 : if (listcount (wq->items) > 0)
377 444 : work_queue_schedule (wq, 0);
378 176 : else if (wq->spec.completion_func)
379 0 : wq->spec.completion_func (wq);
380 :
381 620 : return 0;
382 : }
|