LCOV - code coverage report
Current view: top level - lib - workqueue.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 78 133 58.6 %
Date: 2015-11-19 Functions: 8 12 66.7 %

          Line data    Source code
       1             : /* 
       2             :  * Quagga Work Queue Support.
       3             :  *
       4             :  * Copyright (C) 2005 Sun Microsystems, Inc.
       5             :  *
       6             :  * This file is part of GNU Zebra.
       7             :  *
       8             :  * Quagga is free software; you can redistribute it and/or modify it
       9             :  * under the terms of the GNU General Public License as published by the
      10             :  * Free Software Foundation; either version 2, or (at your option) any
      11             :  * later version.
      12             :  *
      13             :  * Quagga is distributed in the hope that it will be useful, but
      14             :  * WITHOUT ANY WARRANTY; without even the implied warranty of
      15             :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      16             :  * General Public License for more details.
      17             :  *
      18             :  * You should have received a copy of the GNU General Public License
      19             :  * along with Quagga; see the file COPYING.  If not, write to the Free
      20             :  * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
      21             :  * 02111-1307, USA.  
      22             :  */
      23             : 
      24             : #include <lib/zebra.h>
      25             : #include "thread.h"
      26             : #include "memory.h"
      27             : #include "workqueue.h"
      28             : #include "linklist.h"
      29             : #include "command.h"
      30             : #include "log.h"
      31             : 
      32             : /* master list of work_queues */
      33             : static struct list work_queues;
      34             : 
      35             : #define WORK_QUEUE_MIN_GRANULARITY 1
      36             : 
      37             : static struct work_queue_item *
      38         184 : work_queue_item_new (struct work_queue *wq)
      39             : {
      40             :   struct work_queue_item *item;
      41         184 :   assert (wq);
      42             : 
      43         184 :   item = XCALLOC (MTYPE_WORK_QUEUE_ITEM, 
      44             :                   sizeof (struct work_queue_item));
      45             :   
      46         184 :   return item;
      47             : }
      48             : 
      49             : static void
      50         176 : work_queue_item_free (struct work_queue_item *item)
      51             : {
      52         176 :   XFREE (MTYPE_WORK_QUEUE_ITEM, item);
      53         176 :   return;
      54             : }
      55             : 
      56             : /* create new work queue */
      57             : struct work_queue *
      58          45 : work_queue_new (struct thread_master *m, const char *queue_name)
      59             : {
      60             :   struct work_queue *new;
      61             :   
      62          45 :   new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue));
      63             : 
      64          45 :   if (new == NULL)
      65           0 :     return new;
      66             :   
      67          45 :   new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
      68          45 :   new->master = m;
      69          45 :   SET_FLAG (new->flags, WQ_UNPLUGGED);
      70             :   
      71          45 :   if ( (new->items = list_new ()) == NULL)
      72             :     {
      73           0 :       XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
      74           0 :       XFREE (MTYPE_WORK_QUEUE, new);
      75             :       
      76           0 :       return NULL;
      77             :     }
      78             :   
      79          45 :   new->items->del = (void (*)(void *)) work_queue_item_free;  
      80             :   
      81          45 :   listnode_add (&work_queues, new);
      82             :   
      83          45 :   new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
      84             : 
      85             :   /* Default values, can be overriden by caller */
      86          45 :   new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
      87             :     
      88          45 :   return new;
      89             : }
      90             : 
      91             : void
      92           0 : work_queue_free (struct work_queue *wq)
      93             : {
      94           0 :   if (wq->thread != NULL)
      95           0 :     thread_cancel(wq->thread);
      96             :   
      97             :   /* list_delete frees items via callback */
      98           0 :   list_delete (wq->items);
      99           0 :   listnode_delete (&work_queues, wq);
     100             :   
     101           0 :   XFREE (MTYPE_WORK_QUEUE_NAME, wq->name);
     102           0 :   XFREE (MTYPE_WORK_QUEUE, wq);
     103           0 :   return;
     104             : }
     105             : 
     106             : static int
     107         628 : work_queue_schedule (struct work_queue *wq, unsigned int delay)
     108             : {
     109             :   /* if appropriate, schedule work queue thread */
     110         628 :   if ( CHECK_FLAG (wq->flags, WQ_UNPLUGGED)
     111         628 :        && (wq->thread == NULL)
     112         628 :        && (listcount (wq->items) > 0) )
     113             :     {
     114         628 :       wq->thread = thread_add_background (wq->master, work_queue_run, 
     115             :                                           wq, delay);
     116         628 :       return 1;
     117             :     }
     118             :   else
     119           0 :     return 0;
     120             : }
     121             :   
     122             : void
     123         184 : work_queue_add (struct work_queue *wq, void *data)
     124             : {
     125             :   struct work_queue_item *item;
     126             :   
     127         184 :   assert (wq);
     128             : 
     129         184 :   if (!(item = work_queue_item_new (wq)))
     130             :     {
     131           0 :       zlog_err ("%s: unable to get new queue item", __func__);
     132           0 :       return;
     133             :     }
     134             :   
     135         184 :   item->data = data;
     136         184 :   listnode_add (wq->items, item);
     137             :   
     138         184 :   work_queue_schedule (wq, wq->spec.hold);
     139             :   
     140         184 :   return;
     141             : }
     142             : 
     143             : static void
     144         176 : work_queue_item_remove (struct work_queue *wq, struct listnode *ln)
     145             : {
     146         176 :   struct work_queue_item *item = listgetdata (ln);
     147             : 
     148         176 :   assert (item && item->data);
     149             : 
     150             :   /* call private data deletion callback if needed */  
     151         176 :   if (wq->spec.del_item_data)
     152           0 :     wq->spec.del_item_data (wq, item->data);
     153             : 
     154         176 :   list_delete_node (wq->items, ln);
     155         176 :   work_queue_item_free (item);
     156             :   
     157         176 :   return;
     158             : }
     159             : 
     160             : static void
     161         444 : work_queue_item_requeue (struct work_queue *wq, struct listnode *ln)
     162             : {
     163         444 :   LISTNODE_DETACH (wq->items, ln);
     164         444 :   LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */
     165         444 : }
     166             : 
     167           0 : DEFUN(show_work_queues,
     168             :       show_work_queues_cmd,
     169             :       "show work-queues",
     170             :       SHOW_STR
     171             :       "Work Queue information\n")
     172             : {
     173             :   struct listnode *node;
     174             :   struct work_queue *wq;
     175             :   
     176           0 :   vty_out (vty, 
     177             :            "%c %8s %5s %8s %21s%s",
     178             :            ' ', "List","(ms) ","Q. Runs","Cycle Counts   ",
     179           0 :            VTY_NEWLINE);
     180           0 :   vty_out (vty,
     181             :            "%c %8s %5s %8s %7s %6s %6s %s%s",
     182             :            'P',
     183             :            "Items",
     184             :            "Hold",
     185             :            "Total",
     186             :            "Best","Gran.","Avg.", 
     187             :            "Name", 
     188           0 :            VTY_NEWLINE);
     189             :  
     190           0 :   for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq))
     191             :     {
     192           0 :       vty_out (vty,"%c %8d %5d %8ld %7d %6d %6u %s%s",
     193           0 :                (CHECK_FLAG (wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
     194           0 :                listcount (wq->items),
     195             :                wq->spec.hold,
     196             :                wq->runs,
     197             :                wq->cycles.best, wq->cycles.granularity,
     198           0 :                  (wq->runs) ? 
     199           0 :                    (unsigned int) (wq->cycles.total / wq->runs) : 0,
     200             :                wq->name,
     201           0 :                VTY_NEWLINE);
     202             :     }
     203             :     
     204           0 :   return CMD_SUCCESS;
     205             : }
     206             : 
     207             : /* 'plug' a queue: Stop it from being scheduled,
     208             :  * ie: prevent the queue from draining.
     209             :  */
     210             : void
     211           0 : work_queue_plug (struct work_queue *wq)
     212             : {
     213           0 :   if (wq->thread)
     214           0 :     thread_cancel (wq->thread);
     215             :   
     216           0 :   wq->thread = NULL;
     217             :   
     218           0 :   UNSET_FLAG (wq->flags, WQ_UNPLUGGED);
     219           0 : }
     220             : 
     221             : /* unplug queue, schedule it again, if appropriate
     222             :  * Ie: Allow the queue to be drained again
     223             :  */
     224             : void
     225           0 : work_queue_unplug (struct work_queue *wq)
     226             : {
     227           0 :   SET_FLAG (wq->flags, WQ_UNPLUGGED);
     228             : 
     229             :   /* if thread isnt already waiting, add one */
     230           0 :   work_queue_schedule (wq, wq->spec.hold);
     231           0 : }
     232             : 
     233             : /* timer thread to process a work queue
     234             :  * will reschedule itself if required,
     235             :  * otherwise work_queue_item_add 
     236             :  */
     237             : int
     238         620 : work_queue_run (struct thread *thread)
     239             : {
     240             :   struct work_queue *wq;
     241             :   struct work_queue_item *item;
     242             :   wq_item_status ret;
     243         620 :   unsigned int cycles = 0;
     244             :   struct listnode *node, *nnode;
     245         620 :   char yielded = 0;
     246             : 
     247         620 :   wq = THREAD_ARG (thread);
     248         620 :   wq->thread = NULL;
     249             : 
     250         620 :   assert (wq && wq->items);
     251             : 
     252             :   /* calculate cycle granularity:
     253             :    * list iteration == 1 cycle
     254             :    * granularity == # cycles between checks whether we should yield.
     255             :    *
     256             :    * granularity should be > 0, and can increase slowly after each run to
     257             :    * provide some hysteris, but not past cycles.best or 2*cycles.
     258             :    *
     259             :    * Best: starts low, can only increase
     260             :    *
     261             :    * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased 
     262             :    *              if we run to end of time slot, can increase otherwise 
     263             :    *              by a small factor.
     264             :    *
     265             :    * We could use just the average and save some work, however we want to be
     266             :    * able to adjust quickly to CPU pressure. Average wont shift much if
     267             :    * daemon has been running a long time.
     268             :    */
     269         620 :    if (wq->cycles.granularity == 0)
     270           0 :      wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
     271             : 
     272        1240 :   for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item))
     273             :   {
     274         620 :     assert (item && item->data);
     275             :     
     276             :     /* dont run items which are past their allowed retries */
     277         620 :     if (item->ran > wq->spec.max_retries)
     278             :       {
     279             :         /* run error handler, if any */
     280           0 :         if (wq->spec.errorfunc)
     281           0 :           wq->spec.errorfunc (wq, item->data);
     282           0 :         work_queue_item_remove (wq, node);
     283           0 :         continue;
     284             :       }
     285             : 
     286             :     /* run and take care of items that want to be retried immediately */
     287             :     do
     288             :       {
     289         620 :         ret = wq->spec.workfunc (wq, item->data);
     290         620 :         item->ran++;
     291             :       }
     292             :     while ((ret == WQ_RETRY_NOW) 
     293         620 :            && (item->ran < wq->spec.max_retries));
     294             : 
     295         620 :     switch (ret)
     296             :       {
     297             :       case WQ_QUEUE_BLOCKED:
     298             :         {
     299             :           /* decrement item->ran again, cause this isn't an item
     300             :            * specific error, and fall through to WQ_RETRY_LATER
     301             :            */
     302           0 :           item->ran--;
     303             :         }
     304             :       case WQ_RETRY_LATER:
     305             :         {
     306           0 :           goto stats;
     307             :         }
     308             :       case WQ_REQUEUE:
     309             :         {
     310         444 :           item->ran--;
     311         444 :           work_queue_item_requeue (wq, node);
     312         444 :           break;
     313             :         }
     314             :       case WQ_RETRY_NOW:
     315             :         /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */
     316             :       case WQ_ERROR:
     317             :         {
     318           0 :           if (wq->spec.errorfunc)
     319           0 :             wq->spec.errorfunc (wq, item);
     320             :         }
     321             :         /* fall through here is deliberate */
     322             :       case WQ_SUCCESS:
     323             :       default:
     324             :         {
     325         176 :           work_queue_item_remove (wq, node);
     326         176 :           break;
     327             :         }
     328             :       }
     329             : 
     330             :     /* completed cycle */
     331         620 :     cycles++;
     332             : 
     333             :     /* test if we should yield */
     334         620 :     if ( !(cycles % wq->cycles.granularity) 
     335         620 :         && thread_should_yield (thread))
     336             :       {
     337           0 :         yielded = 1;
     338           0 :         goto stats;
     339             :       }
     340             :   }
     341             : 
     342             : stats:
     343             : 
     344             : #define WQ_HYSTERESIS_FACTOR 4
     345             : 
     346             :   /* we yielded, check whether granularity should be reduced */
     347         620 :   if (yielded && (cycles < wq->cycles.granularity))
     348             :     {
     349           0 :       wq->cycles.granularity = ((cycles > 0) ? cycles 
     350           0 :                                              : WORK_QUEUE_MIN_GRANULARITY);
     351             :     }
     352             :   /* otherwise, should granularity increase? */
     353         620 :   else if (cycles >= (wq->cycles.granularity))
     354             :     {
     355         620 :       if (cycles > wq->cycles.best)
     356          45 :         wq->cycles.best = cycles;
     357             :       
     358             :       /* along with yielded check, provides hysteresis for granularity */
     359        1240 :       if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR
     360         620 :                                            * WQ_HYSTERESIS_FACTOR))
     361           0 :         wq->cycles.granularity *= WQ_HYSTERESIS_FACTOR; /* quick ramp-up */
     362         620 :       else if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR))
     363           0 :         wq->cycles.granularity += WQ_HYSTERESIS_FACTOR;
     364             :     }
     365             : #undef WQ_HYSTERIS_FACTOR
     366             :   
     367         620 :   wq->runs++;
     368         620 :   wq->cycles.total += cycles;
     369             : 
     370             : #if 0
     371             :   printf ("%s: cycles %d, new: best %d, worst %d\n",
     372             :             __func__, cycles, wq->cycles.best, wq->cycles.granularity);
     373             : #endif
     374             :   
     375             :   /* Is the queue done yet? If it is, call the completion callback. */
     376         620 :   if (listcount (wq->items) > 0)
     377         444 :     work_queue_schedule (wq, 0);
     378         176 :   else if (wq->spec.completion_func)
     379           0 :     wq->spec.completion_func (wq);
     380             :   
     381         620 :   return 0;
     382             : }

Generated by: LCOV version 1.10