kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
test_threadpool.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 #include <stdlib.h>
21 #include <stdio.h>
22 #include <unistd.h>
23 #include <assert.h>
24 #include <err.h>
25 #include <poll.h>
26 
27 #include "threadpool.h"
28 
29 static size_t task_count = 0;
30 
31 static size_t fibs(size_t arg) {
32  if (arg < 2) { return 1; }
33  return fibs(arg - 1) + fibs(arg - 2);
34 }
35 
36 static void dump_stats(const char *prefix, struct threadpool_info *stats) {
37  printf("%s(at %d, dt %d, bl %zd)\n",
38  prefix, stats->active_threads, stats->dormant_threads,
39  stats->backlog_size);
40 }
41 
42 #define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW) \
43  (__sync_bool_compare_and_swap(PTR, OLD, NEW))
44 
45 static void task_cb(void *udata) {
46  struct threadpool *t = (struct threadpool *)udata;
47 
48  for (;;) {
50  break;
51  }
52  }
53 
54  size_t arg = task_count;
55  arg = 30 + (random() % 10);
56 
57  size_t tc = task_count;
58 
59  size_t res = fibs(arg);
60 
61  printf("%zd -- fibs(%zd) => %zd", tc, arg, res);
62 
63  struct threadpool_info stats;
64  Threadpool_Stats(t, &stats);
65  dump_stats("", &stats);
66 }
67 
68 static void inf_loop_cb(void *env) {
69  (void)env;
70  for (;;) {
71  sleep(1);
72  }
73 }
74 
75 int main(int argc, char **argv) {
76  uint8_t sz2 = 8;
77  uint8_t max_threads = 8;
78 
79  char *sz2_env = getenv("SZ2");
80  char *max_threads_env = getenv("MAX_THREADS");
81  if (sz2_env) { sz2 = atoi(sz2_env); }
82  if (max_threads_env) { max_threads = atoi(max_threads_env); }
83 
84  struct threadpool_config cfg = {
85  .task_ringbuf_size2 = sz2,
86  .max_threads = max_threads,
87  };
88  struct threadpool *t = Threadpool_Init(&cfg);
89  assert(t);
90 
91  struct threadpool_task task = {
92  .task = task_cb, .udata = t,
93  };
94 
95  struct threadpool_info stats;
96 
97  for (int i = 0; i < 4; i++) {
98  printf("scheduling...\n");
99  for (int j = 0; j < 40; j++) {
100  for (;;) {
101  size_t counterpressure = 0;
102  if (Threadpool_Schedule(t, &task, &counterpressure)) {
103  break;
104  } else {
105  size_t msec = 10 * 1000 * counterpressure;
106  usleep(msec);
107  }
108  }
109  }
110 
111  Threadpool_Stats(t, &stats);
112  dump_stats("sleeping...", &stats);
113  sleep(1);
114  Threadpool_Stats(t, &stats);
115  dump_stats("waking...", &stats);
116  }
117 
118  while (task_count < 4 * 40) {
119  sleep(1);
120  }
121 
122  task.task = inf_loop_cb;
123  size_t counterpressure = 0;
124  while (!Threadpool_Schedule(t, &task, &counterpressure)) {
125  usleep(10 * 1000);
126  }
127 
128  sleep(1);
129 
130  const int THREAD_SHUTDOWN_SECONDS = 5;
131  printf("shutting down...\n");
132 
133  int limit = (1000 * THREAD_SHUTDOWN_SECONDS)/10;
134  for (int i = 0; i < limit; i++) {
135  if (Threadpool_Shutdown(t, false)) { break; }
136  (void)poll(NULL, 0, 10);
137 
138  if (i == limit - 1) {
139  printf("cancelling thread in intentional infinite loop\n");
140  Threadpool_Shutdown(t, true);
141  }
142  }
143  Threadpool_Free(t);
144 
145  return 0;
146 }
void Threadpool_Free(struct threadpool *t)
Free a threadpool.
Definition: threadpool.c:200
static size_t limit
size_t backlog_size
Definition: threadpool.h:58
threadpool_task_cb * task
Definition: threadpool.h:49
Configuration for thread pool.
Definition: threadpool.h:34
static void dump_stats(const char *prefix, struct threadpool_info *stats)
bool Threadpool_Shutdown(struct threadpool *t, bool kill_all)
Notify the threadpool's threads that the system is going to shut down soon.
Definition: threadpool.c:163
Internal threadpool state.
uint8_t active_threads
Definition: threadpool.h:56
static size_t fibs(size_t arg)
uint8_t task_ringbuf_size2
Definition: threadpool.h:35
#define THREAD_SHUTDOWN_SECONDS
Definition: bus.c:597
static void inf_loop_cb(void *env)
struct threadpool * Threadpool_Init(struct threadpool_config *cfg)
Initialize a threadpool, according to a config.
Definition: threadpool.c:52
Statistics about the current state of the threadpool.
Definition: threadpool.h:55
A task.
Definition: threadpool.h:48
int main(int argc, char **argv)
static void task_cb(void *udata)
#define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW)
static size_t task_count
void Threadpool_Stats(struct threadpool *t, struct threadpool_info *info)
If TI is non-NULL, fill out some statistics about the operating state of the thread pool...
Definition: threadpool.c:149
uint8_t dormant_threads
Definition: threadpool.h:57
uint8_t max_threads
Definition: threadpool.h:37
bool Threadpool_Schedule(struct threadpool *t, struct threadpool_task *task, size_t *pushback)
Schedule a task in the threadpool.
Definition: threadpool.c:99