OSDN Git Service

* docs/html/parallel_mode.html: Added reference to MCSTL.
[pf3gnuchains/gcc-fork.git] / libstdc++-v3 / include / parallel / workstealing.h
1 // -*- C++ -*-
2
3 // Copyright (C) 2007 Free Software Foundation, Inc.
4 //
5 // This file is part of the GNU ISO C++ Library.  This library is free
6 // software; you can redistribute it and/or modify it under the terms
7 // of the GNU General Public License as published by the Free Software
8 // Foundation; either version 2, or (at your option) any later
9 // version.
10
11 // This library is distributed in the hope that it will be useful, but
12 // WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // General Public License for more details.
15
16 // You should have received a copy of the GNU General Public License
17 // along with this library; see the file COPYING.  If not, write to
18 // the Free Software Foundation, 59 Temple Place - Suite 330, Boston,
19 // MA 02111-1307, USA.
20
21 // As a special exception, you may use this file as part of a free
22 // software library without restriction.  Specifically, if other files
23 // instantiate templates or use macros or inline functions from this
24 // file, or you compile this file and link it with other files to
25 // produce an executable, this file does not by itself cause the
26 // resulting executable to be covered by the GNU General Public
27 // License.  This exception does not however invalidate any other
28 // reasons why the executable file might be covered by the GNU General
29 // Public License.
30
31 /** @file parallel/workstealing.h
32  *  @brief Parallelization of embarrassingly parallel execution by
33  *  means of work-stealing.
34  *
35  *  Work stealing is described in
36  *
37  *  R. D. Blumofe and C. E. Leiserson.
38  *  Scheduling multithreaded computations by work stealing.
39  *  Journal of the ACM, 46(5):720–748, 1999.
40  *
41  *  This file is a GNU parallel extension to the Standard C++ Library.
42  */
43
44 // Written by Felix Putze.
45
46 #ifndef _GLIBCXX_PARALLEL_WORKSTEALING_H
47 #define _GLIBCXX_PARALLEL_WORKSTEALING_H 1
48
49 #include <parallel/parallel.h>
50 #include <parallel/random_number.h>
51 #include <parallel/compatibility.h>
52
53 namespace __gnu_parallel
54 {
55
56 #define _GLIBCXX_JOB_VOLATILE volatile
57
58   /** @brief One job for a certain thread. */
59   template<typename _DifferenceTp>
60   struct Job
61   {
62     typedef _DifferenceTp difference_type;
63
64     /** @brief First element.
65      *
66      *  Changed by owning and stealing thread. By stealing thread,
67      *  always incremented. */
68     _GLIBCXX_JOB_VOLATILE difference_type first;
69
70     /** @brief Last element.
71      *
72      *  Changed by owning thread only. */
73     _GLIBCXX_JOB_VOLATILE difference_type last;
74
75     /** @brief Number of elements, i. e. @c last-first+1.
76      *
77      *  Changed by owning thread only. */
78     _GLIBCXX_JOB_VOLATILE difference_type load;
79   };
80
81   /** @brief Work stealing algorithm for random access iterators.
82    *
83    *  Uses O(1) additional memory. Synchronization at job lists is
84    *  done with atomic operations.
85    *  @param begin Begin iterator of element sequence.
86    *  @param end End iterator of element sequence.
87    *  @param op User-supplied functor (comparator, predicate, adding
88    *  functor, ...).
89    *  @param f Functor to "process" an element with op (depends on
90    *  desired functionality, e. g. for std::for_each(), ...).
91    *  @param r Functor to "add" a single result to the already
92    *  processed elements (depends on functionality).
93    *  @param base Base value for reduction.
94    *  @param output Pointer to position where final result is written to
95    *  @param bound Maximum number of elements processed (e. g. for
96    *  std::count_n()).
97    *  @return User-supplied functor (that may contain a part of the result).
98    */
99   template<typename RandomAccessIterator, typename Op, typename Fu, typename Red, typename Result>
100   Op
101   for_each_template_random_access_workstealing(RandomAccessIterator begin,
102                                                RandomAccessIterator end,
103                                                Op op, Fu& f, Red r,
104                                                Result base, Result& output,
105                                                typename std::iterator_traits<RandomAccessIterator>::difference_type bound)
106   {
107     _GLIBCXX_CALL(end - begin)
108
109     typedef std::iterator_traits<RandomAccessIterator> traits_type;
110     typedef typename traits_type::difference_type difference_type;
111
112
113     difference_type chunk_size = static_cast<difference_type>(Settings::workstealing_chunk_size);
114
115     // How many jobs?
116     difference_type length = (bound < 0) ? (end - begin) : bound;
117
118     // To avoid false sharing in a cache line.
119     const int stride = Settings::cache_line_size * 10 / sizeof(Job<difference_type>) + 1;
120
121     // Total number of threads currently working.
122     thread_index_t busy = 0;
123     thread_index_t num_threads = get_max_threads();
124     difference_type num_threads_min = num_threads < end - begin ? num_threads : end - begin;
125
126     // No more threads than jobs, at least one thread.
127     difference_type num_threads_max = num_threads_min > 1 ? num_threads_min : 1;
128     num_threads = static_cast<thread_index_t>(num_threads_max);
129
130     // Create job description array.
131     Job<difference_type> *job = new Job<difference_type>[num_threads * stride];
132
133     // Write base value to output.
134     output = base;
135
136 #pragma omp parallel shared(busy) num_threads(num_threads)
137     {
138       // Initialization phase.
139
140       // Flags for every thread if it is doing productive work.
141       bool iam_working = false;
142
143       // Thread id.
144       thread_index_t iam = omp_get_thread_num();
145
146       // This job.
147       Job<difference_type>& my_job = job[iam * stride];
148
149       // Random number (for work stealing).
150       thread_index_t victim;
151
152       // Local value for reduction.
153       Result result = Result();
154
155       // Number of elements to steal in one attempt.
156       difference_type steal;
157
158       // Every thread has its own random number generator (modulo num_threads).
159       random_number rand_gen(iam, num_threads);
160
161 #pragma omp atomic
162       // This thread is currently working.
163       busy++;
164
165       iam_working = true;
166
167       // How many jobs per thread? last thread gets the rest.
168       my_job.first = static_cast<difference_type>(iam * (length / num_threads));
169
170       my_job.last = (iam == (num_threads - 1)) ? (length - 1) : ((iam + 1) * (length / num_threads) - 1);
171       my_job.load = my_job.last - my_job.first + 1;
172
173       // Init result with first value (to have a base value for reduction).
174       if (my_job.first <= my_job.last)
175         {
176           // Cannot use volatile variable directly.
177           difference_type my_first = my_job.first;
178           result = f(op, begin + my_first);
179           my_job.first++;
180           my_job.load--;
181         }
182
183       RandomAccessIterator current;
184
185 #pragma omp barrier
186
187       // Actual work phase
188       // Work on own or stolen start
189       while (busy > 0)
190         {
191           // Work until no productive thread left.
192 #pragma omp flush(busy)
193
194           // Thread has own work to do
195           while (my_job.first <= my_job.last)
196             {
197               // fetch-and-add call
198               // Reserve current job block (size chunk_size) in my queue.
199               difference_type current_job = fetch_and_add<difference_type>(&(my_job.first), chunk_size);
200
201               // Update load, to make the three values consistent,
202               // first might have been changed in the meantime
203               my_job.load = my_job.last - my_job.first + 1;
204               for (difference_type job_counter = 0; job_counter < chunk_size && current_job <= my_job.last; job_counter++)
205                 {
206                   // Yes: process it!
207                   current = begin + current_job;
208                   current_job++;
209
210                   // Do actual work.
211                   result = r(result, f(op, current));
212                 }
213
214 #pragma omp flush(busy)
215
216             }
217
218           // After reaching this point, a thread's job list is empty.
219           if (iam_working)
220             {
221 #pragma omp atomic
222               // This thread no longer has work.
223               busy--;
224
225               iam_working = false;
226             }
227
228           difference_type supposed_first, supposed_last, supposed_load;
229           do
230             {
231               // Find random nonempty deque (not own) and do consistency check.
232               yield();
233 #pragma omp flush(busy)
234               victim = rand_gen();
235               supposed_first = job[victim * stride].first;
236               supposed_last = job[victim * stride].last;
237               supposed_load = job[victim * stride].load;
238             }
239           while (busy > 0
240                  && ((supposed_load <= 0) || ((supposed_first + supposed_load - 1) != supposed_last)));
241
242           if (busy == 0)
243             break;
244
245           if (supposed_load > 0)
246             {
247               // Has work and work to do.
248               // Number of elements to steal (at least one).
249               steal = (supposed_load < 2) ? 1 : supposed_load / 2;
250
251               // Protects against stealing threads
252               // omp_set_lock(&(job[victim * stride].lock));
253
254               // Push victim's start forward.
255               difference_type stolen_first = fetch_and_add<difference_type>(&(job[victim * stride].first), steal);
256               difference_type stolen_try = stolen_first + steal - difference_type(1);
257
258               // Protects against working thread
259               // omp_unset_lock(&(job[victim * stride].lock));
260
261               my_job.first = stolen_first;
262               
263               // Avoid std::min dependencies.
264               my_job.last = stolen_try < supposed_last ? stolen_try : supposed_last;
265
266               my_job.load = my_job.last - my_job.first + 1;
267
268               //omp_unset_lock(&(my_job.lock));
269
270 #pragma omp atomic
271               // Has potential work again.
272               busy++;
273               iam_working = true;
274
275 #pragma omp flush(busy)
276             }
277 #pragma omp flush(busy)
278         } // end while busy > 0
279 #pragma omp critical(writeOutput)
280       // Add accumulated result to output.
281       output = r(output, result);
282
283       //omp_destroy_lock(&(my_job.lock));
284     }
285
286     delete[] job;
287
288     // Points to last element processed (needed as return value for
289     // some algorithms like transform)
290     f.finish_iterator = begin + length;
291
292     return op;
293   }
294 } // end namespace
295
296 #endif