Libthreadar  1.0.1
 All Classes Namespaces Files Functions Macros
tampon.hpp
Go to the documentation of this file.
1 /*********************************************************************/
2 // libthreadar - is a library providing several C++ classes to work with threads
3 // Copyright (C) 2014-2015 Denis Corbin
4 //
5 // This file is part of libthreadar
6 //
7 // libthreadar is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // libhtreadar is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU Lesser General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with libthreadar. If not, see <http://www.gnu.org/licenses/>
19 //
20 //----
21 // to contact the author: dar.linux@free.fr
22 /*********************************************************************/
23 
24 #ifndef LIBTHREADAR_TAMPON_H
25 #define LIBTHREADAR_TAMPON_H
26 
29 
30 #include "config.h"
31 
32  // C system headers
33 extern "C"
34 {
35 
36 }
37  // C++ standard headers
38 
39 
40  // libthreadar headers
41 #include "mutex.hpp"
42 #include "exceptions.hpp"
43 
44 namespace libthreadar
45 {
46 
48 
99  template <class T> class tampon
100  {
101  public:
103 
107  tampon(unsigned int max_block, unsigned int block_size);
108 
109  // no copy constructor (made private)
110 
111  // no assignment operator (made private)
112 
115  ~tampon();
116 
118 
123  void get_block_to_feed(T * & ptr, unsigned int & num);
124 
126 
130  void feed(T* ptr, unsigned int written);
131 
133 
137  void feed_cancel_get_block(T *ptr);
138 
140 
145  void fetch(T* & ptr, unsigned int & num);
146 
148 
151  void fetch_recycle(T* ptr);
152 
154 
163  void fetch_push_back(T *ptr, unsigned int new_num);
164 
166 
169  void fetch_push_back_and_skip(T *ptr, unsigned int new_num);
170 
172  void fetch_skip_back();
173 
175  bool has_readable_block_next() const;
176 
178  bool is_empty() const;
179 
181  bool is_not_empty() const { return !is_empty(); };
182 
184  bool is_full() const { return full; }; // no need to acquire mutex "modif"
185 
187  bool is_not_full() const { return !is_full(); };
188 
190  bool is_quiet_full() const { unsigned int tmp = next_feed; shift_by_one(tmp); return tmp == fetch_head; };
191 
193 
195  unsigned int size() const { return table_size; };
196 
198 
200  unsigned int block_size() const { return alloc_size; };
201 
203  unsigned int load() const { return fetch_head <= next_feed ? next_feed - fetch_head : table_size - (fetch_head - next_feed); };
204 
206  void reset();
207 
208  private:
210  tampon(const tampon & ref) { throw THREADAR_BUG; };
211 
213  const tampon & operator = (const tampon & ref) { throw THREADAR_BUG; };
214 
215  struct atom
216  {
217  T* mem;
218  unsigned int data_size;
219 
220  atom() { mem = NULL; data_size = 0; };
221  };
222 
223  mutex modif; //< to make critical section when non atomic action requires a status has not changed between a test and following action
224  atom *table; //< datastructure holding data in transit between two threads
225  unsigned int table_size; //< size of table, i.e. number of struct atom it holds
226  unsigned int alloc_size; //< size of allocated memory for each atom in table
227  unsigned int next_feed; //< index in table of the next atom to use for feeding table
228  unsigned int next_fetch; //< index in table of the next atom to use for fetch table
229  unsigned int fetch_head; //< the oldest object to be fetched
230  bool fetch_outside; //< if set to true, table's index pointed to by next_fetch is used by the fetcher
231  bool feed_outside; //< if set to true, table's index pointed to by next_feed is used by the feeder
232  mutex waiting_feeder; //< feeder thread may be stuck waiting on that semaphore if table is full
233  mutex waiting_fetcher; //< fetcher thread may be stuck waiting on that semaphore if table is empty
234  bool full; //< set when tampon is full
235  bool feeder_go_lock; //< true to inform fetcher than feeder is about to or has already acquire lock on waiting_feeder
236  bool feeder_lock_track; //< only used by feeder to lock on waiting_feeder once outside of critical section
237  bool fetcher_go_lock; //< true to inform feeder than fetcher is about to or has already acquire lock on waiting_fetcher
238  bool fetcher_lock_track; //< only used by fetcher to lock on waiting_fetcher once outside of critical section
239 
240  bool is_empty_no_lock() const { return next_feed == fetch_head && !full; };
241 
243  bool has_readable_block_next_no_lock() const { return next_feed != next_fetch || full; }
244 
246  void shift_by_one(unsigned int & x) const;
247 
249  void shift_back_by_one(unsigned int & x) const;
250 
255  void shift_by_one_data_in_range(unsigned int begin, unsigned int end);
256 
257  };
258 
259  template <class T> tampon<T>::tampon(unsigned int max_block, unsigned int block_size)
260  {
261  table_size = max_block;
262  table = new atom[table_size];
263  if(table == NULL)
264  throw exception_memory();
265  try
266  {
267  alloc_size = block_size;
268  try
269  {
270  for(unsigned int i = 0 ; i < table_size ; ++i)
271  {
272  table[i].mem = new T[alloc_size];
273  if(table[i].mem == NULL)
274  throw exception_memory();
275  table[i].data_size = 0;
276  }
277  reset();
278  }
279  catch(...)
280  {
281  for(unsigned int i = 0; i < table_size ; ++i)
282  {
283  if(table[i].mem != NULL)
284  delete [] table[i].mem;
285  }
286 
287  throw;
288  }
289  }
290  catch(...)
291  {
292  if(table != NULL)
293  delete [] table;
294  throw;
295  }
296  }
297 
298 
299  template <class T> tampon<T>::~tampon()
300  {
301  if(table != NULL)
302  {
303  for(unsigned int i = 0 ; i < table_size ; ++i)
304  {
305  if(table[i].mem != NULL)
306  delete [] table[i].mem;
307  }
308  delete [] table;
309  }
310  }
311 
312  template <class T> void tampon<T>::get_block_to_feed(T * & ptr, unsigned int & num)
313  {
314  if(feed_outside)
315  throw exception_range("feed already out!");
316 
317  modif.lock(); // --- critical section START
318  if(is_full())
319  {
320  feeder_go_lock = true; // inform fetcher that we will suspend in waiting_feeder
321  feeder_lock_track = true;// to suspend on waiting_feeder once we will be out of the critical section
322  }
323  modif.unlock(); // --- critical section END
324 
325  if(feeder_lock_track)
326  {
327  feeder_lock_track = false;
328  waiting_feeder.lock(); // cannot lock inside a critical section ...
329  }
330 
331  if(is_full())
332  throw THREADAR_BUG; // still full!
333 
334  feed_outside = true;
335  ptr = table[next_feed].mem;
336  num = alloc_size;
337  }
338 
339  template <class T> void tampon<T>::feed(T *ptr, unsigned int num)
340  {
341  if(!feed_outside)
342  throw exception_range("fetch not outside!");
343  feed_outside = false;
344 
345  if(ptr != table[next_feed].mem)
346  throw exception_range("returned ptr is not the one given earlier for feeding");
347  table[next_feed].data_size = num;
348 
349  modif.lock(); // --- critical section START
350  shift_by_one(next_feed);
351  if(next_feed == fetch_head)
352  full = true;
353  if(fetcher_go_lock)
354  {
355  fetcher_go_lock = false;
356  waiting_fetcher.unlock();
357  }
358  modif.unlock(); // --- critical section END
359  }
360 
361  template <class T> void tampon<T>::feed_cancel_get_block(T *ptr)
362  {
363  if(!feed_outside)
364  throw exception_range("feed not outside!");
365  feed_outside = false;
366  if(ptr != table[next_feed].mem)
367  throw exception_range("returned ptr is not the one given earlier for feeding");
368  }
369 
370  template <class T> void tampon<T>::fetch(T* & ptr, unsigned int & num)
371  {
372  if(fetch_outside)
373  throw exception_range("already fetched block outside");
374 
375  modif.lock(); // --- critical section START
376  if(!has_readable_block_next_no_lock())
377  {
378  fetcher_go_lock = true; // to inform feeder that we will suspend on waiting_fetcher
379  fetcher_lock_track = true; // to suspend on waiting_fetcher once we will be out of the critical section
380  }
381  modif.unlock(); // --- critical section END
382 
383  if(fetcher_lock_track)
384  {
385  fetcher_lock_track = false;
386  waiting_fetcher.lock(); // cannot lock inside a critical section ...
387  }
388 
389  if(is_empty())
390  throw THREADAR_BUG;
391 
392  fetch_outside = true;
393  ptr = table[next_fetch].mem;
394  num = table[next_fetch].data_size;
395  }
396 
397  template <class T> void tampon<T>::fetch_recycle(T* ptr)
398  {
399  if(!fetch_outside)
400  throw exception_range("no block outside for fetching");
401  fetch_outside = false;
402  if(ptr != table[next_fetch].mem)
403  throw exception_range("returned ptr is no the one given earlier for fetching");
404 
405  modif.lock(); // --- critical section START
406  if(next_fetch == fetch_head)
407  {
408 
409  // no block were skipped
410 
411  shift_by_one(fetch_head);
412  next_fetch = fetch_head;
413  full = false;
414  }
415  else
416  {
417  unsigned int tmp = next_fetch;
418  atom tmp_tom;
419 
420  shift_by_one(tmp);
421  shift_by_one_data_in_range(tmp, next_feed);
422 
423  // we also take into account the situation
424  // where a block has been given for feeding
425  // so the next call to feed() will match the
426  // expected address of the returned block
427  tmp = next_feed; // recording old position of next_feed
428  shift_back_by_one(next_feed);
429  // swapping contents between old next_feed position
430  // and new one:
431  tmp_tom = table[next_feed];
432  table[next_feed] = table[tmp];
433  table[tmp] = tmp_tom;
434  // done!
435 
436  full = false;
437  }
438 
439  if(feeder_go_lock)
440  {
441  feeder_go_lock = false;
442  waiting_feeder.unlock();
443  }
444  modif.unlock(); // --- critical section END
445  }
446 
447  template <class T> void tampon<T>::fetch_push_back(T* ptr, unsigned int new_num)
448  {
449  if(!fetch_outside)
450  throw exception_range("no block outside for fetching");
451  fetch_outside = false;
452 
453  if(ptr != table[next_fetch].mem)
454  throw exception_range("returned ptr is not the one given earlier for fetching");
455  table[next_fetch].data_size = new_num;
456  }
457 
458  template <class T> void tampon<T>::fetch_push_back_and_skip(T *ptr,
459  unsigned int new_num)
460  {
461  fetch_push_back(ptr, new_num);
462  modif.lock(); // --- critical section START
463  if(full && next_fetch == next_feed) // reach last block feed, cannot skip it
464  throw exception_range("cannot skip the last fed block when the tampon is full");
465  shift_by_one(next_fetch);
466  modif.unlock(); // --- critical section END
467  }
468 
469  template <class T> void tampon<T>::fetch_skip_back()
470  {
471  if(fetch_outside)
472  throw exception_range("cannot skip back fetching while a block is being fetched");
473  next_fetch = fetch_head;
474  }
475 
476 
477  template <class T> bool tampon<T>::has_readable_block_next() const
478  {
479  bool ret;
480 
481  tampon<T> *me = const_cast<tampon<T> *>(this);
482  if(me == NULL)
483  throw THREADAR_BUG;
484  me->modif.lock();
485  ret = has_readable_block_next_no_lock();
486  me->modif.unlock();
487 
488  return ret;
489  }
490 
491 
492  template <class T> bool tampon<T>::is_empty() const
493  {
494  bool ret;
495 
496  tampon<T> * me = const_cast<tampon<T> *>(this);
497  if(me == NULL)
498  throw THREADAR_BUG;
499  me->modif.lock();
500  ret = is_empty_no_lock();
501  me->modif.unlock();
502 
503  return ret;
504  }
505 
506  template <class T> void tampon<T>::reset()
507  {
508  next_feed = 0;
509  next_fetch = 0;
510  fetch_head = 0;
511  fetch_outside = false;
512  feed_outside = false;
513  full = false;
514  feeder_go_lock = false;
515  feeder_lock_track = false;
516  fetcher_go_lock = false;
517  fetcher_lock_track = false;
518  (void)waiting_feeder.try_lock();
519  (void)waiting_fetcher.try_lock();
520  }
521 
522  template <class T> void tampon<T>::shift_by_one(unsigned int & x) const
523  {
524  ++x;
525  if(x >= table_size)
526  x = 0;
527  }
528 
529  template <class T> void tampon<T>::shift_back_by_one(unsigned int & x) const
530  {
531  if(x == 0)
532  x = table_size - 1;
533  else
534  --x;
535  }
536 
537  template <class T> void tampon<T>::shift_by_one_data_in_range(unsigned int begin, unsigned int end)
538  {
539 
540  if(begin != end)
541  {
542  unsigned int prev = begin;
543  shift_back_by_one(prev);
544  T* not_squeezed = table[prev].mem; // we will erase the address pointed to by mem so we keep it in memory here
545 
546  while(begin != end)
547  {
548  table[prev] = table[begin]; // this copies both mem (the value of the pointer, not the pointed to) and data_size,
549  prev = begin;
550  shift_by_one(begin);
551  }
552 
553  table[prev].mem = not_squeezed;
554  table[prev].data_size = 0; // by precaution
555  }
556  }
557 
558 
559 } // end of namespace
560 
561 #endif
562 
void fetch_push_back(T *ptr, unsigned int new_num)
fetcher call - step 2 alternative
Definition: tampon.hpp:447
#define THREADAR_BUG
Macro used to throw an exception_bug when execution reach that statement.
Definition: exceptions.hpp:145
defines the mutex C++ class
Class tampon provides asynchronous communication between two threads.
Definition: tampon.hpp:99
void fetch(T *&ptr, unsigned int &num)
fetcher call - step 1
Definition: tampon.hpp:370
defines a set of exceptions that are used by libthreadar to report error situations ...
void reset()
reset the object fields and mutex as if the object was just created
Definition: tampon.hpp:506
bool is_not_full() const
to know whether the tampon is not full
Definition: tampon.hpp:187
tampon(unsigned int max_block, unsigned int block_size)
constructor
Definition: tampon.hpp:259
Exception used to report memory allocation failures.
Definition: exceptions.hpp:131
bool has_readable_block_next() const
to known whether next fetch will be blocking (not skipped blocks)
Definition: tampon.hpp:477
void fetch_recycle(T *ptr)
fetcher call - step 2
Definition: tampon.hpp:397
void fetch_skip_back()
reactivate all skipped blocks, next fetch() will be the oldest available block
Definition: tampon.hpp:469
void get_block_to_feed(T *&ptr, unsigned int &num)
feeder call - step 1
Definition: tampon.hpp:312
unsigned int load() const
returns the current number of blocks currently used in the tampon (fed but not fetched) ...
Definition: tampon.hpp:203
bool is_quiet_full() const
returns true if only one slot is available before filling the tampon
Definition: tampon.hpp:190
unsigned int size() const
returns the size of the tampon in maximum number of block it can contain
Definition: tampon.hpp:195
void unlock()
unlock the mutex
unsigned int block_size() const
returns the allocation size of each block
Definition: tampon.hpp:200
void fetch_push_back_and_skip(T *ptr, unsigned int new_num)
put back the fetched block and skip to next block for the next fetch()
Definition: tampon.hpp:458
void feed_cancel_get_block(T *ptr)
feeder call - step 2 alternative
Definition: tampon.hpp:361
void lock()
lock the mutex
bool is_not_empty() const
to know whether the tampon is not empty
Definition: tampon.hpp:181
bool is_full() const
for feeder to know whether the next call to get_block_to_feed() will be blocking
Definition: tampon.hpp:184
void feed(T *ptr, unsigned int written)
feeder call - step 2
Definition: tampon.hpp:339
This is the only namespace used in libthreadar and all symbols provided by libthreadar are member of ...
Definition: barrier.hpp:46
Exception used to report out or range value or argument.
Definition: exceptions.hpp:187
bool is_empty() const
to know whether the tampon has objects (readable or skipped)
Definition: tampon.hpp:492