32 #define DEFAULT_MAX_DELAY 10000
33 #define INFINITE_DELAY -1
34 #define DEFAULT_TASK_RINGBUF_SIZE2 8
35 #define DEFAULT_MAX_THREADS 8
64 t = malloc(
sizeof(*t));
65 if (t == NULL) {
goto cleanup; }
70 tasks = malloc(tasks_sz);
71 if (tasks == NULL) {
goto cleanup; }
73 threads = malloc(threads_sz);
74 if (threads == NULL) {
goto cleanup; }
76 memset(t, 0,
sizeof(*t));
77 memset(threads, 0, threads_sz);
82 memset(tasks, 0xFF, tasks_sz);
94 if (tasks) { free(tasks); }
95 if (threads) { free(threads); }
101 if (t == NULL) {
return false; }
102 if (task == NULL || task->
task == NULL) {
return false; }
109 size_t mask = queue_size;
115 if (wh - rh >= queue_size - 1) {
116 if (pushback) { *pushback = wh - rh; }
129 if (pushback) { *pushback = wh - rh; }
140 task = &t->
tasks[ch & mask];
141 if (ch != task->
mark) {
break; }
142 assert(ch < t->task_reserve_head);
172 int pcres = pthread_cancel(ti->
t);
176 assert(pcres == ESRCH);
216 }
else if (res == -1) {
242 int joinres = pthread_join(ti->
t, &v);
247 fprintf(stderr,
"pthread_join: %d\n", joinres);
248 assert(joinres == ESRCH);
264 if (tc == NULL) {
return false; }
267 if (0 != pipe(pipe_fds)) {
268 printf(
"pipe(2) failure\n");
278 int res = pthread_create(&ti->
t, NULL,
thread_task, tc);
282 }
else if (res == EAGAIN) {
298 struct pollfd pfd[1] = { { .fd=ti->
child_fd, .events=POLLIN }, };
306 int res = poll(pfd, 1, -1);
308 if (pfd[0].revents & (POLLHUP | POLLERR | POLLNVAL)) {
313 }
else if (pfd[0].revents & POLLIN) {
316 ssize_t rres = read(ti->
child_fd, read_buf,
sizeof(read_buf));
332 assert(ptask->
mark == rh);
337 .udata = ptask->
udata,
358 task = &t->
tasks[relh & mask];
359 if (task->
mark != ~relh) {
break; }
361 assert(relh < t->task_commit_head);
void Threadpool_Free(struct threadpool *t)
Free a threadpool.
Thread_info, plus pointer back to main threadpool manager.
threadpool_task_cb * task
#define THREADPOOL_MAX_RINGBUF_SIZE2
Configuration for thread pool.
A task, with an additional mark.
struct marked_task * tasks
static uint8_t read_buf[(2 *1024L *1024)]
uint8_t task_ringbuf_size2
bool Threadpool_Shutdown(struct threadpool *t, bool kill_all)
Notify the threadpool's threads that the system is going to shut down soon.
static bool notify_shutdown(struct threadpool *t)
Internal threadpool state.
Info retained by a thread while working.
#define DEFAULT_TASK_RINGBUF_SIZE2
uint8_t task_ringbuf_size2
static void * thread_task(void *thread_info)
static bool spawn(struct threadpool *t)
struct threadpool * Threadpool_Init(struct threadpool_config *cfg)
Initialize a threadpool, according to a config.
Statistics about the current state of the threadpool.
threadpool_task_cb * task
struct thread_info * threads
static void release_current_task(struct threadpool *t, struct marked_task *task, size_t rh)
static void set_defaults(struct threadpool_config *cfg)
static void notify_new_task(struct threadpool *t)
#define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW)
threadpool_task_cleanup_cb * cleanup
void Threadpool_Stats(struct threadpool *t, struct threadpool_info *info)
If TI is non-NULL, fill out some statistics about the operating state of the thread pool...
#define DEFAULT_MAX_THREADS
static void commit_current_task(struct threadpool *t, struct marked_task *task, size_t wh)
bool Threadpool_Schedule(struct threadpool *t, struct threadpool_task *task, size_t *pushback)
Schedule a task in the threadpool.
threadpool_task_cleanup_cb * cleanup