kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
threadpool_internals.h
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 #ifndef THREADPOOL_INTERNALS_H
21 #define THREADPOOL_INTERNALS_H
22 
23 #include <pthread.h>
24 #include "threadpool.h"
25 
27 typedef enum {
28  STATUS_NONE, //> undefined status
29  STATUS_ASLEEP, //> thread is poll-sleeping to reduce CPU
30  STATUS_AWAKE, //> thread is active
31  STATUS_SHUTDOWN, //> thread has been notified about shutdown
32  STATUS_JOINED, //> thread has been pthread_join'd
34 
36 struct thread_info {
37  pthread_t t; //> thread
38  int parent_fd; //> alert pipe parent writes into
39  int child_fd; //> alert pipe child reads from
40  thread_status_t status; //> current worker thread status
41 };
42 
45  struct threadpool *t;
46  struct thread_info *ti;
47 };
48 
50 struct marked_task {
53  void *udata;
54  /* This mark is used to indicate which tasks can have the commit_head
55  * and release_head counters advanced past them.
56  *
57  * mark == (task_commit_head that points at tast): commit
58  * mark == ~(task_release_head that points at tast): release
59  *
60  * Any other value means it is in an intermediate state and should be
61  * retried later. */
62  size_t mark;
63 };
64 
66 struct threadpool {
67  /* reserve -> commit -> request -> release */
68  size_t task_reserve_head; //> reserved for write
69  size_t task_commit_head; //> done with write
70  size_t task_request_head; //> requested for read
71  size_t task_release_head; //> done processing task, can be overwritten
72 
73  struct marked_task *tasks; //> ring buffer for tasks
74 
75  /* Size and mask. These can be derived from task_ringbuf_size2,
76  * but are cached to reduce CPU. */
77  size_t task_ringbuf_size; //> size of ring buffer
78  size_t task_ringbuf_mask; //> mask to fit counter within ring buffer
79  uint8_t task_ringbuf_size2; //> log2 of size of ring buffer
80 
81  bool shutting_down; //> shutdown has been called
82  uint8_t live_threads; //> currently live threads
83  uint8_t max_threads; //> max number of threads to start
85 };
86 
87 /* Do an atomic compare-and-swap, changing *PTR from OLD to NEW. Returns
88  * true if the swap succeeded, false if it failed (generally because
89  * another thread updated the memory first). */
90 #define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW) \
91  (__sync_bool_compare_and_swap(PTR, OLD, NEW))
92 
93 /* Message sent to wake up a thread. The message contents are currently unimportant. */
94 #define NOTIFY_MSG "!"
95 #define NOTIFY_MSG_LEN 1
96 
97 /* Spin attempting to atomically adjust F by ADJ until successful. */
98 #define SPIN_ADJ(F, ADJ) \
99  do { \
100  for (;;) { \
101  size_t v = F; \
102  if (ATOMIC_BOOL_COMPARE_AND_SWAP(&F, v, v + ADJ)) { \
103  break; \
104  } \
105  } \
106  } while (0)
107 
108 #endif
size_t task_request_head
size_t task_release_head
thread_status_t status
size_t task_ringbuf_size
void( threadpool_task_cleanup_cb)(void *udata)
Callback to clean up a cancelled task's data, with the arbitrary user-supplied pointer that would hav...
Definition: threadpool.h:45
Thread_info, plus pointer back to main threadpool manager.
A task, with an additional mark.
struct thread_info * ti
struct marked_task * tasks
void( threadpool_task_cb)(void *udata)
Callback for a task, with an arbitrary user-supplied pointer.
Definition: threadpool.h:41
size_t task_reserve_head
uint8_t task_ringbuf_size2
Internal threadpool state.
Info retained by a thread while working.
threadpool_task_cb * task
struct thread_info * threads
struct threadpool * t
size_t task_ringbuf_mask
thread_status_t
Current status of a worker thread.
threadpool_task_cleanup_cb * cleanup