50 memset(&si->header, 0x00,
sizeof(si->header));
64 uint8_t versionPrefix = buf_header->versionPrefix;
70 .versionPrefix = versionPrefix,
71 .protobufLength = protobufLength,
72 .valueLength = valueLength,
81 size_t read_size,
void *socket_udata)
83 KineticSession * session = (KineticSession*)socket_udata;
85 socket_info *si = session->si;
96 memcpy(&si->buf[si->accumulated], read_buf, read_size);
97 si->accumulated += read_size;
101 if (remaining == 0) {
108 .
next_read = si->header.protobufLength + si->header.valueLength,
117 .full_msg_buffer = si,
133 memcpy(&si->buf[si->accumulated], read_buf, read_size);
134 si->accumulated += read_size;
136 uint32_t remaining = si->header.protobufLength + si->header.valueLength - si->accumulated;
138 if (remaining == 0) {
145 .full_msg_buffer = si,
162 #if KINETIC_LOGGER_LOG_SEQUENCE_ID
164 gettimeofday(&tv, NULL);
165 LOGF2(
"SEQ_ID response fd %d seq_id %lld %08lld.%08d",
166 fd, (
long long)seq_id,
167 (
long)tv.tv_sec, (
long)tv.tv_usec);
174 KineticSession * session = (KineticSession*)socket_udata;
178 socket_info *si = (socket_info *)msg;
184 .u.error.opaque_error_id = si->unpack_status,
190 if (response == NULL) {
197 response->
header = si->header;
200 if (response->
proto->has_commandbytes &&
201 response->
proto->commandbytes.data != NULL &&
202 response->
proto->commandbytes.len > 0)
205 response->
proto->commandbytes.len, response->
proto->commandbytes.data);
210 if (response->
header.valueLength > 0)
212 memcpy(response->
value, &si->buf[si->header.protobufLength], si->header.valueLength);
216 if (response->
command != NULL &&
217 response->
command->header != NULL)
219 if (response->
proto->has_authtype &&
226 seq_id = response->
command->header->acksequence;
248 .log_level = (log_level <= 1) ? 0 : log_level,
259 memset(&res, 0,
sizeof(res));
264 client->bus = res.
bus;
uint8_t maxThreadpoolThreads
Max number of threads to use for the threadpool that handles response callbacks.
int64_t KineticSession_GetConnectionID(KineticSession const *const session)
bool KineticBus_Init(KineticClient *client, KineticClientConfig *config)
int logLevel
Logging level (-1:none, 0:error, 1:info, 2:verbose, 3:full)
bool Bus_Shutdown(bus *b)
Begin shutting the system down.
static bus_sink_cb_res_t sink_cb(uint8_t *read_buf, size_t read_size, void *socket_udata)
static bus_sink_cb_res_t reset_transfer(socket_info *si)
Com__Seagate__Kinetic__Proto__Message * proto
static uint8_t read_buf[(2 *1024L *1024)]
void Bus_Free(bus *b)
Free internal data structures for the bus.
KineticResponse * KineticAllocator_NewKineticResponse(size_t const valueLength)
static void log_response_seq_id(int fd, int64_t seq_id)
const char * Bus_LogEventStr(log_event_t event)
Get the string key for a log event ID.
Com__Seagate__Kinetic__Proto__Message * KineticPDU_unpack_message(ProtobufCAllocator *allocator, size_t len, const uint8_t *data)
void KineticBus_Shutdown(KineticClient *const client)
#define KINETIC_ASSERT(cond)
static bool unpack_header(uint8_t const *const read_buf, size_t const read_size, KineticPDUHeader *const header)
uint8_t readerThreads
Number of threads used for handling incoming responses and status messages.
Com__Seagate__Kinetic__Proto__Command * command
#define LOGF0(message,...)
bool Bus_Init(bus_config *config, struct bus_result *res)
Initialize a bus, based on configuration in *config.
static bus_unpack_cb_res_t unpack_cb(void *msg, void *socket_udata)
static void log_cb(log_event_t event, int log_level, const char *msg, void *udata)
#define PDU_PROTO_MAX_LEN
void KineticLogger_LogPrintf(int log_level, const char *format,...)
static void unexpected_msg_cb(void *msg, int64_t seq_id, void *bus_udata, void *socket_udata)
Configuration values for the KineticClient connection.
uint32_t KineticNBO_ToHostU32(uint32_t valueNBO)
void KineticController_HandleUnexpectedResponse(void *msg, int64_t seq_id, void *bus_udata, void *socket_udata)
#define LOGF2(message,...)
Com__Seagate__Kinetic__Proto__Command * KineticPDU_unpack_command(ProtobufCAllocator *allocator, size_t len, const uint8_t *data)