Open Dynamics Engine
threading_impl_templates.h
1 /*************************************************************************
2  * *
3  * Open Dynamics Engine, Copyright (C) 2001-2003 Russell L. Smith. *
4  * All rights reserved. Email: russ@q12.org Web: www.q12.org *
5  * *
6  * Threading implementation templates file. *
7  * Copyright (C) 2011-2012 Oleh Derevenko. All rights reserved. *
8  * e-mail: odar@eleks.com (change all "a" to "e") *
9  * *
10  * This library is free software; you can redistribute it and/or *
11  * modify it under the terms of EITHER: *
12  * (1) The GNU Lesser General Public License as published by the Free *
13  * Software Foundation; either version 2.1 of the License, or (at *
14  * your option) any later version. The text of the GNU Lesser *
15  * General Public License is included with this library in the *
16  * file LICENSE.TXT. *
17  * (2) The BSD-style license that is included with this library in *
18  * the file LICENSE-BSD.TXT. *
19  * *
20  * This library is distributed in the hope that it will be useful, *
21  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
22  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the files *
23  * LICENSE.TXT and LICENSE-BSD.TXT for more details. *
24  * *
25  *************************************************************************/
26 
27 /*
28  * Job list and Mutex group implementation templates for built-in threading
29  * support provider.
30  */
31 
32 
33 #ifndef _ODE_THREADING_IMPL_TEMPLATES_H_
34 #define _ODE_THREADING_IMPL_TEMPLATES_H_
35 
36 
37 #include <ode/common.h>
38 #include <ode/memory.h>
39 
40 #include <ode/threading.h>
41 
42 #include "objects.h"
43 
44 #include <new>
45 
46 
47 #define dMAKE_JOBINSTANCE_RELEASEE(job_instance) ((dCallReleaseeID)(job_instance))
48 #define dMAKE_RELEASEE_JOBINSTANCE(releasee) ((dxThreadedJobInfo *)(releasee))
49 
50 
51 template <class tThreadMutex>
53 {
54 private:
57 
58 public:
59  static dxtemplateMutexGroup<tThreadMutex> *AllocateInstance(dmutexindex_t Mutex_count);
60  static void FreeInstance(dxtemplateMutexGroup<tThreadMutex> *mutex_group);
61 
62 private:
63  bool InitializeMutexArray(dmutexindex_t Mutex_count);
64  void FinalizeMutexArray(dmutexindex_t Mutex_count);
65 
66 public:
67  void LockMutex(dmutexindex_t mutex_index) { dIASSERT(mutex_index < m_un.m_mutex_count); m_Mutex_array[mutex_index].LockMutex(); }
68  bool TryLockMutex(dmutexindex_t mutex_index) { dIASSERT(mutex_index < m_un.m_mutex_count); return m_Mutex_array[mutex_index].TryLockMutex(); }
69  void UnlockMutex(dmutexindex_t mutex_index) { dIASSERT(mutex_index < m_un.m_mutex_count); m_Mutex_array[mutex_index].UnlockMutex(); }
70 
71 private:
72  union
73  {
74  dmutexindex_t m_mutex_count;
75  unsigned long m_reserved_for_allignment[2];
76 
77  } m_un;
78 
79  tThreadMutex m_Mutex_array[1];
80 };
81 
82 template<class tThreadWakeup>
84  public dBase
85 {
86 public:
88  ~dxtemplateCallWait() { DoFinalizeObject(); }
89 
90  bool InitializeObject() { return DoInitializeObject(); }
91 
92 private:
93  bool DoInitializeObject() { return m_wait_wakeup.InitializeObject(); }
94  void DoFinalizeObject() { /* Do nothing */ }
95 
96 public:
98 
99 public:
100  void ResetTheWait() { m_wait_wakeup.ResetWakeup(); }
101  void SignalTheWait() { m_wait_wakeup.WakeupAllThreads(); }
102  bool PerformWaiting(const dThreadedWaitTime *timeout_time_ptr/*=NULL*/) { return m_wait_wakeup.WaitWakeup(timeout_time_ptr); }
103 
104 public:
105  static void AbstractSignalTheWait(void *wait_wakeup_ptr) { ((dxCallWait *)wait_wakeup_ptr)->SignalTheWait(); }
106 
107 private:
108  tThreadWakeup m_wait_wakeup;
109 };
110 
111 
112 #if dBUILTIN_THREADING_IMPL_ENABLED
113 
114 template<class tThreadWakeup, class tAtomicsProvider, const bool tatomic_test_required>
115 class dxtemplateThreadedLull
116 {
117 public:
118  dxtemplateThreadedLull(): m_registrant_count(0), m_alarm_wakeup() {}
119  ~dxtemplateThreadedLull() { dIASSERT(m_registrant_count == 0); DoFinalizeObject(); }
120 
121  bool InitializeObject() { return DoInitializeObject(); }
122 
123 private:
124  bool DoInitializeObject() { return m_alarm_wakeup.InitializeObject(); }
125  void DoFinalizeObject() { /* Do nothing */ }
126 
127 private:
128  typedef typename tAtomicsProvider::atomicord_t atomicord_t;
129 
130 public:
131  void RegisterToLull() { tAtomicsProvider::IncrementTargetNoRet(&m_registrant_count); }
132  void WaitForLullAlarm() { dIASSERT(m_registrant_count != 0); m_alarm_wakeup.WaitWakeup(NULL); }
133  void UnregisterFromLull() { tAtomicsProvider::DecrementTargetNoRet(&m_registrant_count); }
134 
135  void SignalLullAlarmIfAnyRegistrants()
136  {
137  if (tatomic_test_required ? (tAtomicsProvider::QueryTargetValue(&m_registrant_count) != 0) : (m_registrant_count != 0))
138  {
139  m_alarm_wakeup.WakeupAThread();
140  }
141  }
142 
143 private:
144  atomicord_t m_registrant_count;
145  tThreadWakeup m_alarm_wakeup;
146 };
147 
148 
149 #endif // #if dBUILTIN_THREADING_IMPL_ENABLED
150 
151 
153  public dBase
154 {
155  dxThreadedJobInfo() {}
156  explicit dxThreadedJobInfo(void *): m_next_job(NULL) {}
157 
158  void AssignJobData(ddependencycount_t dependencies_count, dxThreadedJobInfo *dependent_job, void *call_wait,
159  int *fault_accumulator_ptr, dThreadedCallFunction *call_function, void *call_context, dcallindex_t call_index)
160  {
161  m_dependencies_count = dependencies_count;
162  m_dependent_job = dependent_job;
163  m_call_wait = call_wait;
164  m_fault_accumulator_ptr = fault_accumulator_ptr;
165 
166  m_call_fault = 0;
167  m_call_function = call_function;
168  m_call_context = call_context;
169  m_call_index = call_index;
170  }
171 
172  bool InvokeCallFunction()
173  {
174  int call_result = m_call_function(m_call_context, m_call_index, dMAKE_JOBINSTANCE_RELEASEE(this));
175  return call_result != 0;
176  }
177 
178  dxThreadedJobInfo *m_next_job;
179  dxThreadedJobInfo **m_prev_job_next_ptr;
180 
181  ddependencycount_t m_dependencies_count;
182  dxThreadedJobInfo *m_dependent_job;
183  void *m_call_wait;
184  int *m_fault_accumulator_ptr;
185 
186  int m_call_fault;
187  dThreadedCallFunction *m_call_function;
188  void *m_call_context;
189  dcallindex_t m_call_index;
190 };
191 
192 
193 template<class tThreadMutex>
195 {
196 public:
197  dxtemplateThreadingLockHelper(tThreadMutex &mutex_instance): m_mutex_instance(mutex_instance), m_lock_indicator_flag(false) { LockMutex(); }
198  ~dxtemplateThreadingLockHelper() { if (m_lock_indicator_flag) { UnlockMutex(); } }
199 
200  void LockMutex() { dIASSERT(!m_lock_indicator_flag); m_mutex_instance.LockMutex(); m_lock_indicator_flag = true; }
201  void UnlockMutex() { dIASSERT(m_lock_indicator_flag); m_mutex_instance.UnlockMutex(); m_lock_indicator_flag = false; }
202 
203 private:
204  tThreadMutex &m_mutex_instance;
205  bool m_lock_indicator_flag;
206 };
207 
208 template<class tThreadLull, class tThreadMutex, class tAtomicsProvider>
210 {
211 public:
213  m_job_list(NULL),
214  m_info_pool((atomicptr_t)NULL),
215  m_pool_access_lock(),
216  m_list_access_lock(),
217  m_info_wait_lull(),
218  m_info_count_known_to_be_preallocated(0)
219  {
220  }
221 
223  {
224  dIASSERT(m_job_list == NULL); // Would not it be nice to wait for jobs to complete before deleting the list?
225 
226  FreeJobInfoPoolInfos();
227  DoFinalizeObject();
228  }
229 
230  bool InitializeObject() { return DoInitializeObject(); }
231 
232 private:
233  bool DoInitializeObject() { return m_pool_access_lock.InitializeObject() && m_list_access_lock.InitializeObject() && m_info_wait_lull.InitializeObject(); }
234  void DoFinalizeObject() { /* Do nothing */ }
235 
236 public:
237  typedef tAtomicsProvider dxAtomicsProvider;
238  typedef typename tAtomicsProvider::atomicord_t atomicord_t;
239  typedef typename tAtomicsProvider::atomicptr_t atomicptr_t;
240  typedef tThreadMutex dxThreadMutex;
242  typedef void dWaitSignallingFunction(void *job_call_wait);
243 
244 public:
245  dxThreadedJobInfo *ReleaseAJobAndPickNextPendingOne(
246  dxThreadedJobInfo *job_to_release, bool job_result, dWaitSignallingFunction *wait_signal_proc_ptr,
247  bool &out_last_job_flag);
248 
249 private:
250  dxThreadedJobInfo *PickNextPendingJob(bool &out_last_job_flag);
251  void ReleaseAJob(dxThreadedJobInfo *job_instance, bool job_result, dWaitSignallingFunction *wait_signal_proc_ptr);
252 
253 public:
254  inline dxThreadedJobInfo *AllocateJobInfoFromPool();
255  void QueueJobForProcessing(dxThreadedJobInfo *job_instance);
256 
257  void AlterJobProcessingDependencies(dxThreadedJobInfo *job_instance, ddependencychange_t dependencies_count_change,
258  bool &out_job_has_become_ready);
259 
260 private:
261  inline ddependencycount_t SmartAddJobDependenciesCount(dxThreadedJobInfo *job_instance, ddependencychange_t dependencies_count_change);
262 
263  inline void InsertJobInfoIntoListHead(dxThreadedJobInfo *job_instance);
264  inline void RemoveJobInfoFromList(dxThreadedJobInfo *job_instance);
265 
266  dxThreadedJobInfo *ExtractJobInfoFromPoolOrAllocate();
267  inline void ReleaseJobInfoIntoPool(dxThreadedJobInfo *job_instance);
268 
269 private:
270  void FreeJobInfoPoolInfos();
271 
272 public:
273  bool EnsureNumberOfJobInfosIsPreallocated(ddependencycount_t required_info_count);
274 
275 private:
276  bool DoPreallocateJobInfos(ddependencycount_t required_info_count);
277 
278 public:
279  bool IsJobListReadyForShutdown() const { return m_job_list == NULL; }
280 
281 private:
282  dxThreadedJobInfo *m_job_list;
283  volatile atomicptr_t m_info_pool; // dxThreadedJobInfo *
284  tThreadMutex m_pool_access_lock;
285  tThreadMutex m_list_access_lock;
286  tThreadLull m_info_wait_lull;
287  ddependencycount_t m_info_count_known_to_be_preallocated;
288 };
289 
290 
291 typedef void (dxThreadReadyToServeCallback)(void *callback_context);
292 
293 
294 #if dBUILTIN_THREADING_IMPL_ENABLED
295 
296 template<class tThreadWakeup, class tJobListContainer>
297 class dxtemplateJobListThreadedHandler
298 {
299 public:
300  dxtemplateJobListThreadedHandler(tJobListContainer *list_container_ptr):
301  m_job_list_ptr(list_container_ptr),
302  m_processing_wakeup(),
303  m_active_thread_count(0),
304  m_shutdown_requested(0)
305  {
306  }
307 
308  ~dxtemplateJobListThreadedHandler()
309  {
310  dIASSERT(m_active_thread_count == 0);
311 
312  DoFinalizeObject();
313  }
314 
315  bool InitializeObject() { return DoInitializeObject(); }
316 
317 private:
318  bool DoInitializeObject() { return m_processing_wakeup.InitializeObject(); }
319  void DoFinalizeObject() { /* Do nothing */ }
320 
321 public:
322  typedef dxtemplateCallWait<tThreadWakeup> dxCallWait;
323 
324 public:
325  inline void ProcessActiveJobAddition();
326  inline void PrepareForWaitingAJobCompletion();
327 
328 public:
329  inline unsigned RetrieveActiveThreadsCount();
330  inline void StickToJobsProcessing(dxThreadReadyToServeCallback *readiness_callback/*=NULL*/, void *callback_context/*=NULL*/);
331 
332 private:
333  void PerformJobProcessingUntilShutdown();
334  void PerformJobProcessingSession();
335 
336  void BlockAsIdleThread();
337  void ActivateAnIdleThread();
338 
339 public:
340  inline void ShutdownProcessing();
341  inline void CleanupForRestart();
342 
343 private:
344  bool IsShutdownRequested() const { return m_shutdown_requested != 0; }
345 
346 private:
347  typedef typename tJobListContainer::dxAtomicsProvider dxAtomicsProvider;
348  typedef typename tJobListContainer::atomicord_t atomicord_t;
349 
350  atomicord_t GetActiveThreadsCount() const { return m_active_thread_count; }
351  void RegisterAsActiveThread() { dxAtomicsProvider::template AddValueToTarget<sizeof(atomicord_t)>((volatile void *)&m_active_thread_count, 1); }
352  void UnregisterAsActiveThread() { dxAtomicsProvider::template AddValueToTarget<sizeof(atomicord_t)>((volatile void *)&m_active_thread_count, -1); }
353 
354 private:
355  tJobListContainer *m_job_list_ptr;
356  tThreadWakeup m_processing_wakeup;
357  volatile atomicord_t m_active_thread_count;
358  int m_shutdown_requested;
359 };
360 
361 
362 #endif // #if dBUILTIN_THREADING_IMPL_ENABLED
363 
364 
365 template<class tThreadWakeup, class tJobListContainer>
367 {
368 public:
369  dxtemplateJobListSelfHandler(tJobListContainer *list_container_ptr):
370  m_job_list_ptr(list_container_ptr)
371  {
372  }
373 
375  {
376  // Do nothing
377  }
378 
379  bool InitializeObject() { return true; }
380 
381 public:
383 
384 public:
385  inline void ProcessActiveJobAddition();
386  inline void PrepareForWaitingAJobCompletion();
387 
388 public:
389  inline unsigned RetrieveActiveThreadsCount();
390  inline void StickToJobsProcessing(dxThreadReadyToServeCallback *readiness_callback/*=NULL*/, void *callback_context/*=NULL*/);
391 
392 private:
393  void PerformJobProcessingUntilExhaustion();
394  void PerformJobProcessingSession();
395 
396 public:
397  inline void ShutdownProcessing();
398  inline void CleanupForRestart();
399 
400 private:
401  tJobListContainer *m_job_list_ptr;
402 };
403 
404 
405 struct dIMutexGroup;
406 struct dxICallWait;
407 
409 {
410 public:
411  virtual void FreeInstance() = 0;
412 
413 public:
414  virtual dIMutexGroup *AllocMutexGroup(dmutexindex_t Mutex_count) = 0;
415  virtual void FreeMutexGroup(dIMutexGroup *mutex_group) = 0;
416  virtual void LockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index) = 0;
417  // virtual bool TryLockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index) = 0;
418  virtual void UnlockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index) = 0;
419 
420 public:
421  virtual dxICallWait *AllocACallWait() = 0;
422  virtual void ResetACallWait(dxICallWait *call_wait) = 0;
423  virtual void FreeACallWait(dxICallWait *call_wait) = 0;
424 
425 public:
426  virtual bool PreallocateJobInfos(ddependencycount_t max_simultaneous_calls_estimate) = 0;
427  virtual void ScheduleNewJob(int *fault_accumulator_ptr/*=NULL*/,
428  dCallReleaseeID *out_post_releasee_ptr/*=NULL*/, ddependencycount_t dependencies_count, dCallReleaseeID dependent_releasee/*=NULL*/,
429  dxICallWait *call_wait/*=NULL*/,
430  dThreadedCallFunction *call_func, void *call_context, dcallindex_t instance_index) = 0;
431  virtual void AlterJobDependenciesCount(dCallReleaseeID target_releasee, ddependencychange_t dependencies_count_change) = 0;
432  virtual void WaitJobCompletion(int *out_wait_status_ptr/*=NULL*/,
433  dxICallWait *call_wait, const dThreadedWaitTime *timeout_time_ptr/*=NULL*/) = 0;
434 
435 public:
436  virtual unsigned RetrieveActiveThreadsCount() = 0;
437  virtual void StickToJobsProcessing(dxThreadReadyToServeCallback *readiness_callback/*=NULL*/, void *callback_context/*=NULL*/) = 0;
438  virtual void ShutdownProcessing() = 0;
439  virtual void CleanupForRestart() = 0;
440 };
441 
442 
443 template<class tJobListContainer, class tJobListHandler>
445  public dBase,
447 {
448 public:
450  dBase(),
451  m_list_container(),
452  m_list_handler(&m_list_container)
453  {
454  }
455 
457  {
458  DoFinalizeObject();
459  }
460 
461  bool InitializeObject() { return DoInitializeObject(); }
462 
463 private:
464  bool DoInitializeObject() { return m_list_container.InitializeObject() && m_list_handler.InitializeObject(); }
465  void DoFinalizeObject() { /* Do nothing */ }
466 
467 protected:
468  virtual void FreeInstance();
469 
470 private:
472  typedef typename tJobListHandler::dxCallWait dxCallWait;
473 
474 protected:
475  virtual dIMutexGroup *AllocMutexGroup(dmutexindex_t Mutex_count);
476  virtual void FreeMutexGroup(dIMutexGroup *mutex_group);
477  virtual void LockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index);
478  // virtual bool TryLockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index);
479  virtual void UnlockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index);
480 
481 protected:
482  virtual dxICallWait *AllocACallWait();
483  virtual void ResetACallWait(dxICallWait *call_wait);
484  virtual void FreeACallWait(dxICallWait *call_wait);
485 
486 protected:
487  virtual bool PreallocateJobInfos(ddependencycount_t max_simultaneous_calls_estimate);
488  virtual void ScheduleNewJob(int *fault_accumulator_ptr/*=NULL*/,
489  dCallReleaseeID *out_post_releasee_ptr/*=NULL*/, ddependencycount_t dependencies_count, dCallReleaseeID dependent_releasee/*=NULL*/,
490  dxICallWait *call_wait/*=NULL*/,
491  dThreadedCallFunction *call_func, void *call_context, dcallindex_t instance_index);
492  virtual void AlterJobDependenciesCount(dCallReleaseeID target_releasee, ddependencychange_t dependencies_count_change);
493  virtual void WaitJobCompletion(int *out_wait_status_ptr/*=NULL*/,
494  dxICallWait *call_wait, const dThreadedWaitTime *timeout_time_ptr/*=NULL*/);
495 
496 protected:
497  virtual unsigned RetrieveActiveThreadsCount();
498  virtual void StickToJobsProcessing(dxThreadReadyToServeCallback *readiness_callback/*=NULL*/, void *callback_context/*=NULL*/);
499  virtual void ShutdownProcessing();
500  virtual void CleanupForRestart();
501 
502 private:
503  tJobListContainer m_list_container;
504  tJobListHandler m_list_handler;
505 };
506 
507 
508 /************************************************************************/
509 /* Implementation of dxtemplateMutexGroup */
510 /************************************************************************/
511 
512 template<class tThreadMutex>
514 {
515  dAASSERT(Mutex_count != 0);
516 
517  const dxtemplateMutexGroup<tThreadMutex> *const dummy_group = (dxtemplateMutexGroup<tThreadMutex> *)(size_t)8;
518  const size_t size_requited = ((size_t)(&dummy_group->m_Mutex_array) - (size_t)dummy_group) + Mutex_count * sizeof(tThreadMutex);
519  dxtemplateMutexGroup<tThreadMutex> *mutex_group = (dxtemplateMutexGroup<tThreadMutex> *)dAlloc(size_requited);
520 
521  if (mutex_group != NULL)
522  {
523  mutex_group->m_un.m_mutex_count = Mutex_count;
524 
525  if (!mutex_group->InitializeMutexArray(Mutex_count))
526  {
527  dFree((void *)mutex_group, size_requited);
528  mutex_group = NULL;
529  }
530  }
531 
532  return mutex_group;
533 }
534 
535 template<class tThreadMutex>
537 {
538  if (mutex_group != NULL)
539  {
540  dmutexindex_t Mutex_count = mutex_group->m_un.m_mutex_count;
541  mutex_group->FinalizeMutexArray(Mutex_count);
542 
543  const size_t anyting_not_zero = 2 * sizeof(size_t);
544  const dxtemplateMutexGroup<tThreadMutex> *const dummy_group = (dxtemplateMutexGroup<tThreadMutex> *)anyting_not_zero;
545  const size_t size_requited = ((size_t)(&dummy_group->m_Mutex_array) - (size_t)dummy_group) + Mutex_count * sizeof(tThreadMutex);
546  dFree((void *)mutex_group, size_requited);
547  }
548 }
549 
550 template<class tThreadMutex>
551 bool dxtemplateMutexGroup<tThreadMutex>::InitializeMutexArray(dmutexindex_t Mutex_count)
552 {
553  bool any_fault = false;
554 
555  dmutexindex_t mutex_index = 0;
556  for (; mutex_index != Mutex_count; ++mutex_index)
557  {
558  tThreadMutex *mutex_storage = m_Mutex_array + mutex_index;
559 
560  new(mutex_storage) tThreadMutex;
561 
562  if (!mutex_storage->InitializeObject())
563  {
564  mutex_storage->tThreadMutex::~tThreadMutex();
565 
566  any_fault = true;
567  break;
568  }
569  }
570 
571  if (any_fault)
572  {
573  FinalizeMutexArray(mutex_index);
574  }
575 
576  bool init_result = !any_fault;
577  return init_result;
578 }
579 
580 template<class tThreadMutex>
581 void dxtemplateMutexGroup<tThreadMutex>::FinalizeMutexArray(dmutexindex_t Mutex_count)
582 {
583  for (dmutexindex_t mutex_index = 0; mutex_index != Mutex_count; ++mutex_index)
584  {
585  tThreadMutex *mutex_storage = m_Mutex_array + mutex_index;
586 
587  mutex_storage->tThreadMutex::~tThreadMutex();
588  }
589 }
590 
591 /************************************************************************/
592 /* Implementation of dxtemplateJobListContainer */
593 /************************************************************************/
594 
595 template<class tThreadLull, class tThreadMutex, class tAtomicsProvider>
597  dxThreadedJobInfo *job_to_release, bool job_result, dWaitSignallingFunction *wait_signal_proc_ptr, bool &out_last_job_flag)
598 {
599  if (job_to_release != NULL)
600  {
601  ReleaseAJob(job_to_release, job_result, wait_signal_proc_ptr);
602  }
603 
604  dxMutexLockHelper list_access(m_list_access_lock);
605 
606  dxThreadedJobInfo *picked_job = PickNextPendingJob(out_last_job_flag);
607  return picked_job;
608 }
609 
610 template<class tThreadLull, class tThreadMutex, class tAtomicsProvider>
612  bool &out_last_job_flag)
613 {
614  dxThreadedJobInfo *current_job = m_job_list;
615  bool last_job_flag = false;
616 
617  while (current_job != NULL)
618  {
619  if (current_job->m_dependencies_count == 0)
620  {
621  // It is OK to assign in unsafe manner - dependencies count should not be changed
622  // after the job has become ready for execution
623  current_job->m_dependencies_count = 1;
624  last_job_flag = current_job->m_next_job == NULL;
625 
626  RemoveJobInfoFromList(current_job);
627  break;
628  }
629 
630  current_job = current_job->m_next_job;
631  }
632 
633  out_last_job_flag = last_job_flag;
634  return current_job;
635 }
636 
637 template<class tThreadLull, class tThreadMutex, class tAtomicsProvider>
639  dxThreadedJobInfo *job_instance, bool job_result, dWaitSignallingFunction *wait_signal_proc_ptr)
640 {
641  dxThreadedJobInfo *current_job = job_instance;
642 
643  if (!job_result)
644  {
645  // Accumulate call fault (be careful to not reset it!!!)
646  current_job->m_call_fault = 1;
647  }
648 
649  bool job_dequeued = true;
650  dIASSERT(current_job->m_prev_job_next_ptr == NULL);
651 
652  while (true)
653  {
654  dIASSERT(current_job->m_dependencies_count != 0);
655 
656  ddependencycount_t new_dependencies_count = SmartAddJobDependenciesCount(current_job, -1);
657 
658  if (new_dependencies_count != 0 || !job_dequeued)
659  {
660  break;
661  }
662 
663  void *job_call_wait = current_job->m_call_wait;
664 
665  if (job_call_wait != NULL)
666  {
667  wait_signal_proc_ptr(job_call_wait);
668  }
669 
670  int call_fault = current_job->m_call_fault;
671 
672  if (current_job->m_fault_accumulator_ptr)
673  {
674  *current_job->m_fault_accumulator_ptr = call_fault;
675  }
676 
677  dxThreadedJobInfo *dependent_job = current_job->m_dependent_job;
678  ReleaseJobInfoIntoPool(current_job);
679 
680  if (dependent_job == NULL)
681  {
682  break;
683  }
684 
685  if (call_fault)
686  {
687  // Accumulate call fault (be careful to not reset it!!!)
688  dependent_job->m_call_fault = 1;
689  }
690 
691  current_job = dependent_job;
692  job_dequeued = dependent_job->m_prev_job_next_ptr == NULL;
693  }
694 }
695 
696 template<class tThreadLull, class tThreadMutex, class tAtomicsProvider>
698 {
699  // No locking is necessary
700  dxThreadedJobInfo *job_instance = ExtractJobInfoFromPoolOrAllocate();
701  return job_instance;
702 }
703 
704 template<class tThreadLull, class tThreadMutex, class tAtomicsProvider>
706 {
707  dxMutexLockHelper list_access(m_list_access_lock);
708 
709  InsertJobInfoIntoListHead(job_instance);
710 }
711 
712 
713 template<class tThreadLull, class tThreadMutex, class tAtomicsProvider>
715  bool &out_job_has_become_ready)
716 {
717  // Dependencies should not be changed when job has already become ready for execution
718  dIASSERT(job_instance->m_dependencies_count != 0);
719  // It's OK that access is not atomic - that is to be handled by external logic
720  dIASSERT(dependencies_count_change < 0 ? (job_instance->m_dependencies_count >= (ddependencycount_t)(-dependencies_count_change)) : ((ddependencycount_t)(-(ddependencychange_t)job_instance->m_dependencies_count) > (ddependencycount_t)dependencies_count_change));
721 
722  ddependencycount_t new_dependencies_count = SmartAddJobDependenciesCount(job_instance, dependencies_count_change);
723  out_job_has_become_ready = new_dependencies_count == 0;
724 }
725 
726 
727 template<class tThreadLull, class tThreadMutex, class tAtomicsProvider>
729  dxThreadedJobInfo *job_instance, ddependencychange_t dependencies_count_change)
730 {
731  ddependencycount_t new_dependencies_count = tAtomicsProvider::template AddValueToTarget<sizeof(ddependencycount_t)>((volatile void *)&job_instance->m_dependencies_count, dependencies_count_change) + dependencies_count_change;
732  return new_dependencies_count;
733 }
734 
735 
736 template<class tThreadLull, class tThreadMutex, class tAtomicsProvider>
738  dxThreadedJobInfo *job_instance)
739 {
740  dxThreadedJobInfo *job_list_head = m_job_list;
741  job_instance->m_next_job = job_list_head;
742 
743  if (job_list_head != NULL)
744  {
745  job_list_head->m_prev_job_next_ptr = &job_instance->m_next_job;
746  }
747 
748  job_instance->m_prev_job_next_ptr = &m_job_list;
749  m_job_list = job_instance;
750 }
751 
752 template<class tThreadLull, class tThreadMutex, class tAtomicsProvider>
754  dxThreadedJobInfo *job_instance)
755 {
756  if (job_instance->m_next_job)
757  {
758  job_instance->m_next_job->m_prev_job_next_ptr = job_instance->m_prev_job_next_ptr;
759  }
760 
761  *job_instance->m_prev_job_next_ptr = job_instance->m_next_job;
762  // Assign NULL to m_prev_job_next_ptr as an indicator that instance has been dequeued
763  job_instance->m_prev_job_next_ptr = NULL;
764 }
765 
766 template<class tThreadLull, class tThreadMutex, class tAtomicsProvider>
768 {
769  dxThreadedJobInfo *result_info;
770 
771  bool waited_lull = false;
772  m_info_wait_lull.RegisterToLull();
773 
774  while (true)
775  {
776  dxThreadedJobInfo *raw_head_info = (dxThreadedJobInfo *)m_info_pool;
777 
778  if (raw_head_info == NULL)
779  {
780  result_info = new dxThreadedJobInfo();
781 
782  if (result_info != NULL)
783  {
784  break;
785  }
786 
787  m_info_wait_lull.WaitForLullAlarm();
788  waited_lull = true;
789  }
790 
791  // Extraction must be locked so that other thread does not "steal" head info,
792  // use it and then reinsert back with a different "next"
793  dxMutexLockHelper pool_access(m_pool_access_lock);
794 
795  dxThreadedJobInfo *head_info = (dxThreadedJobInfo *)m_info_pool; // Head info must be re-read after mutex had been locked
796 
797  if (head_info != NULL)
798  {
799  dxThreadedJobInfo *next_info = head_info->m_next_job;
800  if (tAtomicsProvider::CompareExchangeTargetPtr(&m_info_pool, (atomicptr_t)head_info, (atomicptr_t)next_info))
801  {
802  result_info = head_info;
803  break;
804  }
805  }
806  }
807 
808  m_info_wait_lull.UnregisterFromLull();
809 
810  if (waited_lull)
811  {
812  // It is necessary to re-signal lull alarm if current thread was waiting as
813  // there might be other threads waiting which might have not received alarm signal.
814  m_info_wait_lull.SignalLullAlarmIfAnyRegistrants();
815  }
816 
817  return result_info;
818 }
819 
820 template<class tThreadLull, class tThreadMutex, class tAtomicsProvider>
822  dxThreadedJobInfo *job_instance)
823 {
824  while (true)
825  {
826  dxThreadedJobInfo *next_info = (dxThreadedJobInfo *)m_info_pool;
827  job_instance->m_next_job = next_info;
828 
829  if (tAtomicsProvider::CompareExchangeTargetPtr(&m_info_pool, (atomicptr_t)next_info, (atomicptr_t)job_instance))
830  {
831  break;
832  }
833  }
834 
835  m_info_wait_lull.SignalLullAlarmIfAnyRegistrants();
836 }
837 
838 template<class tThreadLull, class tThreadMutex, class tAtomicsProvider>
840 {
841  dxThreadedJobInfo *current_info = (dxThreadedJobInfo *)m_info_pool;
842 
843  while (current_info != NULL)
844  {
845  dxThreadedJobInfo *info_save = current_info;
846  current_info = current_info->m_next_job;
847 
848  delete info_save;
849  }
850 
851  m_info_pool = (atomicptr_t)NULL;
852 }
853 
854 template<class tThreadLull, class tThreadMutex, class tAtomicsProvider>
856 {
857  bool result = required_info_count <= m_info_count_known_to_be_preallocated
858  || DoPreallocateJobInfos(required_info_count);
859  return result;
860 }
861 
862 template<class tThreadLull, class tThreadMutex, class tAtomicsProvider>
864 {
865  dIASSERT(required_info_count > m_info_count_known_to_be_preallocated); // Also ensures required_info_count > 0
866 
867  bool allocation_failure = false;
868 
869  dxThreadedJobInfo *info_pool = (dxThreadedJobInfo *)m_info_pool;
870 
871  ddependencycount_t info_index = 0;
872  for (dxThreadedJobInfo **current_info_ptr = &info_pool; ; )
873  {
874  dxThreadedJobInfo *current_info = *current_info_ptr;
875 
876  if (current_info == NULL)
877  {
878  current_info = new dxThreadedJobInfo(NULL);
879 
880  if (current_info == NULL)
881  {
882  allocation_failure = true;
883  break;
884  }
885 
886  *current_info_ptr = current_info;
887  }
888 
889  if (++info_index == required_info_count)
890  {
891  m_info_count_known_to_be_preallocated = info_index;
892  break;
893  }
894 
895  current_info_ptr = &current_info->m_next_job;
896  }
897 
898  // Make sure m_info_pool was not changed
899  dIASSERT(m_info_pool == NULL || m_info_pool == (atomicptr_t)info_pool);
900 
901  m_info_pool = (atomicptr_t)info_pool;
902 
903  bool result = !allocation_failure;
904  return result;
905 }
906 
907 
908 #if dBUILTIN_THREADING_IMPL_ENABLED
909 
910 /************************************************************************/
911 /* Implementation of dxtemplateJobListThreadedHandler */
912 /************************************************************************/
913 
914 template<class tThreadWakeup, class tJobListContainer>
915 void dxtemplateJobListThreadedHandler<tThreadWakeup, tJobListContainer>::ProcessActiveJobAddition()
916 {
917  ActivateAnIdleThread();
918 }
919 
920 template<class tThreadWakeup, class tJobListContainer>
921 void dxtemplateJobListThreadedHandler<tThreadWakeup, tJobListContainer>::PrepareForWaitingAJobCompletion()
922 {
923  // Do nothing
924 }
925 
926 template<class tThreadWakeup, class tJobListContainer>
927 unsigned dxtemplateJobListThreadedHandler<tThreadWakeup, tJobListContainer>::RetrieveActiveThreadsCount()
928 {
929  return GetActiveThreadsCount();
930 }
931 
932 template<class tThreadWakeup, class tJobListContainer>
933 void dxtemplateJobListThreadedHandler<tThreadWakeup, tJobListContainer>::StickToJobsProcessing(dxThreadReadyToServeCallback *readiness_callback/*=NULL*/, void *callback_context/*=NULL*/)
934 {
935  RegisterAsActiveThread();
936 
937  if (readiness_callback != NULL)
938  {
939  (*readiness_callback)(callback_context);
940  }
941 
942  PerformJobProcessingUntilShutdown();
943 
944  UnregisterAsActiveThread();
945 }
946 
947 
948 template<class tThreadWakeup, class tJobListContainer>
949 void dxtemplateJobListThreadedHandler<tThreadWakeup, tJobListContainer>::PerformJobProcessingUntilShutdown()
950 {
951  while (true)
952  {
953  // It is expected that new jobs will not be queued any longer after shutdown had been requested
954  if (IsShutdownRequested() && m_job_list_ptr->IsJobListReadyForShutdown())
955  {
956  break;
957  }
958 
959  PerformJobProcessingSession();
960 
961  // It is expected that new jobs will not be queued any longer after shutdown had been requested
962  if (IsShutdownRequested() && m_job_list_ptr->IsJobListReadyForShutdown())
963  {
964  break;
965  }
966 
967  BlockAsIdleThread();
968  }
969 }
970 
971 template<class tThreadWakeup, class tJobListContainer>
972 void dxtemplateJobListThreadedHandler<tThreadWakeup, tJobListContainer>::PerformJobProcessingSession()
973 {
974  dxThreadedJobInfo *current_job = NULL;
975  bool job_result = false;
976 
977  while (true)
978  {
979  bool last_job_flag;
980  current_job = m_job_list_ptr->ReleaseAJobAndPickNextPendingOne(current_job, job_result, &dxCallWait::AbstractSignalTheWait, last_job_flag);
981 
982  if (!current_job)
983  {
984  break;
985  }
986 
987  if (!last_job_flag)
988  {
989  ActivateAnIdleThread();
990  }
991 
992  job_result = current_job->InvokeCallFunction();
993  }
994 }
995 
996 
997 template<class tThreadWakeup, class tJobListContainer>
998 void dxtemplateJobListThreadedHandler<tThreadWakeup, tJobListContainer>::BlockAsIdleThread()
999 {
1000  m_processing_wakeup.WaitWakeup(NULL);
1001 }
1002 
1003 template<class tThreadWakeup, class tJobListContainer>
1004 void dxtemplateJobListThreadedHandler<tThreadWakeup, tJobListContainer>::ActivateAnIdleThread()
1005 {
1006  m_processing_wakeup.WakeupAThread();
1007 }
1008 
1009 
1010 template<class tThreadWakeup, class tJobListContainer>
1011 void dxtemplateJobListThreadedHandler<tThreadWakeup, tJobListContainer>::ShutdownProcessing()
1012 {
1013  m_shutdown_requested = true;
1014  m_processing_wakeup.WakeupAllThreads();
1015 }
1016 
1017 template<class tThreadWakeup, class tJobListContainer>
1018 void dxtemplateJobListThreadedHandler<tThreadWakeup, tJobListContainer>::CleanupForRestart()
1019 {
1020  m_shutdown_requested = false;
1021  m_processing_wakeup.ResetWakeup();
1022 }
1023 
1024 
1025 #endif // #if dBUILTIN_THREADING_IMPL_ENABLED
1026 
1027 
1028 /************************************************************************/
1029 /* Implementation of dxtemplateJobListSelfHandler */
1030 /************************************************************************/
1031 
1032 template<class tThreadWakeup, class tJobListContainer>
1034 {
1035  // Do nothing
1036 }
1037 
1038 template<class tThreadWakeup, class tJobListContainer>
1040 {
1041  PerformJobProcessingUntilExhaustion();
1042 }
1043 
1044 
1045 template<class tThreadWakeup, class tJobListContainer>
1047 {
1048  return 1U; // Self-Handling is always performed by a single thread
1049 }
1050 
1051 template<class tThreadWakeup, class tJobListContainer>
1052 void dxtemplateJobListSelfHandler<tThreadWakeup, tJobListContainer>::StickToJobsProcessing(dxThreadReadyToServeCallback *readiness_callback/*=NULL*/, void *callback_context/*=NULL*/)
1053 {
1054  dIASSERT(false); // This method is not expected to be called for Self-Handler
1055 }
1056 
1057 
1058 template<class tThreadWakeup, class tJobListContainer>
1060 {
1061  PerformJobProcessingSession();
1062 }
1063 
1064 template<class tThreadWakeup, class tJobListContainer>
1066 {
1067  dxThreadedJobInfo *current_job = NULL;
1068  bool job_result = false;
1069 
1070  while (true)
1071  {
1072  bool dummy_last_job_flag;
1073  current_job = m_job_list_ptr->ReleaseAJobAndPickNextPendingOne(current_job, job_result, &dxCallWait::AbstractSignalTheWait, dummy_last_job_flag);
1074 
1075  if (!current_job)
1076  {
1077  break;
1078  }
1079 
1080  job_result = current_job->InvokeCallFunction();
1081  }
1082 }
1083 
1084 template<class tThreadWakeup, class tJobListContainer>
1086 {
1087  // Do nothing
1088 }
1089 
1090 template<class tThreadWakeup, class tJobListContainer>
1092 {
1093  // Do nothing
1094 }
1095 
1096 
1097 /************************************************************************/
1098 /* Implementation of dxtemplateThreadingImplementation */
1099 /************************************************************************/
1100 
1101 template<class tJobListContainer, class tJobListHandler>
1103 {
1104  delete this;
1105 }
1106 
1107 
1108 template<class tJobListContainer, class tJobListHandler>
1110 {
1111  dxMutexGroup *mutex_group = dxMutexGroup::AllocateInstance(Mutex_count);
1112  return (dIMutexGroup *)mutex_group;
1113 }
1114 
1115 template<class tJobListContainer, class tJobListHandler>
1117 {
1118  dxMutexGroup::FreeInstance((dxMutexGroup *)mutex_group);
1119 }
1120 
1121 template<class tJobListContainer, class tJobListHandler>
1122 void dxtemplateThreadingImplementation<tJobListContainer, tJobListHandler>::LockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index)
1123 {
1124  ((dxMutexGroup *)mutex_group)->LockMutex(mutex_index);
1125 }
1126 
1127 // template<class tJobListContainer, class tJobListHandler>
1128 // bool dxtemplateThreadingImplementation<tJobListContainer, tJobListHandler>::TryLockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index)
1129 // {
1130 // return ((dxMutexGroup *)mutex_group)->TryLockMutex(mutex_index);
1131 // }
1132 
1133 template<class tJobListContainer, class tJobListHandler>
1134 void dxtemplateThreadingImplementation<tJobListContainer, tJobListHandler>::UnlockMutexGroupMutex(dIMutexGroup *mutex_group, dmutexindex_t mutex_index)
1135 {
1136  ((dxMutexGroup *)mutex_group)->UnlockMutex(mutex_index);
1137 }
1138 
1139 
1140 template<class tJobListContainer, class tJobListHandler>
1142 {
1143  dxCallWait *call_wait = new dxCallWait();
1144 
1145  if (call_wait != NULL && !call_wait->InitializeObject())
1146  {
1147  delete call_wait;
1148  call_wait = NULL;
1149  }
1150 
1151  return (dxICallWait *)call_wait;
1152 }
1153 
1154 template<class tJobListContainer, class tJobListHandler>
1156 {
1157  ((dxCallWait *)call_wait)->ResetTheWait();
1158 }
1159 
1160 template<class tJobListContainer, class tJobListHandler>
1162 {
1163  delete ((dxCallWait *)call_wait);
1164 }
1165 
1166 
1167 template<class tJobListContainer, class tJobListHandler>
1168 bool dxtemplateThreadingImplementation<tJobListContainer, tJobListHandler>::PreallocateJobInfos(ddependencycount_t max_simultaneous_calls_estimate)
1169 {
1170  // No multithreading protection here!
1171  // Resources are to be preallocated before jobs start to be scheduled
1172  // as otherwise there is no way to implement the preallocation.
1173  bool result = m_list_container.EnsureNumberOfJobInfosIsPreallocated(max_simultaneous_calls_estimate);
1174  return result;
1175 }
1176 
1177 template<class tJobListContainer, class tJobListHandler>
1179  int *fault_accumulator_ptr/*=NULL*/,
1180  dCallReleaseeID *out_post_releasee_ptr/*=NULL*/, ddependencycount_t dependencies_count, dCallReleaseeID dependent_releasee/*=NULL*/,
1181  dxICallWait *call_wait/*=NULL*/,
1182  dThreadedCallFunction *call_func, void *call_context, dcallindex_t instance_index)
1183 {
1184  dxThreadedJobInfo *new_job = m_list_container.AllocateJobInfoFromPool();
1185  dIASSERT(new_job != NULL);
1186 
1187  new_job->AssignJobData(dependencies_count, dMAKE_RELEASEE_JOBINSTANCE(dependent_releasee), (dxCallWait *)call_wait, fault_accumulator_ptr, call_func, call_context, instance_index);
1188 
1189  if (out_post_releasee_ptr != NULL)
1190  {
1191  *out_post_releasee_ptr = dMAKE_JOBINSTANCE_RELEASEE(new_job);
1192  }
1193 
1194  m_list_container.QueueJobForProcessing(new_job);
1195 
1196  if (dependencies_count == 0)
1197  {
1198  m_list_handler.ProcessActiveJobAddition();
1199  }
1200 }
1201 
1202 template<class tJobListContainer, class tJobListHandler>
1204  dCallReleaseeID target_releasee, ddependencychange_t dependencies_count_change)
1205 {
1206  dIASSERT(dependencies_count_change != 0);
1207 
1208  dxThreadedJobInfo *job_instance = dMAKE_RELEASEE_JOBINSTANCE(target_releasee);
1209 
1210  bool job_has_become_ready;
1211  m_list_container.AlterJobProcessingDependencies(job_instance, dependencies_count_change, job_has_become_ready);
1212 
1213  if (job_has_become_ready)
1214  {
1215  m_list_handler.ProcessActiveJobAddition();
1216  }
1217 }
1218 
1219 template<class tJobListContainer, class tJobListHandler>
1221  int *out_wait_status_ptr/*=NULL*/,
1222  dxICallWait *call_wait, const dThreadedWaitTime *timeout_time_ptr/*=NULL*/)
1223 {
1224  dIASSERT(call_wait != NULL);
1225 
1226  m_list_handler.PrepareForWaitingAJobCompletion();
1227 
1228  bool wait_status = ((dxCallWait *)call_wait)->PerformWaiting(timeout_time_ptr);
1229  dIASSERT(timeout_time_ptr != NULL || wait_status);
1230 
1231  if (out_wait_status_ptr)
1232  {
1233  *out_wait_status_ptr = wait_status;
1234  }
1235 }
1236 
1237 
1238 template<class tJobListContainer, class tJobListHandler>
1240 {
1241  return m_list_handler.RetrieveActiveThreadsCount();
1242 }
1243 
1244 template<class tJobListContainer, class tJobListHandler>
1245 void dxtemplateThreadingImplementation<tJobListContainer, tJobListHandler>::StickToJobsProcessing(dxThreadReadyToServeCallback *readiness_callback/*=NULL*/, void *callback_context/*=NULL*/)
1246 {
1247  m_list_handler.StickToJobsProcessing(readiness_callback, callback_context);
1248 }
1249 
1250 template<class tJobListContainer, class tJobListHandler>
1252 {
1253  m_list_handler.ShutdownProcessing();
1254 }
1255 
1256 template<class tJobListContainer, class tJobListHandler>
1258 {
1259  m_list_handler.CleanupForRestart();
1260 }
1261 
1262 
1263 #endif // #ifndef _ODE_THREADING_IMPL_TEMPLATES_H_
Definition: ode/src/objects.h:57
Definition: threading_impl_templates.h:209
Definition: threading_impl_templates.h:194
Definition: threading_impl_templates.h:408
Definition: threading_impl_templates.h:152
Definition: threading_impl_templates.h:83
Definition: threading_impl_templates.h:444
Definition: threading_impl_templates.h:366
Definition: threading_impl_templates.h:52
Definition: threading.h:154