OSDN Git Service

2007-09-11 Johannes Singler <singler@ira.uka.de>
[pf3gnuchains/gcc-fork.git] / libstdc++-v3 / include / parallel / workstealing.h
1 // -*- C++ -*-
2
3 // Copyright (C) 2007 Free Software Foundation, Inc.
4 //
5 // This file is part of the GNU ISO C++ Library.  This library is free
6 // software; you can redistribute it and/or modify it under the terms
7 // of the GNU General Public License as published by the Free Software
8 // Foundation; either version 2, or (at your option) any later
9 // version.
10
11 // This library is distributed in the hope that it will be useful, but
12 // WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // General Public License for more details.
15
16 // You should have received a copy of the GNU General Public License
17 // along with this library; see the file COPYING.  If not, write to
18 // the Free Software Foundation, 59 Temple Place - Suite 330, Boston,
19 // MA 02111-1307, USA.
20
21 // As a special exception, you may use this file as part of a free
22 // software library without restriction.  Specifically, if other files
23 // instantiate templates or use macros or inline functions from this
24 // file, or you compile this file and link it with other files to
25 // produce an executable, this file does not by itself cause the
26 // resulting executable to be covered by the GNU General Public
27 // License.  This exception does not however invalidate any other
28 // reasons why the executable file might be covered by the GNU General
29 // Public License.
30
31 /** @file parallel/workstealing.h
32  *  @brief Parallelization of embarrassingly parallel execution by
33  *  means of work-stealing.
34  *  This file is a GNU parallel extension to the Standard C++ Library.
35  */
36
37 // Written by Felix Putze.
38
39 #ifndef _GLIBCXX_PARALLEL_WORKSTEALING_H
40 #define _GLIBCXX_PARALLEL_WORKSTEALING_H 1
41
42 #include <parallel/parallel.h>
43 #include <parallel/random_number.h>
44 #include <parallel/compatibility.h>
45
46 namespace __gnu_parallel
47 {
48
49 #define _GLIBCXX_JOB_VOLATILE volatile
50
51   /** @brief One job for a certain thread. */
52   template<typename _DifferenceTp>
53   struct Job
54   {
55     typedef _DifferenceTp difference_type;
56
57     /** @brief First element.
58      *
59      *  Changed by owning and stealing thread. By stealing thread,
60      *  always incremented. */
61     _GLIBCXX_JOB_VOLATILE difference_type first;
62
63     /** @brief Last element.
64      *
65      *  Changed by owning thread only. */
66     _GLIBCXX_JOB_VOLATILE difference_type last;
67
68     /** @brief Number of elements, i. e. @c last-first+1.
69      *
70      *  Changed by owning thread only. */
71     _GLIBCXX_JOB_VOLATILE difference_type load;
72   };
73
74   /** @brief Work stealing algorithm for random access iterators.
75    *
76    *  Uses O(1) additional memory. Synchronization at job lists is
77    *  done with atomic operations.
78    *  @param begin Begin iterator of element sequence.
79    *  @param end End iterator of element sequence.
80    *  @param op User-supplied functor (comparator, predicate, adding
81    *  functor, ...).
82    *  @param f Functor to "process" an element with op (depends on
83    *  desired functionality, e. g. for std::for_each(), ...).
84    *  @param r Functor to "add" a single result to the already
85    *  processed elements (depends on functionality).
86    *  @param base Base value for reduction.
87    *  @param output Pointer to position where final result is written to
88    *  @param bound Maximum number of elements processed (e. g. for
89    *  std::count_n()).
90    *  @return User-supplied functor (that may contain a part of the result).
91    */
92   template<typename RandomAccessIterator, typename Op, typename Fu, typename Red, typename Result>
93   Op
94   for_each_template_random_access_workstealing(RandomAccessIterator begin,
95                                                RandomAccessIterator end,
96                                                Op op, Fu& f, Red r,
97                                                Result base, Result& output,
98                                                typename std::iterator_traits<RandomAccessIterator>::difference_type bound)
99   {
100     _GLIBCXX_CALL(end - begin)
101
102     typedef std::iterator_traits<RandomAccessIterator> traits_type;
103     typedef typename traits_type::difference_type difference_type;
104
105
106     difference_type chunk_size = static_cast<difference_type>(Settings::workstealing_chunk_size);
107
108     // How many jobs?
109     difference_type length = (bound < 0) ? (end - begin) : bound;
110
111     // To avoid false sharing in a cache line.
112     const int stride = Settings::cache_line_size * 10 / sizeof(Job<difference_type>) + 1;
113
114     // Total number of threads currently working.
115     thread_index_t busy = 0;
116     thread_index_t num_threads = get_max_threads();
117     difference_type num_threads_min = num_threads < end - begin ? num_threads : end - begin;
118
119     // No more threads than jobs, at least one thread.
120     difference_type num_threads_max = num_threads_min > 1 ? num_threads_min : 1;
121     num_threads = static_cast<thread_index_t>(num_threads_max);
122
123     // Create job description array.
124     Job<difference_type> *job = new Job<difference_type>[num_threads * stride];
125
126     // Write base value to output.
127     output = base;
128
129 #pragma omp parallel shared(busy) num_threads(num_threads)
130     {
131       // Initialization phase.
132
133       // Flags for every thread if it is doing productive work.
134       bool iam_working = false;
135
136       // Thread id.
137       thread_index_t iam = omp_get_thread_num();
138
139       // This job.
140       Job<difference_type>& my_job = job[iam * stride];
141
142       // Random number (for work stealing).
143       thread_index_t victim;
144
145       // Local value for reduction.
146       Result result = Result();
147
148       // Number of elements to steal in one attempt.
149       difference_type steal;
150
151       // Every thread has its own random number generator (modulo num_threads).
152       random_number rand_gen(iam, num_threads);
153
154 #pragma omp atomic
155       // This thread is currently working.
156       busy++;
157
158       iam_working = true;
159
160       // How many jobs per thread? last thread gets the rest.
161       my_job.first = static_cast<difference_type>(iam * (length / num_threads));
162
163       my_job.last = (iam == (num_threads - 1)) ? (length - 1) : ((iam + 1) * (length / num_threads) - 1);
164       my_job.load = my_job.last - my_job.first + 1;
165
166       // Init result with first value (to have a base value for reduction).
167       if (my_job.first <= my_job.last)
168         {
169           // Cannot use volatile variable directly.
170           difference_type my_first = my_job.first;
171           result = f(op, begin + my_first);
172           my_job.first++;
173           my_job.load--;
174         }
175
176       RandomAccessIterator current;
177
178 #pragma omp barrier
179
180       // Actual work phase
181       // Work on own or stolen start
182       while (busy > 0)
183         {
184           // Work until no productive thread left.
185 #pragma omp flush(busy)
186
187           // Thread has own work to do
188           while (my_job.first <= my_job.last)
189             {
190               // fetch-and-add call
191               // Reserve current job block (size chunk_size) in my queue.
192               difference_type current_job = fetch_and_add<difference_type>(&(my_job.first), chunk_size);
193
194               // Update load, to make the three values consistent,
195               // first might have been changed in the meantime
196               my_job.load = my_job.last - my_job.first + 1;
197               for (difference_type job_counter = 0; job_counter < chunk_size && current_job <= my_job.last; job_counter++)
198                 {
199                   // Yes: process it!
200                   current = begin + current_job;
201                   current_job++;
202
203                   // Do actual work.
204                   result = r(result, f(op, current));
205                 }
206
207 #pragma omp flush(busy)
208
209             }
210
211           // After reaching this point, a thread's job list is empty.
212           if (iam_working)
213             {
214 #pragma omp atomic
215               // This thread no longer has work.
216               busy--;
217
218               iam_working = false;
219             }
220
221           difference_type supposed_first, supposed_last, supposed_load;
222           do
223             {
224               // Find random nonempty deque (not own) and do consistency check.
225               yield();
226 #pragma omp flush(busy)
227               victim = rand_gen();
228               supposed_first = job[victim * stride].first;
229               supposed_last = job[victim * stride].last;
230               supposed_load = job[victim * stride].load;
231             }
232           while (busy > 0
233                  && ((supposed_load <= 0) || ((supposed_first + supposed_load - 1) != supposed_last)));
234
235           if (busy == 0)
236             break;
237
238           if (supposed_load > 0)
239             {
240               // Has work and work to do.
241               // Number of elements to steal (at least one).
242               steal = (supposed_load < 2) ? 1 : supposed_load / 2;
243
244               // Protects against stealing threads
245               // omp_set_lock(&(job[victim * stride].lock));
246
247               // Push victim's start forward.
248               difference_type stolen_first = fetch_and_add<difference_type>(&(job[victim * stride].first), steal);
249               difference_type stolen_try = stolen_first + steal - difference_type(1);
250
251               // Protects against working thread
252               // omp_unset_lock(&(job[victim * stride].lock));
253
254               my_job.first = stolen_first;
255               
256               // Avoid std::min dependencies.
257               my_job.last = stolen_try < supposed_last ? stolen_try : supposed_last;
258
259               my_job.load = my_job.last - my_job.first + 1;
260
261               //omp_unset_lock(&(my_job.lock));
262
263 #pragma omp atomic
264               // Has potential work again.
265               busy++;
266               iam_working = true;
267
268 #pragma omp flush(busy)
269             }
270 #pragma omp flush(busy)
271         } // end while busy > 0
272 #pragma omp critical(writeOutput)
273       // Add accumulated result to output.
274       output = r(output, result);
275
276       //omp_destroy_lock(&(my_job.lock));
277     }
278
279     delete[] job;
280
281     // Points to last element processed (needed as return value for
282     // some algorithms like transform)
283     f.finish_iterator = begin + length;
284
285     return op;
286   }
287 } // end namespace
288
289 #endif