SHOGUN  5.0.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules
ParseBuffer.h
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 3 of the License, or
5  * (at your option) any later version.
6  *
7  * Written (W) 2011 Shashwat Lal Das
8  * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society
9  */
10 #ifndef __PARSEBUFFER_H__
11 #define __PARSEBUFFER_H__
12 
13 #include <shogun/lib/config.h>
14 
15 #include <shogun/lib/common.h>
16 #ifdef HAVE_PTHREAD
17 
18 #include <shogun/lib/DataType.h>
19 #include <pthread.h>
20 
21 namespace shogun
22 {
23 
26 enum E_IS_EXAMPLE_USED
27 {
28  E_EMPTY = 1,
29  E_NOT_USED = 2,
30  E_USED = 3
31 };
32 
42 template <class T>
43 class Example
44 {
45 public:
47  float64_t label;
49  T* fv;
50  index_t length;
51 };
52 
69 template <class T> class CParseBuffer: public CSGObject
70 {
71 public:
77  CParseBuffer(int32_t size = 1024);
78 
83  ~CParseBuffer();
84 
91  Example<T>* get_free_example()
92  {
93  pthread_mutex_lock(write_lock);
94  pthread_mutex_lock(&ex_in_use_mutex[ex_write_index]);
95  while (ex_used[ex_write_index] == E_NOT_USED)
96  pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
97  Example<T>* ex=&ex_ring[ex_write_index];
98  pthread_mutex_unlock(&ex_in_use_mutex[ex_write_index]);
99  pthread_mutex_unlock(write_lock);
100 
101  return ex;
102  }
103 
112  int32_t write_example(Example<T>* ex);
113 
119  Example<T>* return_example_to_read();
120 
126  Example<T>* get_unused_example();
127 
136  int32_t copy_example(Example<T>* ex);
137 
145  void finalize_example(bool free_after_release);
146 
156  void set_free_vectors_on_destruct(bool destroy)
157  {
158  free_vectors_on_destruct = destroy;
159  }
160 
165  bool get_free_vectors_on_destruct()
166  {
167  return free_vectors_on_destruct;
168  }
169 
175  virtual const char* get_name() const { return "ParseBuffer"; }
176 
180  void init_vector();
181 
182 protected:
187  virtual void inc_read_index()
188  {
189  ex_read_index=(ex_read_index + 1) % ring_size;
190  }
191 
196  virtual void inc_write_index()
197  {
198  ex_write_index=(ex_write_index + 1) % ring_size;
199  }
200 
201 protected:
202 
204  int32_t ring_size;
206  Example<T>* ex_ring;
207 
209  E_IS_EXAMPLE_USED* ex_used;
211  pthread_mutex_t* ex_in_use_mutex;
213  pthread_cond_t* ex_in_use_cond;
215  pthread_mutex_t* read_lock;
217  pthread_mutex_t* write_lock;
218 
220  int32_t ex_write_index;
222  int32_t ex_read_index;
223 
225  bool free_vectors_on_destruct;
226 };
227 
228 
229 template <class T> void CParseBuffer<T>::init_vector()
230 {
231  if (!free_vectors_on_destruct)
232  return;
233  for (int32_t i=0; i<ring_size; i++)
234  {
235  if(ex_ring[i].fv==NULL)
236  ex_ring[i].fv = new T();
237  }
238 }
239 
240 template <class T> CParseBuffer<T>::CParseBuffer(int32_t size)
241 {
242  ring_size = size;
243  ex_ring = SG_CALLOC(Example<T>, ring_size);
244  ex_used = SG_MALLOC(E_IS_EXAMPLE_USED, ring_size);
245  ex_in_use_mutex = SG_MALLOC(pthread_mutex_t, ring_size);
246  ex_in_use_cond = SG_MALLOC(pthread_cond_t, ring_size);
247  read_lock = SG_MALLOC(pthread_mutex_t, 1);
248  write_lock = SG_MALLOC(pthread_mutex_t, 1);
249 
250  SG_SINFO("Initialized with ring size: %d.\n", ring_size)
251 
252  ex_write_index = 0;
253  ex_read_index = 0;
254 
255  for (int32_t i=0; i<ring_size; i++)
256  {
257  ex_used[i] = E_EMPTY;
258 
259  ex_ring[i].fv = NULL;
260  ex_ring[i].length = 1;
261  ex_ring[i].label = FLT_MAX;
262 
263  pthread_cond_init(&ex_in_use_cond[i], NULL);
264  pthread_mutex_init(&ex_in_use_mutex[i], NULL);
265  }
266  pthread_mutex_init(read_lock, NULL);
267  pthread_mutex_init(write_lock, NULL);
268 
269  free_vectors_on_destruct = true;
270 }
271 
272 template <class T> CParseBuffer<T>::~CParseBuffer()
273 {
274  for (int32_t i=0; i<ring_size; i++)
275  {
276  if (ex_ring[i].fv != NULL && free_vectors_on_destruct)
277  {
278  SG_DEBUG("%s::~%s(): destroying examples ring vector %d at %p\n",
279  get_name(), get_name(), i, ex_ring[i].fv);
280  delete ex_ring[i].fv;
281  }
282  pthread_mutex_destroy(&ex_in_use_mutex[i]);
283  pthread_cond_destroy(&ex_in_use_cond[i]);
284  }
285  SG_FREE(ex_ring);
286  SG_FREE(ex_used);
287  SG_FREE(ex_in_use_mutex);
288  SG_FREE(ex_in_use_cond);
289 
290  SG_FREE(read_lock);
291  SG_FREE(write_lock);
292 }
293 
294 template <class T>
295 int32_t CParseBuffer<T>::write_example(Example<T> *ex)
296 {
297  ex_ring[ex_write_index].label = ex->label;
298  ex_ring[ex_write_index].fv = ex->fv;
299  ex_ring[ex_write_index].length = ex->length;
300  ex_used[ex_write_index] = E_NOT_USED;
301  inc_write_index();
302 
303  return 1;
304 }
305 
306 template <class T>
307 Example<T>* CParseBuffer<T>::return_example_to_read()
308 {
309  if (ex_read_index >= 0)
310  return &ex_ring[ex_read_index];
311  else
312  return NULL;
313 }
314 
315 template <class T>
316 Example<T>* CParseBuffer<T>::get_unused_example()
317 {
318  pthread_mutex_lock(read_lock);
319 
320  Example<T> *ex;
321  int32_t current_index = ex_read_index;
322  // Because read index will change after return_example_to_read
323 
324  pthread_mutex_lock(&ex_in_use_mutex[current_index]);
325 
326  if (ex_used[current_index] == E_NOT_USED)
327  ex = return_example_to_read();
328  else
329  ex = NULL;
330 
331  pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
332 
333  pthread_mutex_unlock(read_lock);
334  return ex;
335 }
336 
337 template <class T>
338 int32_t CParseBuffer<T>::copy_example(Example<T> *ex)
339 {
340  pthread_mutex_lock(write_lock);
341  int32_t ret;
342  int32_t current_index = ex_write_index;
343 
344  pthread_mutex_lock(&ex_in_use_mutex[current_index]);
345  while (ex_used[ex_write_index] == E_NOT_USED)
346  {
347  pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
348  }
349 
350  ret = write_example(ex);
351 
352  pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
353  pthread_mutex_unlock(write_lock);
354 
355  return ret;
356 }
357 
358 template <class T>
359 void CParseBuffer<T>::finalize_example(bool free_after_release)
360 {
361  pthread_mutex_lock(read_lock);
362  pthread_mutex_lock(&ex_in_use_mutex[ex_read_index]);
363  ex_used[ex_read_index] = E_USED;
364 
365  if (free_after_release)
366  {
367  SG_DEBUG("Freeing object in ring at index %d and address: %p.\n",
368  ex_read_index, ex_ring[ex_read_index].fv);
369 
370  SG_FREE(ex_ring[ex_read_index].fv);
371  ex_ring[ex_read_index].fv=NULL;
372  }
373 
374  pthread_cond_signal(&ex_in_use_cond[ex_read_index]);
375  pthread_mutex_unlock(&ex_in_use_mutex[ex_read_index]);
376  inc_read_index();
377 
378  pthread_mutex_unlock(read_lock);
379 }
380 
381 }
382 #endif // HAVE_PTHREAD
383 #endif // __PARSEBUFFER_H__
int32_t index_t
Definition: common.h:62
double float64_t
Definition: common.h:50
#define SG_DEBUG(...)
Definition: SGIO.h:107
all of classes and functions are contained in the shogun namespace
Definition: class_list.h:18
#define SG_SINFO(...)
Definition: SGIO.h:173

SHOGUN Machine Learning Toolbox - Documentation