28 #include <sys/resource.h>
47 int log_level,
const char *msg,
void *udata);
58 void *old_value = NULL;
60 int completion_pipe = -1;
65 if (res == NULL) {
return false; }
79 if (config->
log_cb == NULL) {
89 uint8_t locks_initialized = 0;
94 struct yacht *fd_set = NULL;
96 bus *b = calloc(1,
sizeof(*b));
97 if (b == NULL) {
goto cleanup; }
108 if (0 != pthread_mutex_init(&b->
fd_set_lock, NULL)) {
117 "Initialized bus at %p", (
void*)b);
131 "Initialized listener %d at %p", i, (
void*)ls[i]);
142 joined = calloc(thread_count,
sizeof(
bool));
143 threads = calloc(thread_count,
sizeof(pthread_t));
144 if (joined == NULL || threads == NULL) {
149 if (fd_set == NULL) {
160 int pcres = pthread_create(&b->
threads[i], NULL,
181 if (joined) { free(joined); }
183 if (locks_initialized > 1) {
189 if (threads) { free(threads); }
190 if (fd_set) {
Yacht_Free(fd_set, NULL, NULL); }
197 if (-1 == getrlimit(RLIMIT_NOFILE, &info)) {
198 fprintf(stderr,
"getrlimit: %s", strerror(errno));
203 const unsigned int nval = 1024;
205 if (info.rlim_cur < nval && info.rlim_max > nval) {
206 info.rlim_cur = nval;
208 "Current FD resource limits, [%lu, %lu], changing to %u",
209 (
unsigned long)info.rlim_cur, (
unsigned long)info.rlim_max, nval);
210 if (-1 == setrlimit(RLIMIT_NOFILE, &info)) {
212 "Failed to increase FD resource limit to %u, %s",
213 nval, strerror(errno));
214 fprintf(stderr,
"getrlimit: %s", strerror(errno));
219 "Successfully increased FD resource limit to %u", nval);
223 "Current FD resource limits [%lu, %lu] are acceptable",
224 (
unsigned long)info.rlim_cur, (
unsigned long)info.rlim_max);
238 box = calloc(1,
sizeof(*box));
240 if (box == NULL) {
return NULL; }
243 "Allocated boxed message -- %p", (
void*)box);
246 assert(msg->
fd != 0);
249 if (0 != pthread_mutex_lock(&b->
fd_set_lock)) { assert(
false); }
257 if (0 != pthread_mutex_unlock(&b->
fd_set_lock)) { assert(
false); }
262 "socket isn't registered, failing -- %p", (
void*)box);
272 "rejecting request <fd:%d, seq_id:%lld> due to non-monotonic sequence ID, largest seen is %lld",
299 if (b == NULL || msg == NULL || msg->
fd == -1) {
309 "Sending request <fd:%d, seq_id:%lld>", msg->
fd, (
long long)msg->
seq_id);
312 "...request sent, result %d", res);
318 "Freeing box since request was rejected: %p", (
void *)box);
355 "registering socket %d", fd);
368 if (ci == NULL) {
goto cleanup; }
373 if (ssl == NULL) {
goto cleanup; }
378 *(
int *)&ci->
fd = fd;
385 void *old_value = NULL;
388 if (0 != pthread_mutex_lock(&b->
fd_set_lock)) { assert(
false); }
390 if (0 != pthread_mutex_unlock(&b->
fd_set_lock)) { assert(
false); }
393 assert(old_value == NULL);
400 int completion_pipe = -1;
403 if (!res) {
goto cleanup; }
407 if (!completed) {
goto cleanup; }
424 "forgetting socket %d", fd);
429 int completion_pipe = -1;
435 assert(completion_pipe != -1);
443 void *old_value = NULL;
445 if (0 != pthread_mutex_lock(&b->
fd_set_lock)) { assert(
false); }
447 if (0 != pthread_mutex_unlock(&b->
fd_set_lock)) { assert(
false); }
455 if (socket_udata_out) { *socket_udata_out = ci->
udata; }
473 struct bus *b = (
struct bus *)udata;
480 int completion_pipe = -1;
512 int completion_pipe = -1;
519 "Listener_Shutdown -- %d", i);
531 "Listener_Shutdown -- joining %d", i);
537 "Listener_Shutdown -- joined %d", i);
553 backpressure >>= shift;
555 if (backpressure > 0) {
557 "backpressure %zd", backpressure);
565 void *out_udata = box->
udata;
581 struct boxed_msg *box,
size_t *backpressure) {
592 "Scheduling boxed message -- %p -- where it will be freed", (
void*)box);
597 #define THREAD_SHUTDOWN_SECONDS 5
600 if (b == NULL) {
return; }
608 "Listener_Free -- %d", i);
614 for (
int i = 0; i <
limit; i++) {
616 "Threadpool_Shutdown -- %d", i);
620 if (i == limit - 1) {
622 "Threadpool_Shutdown -- %d (forced)", i);
637 int log_level,
const char *msg,
void *
udata) {
time_t timeout_sec
Message send timeout.
void Threadpool_Free(struct threadpool *t)
Free a threadpool.
static void free_connection_cb(void *value, void *udata)
bool Bus_Shutdown(bus *b)
Begin shutting the system down.
static int listener_id_of_socket(struct bus *b, int fd)
bool Bus_ProcessBoxedMessage(struct bus *b, struct boxed_msg *box, size_t *backpressure)
Deliver a boxed message to the thread pool to execute.
int64_t largest_wr_seq_id_seen
Set by client thread.
bool Bus_ReleaseSocket(struct bus *b, int fd, void **socket_udata_out)
Free metadata about a socket that has been disconnected.
bool Listener_RemoveSocket(struct listener *l, int fd, int *notify_fd)
bool BusSSL_Init(struct bus *b)
Initialize the SSL library internals for use by the messaging bus.
void BusSSL_CtxFree(struct bus *b)
Free all internal data for using SSL (the SSL_CTX).
bus_msg_result_t result
Result message, constructed in place after the request/response cycle has completed or failed due to ...
bus_unpack_cb * unpack_cb
threadpool_task_cb * task
struct yacht * Yacht_Init(uint8_t sz2)
Init a hash table with approx.
pthread_mutex_t fd_set_lock
static void noop_log_cb(log_event_t event, int log_level, const char *msg, void *udata)
static void box_execute_cb(void *udata)
bus_unexpected_msg_cb * unexpected_msg_cb
bool Bus_SendRequest(struct bus *b, bus_user_msg *msg)
Send a request.
static void box_cleanup_cb(void *udata)
struct listener * Listener_Init(struct bus *b, struct bus_config *cfg)
Initialize the listener.
void Bus_Free(bus *b)
Free internal data structures for the bus.
bool * joined
Which threads have joined.
SSL * ssl
SSL handle. Must be valid or BUS_NO_SSL.
bool Threadpool_Shutdown(struct threadpool *t, bool kill_all)
Notify the threadpool's threads that the system is going to shut down soon.
bus_unpack_cb * unpack_cb
Message unpacking callback.
struct listener ** listeners
Listener array.
void * ListenerTask_MainLoop(void *arg)
Listener's main loop – function pointer for pthread start function.
shutdown_state_t shutdown_state
Current shutdown state.
Internal threadpool state.
struct threadpool * threadpool
Thread pool.
bool BusSSL_Disconnect(struct bus *b, SSL *ssl)
Disconnect and free an individual SSL handle.
static bool attempt_to_increase_resource_limits(struct bus *b)
const char * Bus_LogEventStr(log_event_t event)
Get the string key for a log event ID.
bool Listener_AddSocket(struct listener *l, connection_info *ci, int *notify_fd)
Add/remove sockets' metadata from internal info.
void * udata
User data for callbacks.
uint8_t listener_count
Number of listeners.
bus_unexpected_msg_cb * unexpected_msg_cb
struct threadpool_config threadpool_cfg
bus_error_cb * error_cb
Error handling callback.
static void noop_error_cb(bus_unpack_cb_res_t result, void *socket_udata)
bool Yacht_Set(struct yacht *y, int key, void *value, void **old_value)
Set KEY to VALUE in the table.
void * udata
user connection data
void( bus_msg_cb)(bus_msg_result_t *res, void *udata)
int syscall_pthread_join(pthread_t thread, void **value_ptr)
Wrapper for pthread calls.
SSL * ssl
valid pointer or BUS_BOXED_MSG_NO_SSL
#define THREAD_SHUTDOWN_SECONDS
void Bus_BackpressureDelay(struct bus *b, size_t backpressure, uint8_t shift)
Provide backpressure by sleeping for (backpressure >> shift) msec, if the value is greater than 0...
bus_msg_cb * cb
Callback and userdata to which the bus_msg_result_t above will be sunk.
#define DEF_FD_SET_SIZE2
Starting size^2 for file descriptor hash table.
pthread_t * threads
Threads.
Per-socket connection context.
bool BusPoll_OnCompletion(struct bus *b, int fd)
Poll on fd until complete, return true on success or false on IO error.
struct threadpool * Threadpool_Init(struct threadpool_config *cfg)
Initialize a threadpool, according to a config.
#define BUS_LOG_SNPRINTF(B, LEVEL, EVENT_KEY, UDATA, MAX_SZ, FMT,...)
static boxed_msg * box_msg(struct bus *b, bus_user_msg *msg)
#define BUS_DEFAULT_TIMEOUT_SEC
bool Listener_Shutdown(struct listener *l, int *notify_fd)
Shut down the listener.
bool Yacht_Get(struct yacht *y, int key, void **value)
Get KEY from the table, setting *value if found.
struct thread_info * threads
bool Bus_Init(bus_config *config, struct bus_result *res)
Initialize a bus, based on configuration in *config.
bus_sink_cb * sink_cb
IO sink callback.
#define BUS_NO_SSL
Special "NO SSL" value, to distinguish from a NULL SSL handle.
void Listener_Free(struct listener *l)
Free the listener, which must already be shut down.
#define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW)
bool Bus_RegisterSocket(struct bus *b, bus_socket_t type, int fd, void *udata)
Register a socket connected to an endpoint, and data that will be passed to all interactions on that ...
bool Yacht_Remove(struct yacht *y, int key, void **old_value)
Remove KEY from the table.
struct yacht * fd_set
Locked hash table for fd -> connection_info.
SSL * BusSSL_Connect(struct bus *b, int fd)
Do an SSL / TLS shake for a connection.
static void set_defaults(bus_config *cfg)
bool Send_DoBlockingSend(bus *b, boxed_msg *box)
Do a blocking send.
struct listener * Bus_GetListenerForSocket(struct bus *b, int fd)
For a given file descriptor, get the listener ID to use.
int fd
Destination filename and message body.
void Yacht_Free(struct yacht *y, Yacht_Free_cb *cb, void *udata)
Free the table.
bus_log_cb * log_cb
Logging callback.
int syscall_poll(struct pollfd fds[], nfds_t nfds, int timeout)
Wrappers for syscalls, to allow mocking for testing.
bool Threadpool_Schedule(struct threadpool *t, struct threadpool_task *task, size_t *pushback)
Schedule a task in the threadpool.
#define BUS_LOG(B, LEVEL, EVENT_KEY, MSG, UDATA)