1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
10 static int32 debug = 0;
12 typedef struct WaitQ WaitQ;
13 typedef struct SudoG SudoG;
14 typedef struct Select Select;
15 typedef struct Scase Scase;
17 typedef struct __go_type_descriptor Type;
18 typedef struct __go_channel_type ChanType;
22 G* g; // g and selgen constitute
23 uint32 selgen; // a weak pointer to g
25 byte* elem; // data element
36 uint32 qcount; // total data in the q
37 uint32 dataqsiz; // size of the circular q
41 uint32 sendx; // send index
42 uint32 recvx; // receive index
43 WaitQ recvq; // list of recv waiters
44 WaitQ sendq; // list of send waiters
48 // Buffer follows Hchan immediately in memory.
49 // chanbuf(c, i) is pointer to the i'th slot in the buffer.
50 #define chanbuf(c, i) ((byte*)((c)+1)+(uintptr)(c)->elemsize*(i))
62 SudoG sg; // must be first member (cast to Scase)
65 uint16 index; // index to return
66 bool* receivedp; // pointer to received bool (recv2)
71 uint16 tcase; // total count of scase[]
72 uint16 ncase; // currently filled scase[]
73 uint16* pollorder; // case poll order
74 Hchan** lockorder; // channel lock order
75 Scase scase[1]; // one per case (in order of appearance)
78 static void dequeueg(WaitQ*);
79 static SudoG* dequeue(WaitQ*);
80 static void enqueue(WaitQ*, SudoG*);
83 runtime_makechan_c(ChanType *t, int64 hint)
89 elem = t->__element_type;
91 if(hint < 0 || (int32)hint != hint || (elem->__size > 0 && (uintptr)hint > ((uintptr)-1) / elem->__size))
92 runtime_panicstring("makechan: size out of range");
96 // allocate memory in one call
97 c = (Hchan*)runtime_mal(n + hint*elem->__size);
98 c->elemsize = elem->__size;
99 c->elemalign = elem->__align;
103 runtime_printf("makechan: chan=%p; elemsize=%lld; elemalign=%d; dataqsiz=%d\n",
104 c, (long long)elem->__size, elem->__align, c->dataqsiz);
110 // func makechan(typ *ChanType, size uint32) (chan)
111 uintptr reflect_makechan(ChanType *, uint32)
112 asm ("libgo_reflect.reflect.makechan");
115 reflect_makechan(ChanType *t, uint32 size)
120 c = runtime_makechan_c(t, size);
121 ret = runtime_mal(sizeof(void*));
122 __builtin_memcpy(ret, &c, sizeof(void*));
126 // makechan(t *ChanType, hint int64) (hchan *chan any);
128 __go_new_channel(ChanType *t, uintptr hint)
130 return runtime_makechan_c(t, hint);
134 __go_new_channel_big(ChanType *t, uint64 hint)
136 return runtime_makechan_c(t, hint);
140 * generic single channel send/recv
141 * if the bool pointer is nil,
142 * then the full exchange will
143 * occur. if pres is not nil,
144 * then the protocol will not
145 * sleep but return if it could
148 * sleep can wake up with g->param == nil
149 * when a channel involved in the sleep has
150 * been closed. it is easiest to loop and re-run
151 * the operation; we'll see that it's now closed.
154 runtime_chansend(ChanType *t, Hchan *c, byte *ep, bool *pres)
169 g->status = Gwaiting;
170 g->waitreason = "chan send (nil chan)";
172 return; // not reached
175 if(runtime_gcwaiting)
179 runtime_printf("chansend: chan=%p\n", c);
189 sg = dequeue(&c->recvq);
196 runtime_memmove(sg->elem, ep, c->elemsize);
212 mysg.selgen = NOSELGEN;
214 g->status = Gwaiting;
215 g->waitreason = "chan send";
216 enqueue(&c->sendq, &mysg);
220 if(g->param == nil) {
223 runtime_throw("chansend: spurious wakeup");
233 if(c->qcount >= c->dataqsiz) {
241 mysg.selgen = NOSELGEN;
242 g->status = Gwaiting;
243 g->waitreason = "chan send";
244 enqueue(&c->sendq, &mysg);
251 runtime_memmove(chanbuf(c, c->sendx), ep, c->elemsize);
252 if(++c->sendx == c->dataqsiz)
256 sg = dequeue(&c->recvq);
269 runtime_panicstring("send on closed channel");
274 runtime_chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received)
281 if(runtime_gcwaiting)
285 runtime_printf("chanrecv: chan=%p\n", c);
291 if(selected != nil) {
295 g->status = Gwaiting;
296 g->waitreason = "chan receive (nil chan)";
298 return; // not reached
308 sg = dequeue(&c->sendq);
313 runtime_memmove(ep, sg->elem, c->elemsize);
325 if(selected != nil) {
333 mysg.selgen = NOSELGEN;
335 g->status = Gwaiting;
336 g->waitreason = "chan receive";
337 enqueue(&c->recvq, &mysg);
341 if(g->param == nil) {
344 runtime_throw("chanrecv: spurious wakeup");
357 if(selected != nil) {
366 mysg.selgen = NOSELGEN;
367 g->status = Gwaiting;
368 g->waitreason = "chan receive";
369 enqueue(&c->recvq, &mysg);
377 runtime_memmove(ep, chanbuf(c, c->recvx), c->elemsize);
378 runtime_memclr(chanbuf(c, c->recvx), c->elemsize);
379 if(++c->recvx == c->dataqsiz)
383 sg = dequeue(&c->sendq);
399 runtime_memclr(ep, c->elemsize);
407 // The compiler generates a call to __go_send_small to send a value 8
410 __go_send_small(ChanType *t, Hchan* c, uint64 val)
414 byte b[sizeof(uint64)];
420 #ifndef WORDS_BIGENDIAN
423 p = u.b + sizeof(uint64) - t->__element_type->__size;
425 runtime_chansend(t, c, p, nil);
428 // The compiler generates a call to __go_send_big to send a value
429 // larger than 8 bytes or smaller.
431 __go_send_big(ChanType *t, Hchan* c, byte* p)
433 runtime_chansend(t, c, p, nil);
436 // The compiler generates a call to __go_receive_small to receive a
437 // value 8 bytes or smaller.
439 __go_receive_small(ChanType *t, Hchan* c)
442 byte b[sizeof(uint64)];
448 #ifndef WORDS_BIGENDIAN
451 p = u.b + sizeof(uint64) - t->__element_type->__size;
453 runtime_chanrecv(t, c, p, nil, nil);
457 // The compiler generates a call to __go_receive_big to receive a
458 // value larger than 8 bytes.
460 __go_receive_big(ChanType *t, Hchan* c, byte* p)
462 runtime_chanrecv(t, c, p, nil, nil);
465 _Bool runtime_chanrecv2(ChanType *t, Hchan* c, byte* p)
466 __asm__("runtime.chanrecv2");
469 runtime_chanrecv2(ChanType *t, Hchan* c, byte* p)
473 runtime_chanrecv(t, c, p, nil, &received);
477 // func selectnbsend(c chan any, elem any) bool
479 // compiler implements
490 // if selectnbsend(c, v) {
497 runtime_selectnbsend(ChanType *t, Hchan *c, byte *p)
501 runtime_chansend(t, c, p, &res);
505 // func selectnbrecv(elem *any, c chan any) bool
507 // compiler implements
518 // if selectnbrecv(&v, c) {
525 runtime_selectnbrecv(ChanType *t, byte *v, Hchan *c)
529 runtime_chanrecv(t, c, v, &selected, nil);
533 // func selectnbrecv2(elem *any, ok *bool, c chan any) bool
535 // compiler implements
546 // if c != nil && selectnbrecv2(&v, &ok, c) {
553 runtime_selectnbrecv2(ChanType *t, byte *v, _Bool *received, Hchan *c)
559 runtime_chanrecv(t, c, v, &selected, received == nil ? nil : &r);
566 // func chansend(c chan, val iword, nb bool) (selected bool)
567 // where an iword is the same word an interface value would use:
568 // the actual data if it fits, or else a pointer to the data.
570 _Bool reflect_chansend(ChanType *, Hchan *, uintptr, _Bool)
571 __asm__("libgo_reflect.reflect.chansend");
574 reflect_chansend(ChanType *t, Hchan *c, uintptr val, _Bool nb)
582 sp = (bool*)&selected;
587 if(__go_is_pointer_type(t->__element_type))
591 runtime_chansend(t, c, vp, sp);
596 // func chanrecv(c chan, nb bool) (val iword, selected, received bool)
597 // where an iword is the same word an interface value would use:
598 // the actual data if it fits, or else a pointer to the data.
607 struct chanrecv_ret reflect_chanrecv(ChanType *, Hchan *, _Bool)
608 __asm__("libgo_reflect.reflect.chanrecv");
611 reflect_chanrecv(ChanType *t, Hchan *c, _Bool nb)
613 struct chanrecv_ret ret;
627 if(__go_is_pointer_type(t->__element_type)) {
628 vp = (byte*)&ret.val;
630 vp = runtime_mal(t->__element_type->__size);
631 ret.val = (uintptr)vp;
633 runtime_chanrecv(t, c, vp, sp, &received);
635 ret.selected = selected;
636 ret.received = received;
640 static void newselect(int32, Select**);
642 // newselect(size uint32) (sel *byte);
644 void* runtime_newselect(int) __asm__("runtime.newselect");
647 runtime_newselect(int size)
651 newselect(size, &sel);
656 newselect(int32 size, Select **selp)
665 sel = runtime_mal(sizeof(*sel) +
666 n*sizeof(sel->scase[0]) +
667 size*sizeof(sel->lockorder[0]) +
668 size*sizeof(sel->pollorder[0]));
672 sel->lockorder = (void*)(sel->scase + size);
673 sel->pollorder = (void*)(sel->lockorder + size);
677 runtime_printf("newselect s=%p size=%d\n", sel, size);
680 // cut in half to give stack a chance to split
681 static void selectsend(Select *sel, Hchan *c, int index, void *elem);
683 // selectsend(sel *byte, hchan *chan any, elem *any) (selected bool);
685 void runtime_selectsend(Select *, Hchan *, void *, int)
686 __asm__("runtime.selectsend");
689 runtime_selectsend(Select *sel, Hchan *c, void *elem, int index)
691 // nil cases do not compete
695 selectsend(sel, c, index, elem);
699 selectsend(Select *sel, Hchan *c, int index, void *elem)
706 runtime_throw("selectsend: too many cases");
708 cas = &sel->scase[i];
712 cas->kind = CaseSend;
716 runtime_printf("selectsend s=%p index=%d chan=%p\n",
717 sel, cas->index, cas->chan);
720 // cut in half to give stack a chance to split
721 static void selectrecv(Select *sel, Hchan *c, int index, void *elem, bool*);
723 // selectrecv(sel *byte, hchan *chan any, elem *any) (selected bool);
725 void runtime_selectrecv(Select *, Hchan *, void *, int)
726 __asm__("runtime.selectrecv");
729 runtime_selectrecv(Select *sel, Hchan *c, void *elem, int index)
731 // nil cases do not compete
735 selectrecv(sel, c, index, elem, nil);
738 // selectrecv2(sel *byte, hchan *chan any, elem *any, received *bool) (selected bool);
740 void runtime_selectrecv2(Select *, Hchan *, void *, bool *, int)
741 __asm__("runtime.selectrecv2");
744 runtime_selectrecv2(Select *sel, Hchan *c, void *elem, bool *received, int index)
746 // nil cases do not compete
750 selectrecv(sel, c, index, elem, received);
754 selectrecv(Select *sel, Hchan *c, int index, void *elem, bool *received)
761 runtime_throw("selectrecv: too many cases");
763 cas = &sel->scase[i];
767 cas->kind = CaseRecv;
769 cas->receivedp = received;
772 runtime_printf("selectrecv s=%p index=%d chan=%p\n",
773 sel, cas->index, cas->chan);
776 // cut in half to give stack a chance to split
777 static void selectdefault(Select*, int);
779 // selectdefault(sel *byte) (selected bool);
781 void runtime_selectdefault(Select *, int) __asm__("runtime.selectdefault");
784 runtime_selectdefault(Select *sel, int index)
786 selectdefault(sel, index);
790 selectdefault(Select *sel, int index)
797 runtime_throw("selectdefault: too many cases");
799 cas = &sel->scase[i];
803 cas->kind = CaseDefault;
806 runtime_printf("selectdefault s=%p index=%d\n",
817 for(i=0; i<sel->ncase; i++) {
818 c0 = sel->lockorder[i];
820 c = sel->lockorder[i];
827 selunlock(Select *sel)
833 for(i=sel->ncase; i-->0;) {
834 c0 = sel->lockorder[i];
848 g->status = Gwaiting; // forever
849 g->waitreason = "select (no cases)";
853 static int selectgo(Select**);
855 // selectgo(sel *byte);
857 int runtime_selectgo(Select *) __asm__("runtime.selectgo");
860 runtime_selectgo(Select *sel)
862 return selectgo(&sel);
866 selectgo(Select **selp)
878 if(runtime_gcwaiting)
882 runtime_printf("select: sel=%p\n", sel);
886 // The compiler rewrites selects that statically have
887 // only 0 or 1 cases plus default into simpler constructs.
888 // The only way we can end up with such small sel->ncase
889 // values here is for a larger select in which most channels
890 // have been nilled out. The general code handles those
891 // cases correctly, and they are rare enough not to bother
892 // optimizing (and needing to test).
894 // generate permuted order
895 for(i=0; i<sel->ncase; i++)
896 sel->pollorder[i] = i;
897 for(i=1; i<sel->ncase; i++) {
898 o = sel->pollorder[i];
899 j = runtime_fastrand1()%(i+1);
900 sel->pollorder[i] = sel->pollorder[j];
901 sel->pollorder[j] = o;
904 // sort the cases by Hchan address to get the locking order.
905 for(i=0; i<sel->ncase; i++) {
906 c = sel->scase[i].chan;
907 for(j=i; j>0 && sel->lockorder[j-1] >= c; j--)
908 sel->lockorder[j] = sel->lockorder[j-1];
909 sel->lockorder[j] = c;
914 // pass 1 - look for something already waiting
916 for(i=0; i<sel->ncase; i++) {
917 o = sel->pollorder[i];
918 cas = &sel->scase[o];
923 if(c->dataqsiz > 0) {
927 sg = dequeue(&c->sendq);
938 if(c->dataqsiz > 0) {
939 if(c->qcount < c->dataqsiz)
942 sg = dequeue(&c->recvq);
961 // pass 2 - enqueue on all chans
962 for(i=0; i<sel->ncase; i++) {
963 o = sel->pollorder[i];
964 cas = &sel->scase[o];
968 sg->selgen = g->selgen;
972 enqueue(&c->recvq, sg);
976 enqueue(&c->sendq, sg);
982 g->status = Gwaiting;
983 g->waitreason = "select";
990 // pass 3 - dequeue from unsuccessful chans
991 // otherwise they stack up on quiet channels
992 for(i=0; i<sel->ncase; i++) {
993 cas = &sel->scase[i];
994 if(cas != (Scase*)sg) {
996 if(cas->kind == CaseSend)
1010 runtime_throw("selectgo: shouldnt happen");
1013 runtime_printf("wait-return: sel=%p c=%p cas=%p kind=%d\n",
1014 sel, c, cas, cas->kind);
1016 if(cas->kind == CaseRecv) {
1017 if(cas->receivedp != nil)
1018 *cas->receivedp = true;
1025 // can receive from buffer
1026 if(cas->receivedp != nil)
1027 *cas->receivedp = true;
1028 if(cas->sg.elem != nil)
1029 runtime_memmove(cas->sg.elem, chanbuf(c, c->recvx), c->elemsize);
1030 runtime_memclr(chanbuf(c, c->recvx), c->elemsize);
1031 if(++c->recvx == c->dataqsiz)
1034 sg = dequeue(&c->sendq);
1045 // can send to buffer
1046 runtime_memmove(chanbuf(c, c->sendx), cas->sg.elem, c->elemsize);
1047 if(++c->sendx == c->dataqsiz)
1050 sg = dequeue(&c->recvq);
1061 // can receive from sleeping sender (sg)
1064 runtime_printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
1065 if(cas->receivedp != nil)
1066 *cas->receivedp = true;
1067 if(cas->sg.elem != nil)
1068 runtime_memmove(cas->sg.elem, sg->elem, c->elemsize);
1075 // read at end of closed channel
1077 if(cas->receivedp != nil)
1078 *cas->receivedp = false;
1079 if(cas->sg.elem != nil)
1080 runtime_memclr(cas->sg.elem, c->elemsize);
1084 // can send to sleeping receiver (sg)
1087 runtime_printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
1089 runtime_memmove(sg->elem, cas->sg.elem, c->elemsize);
1095 // return index corresponding to chosen case
1101 // send on closed channel
1103 runtime_panicstring("send on closed channel");
1104 return 0; // not reached
1107 // closechan(sel *byte);
1109 runtime_closechan(Hchan *c)
1115 runtime_panicstring("close of nil channel");
1117 if(runtime_gcwaiting)
1123 runtime_panicstring("close of closed channel");
1128 // release all readers
1130 sg = dequeue(&c->recvq);
1138 // release all writers
1140 sg = dequeue(&c->sendq);
1152 __go_builtin_close(Hchan *c)
1154 runtime_closechan(c);
1158 // func chanclose(c chan)
1160 void reflect_chanclose(uintptr) __asm__("libgo_reflect.reflect.chanclose");
1163 reflect_chanclose(uintptr c)
1165 runtime_closechan((Hchan*)c);
1169 // func chanlen(c chan) (len int32)
1171 int32 reflect_chanlen(uintptr) __asm__("libgo_reflect.reflect.chanlen");
1174 reflect_chanlen(uintptr ca)
1188 __go_chan_len(Hchan *c)
1190 return reflect_chanlen((uintptr)c);
1194 // func chancap(c chan) (cap int32)
1196 int32 reflect_chancap(uintptr) __asm__("libgo_reflect.reflect.chancap");
1199 reflect_chancap(uintptr ca)
1213 __go_chan_cap(Hchan *c)
1215 return reflect_chancap((uintptr)c);
1227 q->first = sgp->link;
1229 // if sgp is stale, ignore it
1230 if(sgp->selgen != NOSELGEN &&
1231 (sgp->selgen != sgp->g->selgen ||
1232 !runtime_cas(&sgp->g->selgen, sgp->selgen, sgp->selgen + 2))) {
1233 //prints("INVALID PSEUDOG POINTER\n");
1243 SudoG **l, *sgp, *prevsgp;
1248 for(l=&q->first; (sgp=*l) != nil; l=&sgp->link, prevsgp=sgp) {
1259 enqueue(WaitQ *q, SudoG *sgp)
1262 if(q->first == nil) {
1267 q->last->link = sgp;