1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
12 static int32 debug = 0;
14 typedef struct WaitQ WaitQ;
15 typedef struct SudoG SudoG;
16 typedef struct Select Select;
17 typedef struct Scase Scase;
19 typedef struct __go_type_descriptor Type;
20 typedef struct __go_channel_type ChanType;
24 G* g; // g and selgen constitute
25 uint32 selgen; // a weak pointer to g
27 byte* elem; // data element
38 uint32 qcount; // total data in the q
39 uint32 dataqsiz; // size of the circular q
43 uint32 sendx; // send index
44 uint32 recvx; // receive index
45 WaitQ recvq; // list of recv waiters
46 WaitQ sendq; // list of send waiters
50 // Buffer follows Hchan immediately in memory.
51 // chanbuf(c, i) is pointer to the i'th slot in the buffer.
52 #define chanbuf(c, i) ((byte*)((c)+1)+(uintptr)(c)->elemsize*(i))
64 SudoG sg; // must be first member (cast to Scase)
67 uint16 index; // index to return
68 bool* receivedp; // pointer to received bool (recv2)
73 uint16 tcase; // total count of scase[]
74 uint16 ncase; // currently filled scase[]
75 uint16* pollorder; // case poll order
76 Hchan** lockorder; // channel lock order
77 Scase scase[1]; // one per case (in order of appearance)
80 static void dequeueg(WaitQ*);
81 static SudoG* dequeue(WaitQ*);
82 static void enqueue(WaitQ*, SudoG*);
85 runtime_makechan_c(ChanType *t, int64 hint)
91 elem = t->__element_type;
93 if(hint < 0 || (int32)hint != hint || (elem->__size > 0 && (uintptr)hint > MaxMem / elem->__size))
94 runtime_panicstring("makechan: size out of range");
98 // allocate memory in one call
99 c = (Hchan*)runtime_mal(n + hint*elem->__size);
100 c->elemsize = elem->__size;
101 c->elemalign = elem->__align;
105 runtime_printf("makechan: chan=%p; elemsize=%D; elemalign=%d; dataqsiz=%d\n",
106 c, (int64)elem->__size, elem->__align, c->dataqsiz);
112 // func makechan(typ *ChanType, size uint32) (chan)
113 uintptr reflect_makechan(ChanType *, uint32)
114 asm ("reflect.makechan");
117 reflect_makechan(ChanType *t, uint32 size)
122 c = runtime_makechan_c(t, size);
123 ret = runtime_mal(sizeof(void*));
124 __builtin_memcpy(ret, &c, sizeof(void*));
128 // makechan(t *ChanType, hint int64) (hchan *chan any);
130 __go_new_channel(ChanType *t, uintptr hint)
132 return runtime_makechan_c(t, hint);
136 __go_new_channel_big(ChanType *t, uint64 hint)
138 return runtime_makechan_c(t, hint);
142 * generic single channel send/recv
143 * if the bool pointer is nil,
144 * then the full exchange will
145 * occur. if pres is not nil,
146 * then the protocol will not
147 * sleep but return if it could
150 * sleep can wake up with g->param == nil
151 * when a channel involved in the sleep has
152 * been closed. it is easiest to loop and re-run
153 * the operation; we'll see that it's now closed.
156 runtime_chansend(ChanType *t, Hchan *c, byte *ep, bool *pres)
171 g->status = Gwaiting;
172 g->waitreason = "chan send (nil chan)";
174 return; // not reached
177 if(runtime_gcwaiting)
181 runtime_printf("chansend: chan=%p\n", c);
191 sg = dequeue(&c->recvq);
198 runtime_memmove(sg->elem, ep, c->elemsize);
214 mysg.selgen = NOSELGEN;
216 g->status = Gwaiting;
217 g->waitreason = "chan send";
218 enqueue(&c->sendq, &mysg);
222 if(g->param == nil) {
225 runtime_throw("chansend: spurious wakeup");
235 if(c->qcount >= c->dataqsiz) {
243 mysg.selgen = NOSELGEN;
244 g->status = Gwaiting;
245 g->waitreason = "chan send";
246 enqueue(&c->sendq, &mysg);
253 runtime_memmove(chanbuf(c, c->sendx), ep, c->elemsize);
254 if(++c->sendx == c->dataqsiz)
258 sg = dequeue(&c->recvq);
271 runtime_panicstring("send on closed channel");
276 runtime_chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received)
283 if(runtime_gcwaiting)
287 runtime_printf("chanrecv: chan=%p\n", c);
293 if(selected != nil) {
297 g->status = Gwaiting;
298 g->waitreason = "chan receive (nil chan)";
300 return; // not reached
310 sg = dequeue(&c->sendq);
315 runtime_memmove(ep, sg->elem, c->elemsize);
327 if(selected != nil) {
335 mysg.selgen = NOSELGEN;
337 g->status = Gwaiting;
338 g->waitreason = "chan receive";
339 enqueue(&c->recvq, &mysg);
343 if(g->param == nil) {
346 runtime_throw("chanrecv: spurious wakeup");
359 if(selected != nil) {
368 mysg.selgen = NOSELGEN;
369 g->status = Gwaiting;
370 g->waitreason = "chan receive";
371 enqueue(&c->recvq, &mysg);
379 runtime_memmove(ep, chanbuf(c, c->recvx), c->elemsize);
380 runtime_memclr(chanbuf(c, c->recvx), c->elemsize);
381 if(++c->recvx == c->dataqsiz)
385 sg = dequeue(&c->sendq);
401 runtime_memclr(ep, c->elemsize);
409 // The compiler generates a call to __go_send_small to send a value 8
412 __go_send_small(ChanType *t, Hchan* c, uint64 val)
416 byte b[sizeof(uint64)];
422 #ifndef WORDS_BIGENDIAN
425 p = u.b + sizeof(uint64) - t->__element_type->__size;
427 runtime_chansend(t, c, p, nil);
430 // The compiler generates a call to __go_send_big to send a value
431 // larger than 8 bytes or smaller.
433 __go_send_big(ChanType *t, Hchan* c, byte* p)
435 runtime_chansend(t, c, p, nil);
438 // The compiler generates a call to __go_receive_small to receive a
439 // value 8 bytes or smaller.
441 __go_receive_small(ChanType *t, Hchan* c)
444 byte b[sizeof(uint64)];
450 #ifndef WORDS_BIGENDIAN
453 p = u.b + sizeof(uint64) - t->__element_type->__size;
455 runtime_chanrecv(t, c, p, nil, nil);
459 // The compiler generates a call to __go_receive_big to receive a
460 // value larger than 8 bytes.
462 __go_receive_big(ChanType *t, Hchan* c, byte* p)
464 runtime_chanrecv(t, c, p, nil, nil);
467 _Bool runtime_chanrecv2(ChanType *t, Hchan* c, byte* p)
468 __asm__("runtime.chanrecv2");
471 runtime_chanrecv2(ChanType *t, Hchan* c, byte* p)
475 runtime_chanrecv(t, c, p, nil, &received);
479 // func selectnbsend(c chan any, elem any) bool
481 // compiler implements
492 // if selectnbsend(c, v) {
499 runtime_selectnbsend(ChanType *t, Hchan *c, byte *p)
503 runtime_chansend(t, c, p, &res);
507 // func selectnbrecv(elem *any, c chan any) bool
509 // compiler implements
520 // if selectnbrecv(&v, c) {
527 runtime_selectnbrecv(ChanType *t, byte *v, Hchan *c)
531 runtime_chanrecv(t, c, v, &selected, nil);
535 // func selectnbrecv2(elem *any, ok *bool, c chan any) bool
537 // compiler implements
548 // if c != nil && selectnbrecv2(&v, &ok, c) {
555 runtime_selectnbrecv2(ChanType *t, byte *v, _Bool *received, Hchan *c)
561 runtime_chanrecv(t, c, v, &selected, received == nil ? nil : &r);
568 // func chansend(c chan, val iword, nb bool) (selected bool)
569 // where an iword is the same word an interface value would use:
570 // the actual data if it fits, or else a pointer to the data.
572 _Bool reflect_chansend(ChanType *, Hchan *, uintptr, _Bool)
573 __asm__("reflect.chansend");
576 reflect_chansend(ChanType *t, Hchan *c, uintptr val, _Bool nb)
584 sp = (bool*)&selected;
589 if(__go_is_pointer_type(t->__element_type))
593 runtime_chansend(t, c, vp, sp);
598 // func chanrecv(c chan, nb bool) (val iword, selected, received bool)
599 // where an iword is the same word an interface value would use:
600 // the actual data if it fits, or else a pointer to the data.
609 struct chanrecv_ret reflect_chanrecv(ChanType *, Hchan *, _Bool)
610 __asm__("reflect.chanrecv");
613 reflect_chanrecv(ChanType *t, Hchan *c, _Bool nb)
615 struct chanrecv_ret ret;
629 if(__go_is_pointer_type(t->__element_type)) {
630 vp = (byte*)&ret.val;
632 vp = runtime_mal(t->__element_type->__size);
633 ret.val = (uintptr)vp;
635 runtime_chanrecv(t, c, vp, sp, &received);
637 ret.selected = selected;
638 ret.received = received;
642 static void newselect(int32, Select**);
644 // newselect(size uint32) (sel *byte);
646 void* runtime_newselect(int) __asm__("runtime.newselect");
649 runtime_newselect(int size)
653 newselect(size, &sel);
658 newselect(int32 size, Select **selp)
667 // allocate all the memory we need in a single allocation
668 // start with Select with size cases
669 // then lockorder with size entries
670 // then pollorder with size entries
671 sel = runtime_mal(sizeof(*sel) +
672 n*sizeof(sel->scase[0]) +
673 size*sizeof(sel->lockorder[0]) +
674 size*sizeof(sel->pollorder[0]));
678 sel->lockorder = (void*)(sel->scase + size);
679 sel->pollorder = (void*)(sel->lockorder + size);
683 runtime_printf("newselect s=%p size=%d\n", sel, size);
686 // cut in half to give stack a chance to split
687 static void selectsend(Select *sel, Hchan *c, int index, void *elem);
689 // selectsend(sel *byte, hchan *chan any, elem *any) (selected bool);
691 void runtime_selectsend(Select *, Hchan *, void *, int)
692 __asm__("runtime.selectsend");
695 runtime_selectsend(Select *sel, Hchan *c, void *elem, int index)
697 // nil cases do not compete
701 selectsend(sel, c, index, elem);
705 selectsend(Select *sel, Hchan *c, int index, void *elem)
712 runtime_throw("selectsend: too many cases");
714 cas = &sel->scase[i];
718 cas->kind = CaseSend;
722 runtime_printf("selectsend s=%p index=%d chan=%p\n",
723 sel, cas->index, cas->chan);
726 // cut in half to give stack a chance to split
727 static void selectrecv(Select *sel, Hchan *c, int index, void *elem, bool*);
729 // selectrecv(sel *byte, hchan *chan any, elem *any) (selected bool);
731 void runtime_selectrecv(Select *, Hchan *, void *, int)
732 __asm__("runtime.selectrecv");
735 runtime_selectrecv(Select *sel, Hchan *c, void *elem, int index)
737 // nil cases do not compete
741 selectrecv(sel, c, index, elem, nil);
744 // selectrecv2(sel *byte, hchan *chan any, elem *any, received *bool) (selected bool);
746 void runtime_selectrecv2(Select *, Hchan *, void *, bool *, int)
747 __asm__("runtime.selectrecv2");
750 runtime_selectrecv2(Select *sel, Hchan *c, void *elem, bool *received, int index)
752 // nil cases do not compete
756 selectrecv(sel, c, index, elem, received);
760 selectrecv(Select *sel, Hchan *c, int index, void *elem, bool *received)
767 runtime_throw("selectrecv: too many cases");
769 cas = &sel->scase[i];
773 cas->kind = CaseRecv;
775 cas->receivedp = received;
778 runtime_printf("selectrecv s=%p index=%d chan=%p\n",
779 sel, cas->index, cas->chan);
782 // cut in half to give stack a chance to split
783 static void selectdefault(Select*, int);
785 // selectdefault(sel *byte) (selected bool);
787 void runtime_selectdefault(Select *, int) __asm__("runtime.selectdefault");
790 runtime_selectdefault(Select *sel, int index)
792 selectdefault(sel, index);
796 selectdefault(Select *sel, int index)
803 runtime_throw("selectdefault: too many cases");
805 cas = &sel->scase[i];
809 cas->kind = CaseDefault;
812 runtime_printf("selectdefault s=%p index=%d\n",
823 for(i=0; i<sel->ncase; i++) {
824 c0 = sel->lockorder[i];
826 c = sel->lockorder[i];
833 selunlock(Select *sel)
839 for(i=sel->ncase; i-->0;) {
840 c0 = sel->lockorder[i];
854 g->status = Gwaiting; // forever
855 g->waitreason = "select (no cases)";
859 static int selectgo(Select**);
861 // selectgo(sel *byte);
863 int runtime_selectgo(Select *) __asm__("runtime.selectgo");
866 runtime_selectgo(Select *sel)
868 return selectgo(&sel);
872 selectgo(Select **selp)
884 if(runtime_gcwaiting)
888 runtime_printf("select: sel=%p\n", sel);
892 // The compiler rewrites selects that statically have
893 // only 0 or 1 cases plus default into simpler constructs.
894 // The only way we can end up with such small sel->ncase
895 // values here is for a larger select in which most channels
896 // have been nilled out. The general code handles those
897 // cases correctly, and they are rare enough not to bother
898 // optimizing (and needing to test).
900 // generate permuted order
901 for(i=0; i<sel->ncase; i++)
902 sel->pollorder[i] = i;
903 for(i=1; i<sel->ncase; i++) {
904 o = sel->pollorder[i];
905 j = runtime_fastrand1()%(i+1);
906 sel->pollorder[i] = sel->pollorder[j];
907 sel->pollorder[j] = o;
910 // sort the cases by Hchan address to get the locking order.
911 for(i=0; i<sel->ncase; i++) {
912 c = sel->scase[i].chan;
913 for(j=i; j>0 && sel->lockorder[j-1] >= c; j--)
914 sel->lockorder[j] = sel->lockorder[j-1];
915 sel->lockorder[j] = c;
920 // pass 1 - look for something already waiting
922 for(i=0; i<sel->ncase; i++) {
923 o = sel->pollorder[i];
924 cas = &sel->scase[o];
929 if(c->dataqsiz > 0) {
933 sg = dequeue(&c->sendq);
944 if(c->dataqsiz > 0) {
945 if(c->qcount < c->dataqsiz)
948 sg = dequeue(&c->recvq);
967 // pass 2 - enqueue on all chans
968 for(i=0; i<sel->ncase; i++) {
969 o = sel->pollorder[i];
970 cas = &sel->scase[o];
974 sg->selgen = g->selgen;
978 enqueue(&c->recvq, sg);
982 enqueue(&c->sendq, sg);
988 g->status = Gwaiting;
989 g->waitreason = "select";
996 // pass 3 - dequeue from unsuccessful chans
997 // otherwise they stack up on quiet channels
998 for(i=0; i<sel->ncase; i++) {
999 cas = &sel->scase[i];
1000 if(cas != (Scase*)sg) {
1002 if(cas->kind == CaseSend)
1003 dequeueg(&c->sendq);
1005 dequeueg(&c->recvq);
1016 runtime_throw("selectgo: shouldnt happen");
1019 runtime_printf("wait-return: sel=%p c=%p cas=%p kind=%d\n",
1020 sel, c, cas, cas->kind);
1022 if(cas->kind == CaseRecv) {
1023 if(cas->receivedp != nil)
1024 *cas->receivedp = true;
1031 // can receive from buffer
1032 if(cas->receivedp != nil)
1033 *cas->receivedp = true;
1034 if(cas->sg.elem != nil)
1035 runtime_memmove(cas->sg.elem, chanbuf(c, c->recvx), c->elemsize);
1036 runtime_memclr(chanbuf(c, c->recvx), c->elemsize);
1037 if(++c->recvx == c->dataqsiz)
1040 sg = dequeue(&c->sendq);
1051 // can send to buffer
1052 runtime_memmove(chanbuf(c, c->sendx), cas->sg.elem, c->elemsize);
1053 if(++c->sendx == c->dataqsiz)
1056 sg = dequeue(&c->recvq);
1067 // can receive from sleeping sender (sg)
1070 runtime_printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
1071 if(cas->receivedp != nil)
1072 *cas->receivedp = true;
1073 if(cas->sg.elem != nil)
1074 runtime_memmove(cas->sg.elem, sg->elem, c->elemsize);
1081 // read at end of closed channel
1083 if(cas->receivedp != nil)
1084 *cas->receivedp = false;
1085 if(cas->sg.elem != nil)
1086 runtime_memclr(cas->sg.elem, c->elemsize);
1090 // can send to sleeping receiver (sg)
1093 runtime_printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
1095 runtime_memmove(sg->elem, cas->sg.elem, c->elemsize);
1101 // return index corresponding to chosen case
1107 // send on closed channel
1109 runtime_panicstring("send on closed channel");
1110 return 0; // not reached
1113 // closechan(sel *byte);
1115 runtime_closechan(Hchan *c)
1121 runtime_panicstring("close of nil channel");
1123 if(runtime_gcwaiting)
1129 runtime_panicstring("close of closed channel");
1134 // release all readers
1136 sg = dequeue(&c->recvq);
1144 // release all writers
1146 sg = dequeue(&c->sendq);
1158 __go_builtin_close(Hchan *c)
1160 runtime_closechan(c);
1164 // func chanclose(c chan)
1166 void reflect_chanclose(uintptr) __asm__("reflect.chanclose");
1169 reflect_chanclose(uintptr c)
1171 runtime_closechan((Hchan*)c);
1175 // func chanlen(c chan) (len int32)
1177 int32 reflect_chanlen(uintptr) __asm__("reflect.chanlen");
1180 reflect_chanlen(uintptr ca)
1194 __go_chan_len(Hchan *c)
1196 return reflect_chanlen((uintptr)c);
1200 // func chancap(c chan) (cap int32)
1202 int32 reflect_chancap(uintptr) __asm__("reflect.chancap");
1205 reflect_chancap(uintptr ca)
1219 __go_chan_cap(Hchan *c)
1221 return reflect_chancap((uintptr)c);
1233 q->first = sgp->link;
1235 // if sgp is stale, ignore it
1236 if(sgp->selgen != NOSELGEN &&
1237 (sgp->selgen != sgp->g->selgen ||
1238 !runtime_cas(&sgp->g->selgen, sgp->selgen, sgp->selgen + 2))) {
1239 //prints("INVALID PSEUDOG POINTER\n");
1249 SudoG **l, *sgp, *prevsgp;
1254 for(l=&q->first; (sgp=*l) != nil; l=&sgp->link, prevsgp=sgp) {
1265 enqueue(WaitQ *q, SudoG *sgp)
1268 if(q->first == nil) {
1273 q->last->link = sgp;