SHOGUN  6.0.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules
StreamingMMD.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) The Shogun Machine Learning Toolbox
3  * Written (w) 2012 - 2013 Heiko Strathmann
4  * Written (w) 2014 - 2017 Soumyajit De
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice, this
11  * list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  * this list of conditions and the following disclaimer in the documentation
14  * and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
20  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  *
27  * The views and conclusions contained in the software and documentation are those
28  * of the authors and should not be interpreted as representing official policies,
29  * either expressed or implied, of the Shogun Development Team.
30  */
31 
32 #include <vector>
33 #include <memory>
34 #include <type_traits>
35 #include <shogun/kernel/Kernel.h>
52 
53 using namespace shogun;
54 using namespace internal;
55 
57 {
58  Self(CStreamingMMD& cmmd);
59 
60  void create_statistic_job();
61  void create_variance_job();
62  void create_computation_jobs();
63 
64  void merge_samples(NextSamples&, std::vector<CFeatures*>&) const;
65  void compute_kernel(ComputationManager&, std::vector<CFeatures*>&, CKernel*) const;
66  void compute_jobs(ComputationManager&) const;
67 
68  std::pair<float64_t, float64_t> compute_statistic_variance();
69  std::pair<SGVector<float64_t>, SGMatrix<float64_t>> compute_statistic_and_Q(const KernelManager&);
70  SGVector<float64_t> sample_null();
71 
73 
74  bool use_gpu;
76 
80 
81  std::function<float32_t(const SGMatrix<float32_t>&)> statistic_job;
82  std::function<float32_t(const SGMatrix<float32_t>&)> permutation_job;
83  std::function<float32_t(const SGMatrix<float32_t>&)> variance_job;
84 };
85 
87  use_gpu(false), num_null_samples(250),
88  statistic_type(ST_UNBIASED_FULL),
89  variance_estimation_method(VEM_DIRECT),
90  null_approximation_method(NAM_MMD1_GAUSSIAN),
91  statistic_job(nullptr), variance_job(nullptr)
92 {
93 }
94 
96 {
97  create_statistic_job();
98  create_variance_job();
99 }
100 
102 {
103  const DataManager& data_mgr=owner.get_data_mgr();
104 
105  auto Bx=data_mgr.blocksize_at(0);
106  auto By=data_mgr.blocksize_at(1);
107 
108  REQUIRE(Bx>0, "Blocksize for samples from P cannot be 0!\n");
109  REQUIRE(By>0, "Blocksize for samples from Q cannot be 0!\n");
110 
111  auto mmd=mmd::ComputeMMD();
112  mmd.m_n_x=Bx;
113  mmd.m_n_y=By;
114  mmd.m_stype=statistic_type;
115 
116  statistic_job=mmd;
117  permutation_job=mmd::WithinBlockPermutation(Bx, By, statistic_type);
118 }
119 
121 {
122  switch (variance_estimation_method)
123  {
124  case VEM_DIRECT:
125  variance_job=owner.get_direct_estimation_method();
126  break;
127  case VEM_PERMUTATION:
128  variance_job=permutation_job;
129  break;
130  default : break;
131  };
132 }
133 
134 void CStreamingMMD::Self::merge_samples(NextSamples& next_burst, std::vector<CFeatures*>& blocks) const
135 {
136  blocks.resize(next_burst.num_blocks());
137 #pragma omp parallel for
138  for (int64_t i=0; i<(int64_t)blocks.size(); ++i)
139  {
140  CFeatures *block_p=next_burst[0][i];
141  CFeatures *block_q=next_burst[1][i];
142  auto block_p_and_q=block_p->create_merged_copy(block_q);
143  blocks[i]=block_p_and_q;
144  }
145  next_burst.clear();
146 }
147 
148 void CStreamingMMD::Self::compute_kernel(ComputationManager& cm, std::vector<CFeatures*>& blocks, CKernel* kernel) const
149 {
150  REQUIRE(kernel->get_kernel_type()!=K_CUSTOM, "Underlying kernel cannot be custom!\n");
151  cm.num_data(blocks.size());
152 #pragma omp parallel for
153  for (int64_t i=0; i<(int64_t)blocks.size(); ++i)
154  {
155  try
156  {
157  auto kernel_clone=std::unique_ptr<CKernel>(static_cast<CKernel*>(kernel->clone()));
158  kernel_clone->init(blocks[i], blocks[i]);
159  cm.data(i)=kernel_clone->get_kernel_matrix<float32_t>();
160  kernel_clone->remove_lhs_and_rhs();
161  }
162  catch (ShogunException e)
163  {
164  SG_SERROR("%s, Try using less number of blocks per burst!\n", e.get_exception_string());
165  }
166  }
167 }
168 
169 void CStreamingMMD::Self::compute_jobs(ComputationManager& cm) const
170 {
171  if (use_gpu)
172  cm.use_gpu().compute_data_parallel_jobs();
173  else
174  cm.use_cpu().compute_data_parallel_jobs();
175 }
176 
177 std::pair<float64_t, float64_t> CStreamingMMD::Self::compute_statistic_variance()
178 {
179  const KernelManager& kernel_mgr=owner.get_kernel_mgr();
180  auto kernel=kernel_mgr.kernel_at(0);
181  REQUIRE(kernel != nullptr, "Kernel is not set!\n");
182 
183  float64_t statistic=0;
184  float64_t permuted_samples_statistic=0;
185  float64_t variance=0;
186  index_t statistic_term_counter=1;
187  index_t variance_term_counter=1;
188 
189  DataManager& data_mgr=owner.get_data_mgr();
190  data_mgr.start();
191  auto next_burst=data_mgr.next();
192  if (!next_burst.empty())
193  {
194  ComputationManager cm;
195  create_computation_jobs();
196  cm.enqueue_job(statistic_job);
197  cm.enqueue_job(variance_job);
198 
199  std::vector<CFeatures*> blocks;
200 
201  while (!next_burst.empty())
202  {
203  merge_samples(next_burst, blocks);
204  compute_kernel(cm, blocks, kernel);
205  blocks.resize(0);
206  compute_jobs(cm);
207 
208  auto mmds=cm.result(0);
209  auto vars=cm.result(1);
210 
211  for (size_t i=0; i<mmds.size(); ++i)
212  {
213  auto delta=mmds[i]-statistic;
214  statistic+=delta/statistic_term_counter;
215  statistic_term_counter++;
216  }
217 
218  if (variance_estimation_method==VEM_DIRECT)
219  {
220  for (size_t i=0; i<mmds.size(); ++i)
221  {
222  auto delta=vars[i]-variance;
223  variance+=delta/variance_term_counter;
224  variance_term_counter++;
225  }
226  }
227  else
228  {
229  for (size_t i=0; i<vars.size(); ++i)
230  {
231  auto delta=vars[i]-permuted_samples_statistic;
232  permuted_samples_statistic+=delta/variance_term_counter;
233  variance+=delta*(vars[i]-permuted_samples_statistic);
234  variance_term_counter++;
235  }
236  }
237  next_burst=data_mgr.next();
238  }
239  cm.done();
240  }
241  data_mgr.end();
242 
243  // normalize statistic and variance
244  statistic=owner.normalize_statistic(statistic);
245  if (variance_estimation_method==VEM_PERMUTATION)
246  variance=owner.normalize_variance(variance);
247 
248  return std::make_pair(statistic, variance);
249 }
250 
251 std::pair<SGVector<float64_t>, SGMatrix<float64_t> > CStreamingMMD::Self::compute_statistic_and_Q(const KernelManager& kernel_selection_mgr)
252 {
253 // const size_t num_kernels=0;
254 // SGVector<float64_t> statistic(num_kernels);
255 // SGMatrix<float64_t> Q(num_kernels, num_kernels);
256 // return std::make_pair(statistic, Q);
257  REQUIRE(kernel_selection_mgr.num_kernels()>0, "No kernels specified for kernel learning! "
258  "Please add kernels using add_kernel() method!\n");
259 
260  const auto num_kernels=kernel_selection_mgr.num_kernels();
261  SGVector<float64_t> statistic(num_kernels);
262  SGMatrix<float64_t> Q(num_kernels, num_kernels);
263 
264  std::fill(statistic.data(), statistic.data()+statistic.size(), 0);
265  std::fill(Q.data(), Q.data()+Q.size(), 0);
266 
267  SGVector<index_t> term_counters_statistic(num_kernels);
268  term_counters_statistic.set_const(1);
269  SGMatrix<index_t> term_counters_Q(num_kernels, num_kernels);
270  std::fill(term_counters_Q.data(), term_counters_Q.data()+term_counters_Q.size(), 1);
271 
272  DataManager& data_mgr=owner.get_data_mgr();
273  ComputationManager cm;
274  create_computation_jobs();
275  cm.enqueue_job(statistic_job);
276 
277  data_mgr.start();
278  auto next_burst=data_mgr.next();
279  std::vector<CFeatures*> blocks;
280  std::vector<std::vector<float32_t> > mmds(num_kernels);
281  while (!next_burst.empty())
282  {
283  const auto num_blocks=next_burst.num_blocks();
284  REQUIRE(num_blocks%2==0,
285  "The number of blocks per burst (%d this burst) has to be even!\n",
286  num_blocks);
287  merge_samples(next_burst, blocks);
288  std::for_each(blocks.begin(), blocks.end(), [](CFeatures* ptr) { SG_REF(ptr); });
289  for (auto k=0; k<num_kernels; ++k)
290  {
291  CKernel* kernel=kernel_selection_mgr.kernel_at(k);
292  compute_kernel(cm, blocks, kernel);
293  compute_jobs(cm);
294  mmds[k]=cm.result(0);
295  for (auto i=0; i<num_blocks; ++i)
296  {
297  auto delta=mmds[k][i]-statistic[k];
298  statistic[k]+=delta/term_counters_statistic[k]++;
299  }
300  }
301  std::for_each(blocks.begin(), blocks.end(), [](CFeatures* ptr) { SG_UNREF(ptr); });
302  blocks.resize(0);
303  for (auto i=0; i<num_kernels; ++i)
304  {
305  for (auto j=0; j<=i; ++j)
306  {
307  for (auto k=0; k<num_blocks-1; k+=2)
308  {
309  auto term=(mmds[i][k]-mmds[i][k+1])*(mmds[j][k]-mmds[j][k+1]);
310  Q(i, j)+=(term-Q(i, j))/term_counters_Q(i, j)++;
311  }
312  Q(j, i)=Q(i, j);
313  }
314  }
315  next_burst=data_mgr.next();
316  }
317  mmds.clear();
318 
319  data_mgr.end();
320  cm.done();
321 
322  std::for_each(statistic.data(), statistic.data()+statistic.size(), [this](float64_t val)
323  {
324  val=owner.normalize_statistic(val);
325  });
326  return std::make_pair(statistic, Q);
327 }
328 
330 {
331  const KernelManager& kernel_mgr=owner.get_kernel_mgr();
332  auto kernel=kernel_mgr.kernel_at(0);
333  REQUIRE(kernel != nullptr, "Kernel is not set!\n");
334 
335  SGVector<float64_t> statistic(num_null_samples);
336  SGVector<index_t> term_counters(num_null_samples);
337 
338  std::fill(statistic.vector, statistic.vector+statistic.vlen, 0);
339  std::fill(term_counters.data(), term_counters.data()+term_counters.size(), 1);
340 
341  DataManager& data_mgr=owner.get_data_mgr();
342  ComputationManager cm;
343 
344  create_statistic_job();
345  cm.enqueue_job(permutation_job);
346 
347  std::vector<CFeatures*> blocks;
348 
349  data_mgr.start();
350  auto next_burst=data_mgr.next();
351 
352  while (!next_burst.empty())
353  {
354  merge_samples(next_burst, blocks);
355  compute_kernel(cm, blocks, kernel);
356  blocks.resize(0);
357 
358  for (auto j=0; j<num_null_samples; ++j)
359  {
360  compute_jobs(cm);
361  auto mmds=cm.result(0);
362  for (size_t i=0; i<mmds.size(); ++i)
363  {
364  auto delta=mmds[i]-statistic[j];
365  statistic[j]+=delta/term_counters[j];
366  term_counters[j]++;
367  }
368  }
369  next_burst=data_mgr.next();
370  }
371 
372  data_mgr.end();
373  cm.done();
374 
375  // normalize statistic
376  std::for_each(statistic.vector, statistic.vector + statistic.vlen, [this](float64_t& value)
377  {
378  value=owner.normalize_statistic(value);
379  });
380 
381  return statistic;
382 }
383 
384 CStreamingMMD::CStreamingMMD() : CMMD()
385 {
386 #if EIGEN_VERSION_AT_LEAST(3,1,0)
387  Eigen::initParallel();
388 #endif
389  self=std::unique_ptr<Self>(new Self(*this));
390 }
391 
393 {
394 }
395 
397 {
398  return self->compute_statistic_variance().first;
399 }
400 
402 {
403  return self->compute_statistic_variance().second;
404 }
405 
407 {
408  return self->compute_statistic_and_Q(get_kernel_selection_strategy()->get_kernel_mgr()).first;
409 }
410 
411 std::pair<float64_t, float64_t> CStreamingMMD::compute_statistic_variance()
412 {
413  return self->compute_statistic_variance();
414 }
415 
416 std::pair<SGVector<float64_t>, SGMatrix<float64_t> > CStreamingMMD::compute_statistic_and_Q(const KernelManager& kernel_selection_mgr)
417 {
418  return self->compute_statistic_and_Q(kernel_selection_mgr);
419 }
420 
422 {
423  return self->sample_null();
424 }
425 
427 {
428  self->num_null_samples=null_samples;
429 }
430 
432 {
433  return self->num_null_samples;
434 }
435 
437 {
438  self->use_gpu=gpu;
439 }
440 
442 {
443  return self->use_gpu;
444 }
445 
447 {
448  for (auto i=0; i<get_kernel_mgr().num_kernels(); ++i)
449  get_kernel_mgr().restore_kernel_at(i);
450 }
451 
453 {
454  self->statistic_type=stype;
455 }
456 
458 {
459  return self->statistic_type;
460 }
461 
463 {
464  // TODO overload this
465 /* if (std::is_same<Derived, CQuadraticTimeMMD>::value && vmethod == VEM_PERMUTATION)
466  {
467  std::cerr << "cannot use permutation method for quadratic time MMD" << std::endl;
468  }*/
469  self->variance_estimation_method=vmethod;
470 }
471 
473 {
474  return self->variance_estimation_method;
475 }
476 
478 {
479  // TODO overload this
480 /* if (std::is_same<Derived, CQuadraticTimeMMD>::value && nmethod == NAM_MMD1_GAUSSIAN)
481  {
482  std::cerr << "cannot use gaussian method for quadratic time MMD" << std::endl;
483  }
484  else if ((std::is_same<Derived, CBTestMMD>::value || std::is_same<Derived, CLinearTimeMMD>::value) &&
485  (nmethod == NAM_MMD2_SPECTRUM || nmethod == NAM_MMD2_GAMMA))
486  {
487  std::cerr << "cannot use spectrum/gamma method for B-test/linear time MMD" << std::endl;
488  }*/
489  self->null_approximation_method=nmethod;
490 }
491 
493 {
494  return self->null_approximation_method;
495 }
496 
497 const char* CStreamingMMD::get_name() const
498 {
499  return "StreamingMMD";
500 }
const ENullApproximationMethod get_null_approximation_method() const
void set_statistic_type(EStatisticType stype)
virtual const char * get_name() const
void set_variance_estimation_method(EVarianceEstimationMethod vmethod)
virtual SGVector< float64_t > compute_multiple()
int32_t index_t
Definition: common.h:72
T * data() const
Definition: SGMatrix.h:237
const index_t get_num_null_samples() const
virtual CSGObject * clone()
Definition: SGObject.cpp:729
Class ShogunException defines an exception which is thrown whenever an error inside of shogun occurs...
virtual float64_t compute_variance()
const EStatisticType get_statistic_type() const
const EVarianceEstimationMethod get_variance_estimation_method() const
#define REQUIRE(x,...)
Definition: SGIO.h:205
virtual SGVector< float64_t > sample_null()
SGVector< float64_t > sample_null()
#define SG_REF(x)
Definition: SGObject.h:52
void compute_jobs(ComputationManager &) const
virtual CFeatures * create_merged_copy(CList *others)
Definition: Features.h:235
ENullApproximationMethod null_approximation_method
int32_t size() const
Definition: SGVector.h:136
index_t vlen
Definition: SGVector.h:545
const index_t num_blocks() const
Definition: NextSamples.cpp:71
CKernelSelectionStrategy const * get_kernel_selection_strategy() const
Definition: MMD.cpp:104
void compute_kernel(ComputationManager &, std::vector< CFeatures * > &, CKernel *) const
double float64_t
Definition: common.h:60
void set_null_approximation_method(ENullApproximationMethod nmethod)
EStatisticType
Definition: TestEnums.h:40
Self(CStreamingMMD &cmmd)
std::pair< SGVector< float64_t >, SGMatrix< float64_t > > compute_statistic_and_Q(const KernelManager &)
float float32_t
Definition: common.h:59
EVarianceEstimationMethod
Definition: TestEnums.h:47
internal::KernelManager & get_kernel_mgr()
std::pair< float64_t, float64_t > compute_statistic_variance()
#define SG_UNREF(x)
Definition: SGObject.h:53
const char * get_exception_string()
all of classes and functions are contained in the shogun namespace
Definition: class_list.h:18
virtual EKernelType get_kernel_type()=0
uint64_t size() const
Definition: SGMatrix.h:243
The class Features is the base class of all feature objects.
Definition: Features.h:68
#define SG_SERROR(...)
Definition: SGIO.h:178
Class DataManager for fetching/streaming test data block-wise. It can handle data coming from multipl...
Definition: DataManager.h:63
Abstract base class that provides an interface for performing kernel two-sample test using Maximum Me...
Definition: MMD.h:120
The Kernel base class.
void set_num_null_samples(index_t null_samples)
EVarianceEstimationMethod variance_estimation_method
void merge_samples(NextSamples &, std::vector< CFeatures * > &) const
T * data() const
Definition: SGVector.h:139
class NextSamples is the return type for next() call in DataManager. If there are no more samples (fr...
Definition: NextSamples.h:68
virtual float64_t compute_statistic()
ENullApproximationMethod
Definition: TestEnums.h:53
const index_t blocksize_at(index_t i) const

SHOGUN Machine Learning Toolbox - Documentation