SHOGUN  6.0.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules
ParseBuffer.h
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 3 of the License, or
5  * (at your option) any later version.
6  *
7  * Written (W) 2011 Shashwat Lal Das
8  * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society
9  */
10 #ifndef __PARSEBUFFER_H__
11 #define __PARSEBUFFER_H__
12 
13 #include <shogun/lib/config.h>
14 
15 #if defined(HAVE_CXX11) || defined(HAVE_PTHREAD)
16 
17 #include <shogun/lib/common.h>
18 #include <shogun/base/SGObject.h>
19 #include <shogun/lib/DataType.h>
20 #ifdef HAVE_CXX11
21 #include <condition_variable>
22 #include <memory>
23 #include <mutex>
24 #include <vector>
25 #elif HAVE_PTHREAD
26 #include <pthread.h>
27 #endif
28 
29 namespace shogun
30 {
31 
34 enum E_IS_EXAMPLE_USED
35 {
36  E_EMPTY = 1,
37  E_NOT_USED = 2,
38  E_USED = 3
39 };
40 
50 template <class T>
51 class Example
52 {
53 public:
55  float64_t label;
57  T* fv;
58  index_t length;
59 };
60 
77 template <class T> class CParseBuffer: public CSGObject
78 {
79 public:
85  CParseBuffer(int32_t size = 1024);
86 
91  ~CParseBuffer();
92 
99  Example<T>* get_free_example()
100  {
101 #ifdef HAVE_CXX11
102  std::unique_lock<std::mutex> write_lk(*write_mutex, std::defer_lock);
103  std::unique_lock<std::mutex> current_ex_lock(*ex_in_use_mutex[ex_write_index], std::defer_lock);
104  std::lock(write_lk, current_ex_lock);
105  while (ex_used[ex_write_index] == E_NOT_USED)
106  ex_in_use_cond[ex_write_index]->wait(current_ex_lock);
107  Example<T>* ex=&ex_ring[ex_write_index];
108 #elif HAVE_PTHREAD
109  pthread_mutex_lock(write_lock);
110  pthread_mutex_lock(&ex_in_use_mutex[ex_write_index]);
111  while (ex_used[ex_write_index] == E_NOT_USED)
112  pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
113  Example<T>* ex=&ex_ring[ex_write_index];
114  pthread_mutex_unlock(&ex_in_use_mutex[ex_write_index]);
115  pthread_mutex_unlock(write_lock);
116 #endif
117 
118  return ex;
119  }
120 
129  int32_t write_example(Example<T>* ex);
130 
136  Example<T>* return_example_to_read();
137 
143  Example<T>* get_unused_example();
144 
153  int32_t copy_example(Example<T>* ex);
154 
162  void finalize_example(bool free_after_release);
163 
173  void set_free_vectors_on_destruct(bool destroy)
174  {
175  free_vectors_on_destruct = destroy;
176  }
177 
182  bool get_free_vectors_on_destruct()
183  {
184  return free_vectors_on_destruct;
185  }
186 
192  virtual const char* get_name() const { return "ParseBuffer"; }
193 
197  void init_vector();
198 
199 protected:
204  virtual void inc_read_index()
205  {
206  ex_read_index=(ex_read_index + 1) % ring_size;
207  }
208 
213  virtual void inc_write_index()
214  {
215  ex_write_index=(ex_write_index + 1) % ring_size;
216  }
217 
218 protected:
219 
221  int32_t ring_size;
223  Example<T>* ex_ring;
224 
226  E_IS_EXAMPLE_USED* ex_used;
227 #ifdef HAVE_CXX11
228  std::vector<std::shared_ptr<std::mutex> > ex_in_use_mutex;
231  std::vector<std::shared_ptr<std::condition_variable> > ex_in_use_cond;
233  std::shared_ptr<std::mutex> read_mutex;
235  std::shared_ptr<std::mutex> write_mutex;
236 #elif HAVE_PTHREAD
237  pthread_mutex_t* ex_in_use_mutex;
240  pthread_cond_t* ex_in_use_cond;
242  pthread_mutex_t* read_lock;
244  pthread_mutex_t* write_lock;
245 #endif
246 
248  int32_t ex_write_index;
250  int32_t ex_read_index;
251 
253  bool free_vectors_on_destruct;
254 };
255 
256 
257 template <class T> void CParseBuffer<T>::init_vector()
258 {
259  if (!free_vectors_on_destruct)
260  return;
261  for (int32_t i=0; i<ring_size; i++)
262  {
263  if(ex_ring[i].fv==NULL)
264  ex_ring[i].fv = new T();
265  }
266 }
267 
268 template <class T> CParseBuffer<T>::CParseBuffer(int32_t size)
269 {
270  ring_size = size;
271  ex_ring = SG_CALLOC(Example<T>, ring_size);
272  ex_used = SG_MALLOC(E_IS_EXAMPLE_USED, ring_size);
273 #ifdef HAVE_CXX11
274  read_mutex = std::make_shared<std::mutex>();
275  write_mutex = std::make_shared<std::mutex>();
276 #elif HAVE_PTHREAD
277  ex_in_use_mutex = SG_MALLOC(pthread_mutex_t, ring_size);
278  ex_in_use_cond = SG_MALLOC(pthread_cond_t, ring_size);
279  read_lock = SG_MALLOC(pthread_mutex_t, 1);
280  write_lock = SG_MALLOC(pthread_mutex_t, 1);
281 #endif
282 
283  SG_SINFO("Initialized with ring size: %d.\n", ring_size)
284 
285  ex_write_index = 0;
286  ex_read_index = 0;
287 
288  for (int32_t i=0; i<ring_size; i++)
289  {
290  ex_used[i] = E_EMPTY;
291 
292  ex_ring[i].fv = NULL;
293  ex_ring[i].length = 1;
294  ex_ring[i].label = FLT_MAX;
295 
296 #ifdef HAVE_CXX11
297  ex_in_use_mutex.push_back(std::make_shared<std::mutex>());
298  ex_in_use_cond.push_back(std::make_shared<std::condition_variable>());
299 #elif defined(HAVE_PTHREAD)
300  pthread_cond_init(&ex_in_use_cond[i], NULL);
301  pthread_mutex_init(&ex_in_use_mutex[i], NULL);
302 #endif
303  }
304 #if defined(HAVE_PTHREAD) && !defined(HAVE_CXX11)
305  pthread_mutex_init(read_lock, NULL);
306  pthread_mutex_init(write_lock, NULL);
307 #endif
308 
309  free_vectors_on_destruct = true;
310 }
311 
312 template <class T> CParseBuffer<T>::~CParseBuffer()
313 {
314  for (int32_t i=0; i<ring_size; i++)
315  {
316  if (ex_ring[i].fv != NULL && free_vectors_on_destruct)
317  {
318  SG_DEBUG("%s::~%s(): destroying examples ring vector %d at %p\n",
319  get_name(), get_name(), i, ex_ring[i].fv);
320  delete ex_ring[i].fv;
321  }
322 #if defined(HAVE_PTHREAD) && !defined(HAVE_CXX11)
323  pthread_mutex_destroy(&ex_in_use_mutex[i]);
324  pthread_cond_destroy(&ex_in_use_cond[i]);
325 #endif
326  }
327  SG_FREE(ex_ring);
328  SG_FREE(ex_used);
329 #ifdef HAVE_CXX11
330  ex_in_use_mutex.clear();
331  ex_in_use_cond.clear();
332  read_mutex.reset();
333  write_mutex.reset();
334 #elif HAVE_PTHREAD
335  SG_FREE(ex_in_use_mutex);
336  SG_FREE(ex_in_use_cond);
337 
338  SG_FREE(read_lock);
339  SG_FREE(write_lock);
340 #endif
341 }
342 
343 template <class T>
344 int32_t CParseBuffer<T>::write_example(Example<T> *ex)
345 {
346  ex_ring[ex_write_index].label = ex->label;
347  ex_ring[ex_write_index].fv = ex->fv;
348  ex_ring[ex_write_index].length = ex->length;
349  ex_used[ex_write_index] = E_NOT_USED;
350  inc_write_index();
351 
352  return 1;
353 }
354 
355 template <class T>
356 Example<T>* CParseBuffer<T>::return_example_to_read()
357 {
358  if (ex_read_index >= 0)
359  return &ex_ring[ex_read_index];
360  else
361  return NULL;
362 }
363 
364 template <class T>
365 Example<T>* CParseBuffer<T>::get_unused_example()
366 {
367 #ifdef HAVE_CXX11
368  std::lock_guard<std::mutex> read_lk(*read_mutex);
369 #elif HAVE_PTHREAD
370  pthread_mutex_lock(read_lock);
371 #endif
372 
373  Example<T> *ex;
374  int32_t current_index = ex_read_index;
375  // Because read index will change after return_example_to_read
376 
377 #ifdef HAVE_CXX11
378  std::lock_guard<std::mutex> current_ex_lk(*ex_in_use_mutex[current_index]);
379 #elif HAVE_PTHREAD
380  pthread_mutex_lock(&ex_in_use_mutex[current_index]);
381 #endif
382 
383  if (ex_used[current_index] == E_NOT_USED)
384  ex = return_example_to_read();
385  else
386  ex = NULL;
387 
388 #if defined(HAVE_PTHREAD) && !defined(HAVE_CXX11)
389  pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
390  pthread_mutex_unlock(read_lock);
391 #endif
392  return ex;
393 }
394 
395 template <class T>
396 int32_t CParseBuffer<T>::copy_example(Example<T> *ex)
397 {
398 #ifdef HAVE_CXX11
399  std::lock_guard<std::mutex> write_lk(*write_mutex);
400 #elif HAVE_PTHREAD
401  pthread_mutex_lock(write_lock);
402 #endif
403  int32_t ret;
404  int32_t current_index = ex_write_index;
405 
406 #ifdef HAVE_CXX11
407  std::unique_lock<std::mutex> current_ex_lock(*ex_in_use_mutex[current_index]);
408 #elif HAVE_PTHREAD
409  pthread_mutex_lock(&ex_in_use_mutex[current_index]);
410 #endif
411  while (ex_used[ex_write_index] == E_NOT_USED)
412  {
413 #ifdef HAVE_CXX11
414  ex_in_use_cond[ex_write_index]->wait(current_ex_lock);
415 #elif HAVE_PTHREAD
416  pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
417 #endif
418  }
419 
420  ret = write_example(ex);
421 
422 #if defined(HAVE_PTHREAD) && !defined(HAVE_CXX11)
423  pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
424  pthread_mutex_unlock(write_lock);
425 #endif
426 
427  return ret;
428 }
429 
430 template <class T>
431 void CParseBuffer<T>::finalize_example(bool free_after_release)
432 {
433 #ifdef HAVE_CXX11
434  std::lock_guard<std::mutex> read_lk(*read_mutex);
435  std::unique_lock<std::mutex> current_ex_lock(*ex_in_use_mutex[ex_read_index]);
436 #elif HAVE_PTHREAD
437  pthread_mutex_lock(read_lock);
438  pthread_mutex_lock(&ex_in_use_mutex[ex_read_index]);
439 #endif
440  ex_used[ex_read_index] = E_USED;
441 
442  if (free_after_release)
443  {
444  SG_DEBUG("Freeing object in ring at index %d and address: %p.\n",
445  ex_read_index, ex_ring[ex_read_index].fv);
446 
447  SG_FREE(ex_ring[ex_read_index].fv);
448  ex_ring[ex_read_index].fv=NULL;
449  }
450 
451 #ifdef HAVE_CXX11
452  ex_in_use_cond[ex_read_index]->notify_one();
453  current_ex_lock.unlock();
454 #elif HAVE_PTHREAD
455  pthread_cond_signal(&ex_in_use_cond[ex_read_index]);
456  pthread_mutex_unlock(&ex_in_use_mutex[ex_read_index]);
457 #endif
458  inc_read_index();
459 
460 #if defined(HAVE_PTHREAD) && !defined(HAVE_CXX11)
461  pthread_mutex_unlock(read_lock);
462 #endif
463 }
464 
465 }
466 #endif // defined(HAVE_CXX11) || defined(HAVE_PTHREAD)
467 #endif // __PARSEBUFFER_H__
int32_t index_t
Definition: common.h:72
double float64_t
Definition: common.h:60
#define SG_DEBUG(...)
Definition: SGIO.h:106
all of classes and functions are contained in the shogun namespace
Definition: class_list.h:18
#define SG_SINFO(...)
Definition: SGIO.h:172

SHOGUN Machine Learning Toolbox - Documentation