kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
kinetic_bus.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 
21 #include "kinetic_bus.h"
22 #include "kinetic_nbo.h"
23 #include "kinetic_session.h"
24 #include "kinetic_socket.h"
25 #include "kinetic_hmac.h"
26 #include "kinetic_logger.h"
27 #include "kinetic.pb-c.h"
28 #include "kinetic_nbo.h"
29 #include "kinetic_allocator.h"
30 #include "kinetic_controller.h"
31 #include "bus.h"
32 #include "kinetic_pdu_unpack.h"
33 
34 #include <time.h>
35 
36 STATIC void log_cb(log_event_t event, int log_level, const char *msg, void *udata) {
37  (void)udata;
38  const char *event_str = Bus_LogEventStr(event);
39  KineticLogger_LogPrintf(log_level, "%s[%d] %s", event_str, log_level, msg);
40 }
41 
42 static bus_sink_cb_res_t reset_transfer(socket_info *si) {
43  bus_sink_cb_res_t res = { /* prime pump with header size */
44  .next_read = sizeof(KineticPDUHeader),
45  };
46 
47  si->state = STATE_AWAITING_HEADER;
48  si->accumulated = 0;
49  si->unpack_status = UNPACK_ERROR_UNDEFINED;
50  memset(&si->header, 0x00, sizeof(si->header));
51  return res;
52 }
53 
54 STATIC bool unpack_header(uint8_t const * const read_buf, size_t const read_size, KineticPDUHeader * const header)
55 {
56  if (read_size != sizeof(KineticPDUHeader)) {
57  return false;
58  // TODO this will fail if we don't get all of the header bytes in one read
59  // we should fix this
60  }
61  KineticPDUHeader const * const buf_header = (KineticPDUHeader const * const)read_buf;
62  uint32_t protobufLength = KineticNBO_ToHostU32(buf_header->protobufLength);
63  uint32_t valueLength = KineticNBO_ToHostU32(buf_header->valueLength);
64  uint8_t versionPrefix = buf_header->versionPrefix;
65 
66  if (protobufLength <= PDU_PROTO_MAX_LEN &&
67  valueLength <= PDU_PROTO_MAX_LEN)
68  {
69  *header = (KineticPDUHeader){
70  .versionPrefix = versionPrefix,
71  .protobufLength = protobufLength,
72  .valueLength = valueLength,
73  };
74  return true;
75  } else {
76  return false;
77  }
78 }
79 
81  size_t read_size, void *socket_udata)
82 {
83  KineticSession * session = (KineticSession*)socket_udata;
84  KINETIC_ASSERT(session);
85  socket_info *si = session->si;
86  KINETIC_ASSERT(si);
87 
88  switch (si->state) {
89  case STATE_UNINIT:
90  {
91  KINETIC_ASSERT(read_size == 0);
92  return reset_transfer(si);
93  }
95  {
96  memcpy(&si->buf[si->accumulated], read_buf, read_size);
97  si->accumulated += read_size;
98 
99  uint32_t remaining = PDU_HEADER_LEN - si->accumulated;
100 
101  if (remaining == 0) {
102  if (unpack_header(&si->buf[0], PDU_HEADER_LEN, &si->header))
103  {
104  si->accumulated = 0;
105  si->unpack_status = UNPACK_ERROR_SUCCESS;
106  si->state = STATE_AWAITING_BODY;
107  bus_sink_cb_res_t res = {
108  .next_read = si->header.protobufLength + si->header.valueLength,
109  };
110  return res;
111  } else {
112  si->accumulated = 0;
113  si->unpack_status = UNPACK_ERROR_INVALID_HEADER;
114  si->state = STATE_AWAITING_HEADER;
115  bus_sink_cb_res_t res = {
116  .next_read = sizeof(KineticPDUHeader),
117  .full_msg_buffer = si,
118  };
119  return res;
120  }
121  }
122  else
123  {
124  bus_sink_cb_res_t res = {
125  .next_read = remaining,
126  };
127  return res;
128  }
129  break;
130  }
131  case STATE_AWAITING_BODY:
132  {
133  memcpy(&si->buf[si->accumulated], read_buf, read_size);
134  si->accumulated += read_size;
135 
136  uint32_t remaining = si->header.protobufLength + si->header.valueLength - si->accumulated;
137 
138  if (remaining == 0) {
139  si->state = STATE_AWAITING_HEADER;
140  si->accumulated = 0;
141  bus_sink_cb_res_t res = {
142  .next_read = sizeof(KineticPDUHeader),
143  // returning the whole si, because we need access to the pdu header as well
144  // as the protobuf and value bytes
145  .full_msg_buffer = si,
146  };
147  return res;
148  } else {
149  bus_sink_cb_res_t res = {
150  .next_read = remaining,
151  };
152  return res;
153  }
154  break;
155  }
156  default:
157  KINETIC_ASSERT(false);
158  }
159 }
160 
161 static void log_response_seq_id(int fd, int64_t seq_id) {
162  #if KINETIC_LOGGER_LOG_SEQUENCE_ID
163  struct timeval tv;
164  gettimeofday(&tv, NULL);
165  LOGF2("SEQ_ID response fd %d seq_id %lld %08lld.%08d",
166  fd, (long long)seq_id,
167  (long)tv.tv_sec, (long)tv.tv_usec);
168  #else
169  (void)seq_id;
170  #endif
171 }
172 
173 STATIC bus_unpack_cb_res_t unpack_cb(void *msg, void *socket_udata) {
174  KineticSession * session = (KineticSession*)socket_udata;
175  KINETIC_ASSERT(session);
176 
177  /* just got .full_msg_buffer from sink_cb -- pass it along as-is */
178  socket_info *si = (socket_info *)msg;
179 
180  if (si->unpack_status != UNPACK_ERROR_SUCCESS)
181  {
182  return (bus_unpack_cb_res_t) {
183  .ok = false,
184  .u.error.opaque_error_id = si->unpack_status,
185  };
186  }
187 
188  KineticResponse * response = KineticAllocator_NewKineticResponse(si->header.valueLength);
189 
190  if (response == NULL) {
191  bus_unpack_cb_res_t res = {
192  .ok = false,
193  .u.error.opaque_error_id = UNPACK_ERROR_PAYLOAD_MALLOC_FAIL,
194  };
195  return res;
196  } else {
197  response->header = si->header;
198 
199  response->proto = KineticPDU_unpack_message(NULL, si->header.protobufLength, si->buf);
200  if (response->proto->has_commandbytes &&
201  response->proto->commandbytes.data != NULL &&
202  response->proto->commandbytes.len > 0)
203  {
204  response->command = KineticPDU_unpack_command(NULL,
205  response->proto->commandbytes.len, response->proto->commandbytes.data);
206  } else {
207  response->command = NULL;
208  }
209 
210  if (response->header.valueLength > 0)
211  {
212  memcpy(response->value, &si->buf[si->header.protobufLength], si->header.valueLength);
213  }
214 
215  int64_t seq_id = BUS_NO_SEQ_ID;
216  if (response->command != NULL &&
217  response->command->header != NULL)
218  {
219  if (response->proto->has_authtype &&
221  && KineticSession_GetConnectionID(session) == 0)
222  {
223  /* Ignore the unsolicited status message on connect. */
224  seq_id = BUS_NO_SEQ_ID;
225  } else {
226  seq_id = response->command->header->acksequence;
227  }
228  log_response_seq_id(session->socket, seq_id);
229  }
230 
231  bus_unpack_cb_res_t res = {
232  .ok = true,
233  .u.success = {
234  .seq_id = seq_id,
235  .msg = response,
236  },
237  };
238  return res;
239  }
240 }
241 
242 bool KineticBus_Init(KineticClient * client, KineticClientConfig * config)
243 {
244  int log_level = config->logLevel;
245 
246  bus_config cfg = {
247  .log_cb = log_cb,
248  .log_level = (log_level <= 1) ? 0 : log_level,
249  .sink_cb = sink_cb,
250  .unpack_cb = unpack_cb,
252  .bus_udata = NULL,
253  .listener_count = config->readerThreads,
254  .threadpool_cfg = {
255  .max_threads = config->maxThreadpoolThreads,
256  },
257  };
258  bus_result res;
259  memset(&res, 0, sizeof(res));
260  if (!Bus_Init(&cfg, &res)) {
261  LOGF0("failed to init bus: %d", res.status);
262  return false;
263  }
264  client->bus = res.bus;
265  return true;
266 }
267 
268 void KineticBus_Shutdown(KineticClient * const client)
269 {
270  if (client) {
271  Bus_Shutdown(client->bus);
272  Bus_Free(client->bus);
273  client->bus = NULL;
274  }
275 }
uint8_t maxThreadpoolThreads
Max number of threads to use for the threadpool that handles response callbacks.
Bus_Init_res_t status
Definition: bus_types.h:210
int64_t KineticSession_GetConnectionID(KineticSession const *const session)
bool KineticBus_Init(KineticClient *client, KineticClientConfig *config)
Definition: kinetic_bus.c:242
int logLevel
Logging level (-1:none, 0:error, 1:info, 2:verbose, 3:full)
bool Bus_Shutdown(bus *b)
Begin shutting the system down.
Definition: bus.c:494
static bus_sink_cb_res_t sink_cb(uint8_t *read_buf, size_t read_size, void *socket_udata)
Definition: kinetic_bus.c:80
log_event_t
Definition: bus_types.h:95
static bus_sink_cb_res_t reset_transfer(socket_info *si)
Definition: kinetic_bus.c:42
Com__Seagate__Kinetic__Proto__Message * proto
static uint8_t read_buf[(2 *1024L *1024)]
Definition: echosrv.c:44
void Bus_Free(bus *b)
Free internal data structures for the bus.
Definition: bus.c:599
KineticResponse * KineticAllocator_NewKineticResponse(size_t const valueLength)
static void log_response_seq_id(int fd, int64_t seq_id)
Definition: kinetic_bus.c:161
const char * Bus_LogEventStr(log_event_t event)
Get the string key for a log event ID.
Definition: bus.c:335
Com__Seagate__Kinetic__Proto__Message * KineticPDU_unpack_message(ProtobufCAllocator *allocator, size_t len, const uint8_t *data)
#define BUS_NO_SEQ_ID
Definition: bus_types.h:39
void KineticBus_Shutdown(KineticClient *const client)
Definition: kinetic_bus.c:268
#define KINETIC_ASSERT(cond)
static bool unpack_header(uint8_t const *const read_buf, size_t const read_size, KineticPDUHeader *const header)
Definition: kinetic_bus.c:54
uint8_t readerThreads
Number of threads used for handling incoming responses and status messages.
Com__Seagate__Kinetic__Proto__Command * command
KineticPDUHeader header
#define LOGF0(message,...)
#define STATIC
#define PDU_HEADER_LEN
bool Bus_Init(bus_config *config, struct bus_result *res)
Initialize a bus, based on configuration in *config.
Definition: bus.c:64
static bus_unpack_cb_res_t unpack_cb(void *msg, void *socket_udata)
Definition: kinetic_bus.c:173
static void log_cb(log_event_t event, int log_level, const char *msg, void *udata)
Definition: kinetic_bus.c:36
#define PDU_PROTO_MAX_LEN
void KineticLogger_LogPrintf(int log_level, const char *format,...)
static void unexpected_msg_cb(void *msg, int64_t seq_id, void *bus_udata, void *socket_udata)
Definition: bus_example.c:233
struct bus * bus
Definition: bus_types.h:211
bus_log_cb * log_cb
Definition: bus_types.h:175
Configuration values for the KineticClient connection.
uint32_t KineticNBO_ToHostU32(uint32_t valueNBO)
Definition: kinetic_nbo.c:38
void KineticController_HandleUnexpectedResponse(void *msg, int64_t seq_id, void *bus_udata, void *socket_udata)
#define LOGF2(message,...)
Com__Seagate__Kinetic__Proto__Command * KineticPDU_unpack_command(ProtobufCAllocator *allocator, size_t len, const uint8_t *data)