SHOGUN  6.0.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules
InputParser.h
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 3 of the License, or
5  * (at your option) any later version.
6  *
7  * Written (W) 2011 Shashwat Lal Das
8  * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society
9  */
10 
11 #ifndef __INPUTPARSER_H__
12 #define __INPUTPARSER_H__
13 
14 #include <shogun/lib/config.h>
15 
16 #include <shogun/lib/common.h>
17 #include <shogun/io/SGIO.h>
20 #include <condition_variable>
21 #include <memory>
22 #include <mutex>
23 #include <thread>
24 
25 #define PARSER_DEFAULT_BUFFSIZE 100
26 
27 namespace shogun
28 {
32  {
35  };
36 
85 template <class T> class CInputParser
86 {
87 public:
88 
93  CInputParser();
94 
99  ~CInputParser();
100 
112  void init(CStreamingFile* input_file, bool is_labelled = true, int32_t size = PARSER_DEFAULT_BUFFSIZE);
113 
119  bool is_running();
120 
128 
140  void set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len));
141 
153  void set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label));
154 
166  int32_t get_vector_and_label(T* &feature_vector,
167  int32_t &length,
168  float64_t &label);
169 
180  int32_t get_vector_only(T* &feature_vector, int32_t &length);
181 
188  void set_free_vector_after_release(bool free_vec);
189 
196  void set_free_vectors_on_destruct(bool destroy);
197 
203  void start_parser();
204 
213  void* main_parse_loop(void* params);
214 
215 
221  void copy_example_into_buffer(Example<T>* ex);
222 
229  Example<T>* retrieve_example();
230 
243  int32_t get_next_example(T* &feature_vector,
244  int32_t &length,
245  float64_t &label);
246 
255  int32_t get_next_example(T* &feature_vector,
256  int32_t &length);
257 
265  void finalize_example();
266 
271  void end_parser();
272 
275  void exit_parser();
276 
282  int32_t get_ring_size() { return ring_size; }
283 
284 private:
292  static void* parse_loop_entry_point(void* params);
293 
294 public:
300 protected:
307  void (CStreamingFile::*read_vector) (T* &vec, int32_t &len);
308 
315  void (CStreamingFile::*read_vector_and_label) (T* &vec, int32_t &len, float64_t &label);
316 
319 
321  std::thread parse_thread;
322 
324  CParseBuffer<T>* examples_ring;
325 
328 
331 
334 
336  Example<T>* current_example;
337 
340 
343 
345  int32_t current_len;
346 
349 
351  int32_t ring_size;
352 
355 
357  std::condition_variable examples_state_changed;
358 
360  alignas(CPU_CACHE_LINE_SIZE) std::atomic_bool keep_running;
361 
362 };
363 
364 template <class T>
365  void CInputParser<T>::set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len))
366 {
367  // Set read_vector to point to the function passed as arg
368  read_vector=func_ptr;
369 }
370 
371 template <class T>
372  void CInputParser<T>::set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label))
373 {
374  // Set read_vector_and_label to point to the function passed as arg
375  read_vector_and_label=func_ptr;
376 }
377 
378 template <class T>
380 {
381  examples_ring = nullptr;
382  parsing_done=true;
383  reading_done=true;
384  keep_running.store(false, std::memory_order_release);
385 }
386 
387 template <class T>
389 {
390  SG_UNREF(examples_ring);
391 }
392 
393 template <class T>
394  void CInputParser<T>::init(CStreamingFile* input_file, bool is_labelled, int32_t size)
395 {
396  input_source = input_file;
397 
398  if (is_labelled == true)
399  example_type = E_LABELLED;
400  else
401  example_type = E_UNLABELLED;
402 
403  SG_UNREF(examples_ring);
404  examples_ring = new CParseBuffer<T>(size);
405  SG_REF(examples_ring);
406 
407  parsing_done = false;
408  reading_done = false;
409  number_of_vectors_parsed = 0;
410  number_of_vectors_read = 0;
411 
412  current_len = -1;
413  current_label = -1;
414  current_feature_vector = NULL;
415 
416  free_after_release=true;
417  ring_size=size;
418 }
419 
420 template <class T>
422 {
423  free_after_release=free_vec;
424 }
425 
426 template <class T>
428 {
429  examples_ring->set_free_vectors_on_destruct(destroy);
430 }
431 
432 template <class T>
434 {
435  SG_SDEBUG("entering CInputParser::start_parser()\n")
436  if (is_running())
437  {
438  SG_SERROR("Parser thread is already running! Multiple parse threads not supported.\n")
439  }
440 
441  SG_SDEBUG("creating parse thread\n")
442  if (examples_ring)
443  examples_ring->init_vector();
444  keep_running.store(true, std::memory_order_release);
445  parse_thread = std::thread(&parse_loop_entry_point, this);
446 
447  SG_SDEBUG("leaving CInputParser::start_parser()\n")
448 }
449 
450 template <class T>
451  void* CInputParser<T>::parse_loop_entry_point(void* params)
452 {
453  ((CInputParser *) params)->main_parse_loop(params);
454 
455  return NULL;
456 }
457 
458 template <class T>
460 {
461  SG_SDEBUG("entering CInputParser::is_running()\n")
462  bool ret;
463  std::lock_guard<std::mutex> lock(examples_state_lock);
464 
465  if (parsing_done)
466  if (reading_done)
467  ret = false;
468  else
469  ret = true;
470  else
471  ret = false;
472 
473  SG_SDEBUG("leaving CInputParser::is_running(), returning %d\n", ret)
474  return ret;
475 }
476 
477 template <class T>
478  int32_t CInputParser<T>::get_vector_and_label(T* &feature_vector,
479  int32_t &length,
480  float64_t &label)
481 {
482  (input_source->*read_vector_and_label)(feature_vector, length, label);
483 
484  if (length < 1)
485  {
486  // Problem reading the example
487  return 0;
488  }
489 
490  return 1;
491 }
492 
493 template <class T>
494  int32_t CInputParser<T>::get_vector_only(T* &feature_vector,
495  int32_t &length)
496 {
497  (input_source->*read_vector)(feature_vector, length);
498 
499  if (length < 1)
500  {
501  // Problem reading the example
502  return 0;
503  }
504 
505  return 1;
506 }
507 
508 template <class T>
510 {
511  examples_ring->copy_example(ex);
512 }
513 
514 template <class T> void* CInputParser<T>::main_parse_loop(void* params)
515 {
516  // Read the examples into current_* objects
517  // Instead of allocating mem for new objects each time
518  CInputParser* this_obj = (CInputParser *) params;
519  this->input_source = this_obj->input_source;
520 
521  while (keep_running.load(std::memory_order_acquire))
522  {
523  std::unique_lock<std::mutex> lock(examples_state_lock);
524 
525  if (parsing_done)
526  {
527  return NULL;
528  }
529  lock.unlock();
530 
531  current_example = examples_ring->get_free_example();
532  current_feature_vector = current_example->fv;
533  current_len = current_example->length;
534  current_label = current_example->label;
535 
536  if (example_type == E_LABELLED)
537  get_vector_and_label(current_feature_vector, current_len, current_label);
538  else
539  get_vector_only(current_feature_vector, current_len);
540 
541  if (current_len < 0)
542  {
543  lock.lock();
544  parsing_done = true;
545  examples_state_changed.notify_one();
546  return NULL;
547  }
548 
549  current_example->label = current_label;
550  current_example->fv = current_feature_vector;
551  current_example->length = current_len;
552 
553  examples_ring->copy_example(current_example);
554  lock.lock();
555  number_of_vectors_parsed++;
556  examples_state_changed.notify_one();
557  }
558  return NULL;
559 }
560 
561 template <class T> Example<T>* CInputParser<T>::retrieve_example()
562 {
563  /* This function should be guarded by mutexes while calling */
564  Example<T> *ex;
565 
566  if (parsing_done)
567  {
568  if (number_of_vectors_read == number_of_vectors_parsed)
569  {
570  reading_done = true;
571  /* Signal to waiting threads that no more examples are left */
572  examples_state_changed.notify_one();
573  return NULL;
574  }
575  }
576 
577  if (number_of_vectors_parsed <= 0)
578  return NULL;
579 
580  if (number_of_vectors_read == number_of_vectors_parsed)
581  {
582  return NULL;
583  }
584 
585  ex = examples_ring->get_unused_example();
586  number_of_vectors_read++;
587 
588  return ex;
589 }
590 
591 template <class T> int32_t CInputParser<T>::get_next_example(T* &fv,
592  int32_t &length, float64_t &label)
593 {
594  /* if reading is done, no more examples can be fetched. return 0
595  else, if example can be read, get the example and return 1.
596  otherwise, wait for further parsing, get the example and
597  return 1 */
598 
599  Example<T> *ex;
600 
601  while (keep_running.load(std::memory_order_acquire))
602  {
603  if (reading_done)
604  return 0;
605 
606  std::unique_lock<std::mutex> lock(examples_state_lock);
607  ex = retrieve_example();
608 
609  if (ex == NULL)
610  {
611  if (reading_done)
612  {
613  /* No more examples left, return */
614  return 0;
615  }
616  else
617  {
618  /* Examples left, wait for one to become ready */
619  examples_state_changed.wait(lock);
620  continue;
621  }
622  }
623  else
624  {
625  /* Example ready, return the example */
626  break;
627  }
628  }
629 
630  fv = ex->fv;
631  length = ex->length;
632  label = ex->label;
633 
634  return 1;
635 }
636 
637 template <class T>
638  int32_t CInputParser<T>::get_next_example(T* &fv, int32_t &length)
639 {
640  float64_t label_dummy;
641 
642  return get_next_example(fv, length, label_dummy);
643 }
644 
645 template <class T>
647 {
648  examples_ring->finalize_example(free_after_release);
649 }
650 
651 template <class T> void CInputParser<T>::end_parser()
652 {
653  SG_SDEBUG("entering CInputParser::end_parser\n")
654  SG_SDEBUG("joining parse thread\n")
655  if (parse_thread.joinable())
656  parse_thread.join();
657  SG_SDEBUG("leaving CInputParser::end_parser\n")
658 }
659 
660 template <class T> void CInputParser<T>::exit_parser()
661 {
662  SG_SDEBUG("cancelling parse thread\n")
663  keep_running.store(false, std::memory_order_release);
664  examples_state_changed.notify_one();
665  if (parse_thread.joinable())
666  parse_thread.join();
667 }
668 }
669 
670 #endif // __INPUTPARSER_H__
std::mutex examples_state_lock
Mutex which is used when getting/setting state of examples (whether a new example is ready) ...
Definition: InputParser.h:354
int32_t number_of_features
Number of features in dataset (max of 'seen' features upto point of access)
Definition: InputParser.h:327
int32_t ring_size
Size of the ring of examples.
Definition: InputParser.h:351
void set_free_vectors_on_destruct(bool destroy)
Definition: InputParser.h:427
float64_t current_label
Label of current example.
Definition: InputParser.h:342
int32_t number_of_vectors_parsed
Number of vectors parsed.
Definition: InputParser.h:330
void(CStreamingFile::* read_vector)(T *&vec, int32_t &len)
Definition: InputParser.h:307
#define PARSER_DEFAULT_BUFFSIZE
Definition: InputParser.h:25
int32_t current_len
Number of features in current example.
Definition: InputParser.h:345
std::atomic_bool keep_running
Flag that indicate that the parsing thread should continue reading.
Definition: InputParser.h:360
void set_read_vector_and_label(void(CStreamingFile::*func_ptr)(T *&vec, int32_t &len, float64_t &label))
Definition: InputParser.h:372
int32_t get_vector_only(T *&feature_vector, int32_t &length)
Definition: InputParser.h:494
#define SG_REF(x)
Definition: SGObject.h:52
E_EXAMPLE_TYPE
Definition: InputParser.h:31
A Streaming File access class.
Definition: StreamingFile.h:34
int32_t get_number_of_features()
Definition: InputParser.h:127
int32_t number_of_vectors_read
Number of vectors used by external algorithm.
Definition: InputParser.h:333
CParseBuffer< T > * examples_ring
The ring of examples, stored as they are parsed.
Definition: InputParser.h:324
void init(CStreamingFile *input_file, bool is_labelled=true, int32_t size=PARSER_DEFAULT_BUFFSIZE)
Definition: InputParser.h:394
double float64_t
Definition: common.h:60
Example< T > * retrieve_example()
Definition: InputParser.h:561
T * current_feature_vector
Feature vector of current example.
Definition: InputParser.h:339
int32_t get_next_example(T *&feature_vector, int32_t &length, float64_t &label)
Definition: InputParser.h:591
bool free_after_release
Whether to SG_FREE() vector after it is used.
Definition: InputParser.h:348
int32_t get_vector_and_label(T *&feature_vector, int32_t &length, float64_t &label)
Definition: InputParser.h:478
E_EXAMPLE_TYPE example_type
Definition: InputParser.h:298
#define SG_UNREF(x)
Definition: SGObject.h:53
int32_t get_ring_size()
Definition: InputParser.h:282
all of classes and functions are contained in the shogun namespace
Definition: class_list.h:18
#define SG_SDEBUG(...)
Definition: SGIO.h:167
Class CInputParser is a templated class used to maintain the reading/parsing/providing of examples...
Definition: InputParser.h:85
constexpr size_t CPU_CACHE_LINE_SIZE
Definition: common.h:80
#define SG_SERROR(...)
Definition: SGIO.h:178
std::thread parse_thread
Thread in which the parser runs.
Definition: InputParser.h:321
void copy_example_into_buffer(Example< T > *ex)
Definition: InputParser.h:509
void * main_parse_loop(void *params)
Definition: InputParser.h:514
void set_read_vector(void(CStreamingFile::*func_ptr)(T *&vec, int32_t &len))
Definition: InputParser.h:365
std::condition_variable examples_state_changed
Condition variable to indicate change of state of examples.
Definition: InputParser.h:357
CStreamingFile * input_source
Input source, CStreamingFile object.
Definition: InputParser.h:318
void(CStreamingFile::* read_vector_and_label)(T *&vec, int32_t &len, float64_t &label)
Definition: InputParser.h:315
void set_free_vector_after_release(bool free_vec)
Definition: InputParser.h:421
Example< T > * current_example
Example currently being used.
Definition: InputParser.h:336

SHOGUN Machine Learning Toolbox - Documentation