OSDN Git Service

* config/i386/i386.md (*movdi_internal_rex64): Merge
[pf3gnuchains/gcc-fork.git] / libgo / runtime / go-select.c
1 /* go-select.c -- implement select.
2
3    Copyright 2009 The Go Authors. All rights reserved.
4    Use of this source code is governed by a BSD-style
5    license that can be found in the LICENSE file.  */
6
7 #include <pthread.h>
8 #include <stdarg.h>
9 #include <stddef.h>
10 #include <stdint.h>
11 #include <stdlib.h>
12 #include <unistd.h>
13
14 #include "config.h"
15 #include "go-assert.h"
16 #include "channel.h"
17
18 /* __go_select builds an array of these structures.  */
19
20 struct select_channel
21 {
22   /* The channel being selected.  */
23   struct __go_channel* channel;
24   /* If this channel is selected, the value to return.  */
25   uintptr_t retval;
26   /* If this channel is a duplicate of one which appears earlier in
27      the array, this is the array index of the earlier channel.  This
28      is -1UL if this is not a dup.  */
29   uintptr_t dup_index;
30   /* An entry to put on the send or receive queue.  */
31   struct __go_channel_select queue_entry;
32   /* True if selected for send.  */
33   _Bool is_send;
34   /* True if channel is ready--it has data to receive or space to
35      send.  */
36   _Bool is_ready;
37 };
38
39 /* This mutex controls access to __go_select_cond.  This mutex may not
40    be acquired if any channel locks are held.  */
41
42 static pthread_mutex_t __go_select_mutex = PTHREAD_MUTEX_INITIALIZER;
43
44 /* When we have to wait for channels, we tell them to trigger this
45    condition variable when they send or receive something.  */
46
47 static pthread_cond_t __go_select_cond = PTHREAD_COND_INITIALIZER;
48
49 /* Sort the channels by address.  This avoids deadlock when multiple
50    selects are running on overlapping sets of channels.  */
51
52 static int
53 channel_sort (const void *p1, const void *p2)
54 {
55   const struct select_channel *c1 = (const struct select_channel *) p1;
56   const struct select_channel *c2 = (const struct select_channel *) p2;
57
58   if ((uintptr_t) c1->channel < (uintptr_t) c2->channel)
59     return -1;
60   else if ((uintptr_t) c1->channel > (uintptr_t) c2->channel)
61     return 1;
62   else
63     return 0;
64 }
65
66 /* Return whether there is an entry on QUEUE which can be used for a
67    synchronous send or receive.  */
68
69 static _Bool
70 is_queue_ready (struct __go_channel_select *queue)
71 {
72   int x;
73
74   if (queue == NULL)
75     return 0;
76
77   x = pthread_mutex_lock (&__go_select_data_mutex);
78   __go_assert (x == 0);
79
80   while (queue != NULL)
81     {
82       if (*queue->selected == NULL)
83         break;
84       queue = queue->next;
85     }
86
87   x = pthread_mutex_unlock (&__go_select_data_mutex);
88   __go_assert (x == 0);
89
90   return queue != NULL;
91 }
92
93 /* Return whether CHAN is ready.  If IS_SEND is true check whether it
94    has space to send, otherwise check whether it has a value to
95    receive.  */
96
97 static _Bool
98 is_channel_ready (struct __go_channel* channel, _Bool is_send)
99 {
100   if (is_send)
101     {
102       if (channel->selected_for_send)
103         return 0;
104       if (channel->is_closed)
105         return 1;
106       if (channel->num_entries > 0)
107         {
108           /* An asynchronous channel is ready for sending if there is
109              room in the buffer.  */
110           return ((channel->next_store + 1) % channel->num_entries
111                   != channel->next_fetch);
112         }
113       else
114         {
115           if (channel->waiting_to_send)
116             {
117               /* Some other goroutine is waiting to send on this
118                  channel, so we can't.  */
119               return 0;
120             }
121           if (channel->waiting_to_receive)
122             {
123               /* Some other goroutine is waiting to receive a value,
124                  so we can send one.  */
125               return 1;
126             }
127           if (is_queue_ready (channel->select_receive_queue))
128             {
129               /* There is a select statement waiting to synchronize
130                  with this one.  */
131               return 1;
132             }
133           return 0;
134         }
135     }
136   else
137     {
138       if (channel->selected_for_receive)
139         return 0;
140       if (channel->is_closed)
141         return 1;
142       if (channel->num_entries > 0)
143         {
144           /* An asynchronous channel is ready for receiving if there
145              is a value in the buffer.  */
146           return channel->next_fetch != channel->next_store;
147         }
148       else
149         {
150           if (channel->waiting_to_receive)
151             {
152               /* Some other goroutine is waiting to receive from this
153                  channel, so it is not ready for us to receive.  */
154               return 0;
155             }
156           if (channel->next_store > 0)
157             {
158               /* There is data on the channel.  */
159               return 1;
160             }
161           if (is_queue_ready (channel->select_send_queue))
162             {
163               /* There is a select statement waiting to synchronize
164                  with this one.  */
165               return 1;
166             }
167           return 0;
168         }
169     }
170 }
171
172 /* Mark a channel as selected.  The channel is locked.  IS_SELECTED is
173    true if the channel was selected for us by another goroutine.  We
174    set *NEEDS_BROADCAST if we need to broadcast on the select
175    condition variable.  Return true if we got it.  */
176
177 static _Bool
178 mark_channel_selected (struct __go_channel *channel, _Bool is_send,
179                        _Bool is_selected, _Bool *needs_broadcast)
180 {
181   if (channel->num_entries == 0)
182     {
183       /* This is a synchronous channel.  If there is no goroutine
184          currently waiting, but there is another select waiting, then
185          we need to tell that select to use this channel.  That may
186          fail--there may be no other goroutines currently waiting--as
187          a third goroutine may already have claimed the select.  */
188       if (!is_selected
189           && !channel->is_closed
190           && (is_send
191               ? !channel->waiting_to_receive
192               : channel->next_store == 0))
193         {
194           int x;
195           struct __go_channel_select *queue;
196
197           x = pthread_mutex_lock (&__go_select_data_mutex);
198           __go_assert (x == 0);
199
200           queue = (is_send
201                    ? channel->select_receive_queue
202                    : channel->select_send_queue);
203           __go_assert (queue != NULL);
204
205           while (queue != NULL)
206             {
207               if (*queue->selected == NULL)
208                 {
209                   *queue->selected = channel;
210                   *queue->is_read = !is_send;
211                   break;
212                 }
213               queue = queue->next;
214             }
215
216           x = pthread_mutex_unlock (&__go_select_data_mutex);
217           __go_assert (x == 0);
218
219           if (queue == NULL)
220             return 0;
221
222           if (is_send)
223             channel->selected_for_receive = 1;
224           else
225             channel->selected_for_send = 1;
226
227           /* We are going to have to tell the other select that there
228              is something to do.  */
229           *needs_broadcast = 1;
230         }
231     }
232
233   if (is_send)
234     channel->selected_for_send = 1;
235   else
236     channel->selected_for_receive = 1;
237
238   return 1;
239 }
240
241 /* Mark a channel to indicate that a select is waiting.  The channel
242    is locked.  */
243
244 static void
245 mark_select_waiting (struct select_channel *sc,
246                      struct __go_channel **selected_pointer,
247                      _Bool *selected_for_read_pointer)
248 {
249   struct __go_channel *channel = sc->channel;
250   _Bool is_send = sc->is_send;
251
252   if (channel->num_entries == 0)
253     {
254       struct __go_channel_select **pp;
255
256       pp = (is_send
257             ? &channel->select_send_queue
258             : &channel->select_receive_queue);
259
260       /* Add an entry to the queue of selects on this channel.  */
261       sc->queue_entry.next = *pp;
262       sc->queue_entry.selected = selected_pointer;
263       sc->queue_entry.is_read = selected_for_read_pointer;
264
265       *pp = &sc->queue_entry;
266     }
267
268   channel->select_mutex = &__go_select_mutex;
269   channel->select_cond = &__go_select_cond;
270
271   /* We never actually clear the select_mutex and select_cond fields.
272      In order to clear them safely, we would need to have some way of
273      knowing when no select is waiting for the channel.  Thus we
274      introduce a bit of inefficiency for every channel that select
275      needs to wait for.  This is harmless other than the performance
276      cost.  */
277 }
278
279 /* Remove the entry for this select waiting on this channel.  The
280    channel is locked.  We check both queues, because the channel may
281    be selected for both reading and writing.  */
282
283 static void
284 clear_select_waiting (struct select_channel *sc,
285                       struct __go_channel **selected_pointer)
286 {
287   struct __go_channel *channel = sc->channel;
288
289   if (channel->num_entries == 0)
290     {
291       _Bool found;
292       struct __go_channel_select **pp;
293
294       found = 0;
295
296       for (pp = &channel->select_send_queue; *pp != NULL; pp = &(*pp)->next)
297         {
298           if ((*pp)->selected == selected_pointer)
299             {
300               *pp = (*pp)->next;
301               found = 1;
302               break;
303             }
304         }
305
306       for (pp = &channel->select_receive_queue; *pp != NULL; pp = &(*pp)->next)
307         {
308           if ((*pp)->selected == selected_pointer)
309             {
310               *pp = (*pp)->next;
311               found = 1;
312               break;
313             }
314         }
315
316       __go_assert (found);
317     }
318 }
319
320 /* Look through the list of channels to see which ones are ready.
321    Lock each channels, and set the is_ready flag.  Return the number
322    of ready channels.  */
323
324 static uintptr_t
325 lock_channels_find_ready (struct select_channel *channels, uintptr_t count)
326 {
327   uintptr_t ready_count;
328   uintptr_t i;
329
330   ready_count = 0;
331   for (i = 0; i < count; ++i)
332     {
333       struct __go_channel *channel = channels[i].channel;
334       _Bool is_send = channels[i].is_send;
335       uintptr_t dup_index = channels[i].dup_index;
336       int x;
337
338       if (channel == NULL)
339         continue;
340
341       if (dup_index != (uintptr_t) -1UL)
342         {
343           if (channels[dup_index].is_ready)
344             {
345               channels[i].is_ready = 1;
346               ++ready_count;
347             }
348           continue;
349         }
350
351       x = pthread_mutex_lock (&channel->lock);
352       __go_assert (x == 0);
353
354       if (is_channel_ready (channel, is_send))
355         {
356           channels[i].is_ready = 1;
357           ++ready_count;
358         }
359     }
360
361   return ready_count;
362 }
363
364 /* The channel we are going to select has been forced by some other
365    goroutine.  SELECTED_CHANNEL is the channel we will use,
366    SELECTED_FOR_READ is whether the other goroutine wants to read from
367    the channel.  Note that the channel could be specified multiple
368    times in this select, so we must mark each appropriate entry for
369    this channel as ready.  Every other channel is marked as not ready.
370    All the channels are locked before this routine is called.  This
371    returns the number of ready channels.  */
372
373 uintptr_t
374 force_selected_channel_ready (struct select_channel *channels, uintptr_t count,
375                               struct __go_channel *selected_channel,
376                               _Bool selected_for_read)
377 {
378   uintptr_t ready_count;
379   uintptr_t i;
380
381   ready_count = 0;
382   for (i = 0; i < count; ++i)
383     {
384       struct __go_channel *channel = channels[i].channel;
385       _Bool is_send = channels[i].is_send;
386
387       if (channel == NULL)
388         continue;
389
390       if (channel != selected_channel
391           || (is_send ? !selected_for_read : selected_for_read))
392         channels[i].is_ready = 0;
393       else
394         {
395           channels[i].is_ready = 1;
396           ++ready_count;
397         }
398     }
399   __go_assert (ready_count > 0);
400   return ready_count;
401 }
402
403 /* Unlock all the channels.  */
404
405 static void
406 unlock_channels (struct select_channel *channels, uintptr_t count)
407 {
408   uintptr_t i;
409   int x;
410
411   for (i = 0; i < count; ++i)
412     {
413       struct __go_channel *channel = channels[i].channel;
414
415       if (channel == NULL)
416         continue;
417
418       if (channels[i].dup_index != (uintptr_t) -1UL)
419         continue;
420
421       x = pthread_mutex_unlock (&channel->lock);
422       __go_assert (x == 0);
423     }
424 }
425
426 /* At least one channel is ready.  Randomly pick a channel to return.
427    Unlock all the channels.  IS_SELECTED is true if the channel was
428    picked for us by some other goroutine.  If SELECTED_POINTER is not
429    NULL, remove it from the queue for all the channels.  Return the
430    retval field of the selected channel.  This will return 0 if we
431    can't use the selected channel, because it relied on synchronizing
432    with some other select, and that select already synchronized with a
433    different channel.  */
434
435 static uintptr_t
436 unlock_channels_and_select (struct select_channel *channels,
437                             uintptr_t count, uintptr_t ready_count,
438                             _Bool is_selected,
439                             struct __go_channel **selected_pointer)
440 {
441   uintptr_t selected;
442   uintptr_t ret;
443   _Bool needs_broadcast;
444   uintptr_t i;
445   int x;
446
447   /* Pick which channel we are going to return.  */
448 #if defined(HAVE_RANDOM)
449   selected = (uintptr_t) random () % ready_count;
450 #else
451   selected = (uintptr_t) rand () % ready_count;
452 #endif
453   ret = 0;
454   needs_broadcast = 0;
455
456   /* Look at the channels in reverse order so that we don't unlock a
457      duplicated channel until we have seen all its dups.  */
458   for (i = 0; i < count; ++i)
459     {
460       uintptr_t j = count - i - 1;
461       struct __go_channel *channel = channels[j].channel;
462       _Bool is_send = channels[j].is_send;
463
464       if (channel == NULL)
465         continue;
466
467       if (channels[j].is_ready)
468         {
469           if (selected == 0)
470             {
471               if (mark_channel_selected (channel, is_send, is_selected,
472                                          &needs_broadcast))
473                 ret = channels[j].retval;
474             }
475
476           --selected;
477         }
478
479       if (channels[j].dup_index == (uintptr_t) -1UL)
480         {
481           if (selected_pointer != NULL)
482             clear_select_waiting (&channels[j], selected_pointer);
483
484           x = pthread_mutex_unlock (&channel->lock);
485           __go_assert (x == 0);
486         }
487     }
488
489   /* The NEEDS_BROADCAST variable is set if we are synchronizing with
490      some other select statement.  We can't do the actual broadcast
491      until we have unlocked all the channels.  */
492
493   if (needs_broadcast)
494     {
495       x = pthread_mutex_lock (&__go_select_mutex);
496       __go_assert (x == 0);
497
498       x = pthread_cond_broadcast (&__go_select_cond);
499       __go_assert (x == 0);
500
501       x = pthread_mutex_unlock (&__go_select_mutex);
502       __go_assert (x == 0);
503     }
504
505   return ret;
506 }
507
508 /* Mark all channels to show that we are waiting for them.  This is
509    called with the select mutex held, but none of the channels are
510    locked.  This returns true if some channel was found to be
511    ready.  */
512
513 static _Bool
514 mark_all_channels_waiting (struct select_channel* channels, uintptr_t count,
515                            struct __go_channel **selected_pointer,
516                            _Bool *selected_for_read_pointer)
517 {
518   _Bool ret;
519   int x;
520   uintptr_t i;
521
522   ret = 0;
523   for (i = 0; i < count; ++i)
524     {
525       struct __go_channel *channel = channels[i].channel;
526       _Bool is_send = channels[i].is_send;
527
528       if (channel == NULL)
529         continue;
530
531       if (channels[i].dup_index != (uintptr_t) -1UL)
532         {
533           uintptr_t j;
534
535           /* A channel may be selected for both read and write.  */
536           if (channels[channels[i].dup_index].is_send != is_send)
537             {
538               for (j = channels[i].dup_index + 1; j < i; ++j)
539                 {
540                   if (channels[j].channel == channel
541                       && channels[j].is_send == is_send)
542                     break;
543                 }
544               if (j < i)
545                 continue;
546             }
547         }
548
549       x = pthread_mutex_lock (&channel->lock);
550       __go_assert (x == 0);
551
552       /* To avoid a race condition, we have to check again whether the
553          channel is ready.  It may have become ready since we did the
554          first set of checks but before we acquired the select mutex.
555          If we don't check here, we could sleep forever on the select
556          condition variable.  */
557       if (is_channel_ready (channel, is_send))
558         ret = 1;
559
560       /* If SELECTED_POINTER is NULL, then we have already marked the
561          channel as waiting.  */
562       if (selected_pointer != NULL)
563         mark_select_waiting (&channels[i], selected_pointer,
564                              selected_for_read_pointer);
565
566       x = pthread_mutex_unlock (&channel->lock);
567       __go_assert (x == 0);
568     }
569
570   return ret;
571 }
572
573 /* Implement select.  This is called by the compiler-generated code
574    with pairs of arguments: a pointer to a channel, and an int which
575    is non-zero for send, zero for receive.  */
576
577 uintptr_t
578 __go_select (uintptr_t count, _Bool has_default,
579              struct __go_channel **channel_args, _Bool *is_send_args)
580 {
581   struct select_channel stack_buffer[16];
582   struct select_channel *allocated_buffer;
583   struct select_channel *channels;
584   uintptr_t i;
585   int x;
586   struct __go_channel *selected_channel;
587   _Bool selected_for_read;
588   _Bool is_queued;
589
590   if (count < sizeof stack_buffer / sizeof stack_buffer[0])
591     {
592       channels = &stack_buffer[0];
593       allocated_buffer = NULL;
594     }
595   else
596     {
597       allocated_buffer = ((struct select_channel *)
598                           malloc (count * sizeof (struct select_channel)));
599       channels = allocated_buffer;
600     }
601
602   for (i = 0; i < count; ++i)
603     {
604       struct __go_channel *channel_arg = channel_args[i];
605       _Bool is_send = is_send_args[i];
606
607       channels[i].channel = (struct __go_channel*) channel_arg;
608       channels[i].retval = i + 1;
609       channels[i].dup_index = (uintptr_t) -1UL;
610       channels[i].queue_entry.next = NULL;
611       channels[i].queue_entry.selected = NULL;
612       channels[i].is_send = is_send;
613       channels[i].is_ready = 0;
614     }
615
616   qsort (channels, count, sizeof (struct select_channel), channel_sort);
617
618   for (i = 0; i < count; ++i)
619     {
620       uintptr_t j;
621
622       for (j = 0; j < i; ++j)
623         {
624           if (channels[j].channel == channels[i].channel)
625             {
626               channels[i].dup_index = j;
627               break;
628             }
629         }
630     }
631
632   /* SELECT_CHANNEL is used to select synchronized channels.  If no
633      channels are ready, we store a pointer to this variable on the
634      select queue for each synchronized channel.  Because the variable
635      may be set by channel operations running in other goroutines,
636      SELECT_CHANNEL may only be accessed when all the channels are
637      locked and/or when the select_data_mutex is locked.  */
638   selected_channel = NULL;
639
640   /* SELECTED_FOR_READ is set to true if SELECTED_CHANNEL was set by a
641      goroutine which wants to read from the channel.  The access
642      restrictions for this are like those for SELECTED_CHANNEL.  */
643   selected_for_read = 0;
644
645   /* IS_QUEUED is true if we have queued up this select on the queues
646      for any associated synchronous channels.  We only do this if no
647      channels are ready the first time around the loop.  */
648   is_queued = 0;
649
650   while (1)
651     {
652       int ready_count;
653       _Bool is_selected;
654
655       /* Lock all channels, identify which ones are ready.  */
656       ready_count = lock_channels_find_ready (channels, count);
657
658       /* All the channels are locked, so we can look at
659          SELECTED_CHANNEL.  If it is not NULL, then our choice has
660          been forced by some other goroutine.  This can only happen
661          after the first time through the loop.  */
662       is_selected = selected_channel != NULL;
663       if (is_selected)
664         ready_count = force_selected_channel_ready (channels, count,
665                                                     selected_channel,
666                                                     selected_for_read);
667
668       if (ready_count > 0)
669         {
670           uintptr_t ret;
671
672           ret = unlock_channels_and_select (channels, count, ready_count,
673                                             is_selected,
674                                             (is_queued
675                                              ? &selected_channel
676                                              : NULL));
677
678           /* If RET is zero, it means that the channel we picked
679              turned out not to be ready, because some other select
680              grabbed it during our traversal.  Loop around and try
681              again.  */
682           if (ret == 0)
683             {
684               is_queued = 0;
685               /* We are no longer on any channel queues, so it is safe
686                  to touch SELECTED_CHANNEL here.  It must be NULL,
687                  because otherwise that would somebody has promised to
688                  synch up with us and then failed to do so.  */
689               __go_assert (selected_channel == NULL);
690               continue;
691             }
692
693           if (allocated_buffer != NULL)
694             free (allocated_buffer);
695
696           return ret;
697         }
698
699       /* No channels were ready.  */
700
701       unlock_channels (channels, count);
702
703       if (has_default)
704         {
705           /* Use the default clause.  */
706           if (allocated_buffer != NULL)
707             free (allocated_buffer);
708           return 0;
709         }
710
711       /* This is a blocking select.  Grab the select lock, tell all
712          the channels to notify us when something happens, and wait
713          for something to happen.  */
714
715       x = pthread_mutex_lock (&__go_select_mutex);
716       __go_assert (x == 0);
717
718       /* Check whether CHANNEL_SELECTED was set while the channels
719          were unlocked.  If it was set, then we can simply loop around
720          again.  We need to check this while the select mutex is held.
721          It is possible that something will set CHANNEL_SELECTED while
722          we mark the channels as waiting.  If this happens, that
723          goroutine is required to signal the select condition
724          variable, which means acquiring the select mutex.  Since we
725          have the select mutex locked ourselves, we can not miss that
726          signal.  */
727
728       x = pthread_mutex_lock (&__go_select_data_mutex);
729       __go_assert (x == 0);
730
731       is_selected = selected_channel != NULL;
732
733       x = pthread_mutex_unlock (&__go_select_data_mutex);
734       __go_assert (x == 0);
735
736       if (!is_selected)
737         {
738           /* Mark the channels as waiting, and check whether they have
739              become ready.  */
740           if (!mark_all_channels_waiting (channels, count,
741                                           (is_queued
742                                            ? NULL
743                                            : &selected_channel),
744                                           (is_queued
745                                            ? NULL
746                                            : &selected_for_read)))
747             {
748               x = pthread_cond_wait (&__go_select_cond, &__go_select_mutex);
749               __go_assert (x == 0);
750             }
751
752           is_queued = 1;
753         }
754
755       x = pthread_mutex_unlock (&__go_select_mutex);
756       __go_assert (x == 0);
757     }
758 }