OSDN Git Service

2007-11-02 Johannes Singler <singler@ira.uka.de>
[pf3gnuchains/gcc-fork.git] / libstdc++-v3 / include / parallel / workstealing.h
1 // -*- C++ -*-
2
3 // Copyright (C) 2007 Free Software Foundation, Inc.
4 //
5 // This file is part of the GNU ISO C++ Library.  This library is free
6 // software; you can redistribute it and/or modify it under the terms
7 // of the GNU General Public License as published by the Free Software
8 // Foundation; either version 2, or (at your option) any later
9 // version.
10
11 // This library is distributed in the hope that it will be useful, but
12 // WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 // General Public License for more details.
15
16 // You should have received a copy of the GNU General Public License
17 // along with this library; see the file COPYING.  If not, write to
18 // the Free Software Foundation, 59 Temple Place - Suite 330, Boston,
19 // MA 02111-1307, USA.
20
21 // As a special exception, you may use this file as part of a free
22 // software library without restriction.  Specifically, if other files
23 // instantiate templates or use macros or inline functions from this
24 // file, or you compile this file and link it with other files to
25 // produce an executable, this file does not by itself cause the
26 // resulting executable to be covered by the GNU General Public
27 // License.  This exception does not however invalidate any other
28 // reasons why the executable file might be covered by the GNU General
29 // Public License.
30
31 /** @file parallel/workstealing.h
32  *  @brief Parallelization of embarrassingly parallel execution by
33  *  means of work-stealing.
34  *
35  *  Work stealing is described in
36  *
37  *  R. D. Blumofe and C. E. Leiserson.
38  *  Scheduling multithreaded computations by work stealing.
39  *  Journal of the ACM, 46(5):720–748, 1999.
40  *
41  *  This file is a GNU parallel extension to the Standard C++ Library.
42  */
43
44 // Written by Felix Putze.
45
46 #ifndef _GLIBCXX_PARALLEL_WORKSTEALING_H
47 #define _GLIBCXX_PARALLEL_WORKSTEALING_H 1
48
49 #include <parallel/parallel.h>
50 #include <parallel/random_number.h>
51 #include <parallel/compatibility.h>
52
53 namespace __gnu_parallel
54 {
55
56 #define _GLIBCXX_JOB_VOLATILE volatile
57
58   /** @brief One job for a certain thread. */
59   template<typename _DifferenceTp>
60   struct Job
61   {
62     typedef _DifferenceTp difference_type;
63
64     /** @brief First element.
65      *
66      *  Changed by owning and stealing thread. By stealing thread,
67      *  always incremented. */
68     _GLIBCXX_JOB_VOLATILE difference_type first;
69
70     /** @brief Last element.
71      *
72      *  Changed by owning thread only. */
73     _GLIBCXX_JOB_VOLATILE difference_type last;
74
75     /** @brief Number of elements, i. e. @c last-first+1.
76      *
77      *  Changed by owning thread only. */
78     _GLIBCXX_JOB_VOLATILE difference_type load;
79   };
80
81   /** @brief Work stealing algorithm for random access iterators.
82    *
83    *  Uses O(1) additional memory. Synchronization at job lists is
84    *  done with atomic operations.
85    *  @param begin Begin iterator of element sequence.
86    *  @param end End iterator of element sequence.
87    *  @param op User-supplied functor (comparator, predicate, adding
88    *  functor, ...).
89    *  @param f Functor to "process" an element with op (depends on
90    *  desired functionality, e. g. for std::for_each(), ...).
91    *  @param r Functor to "add" a single result to the already
92    *  processed elements (depends on functionality).
93    *  @param base Base value for reduction.
94    *  @param output Pointer to position where final result is written to
95    *  @param bound Maximum number of elements processed (e. g. for
96    *  std::count_n()).
97    *  @return User-supplied functor (that may contain a part of the result).
98    */
99   template<typename RandomAccessIterator, typename Op, typename Fu, typename Red, typename Result>
100   Op
101   for_each_template_random_access_workstealing(RandomAccessIterator begin,
102                                                RandomAccessIterator end,
103                                                Op op, Fu& f, Red r,
104                                                Result base, Result& output,
105                                                typename std::iterator_traits<RandomAccessIterator>::difference_type bound)
106   {
107     _GLIBCXX_CALL(end - begin)
108
109     typedef std::iterator_traits<RandomAccessIterator> traits_type;
110     typedef typename traits_type::difference_type difference_type;
111
112
113     difference_type chunk_size = static_cast<difference_type>(Settings::workstealing_chunk_size);
114
115     // How many jobs?
116     difference_type length = (bound < 0) ? (end - begin) : bound;
117
118     // To avoid false sharing in a cache line.
119     const int stride = Settings::cache_line_size * 10 / sizeof(Job<difference_type>) + 1;
120
121     // Total number of threads currently working.
122     thread_index_t busy = 0;
123     thread_index_t num_threads = get_max_threads();
124     difference_type num_threads_min = num_threads < end - begin ? num_threads : end - begin;
125
126     omp_lock_t output_lock;
127     omp_init_lock(&output_lock);
128
129     // No more threads than jobs, at least one thread.
130     difference_type num_threads_max = num_threads_min > 1 ? num_threads_min : 1;
131     num_threads = static_cast<thread_index_t>(num_threads_max);
132
133     // Create job description array.
134     Job<difference_type> *job = new Job<difference_type>[num_threads * stride];
135
136     // Write base value to output.
137     output = base;
138
139 #pragma omp parallel shared(busy) num_threads(num_threads)
140     {
141       // Initialization phase.
142
143       // Flags for every thread if it is doing productive work.
144       bool iam_working = false;
145
146       // Thread id.
147       thread_index_t iam = omp_get_thread_num();
148
149       // This job.
150       Job<difference_type>& my_job = job[iam * stride];
151
152       // Random number (for work stealing).
153       thread_index_t victim;
154
155       // Local value for reduction.
156       Result result = Result();
157
158       // Number of elements to steal in one attempt.
159       difference_type steal;
160
161       // Every thread has its own random number generator (modulo num_threads).
162       random_number rand_gen(iam, num_threads);
163
164 #pragma omp atomic
165       // This thread is currently working.
166       busy++;
167
168       iam_working = true;
169
170       // How many jobs per thread? last thread gets the rest.
171       my_job.first = static_cast<difference_type>(iam * (length / num_threads));
172
173       my_job.last = (iam == (num_threads - 1)) ? (length - 1) : ((iam + 1) * (length / num_threads) - 1);
174       my_job.load = my_job.last - my_job.first + 1;
175
176       // Init result with first value (to have a base value for reduction).
177       if (my_job.first <= my_job.last)
178         {
179           // Cannot use volatile variable directly.
180           difference_type my_first = my_job.first;
181           result = f(op, begin + my_first);
182           my_job.first++;
183           my_job.load--;
184         }
185
186       RandomAccessIterator current;
187
188 #pragma omp barrier
189
190       // Actual work phase
191       // Work on own or stolen start
192       while (busy > 0)
193         {
194           // Work until no productive thread left.
195 #pragma omp flush(busy)
196
197           // Thread has own work to do
198           while (my_job.first <= my_job.last)
199             {
200               // fetch-and-add call
201               // Reserve current job block (size chunk_size) in my queue.
202               difference_type current_job = fetch_and_add<difference_type>(&(my_job.first), chunk_size);
203
204               // Update load, to make the three values consistent,
205               // first might have been changed in the meantime
206               my_job.load = my_job.last - my_job.first + 1;
207               for (difference_type job_counter = 0; job_counter < chunk_size && current_job <= my_job.last; job_counter++)
208                 {
209                   // Yes: process it!
210                   current = begin + current_job;
211                   current_job++;
212
213                   // Do actual work.
214                   result = r(result, f(op, current));
215                 }
216
217 #pragma omp flush(busy)
218
219             }
220
221           // After reaching this point, a thread's job list is empty.
222           if (iam_working)
223             {
224 #pragma omp atomic
225               // This thread no longer has work.
226               busy--;
227
228               iam_working = false;
229             }
230
231           difference_type supposed_first, supposed_last, supposed_load;
232           do
233             {
234               // Find random nonempty deque (not own) and do consistency check.
235               yield();
236 #pragma omp flush(busy)
237               victim = rand_gen();
238               supposed_first = job[victim * stride].first;
239               supposed_last = job[victim * stride].last;
240               supposed_load = job[victim * stride].load;
241             }
242           while (busy > 0
243                  && ((supposed_load <= 0) || ((supposed_first + supposed_load - 1) != supposed_last)));
244
245           if (busy == 0)
246             break;
247
248           if (supposed_load > 0)
249             {
250               // Has work and work to do.
251               // Number of elements to steal (at least one).
252               steal = (supposed_load < 2) ? 1 : supposed_load / 2;
253
254               // Protects against stealing threads
255               // omp_set_lock(&(job[victim * stride].lock));
256
257               // Push victim's start forward.
258               difference_type stolen_first = fetch_and_add<difference_type>(&(job[victim * stride].first), steal);
259               difference_type stolen_try = stolen_first + steal - difference_type(1);
260
261               // Protects against working thread
262               // omp_unset_lock(&(job[victim * stride].lock));
263
264               my_job.first = stolen_first;
265               
266               // Avoid std::min dependencies.
267               my_job.last = stolen_try < supposed_last ? stolen_try : supposed_last;
268
269               my_job.load = my_job.last - my_job.first + 1;
270
271               //omp_unset_lock(&(my_job.lock));
272
273 #pragma omp atomic
274               // Has potential work again.
275               busy++;
276               iam_working = true;
277
278 #pragma omp flush(busy)
279             }
280 #pragma omp flush(busy)
281         } // end while busy > 0
282       // Add accumulated result to output.
283       omp_set_lock(&output_lock);
284       output = r(output, result);
285       omp_unset_lock(&output_lock);
286
287       //omp_destroy_lock(&(my_job.lock));
288     }
289
290     delete[] job;
291
292     // Points to last element processed (needed as return value for
293     // some algorithms like transform)
294     f.finish_iterator = begin + length;
295
296     omp_destroy_lock(&output_lock);
297
298     return op;
299   }
300 } // end namespace
301
302 #endif