Back to index

plt-scheme  4.2.1
sema.c
Go to the documentation of this file.
00001 /*
00002   MzScheme
00003   Copyright (c) 2004-2009 PLT Scheme Inc.
00004   Copyright (c) 1995-2001 Matthew Flatt
00005  
00006     This library is free software; you can redistribute it and/or
00007     modify it under the terms of the GNU Library General Public
00008     License as published by the Free Software Foundation; either
00009     version 2 of the License, or (at your option) any later version.
00010 
00011     This library is distributed in the hope that it will be useful,
00012     but WITHOUT ANY WARRANTY; without even the implied warranty of
00013     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00014     Library General Public License for more details.
00015 
00016     You should have received a copy of the GNU Library General Public
00017     License along with this library; if not, write to the Free
00018     Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
00019     Boston, MA 02110-1301 USA.
00020 */
00021 
00022 #include "schpriv.h"
00023 
00024 #ifndef NO_SCHEME_THREADS
00025 
00026 Scheme_Object *scheme_always_ready_evt;
00027 Scheme_Object *scheme_system_idle_channel;
00028 
00029 static Scheme_Object *make_sema(int n, Scheme_Object **p);
00030 static Scheme_Object *semap(int n, Scheme_Object **p);
00031 static Scheme_Object *hit_sema(int n, Scheme_Object **p);
00032 static Scheme_Object *block_sema_p(int n, Scheme_Object **p);
00033 static Scheme_Object *block_sema(int n, Scheme_Object **p);
00034 static Scheme_Object *block_sema_breakable(int n, Scheme_Object **p);
00035 static Scheme_Object *make_sema_repost(int n, Scheme_Object **p);
00036 
00037 static Scheme_Object *make_channel(int n, Scheme_Object **p);
00038 static Scheme_Object *make_channel_put(int n, Scheme_Object **p);
00039 static Scheme_Object *channel_p(int n, Scheme_Object **p);
00040 
00041 static Scheme_Object *thread_send(int n, Scheme_Object **p);
00042 static Scheme_Object *thread_receive(int n, Scheme_Object **p);
00043 static Scheme_Object *thread_try_receive(int n, Scheme_Object **p);
00044 static Scheme_Object *thread_receive_evt(int n, Scheme_Object **p);
00045 static Scheme_Object *thread_rewind_receive(int n, Scheme_Object **p);
00046 
00047 static Scheme_Object *make_alarm(int n, Scheme_Object **p);
00048 static Scheme_Object *make_sys_idle(int n, Scheme_Object **p);
00049 
00050 static int channel_get_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
00051 static int channel_put_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
00052 static int channel_syncer_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
00053 static int alarm_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
00054 static int always_ready(Scheme_Object *w);
00055 static int never_ready(Scheme_Object *w);
00056 static int thread_recv_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
00057 
00058 static int pending_break(Scheme_Thread *p);
00059 
00060 int scheme_main_was_once_suspended;
00061 
00062 static Scheme_Object *system_idle_put_evt;
00063 static Scheme_Object *thread_recv_evt;
00064 
00065 #ifdef MZ_PRECISE_GC
00066 static void register_traversers(void);
00067 #endif
00068 
00069 typedef struct {
00070   Scheme_Object so;
00071   double sleep_end;
00072 } Scheme_Alarm;
00073 
00074 /* For object-sync: */
00075 static int sema_ready(Scheme_Object *s)
00076 {
00077   return scheme_wait_sema(s, 1);
00078 }
00079 
00080 static Scheme_Object *sema_for_repost(Scheme_Object *s, int *repost)
00081 {
00082   *repost = 1;
00083   return SCHEME_PTR_VAL(s);
00084 }
00085 
00086 void scheme_init_sema(Scheme_Env *env)
00087 {
00088   Scheme_Object *o;
00089 
00090 #ifdef MZ_PRECISE_GC
00091   register_traversers();
00092 #endif
00093 
00094   scheme_add_global_constant("make-semaphore", 
00095                           scheme_make_prim_w_arity(make_sema,
00096                                                 "make-semaphore", 
00097                                                 0, 1), 
00098                           env);
00099   scheme_add_global_constant("semaphore?", 
00100                           scheme_make_folding_prim(semap,
00101                                                 "semaphore?", 
00102                                                 1, 1, 1), 
00103                           env);
00104   scheme_add_global_constant("semaphore-post", 
00105                           scheme_make_prim_w_arity(hit_sema, 
00106                                                 "semaphore-post", 
00107                                                 1, 1), 
00108                           env);
00109   scheme_add_global_constant("semaphore-try-wait?", 
00110                           scheme_make_prim_w_arity(block_sema_p, 
00111                                                 "semaphore-try-wait?", 
00112                                                 1, 1), 
00113                           env);
00114   scheme_add_global_constant("semaphore-wait", 
00115                           scheme_make_prim_w_arity(block_sema, 
00116                                                 "semaphore-wait", 
00117                                                 1, 1), 
00118                           env);
00119   scheme_add_global_constant("semaphore-wait/enable-break", 
00120                           scheme_make_prim_w_arity(block_sema_breakable, 
00121                                                 "semaphore-wait/enable-break", 
00122                                                 1, 1), 
00123                           env);
00124 
00125   scheme_add_global_constant("semaphore-peek-evt", 
00126                           scheme_make_prim_w_arity(make_sema_repost,
00127                                                 "semaphore-peek-evt", 
00128                                                 1, 1), 
00129                           env);
00130 
00131   scheme_add_global_constant("make-channel", 
00132                           scheme_make_prim_w_arity(make_channel,
00133                                                 "make-channel",
00134                                                 0, 0), 
00135                           env);
00136   scheme_add_global_constant("channel-put-evt", 
00137                           scheme_make_prim_w_arity(make_channel_put,
00138                                                 "channel-put-evt",
00139                                                 2, 2), 
00140                           env);
00141   scheme_add_global_constant("channel?", 
00142                           scheme_make_folding_prim(channel_p,
00143                                                 "channel?",
00144                                                 1, 1, 1), 
00145                           env);  
00146 
00147   scheme_add_global_constant("thread-send", 
00148                           scheme_make_prim_w_arity(thread_send,
00149                                                 "thread-send", 
00150                                                 2, 3), 
00151                           env);
00152   scheme_add_global_constant("thread-receive", 
00153                           scheme_make_prim_w_arity(thread_receive,
00154                                                 "thread-receive", 
00155                                                 0, 0), 
00156                           env);
00157   scheme_add_global_constant("thread-try-receive", 
00158                           scheme_make_prim_w_arity(thread_try_receive,
00159                                                 "thread-try-receive", 
00160                                                 0, 0), 
00161                           env);
00162   scheme_add_global_constant("thread-receive-evt", 
00163                           scheme_make_prim_w_arity(thread_receive_evt,
00164                                                 "thread-receive-evt", 
00165                                                 0, 0), 
00166                           env);
00167   scheme_add_global_constant("thread-rewind-receive", 
00168                           scheme_make_prim_w_arity(thread_rewind_receive,
00169                                                 "thread-rewind-receive", 
00170                                                 1, 1), 
00171                           env);
00172 
00173   scheme_add_global_constant("alarm-evt", 
00174                           scheme_make_prim_w_arity(make_alarm,
00175                                                 "alarm-evt",
00176                                                 1, 1), 
00177                           env);
00178 
00179   scheme_add_global_constant("system-idle-evt", 
00180                           scheme_make_prim_w_arity(make_sys_idle,
00181                                                 "system-idle-evt",
00182                                                 0, 0), 
00183                           env);
00184 
00185   REGISTER_SO(scheme_always_ready_evt);
00186   scheme_always_ready_evt = scheme_alloc_small_object();
00187   scheme_always_ready_evt->type = scheme_always_evt_type;
00188   scheme_add_global_constant("always-evt", scheme_always_ready_evt, env);
00189 
00190   o = scheme_alloc_small_object();
00191   o->type = scheme_never_evt_type;
00192   scheme_add_global_constant("never-evt", o, env);
00193 
00194   REGISTER_SO(thread_recv_evt);
00195   o = scheme_alloc_small_object();
00196   o->type = scheme_thread_recv_evt_type;
00197   thread_recv_evt = o;
00198 
00199   REGISTER_SO(scheme_system_idle_channel);
00200   scheme_system_idle_channel = scheme_make_channel();
00201 
00202   scheme_add_evt(scheme_sema_type, sema_ready, NULL, NULL, 0);
00203   scheme_add_evt_through_sema(scheme_semaphore_repost_type, sema_for_repost, NULL);
00204   scheme_add_evt(scheme_channel_type, (Scheme_Ready_Fun)channel_get_ready, NULL, NULL, 1);
00205   scheme_add_evt(scheme_channel_put_type, (Scheme_Ready_Fun)channel_put_ready, NULL, NULL, 1);
00206   scheme_add_evt(scheme_channel_syncer_type, (Scheme_Ready_Fun)channel_syncer_ready, NULL, NULL, 0);
00207   scheme_add_evt(scheme_alarm_type, (Scheme_Ready_Fun)alarm_ready, NULL, NULL, 0);
00208   scheme_add_evt(scheme_always_evt_type, always_ready, NULL, NULL, 0);
00209   scheme_add_evt(scheme_never_evt_type, never_ready, NULL, NULL, 0);
00210   scheme_add_evt(scheme_thread_recv_evt_type, (Scheme_Ready_Fun)thread_recv_ready, NULL, NULL, 0);
00211 }
00212 
00213 Scheme_Object *scheme_make_sema(long v)
00214 {
00215   Scheme_Sema *sema;
00216 
00217   sema = MALLOC_ONE_TAGGED(Scheme_Sema);
00218   sema->value = v;
00219 
00220   sema->so.type = scheme_sema_type;
00221 
00222   return (Scheme_Object *)sema;
00223 }
00224 
00225 static Scheme_Object *make_sema(int n, Scheme_Object **p)
00226 {
00227   long v;
00228 
00229   if (n) {
00230     if (!SCHEME_INTP(p[0])) {
00231       if (!SCHEME_BIGNUMP(p[0]) || !SCHEME_BIGPOS(p[0]))
00232        scheme_wrong_type("make-semaphore", "non-negative exact integer", 0, n, p);
00233     }
00234 
00235     if (!scheme_get_int_val(p[0], &v)) {
00236       scheme_raise_exn(MZEXN_FAIL,
00237                      "make-semaphore: starting value %s is too large",
00238                      scheme_make_provided_string(p[0], 0, NULL));
00239     } else if (v < 0)
00240       scheme_wrong_type("make-semaphore", "non-negative exact integer", 0, n, p);
00241   } else
00242     v = 0;
00243 
00244   return scheme_make_sema(v);
00245 }
00246 
00247 static Scheme_Object *make_sema_repost(int n, Scheme_Object **p)
00248 {
00249   if (!SCHEME_SEMAP(p[0]))
00250     scheme_wrong_type("semaphore-peek-evt", "semaphore", 0, n, p);
00251  
00252   return scheme_make_sema_repost(p[0]);
00253 }
00254  
00255 Scheme_Object *scheme_make_sema_repost(Scheme_Object *sema)
00256 {
00257   Scheme_Object *o;
00258 
00259   o = scheme_alloc_small_object();
00260   o->type = scheme_semaphore_repost_type;
00261   SCHEME_PTR_VAL(o) = sema;
00262 
00263   return o;
00264 }
00265 
00266 static Scheme_Object *semap(int n, Scheme_Object **p)
00267 {
00268   return SCHEME_SEMAP(p[0]) ? scheme_true : scheme_false;
00269 }
00270 
00271 void scheme_post_sema(Scheme_Object *o)
00272 {
00273   Scheme_Sema *t = (Scheme_Sema *)o;
00274   int v, consumed;
00275 
00276   if (t->value < 0) return;
00277 
00278   v = t->value + 1;
00279   if (v > t->value) {
00280     t->value = v;
00281 
00282     while (t->first) {
00283       Scheme_Channel_Syncer *w;
00284 
00285       w = t->first;
00286 
00287       t->first = w->next;
00288       if (!w->next)
00289        t->last = NULL;
00290       else
00291        t->first->prev = NULL;
00292       
00293       if ((!w->syncing || !w->syncing->result) && !pending_break(w->p)) {
00294        if (w->syncing) {
00295          w->syncing->result = w->syncing_i + 1;
00296          if (w->syncing->disable_break)
00297            w->syncing->disable_break->suspend_break++;
00298          scheme_post_syncing_nacks(w->syncing);
00299          if (!w->syncing->reposts || !w->syncing->reposts[w->syncing_i]) {
00300            t->value -= 1;
00301            consumed = 1;
00302          } else
00303            consumed = 0;
00304           if (w->syncing->accepts && w->syncing->accepts[w->syncing_i])
00305             scheme_accept_sync(w->syncing, w->syncing_i);
00306        } else {
00307          /* In this case, we will remove the syncer from line, but
00308             someone else might grab the post. This is unfair, but it
00309             can help improve throughput when multiple threads synchronize
00310             on a lock. */
00311          consumed = 1;
00312        }
00313        w->picked = 1;
00314       } else
00315        consumed = 0;
00316 
00317       w->in_line = 0;
00318       w->prev = NULL;
00319       w->next = NULL;
00320 
00321       if (w->picked) {
00322        scheme_weak_resume_thread(w->p);
00323        if (consumed)
00324          break;
00325       }
00326       /* otherwise, loop to find one we can wake up */
00327     }
00328 
00329     return;
00330   }
00331 
00332   scheme_raise_exn(MZEXN_FAIL,
00333                  "semaphore-post: the maximum post count has already been reached");
00334 }
00335 
00336 void scheme_post_sema_all(Scheme_Object *o)
00337 {
00338   Scheme_Sema *t = (Scheme_Sema *)o;
00339 
00340   while (t->first) {
00341     scheme_post_sema(o);
00342   }
00343   t->value = -1;
00344 }
00345 
00346 static Scheme_Object *hit_sema(int n, Scheme_Object **p)
00347 {
00348   if (!SCHEME_SEMAP(p[0]))
00349     scheme_wrong_type("semaphore-post", "semaphore", 0, n, p);
00350 
00351   scheme_post_sema(p[0]);
00352 
00353   return scheme_void;
00354 }
00355 
00356 static int out_of_line(Scheme_Object *a)
00357 {
00358   Scheme_Thread *p;
00359   int n, i;
00360   Scheme_Channel_Syncer *w;
00361 
00362   /* Out of one line? */
00363   n = SCHEME_INT_VAL(((Scheme_Object **)a)[0]);
00364   for (i = 0; i < n; i++) {
00365     w = (((Scheme_Channel_Syncer ***)a)[1])[i];
00366     if (w->picked)
00367       return 1;
00368   }
00369 
00370   /* Suspended break? */
00371   p = ((Scheme_Thread **)a)[2];
00372   if (p->external_break) {
00373     int v;
00374     --p->suspend_break;
00375     v = scheme_can_break(p);
00376     p->suspend_break++;
00377     if (v)
00378       return 1; 
00379   }
00380 
00381   /* Suspended by user? */
00382   if ((p->running & MZTHREAD_USER_SUSPENDED)
00383       || scheme_main_was_once_suspended)
00384     return 1;
00385 
00386   return 0;
00387 }
00388 
00389 static void get_into_line(Scheme_Sema *sema, Scheme_Channel_Syncer *w)
00390   /* Can be called multiple times. */
00391 {
00392   Scheme_Channel_Syncer *last, *first;
00393   
00394   w->in_line = 1;
00395   w->picked = 0;
00396 
00397   if (SAME_TYPE(SCHEME_TYPE(sema), scheme_never_evt_type)) {
00398     return; /* !!!! skip everything else */
00399   } else if (SCHEME_SEMAP(sema)) {
00400     last = sema->last;
00401     first = sema->first;
00402   } else if (SCHEME_CHANNELP(sema)) {
00403     last = ((Scheme_Channel *)sema)->get_last;
00404     first = ((Scheme_Channel *)sema)->get_first;
00405   } else {
00406     last = ((Scheme_Channel_Put *)sema)->ch->put_last;
00407     first = ((Scheme_Channel_Put *)sema)->ch->put_first;
00408   }
00409 
00410   w->prev = last;
00411   if (last)
00412     last->next = w;
00413   else
00414     first = w;
00415   last = w;
00416   w->next = NULL;
00417 
00418   if (SCHEME_SEMAP(sema)) {
00419     sema->last = last;
00420     sema->first = first;
00421   } else if (SCHEME_CHANNELP(sema)) {
00422     ((Scheme_Channel *)sema)->get_last = last;
00423     ((Scheme_Channel *)sema)->get_first = first;
00424   } else {
00425     ((Scheme_Channel_Put *)sema)->ch->put_last = last;
00426     ((Scheme_Channel_Put *)sema)->ch->put_first = first;
00427   }
00428 }
00429 
00430 static void get_outof_line(Scheme_Sema *sema, Scheme_Channel_Syncer *w)
00431 {
00432   Scheme_Channel_Syncer *last, *first;
00433 
00434   if (!w->in_line)
00435     return;
00436   w->in_line = 0;
00437 
00438   if (SAME_TYPE(SCHEME_TYPE(sema), scheme_never_evt_type)) {
00439     return; /* !!!! skip everything else */
00440   } else if (SCHEME_SEMAP(sema)) {
00441     last = sema->last;
00442     first = sema->first;
00443   } else if (SCHEME_CHANNELP(sema)) {
00444     last = ((Scheme_Channel *)sema)->get_last;
00445     first = ((Scheme_Channel *)sema)->get_first;
00446   } else {
00447     last = ((Scheme_Channel_Put *)sema)->ch->put_last;
00448     first = ((Scheme_Channel_Put *)sema)->ch->put_first;
00449   }
00450 
00451   if (w->prev)
00452     w->prev->next = w->next;
00453   else
00454     first = w->next;
00455   if (w->next)
00456     w->next->prev = w->prev;
00457   else
00458     last = w->prev;
00459 
00460   if (SCHEME_SEMAP(sema)) {
00461     sema->last = last;
00462     sema->first = first;
00463   } else if (SCHEME_CHANNELP(sema)) {
00464     ((Scheme_Channel *)sema)->get_last = last;
00465     ((Scheme_Channel *)sema)->get_first = first;
00466   } else {
00467     ((Scheme_Channel_Put *)sema)->ch->put_last = last;
00468     ((Scheme_Channel_Put *)sema)->ch->put_first = first;
00469   }
00470 }
00471 
00472 static void ext_get_into_line(Scheme_Object *ch, Scheme_Schedule_Info *sinfo)
00473 {
00474   Scheme_Channel_Syncer *w;
00475 
00476   /* Get into line */
00477   w = MALLOC_ONE_RT(Scheme_Channel_Syncer);
00478   w->so.type = scheme_channel_syncer_type;
00479   if (sinfo->false_positive_ok)
00480     w->p = sinfo->false_positive_ok;
00481   else
00482     w->p = scheme_current_thread;
00483   w->syncing = (Syncing *)sinfo->current_syncing;
00484   w->obj = ch;
00485   w->syncing_i = sinfo->w_i;
00486 
00487   get_into_line((Scheme_Sema *)ch, w);
00488 
00489   scheme_set_sync_target(sinfo, (Scheme_Object *)w, NULL, NULL, 0, 0, NULL);
00490 }
00491 
00492 void scheme_get_outof_line(Scheme_Channel_Syncer *ch_w)
00493 {
00494   get_outof_line((Scheme_Sema *)ch_w->obj, ch_w);
00495 }
00496 
00497 static int try_channel(Scheme_Sema *sema, Syncing *syncing, int pos, Scheme_Object **result)
00498 {
00499   if (SCHEME_CHANNELP(sema)) {
00500     /* GET mode */
00501     Scheme_Channel *ch = (Scheme_Channel *)sema;
00502     Scheme_Channel_Syncer *w = ch->put_first, *next;
00503     int picked = 0;
00504 
00505     while (w) {
00506       if (w->syncing == syncing) {
00507        /* can't synchronize with self */
00508        w = w->next;
00509       } else {
00510        Scheme_Channel_Put *chp = (Scheme_Channel_Put *)w->obj;
00511 
00512         if (!w->syncing->result && !pending_break(w->p)) {
00513          w->picked = 1;
00514          w->syncing->result = w->syncing_i + 1;
00515          if (w->syncing->disable_break)
00516            w->syncing->disable_break->suspend_break++;
00517          scheme_post_syncing_nacks(w->syncing);
00518          if (result)
00519            *result = chp->val;
00520          if (syncing && (pos >= 0)) {
00521            syncing->result = pos + 1;
00522            if (syncing->disable_break)
00523              syncing->disable_break->suspend_break++;
00524            scheme_post_syncing_nacks(syncing);
00525            syncing->set->argv[pos] = chp->val;
00526          }
00527          picked = 1;
00528          scheme_weak_resume_thread(w->p);
00529        }
00530        
00531        next = w->next;
00532        get_outof_line((Scheme_Sema *)chp, w);
00533        w = next;
00534        
00535        if (picked)
00536          return 1;
00537       }
00538     }
00539 
00540     return 0;
00541   } else {
00542     /* PUT mode */
00543     Scheme_Channel_Put *chp = (Scheme_Channel_Put *)sema;
00544     Scheme_Channel_Syncer *w = chp->ch->get_first, *next;
00545     int picked = 0;
00546 
00547     while (w) {
00548       if (w->syncing == syncing) {
00549        /* can't synchronize with self */
00550        w = w->next;
00551       } else {
00552        if (!w->syncing->result && !pending_break(w->p)) {
00553          w->picked = 1;
00554          w->syncing->set->argv[w->syncing_i] = chp->val;
00555          w->syncing->result = w->syncing_i + 1;
00556          if (w->syncing->disable_break)
00557            w->syncing->disable_break->suspend_break++;
00558          scheme_post_syncing_nacks(w->syncing);
00559          if (syncing && (pos >= 0)) {
00560            syncing->result = pos + 1;
00561            if (syncing->disable_break)
00562              syncing->disable_break->suspend_break++;
00563            scheme_post_syncing_nacks(syncing);
00564          }
00565          picked = 1;
00566          scheme_weak_resume_thread(w->p);
00567        }
00568        
00569        next = w->next;
00570        get_outof_line((Scheme_Sema *)chp->ch, w);
00571        w = next;
00572        
00573        if (picked)
00574          return 1;
00575       }
00576     }
00577 
00578     return 0;    
00579   }
00580 }
00581 
00582 int scheme_try_plain_sema(Scheme_Object *o)
00583 {
00584   Scheme_Sema *sema = (Scheme_Sema *)o;
00585 
00586   if (sema->value) {
00587     if (sema->value > 0)
00588       --sema->value;
00589     return 1;
00590   } else
00591     return 0;
00592 }
00593 
00594 int scheme_wait_semas_chs(int n, Scheme_Object **o, int just_try, Syncing *syncing)
00595      /* When syncing is supplied, o can contain Scheme_Channel_Syncer
00596        and never-evt values, and just_try must be 0. */
00597 {
00598   Scheme_Sema **semas = (Scheme_Sema **)o;
00599   int v, i, ii;
00600 
00601   if (just_try) {
00602     /* assert: n == 1, !syncing */
00603     Scheme_Sema *sema = semas[0];
00604     if (just_try > 0) {
00605       if (sema->so.type == scheme_sema_type) {
00606         v = scheme_try_plain_sema((Scheme_Object *)sema);
00607       } else {
00608        v = try_channel(sema, syncing, 0, NULL);
00609       }
00610     } else {
00611       Scheme_Cont_Frame_Data cframe;
00612 
00613       scheme_push_break_enable(&cframe, 1, 1);
00614 
00615       scheme_wait_sema((Scheme_Object *)sema, 0);
00616 
00617       scheme_pop_break_enable(&cframe, 0);
00618 
00619       return 1;
00620     }
00621   } else {
00622     int start_pos;
00623 
00624     if (n > 1) {
00625       if (syncing)
00626        start_pos = syncing->start_pos;
00627       else {
00628        Scheme_Object *rand_state;
00629        rand_state = scheme_get_param(scheme_current_config(), MZCONFIG_SCHEDULER_RANDOM_STATE);
00630        start_pos = scheme_rand((Scheme_Random_State *)rand_state);
00631       }
00632     } else
00633       start_pos = 0;
00634 
00635     /* Initial poll */
00636     i = 0;
00637     for (ii = 0; ii < n; ii++) {
00638       /* Randomized start position for poll ensures fairness: */
00639       i = (start_pos + ii) % n;
00640 
00641       if (semas[i]->so.type == scheme_sema_type) {
00642        if (semas[i]->value) {
00643          if ((semas[i]->value > 0) && (!syncing || !syncing->reposts || !syncing->reposts[i]))
00644            --semas[i]->value;
00645           if (syncing && syncing->accepts && syncing->accepts[i])
00646             scheme_accept_sync(syncing, i);
00647          break;
00648        }
00649       } else if (semas[i]->so.type == scheme_never_evt_type) {
00650        /* Never ready. */
00651       } else if (semas[i]->so.type == scheme_channel_syncer_type) {
00652        /* Probably no need to poll */
00653       } else if (try_channel(semas[i], syncing, i, NULL))
00654        break;
00655     }
00656 
00657     /* In the following, syncers get changed back to channels,
00658        and channel puts */
00659     if (ii >= n) {
00660       Scheme_Channel_Syncer **ws, *w;
00661 
00662       ws = MALLOC_N(Scheme_Channel_Syncer*, n);
00663       for (i = 0; i < n; i++) {
00664        if (semas[i]->so.type == scheme_channel_syncer_type) {
00665          ws[i] = (Scheme_Channel_Syncer *)semas[i];
00666          semas[i] = (Scheme_Sema *)ws[i]->obj;
00667        } else {
00668          w = MALLOC_ONE_RT(Scheme_Channel_Syncer);
00669          ws[i] = w;
00670          w->so.type = scheme_channel_syncer_type;
00671          w->p = scheme_current_thread;
00672          w->syncing = syncing;
00673          w->obj = (Scheme_Object *)semas[i];
00674          w->syncing_i = i;
00675        }
00676       }
00677       
00678       while (1) {
00679        int out_of_a_line;
00680 
00681        /* Get into line */
00682        for (i = 0; i < n; i++) {
00683          if (!ws[i]->in_line) {
00684            get_into_line(semas[i], ws[i]);
00685          }
00686        }
00687 
00688        if (!scheme_current_thread->next) {
00689          void **a;
00690 
00691          /* We're not allowed to suspend the main thread. Delay
00692             breaks so we get a chance to clean up. */
00693          scheme_current_thread->suspend_break++;
00694 
00695          a = MALLOC_N(void*, 3);
00696          a[0] = scheme_make_integer(n);
00697          a[1] = ws;
00698          a[2] = scheme_current_thread;
00699          
00700          scheme_main_was_once_suspended = 0;
00701 
00702          scheme_block_until(out_of_line, NULL, (Scheme_Object *)a, (float)0.0);
00703          
00704          --scheme_current_thread->suspend_break;
00705        } else {
00706          /* Mark the thread to indicate that we need to clean up
00707             if the thread is killed. */
00708          int old_nkc;
00709          old_nkc = (scheme_current_thread->running & MZTHREAD_NEED_KILL_CLEANUP);
00710          if (!old_nkc)
00711            scheme_current_thread->running += MZTHREAD_NEED_KILL_CLEANUP;
00712          scheme_weak_suspend_thread(scheme_current_thread);
00713          if (!old_nkc && (scheme_current_thread->running & MZTHREAD_NEED_KILL_CLEANUP))
00714            scheme_current_thread->running -= MZTHREAD_NEED_KILL_CLEANUP;
00715        }
00716 
00717        /* We've been resumed. But was it for the semaphore, or a signal? */
00718        out_of_a_line = 0;
00719        
00720        /* If we get the post, we must return WITHOUT BLOCKING. 
00721           MrEd, for example, depends on this special property, which ensures
00722           that the thread can't be broken or killed between
00723           receiving the post and returning. */
00724 
00725        if (!syncing) {
00726          /* Poster can't be sure that we really will get it,
00727             so we have to decrement the sema count here. */
00728          i = 0;
00729          for (ii = 0; ii < n; ii++) {
00730            i = (start_pos + ii) % n;
00731            if (ws[i]->picked) {
00732              out_of_a_line = 1;
00733              if (semas[i]->value) {
00734               if (semas[i]->value > 0)
00735                 --(semas[i]->value);
00736               break;
00737              }
00738            }
00739          }
00740          if (ii >= n)
00741            i = n;
00742        } else {
00743          if (syncing->result) {
00744            out_of_a_line = 1;
00745            i = syncing->result - 1;
00746          } else {
00747            out_of_a_line = 0;
00748            i = n;
00749          }
00750        }
00751 
00752        if (!out_of_a_line) {
00753          /* We weren't woken by any semaphore/channel. Get out of line, block once 
00754             (to handle breaks/kills) and then loop to get back into line. */
00755          for (i = 0; i < n; i++) {
00756            if (ws[i]->in_line)
00757              get_outof_line(semas[i], ws[i]);
00758          }
00759          
00760          scheme_thread_block(0); /* ok if it returns multiple times */ 
00761          scheme_current_thread->ran_some = 1;
00762          /* [but why would it return multiple times?! there must have been a reason...] */
00763        } else {
00764 
00765          if ((scheme_current_thread->running & MZTHREAD_KILLED)
00766              || ((scheme_current_thread->running & MZTHREAD_USER_SUSPENDED)
00767                 && !(scheme_current_thread->running & MZTHREAD_NEED_SUSPEND_CLEANUP))) {
00768            /* We've been killed or suspended! */
00769            i = -1;
00770          }
00771 
00772          /* We got a post from semas[i], or we were killed. 
00773             Did any (other) semaphore pick us?
00774             (This only happens when syncing == NULL.) */
00775          if (!syncing) {
00776            int j;
00777 
00778            for (j = 0; j < n; j++) {
00779              if (j != i) {
00780               if (ws[j]->picked) {
00781                 if (semas[j]->value) {
00782                   /* Consume the value and repost, because no one else
00783                      has been told to go, and we're accepting a different post. */
00784                   if (semas[j]->value > 0)
00785                     --semas[j]->value;
00786                   scheme_post_sema((Scheme_Object *)semas[j]);
00787                 }
00788               }
00789              }
00790            }
00791          }
00792 
00793          /* If we're done, get out of all lines that we're still in. */
00794          if (i < n) {
00795            int j;
00796            for (j = 0; j < n; j++) {
00797              if (ws[j]->in_line)
00798               get_outof_line(semas[j], ws[j]);
00799            }
00800          }
00801 
00802          if (i == -1) {
00803            scheme_thread_block(0); /* dies or suspends */
00804            scheme_current_thread->ran_some = 1;
00805          }
00806 
00807          if (i < n)
00808            break;
00809        }
00810 
00811        /* Otherwise: !syncing and someone stole the post, or we were
00812           suspended and we have to start over. Either way, poll then
00813           loop to get back in line an try again. */
00814        for (ii = 0; ii < n; ii++) {
00815          i = (start_pos + ii) % n;
00816 
00817          if (semas[i]->so.type == scheme_sema_type) {
00818            if (semas[i]->value) {
00819              if ((semas[i]->value > 0) && (!syncing || !syncing->reposts || !syncing->reposts[i]))
00820               --semas[i]->value;
00821               if (syncing && syncing->accepts && syncing->accepts[i])
00822                 scheme_accept_sync(syncing, i);
00823              break;
00824            }
00825          }  else if (semas[i]->so.type == scheme_never_evt_type) {
00826            /* Never ready. */
00827          } else if (try_channel(semas[i], syncing, i, NULL))
00828            break;
00829        }
00830 
00831        if (ii < n) {
00832          /* Get out of any line that we still might be in: */
00833          int j;
00834          for (j = 0; j < n; j++) {
00835            if (ws[j]->in_line)
00836              get_outof_line(semas[j], ws[j]);
00837          }
00838 
00839          break;
00840        }
00841 
00842        if (!syncing) {
00843          /* Looks like this thread is a victim of unfair semaphore access.
00844             Go into fair mode by allocating a syncing: */
00845          syncing = MALLOC_ONE_RT(Syncing);
00846 #ifdef MZTAG_REQUIRED
00847          syncing->type = scheme_rt_syncing;
00848 #endif
00849          syncing->start_pos = start_pos;
00850 
00851          /* Get out of all lines, and set syncing field before we get back in line: */
00852          {
00853            int j;
00854            for (j = 0; j < n; j++) {
00855              if (ws[j]->in_line)
00856               get_outof_line(semas[j], ws[j]);
00857              ws[j]->syncing = syncing;
00858            }
00859          }
00860        }
00861        /* Back to top of loop to sync again */
00862       }
00863     }
00864     v = i + 1;
00865   }
00866 
00867   return v;
00868 }
00869 
00870 int scheme_wait_sema(Scheme_Object *o, int just_try)
00871 {
00872   Scheme_Object *a[1];
00873 
00874   a[0] = o;
00875 
00876   return scheme_wait_semas_chs(1, a, just_try, NULL);
00877 }
00878 
00879 static Scheme_Object *block_sema_p(int n, Scheme_Object **p)
00880 {
00881   if (!SCHEME_SEMAP(p[0]))
00882     scheme_wrong_type("semaphore-try-wait?", "sema", 0, n, p);
00883 
00884   return scheme_wait_sema(p[0], 1) ? scheme_true : scheme_false;
00885 }
00886 
00887 static Scheme_Object *block_sema(int n, Scheme_Object **p)
00888 {
00889   if (!SCHEME_SEMAP(p[0]))
00890     scheme_wrong_type("semaphore-wait", "sema", 0, n, p);
00891 
00892   scheme_wait_sema(p[0], 0);
00893 
00894   /* In case a break appeared after we received the post,
00895      check for a break, because scheme_wait_sema() won't: */
00896   scheme_check_break_now();
00897 
00898   return scheme_void;
00899 }
00900 
00901 static Scheme_Object *block_sema_breakable(int n, Scheme_Object **p)
00902 {
00903   if (!SCHEME_SEMAP(p[0]))
00904     scheme_wrong_type("semaphore-wait/enable-break", "sema", 0, n, p);
00905 
00906   scheme_wait_sema(p[0], -1);
00907 
00908   return scheme_void;
00909 }
00910 
00911 static int pending_break(Scheme_Thread *p)
00912 {
00913   if (p->running & (MZTHREAD_KILLED | MZTHREAD_USER_SUSPENDED))
00914     return 1;
00915 
00916   if (p->external_break) {
00917     int v;
00918 
00919     if (!p->next) {
00920       /* if p is the main thread, it must have a suspension
00921         to block on a channel or semaphore: */
00922       --p->suspend_break;
00923     }
00924 
00925     v = scheme_can_break(p);
00926 
00927     if (!p->next)
00928       p->suspend_break++;
00929 
00930     return v;
00931   }
00932 
00933   return 0;
00934 }
00935 
00936 /**********************************************************************/
00937 /*                            Channels                                */
00938 /**********************************************************************/
00939 
00940 Scheme_Object *scheme_make_channel()
00941 {
00942   Scheme_Channel *c;
00943 
00944   c = MALLOC_ONE_TAGGED(Scheme_Channel);
00945   c->so.type = scheme_channel_type;
00946   
00947   return (Scheme_Object *)c;
00948 }
00949 
00950 static Scheme_Object *make_channel(int n, Scheme_Object **p)
00951 {
00952   return scheme_make_channel();
00953 }
00954 
00955 Scheme_Object *scheme_make_channel_put_evt(Scheme_Object *ch, Scheme_Object *v)
00956 {
00957   Scheme_Channel_Put *cp;
00958 
00959   cp = MALLOC_ONE_TAGGED(Scheme_Channel_Put);
00960   cp->so.type = scheme_channel_put_type;
00961   cp->ch = (Scheme_Channel *)ch;
00962   cp->val = v;
00963 
00964   return (Scheme_Object *)cp;
00965 }
00966 
00967 int scheme_try_channel_put(Scheme_Object *ch, Scheme_Object *v)
00968 {
00969   if (((Scheme_Channel *)ch)->get_first) {
00970     Scheme_Object *a[2];
00971     v = scheme_make_channel_put_evt(ch, v);
00972     a[0] = scheme_make_integer(0);
00973     a[1] = v;
00974     v = scheme_sync_timeout(2, a);
00975     return SCHEME_TRUEP(v);
00976   } else
00977     return 0;
00978 }
00979 
00980 static Scheme_Object *make_channel_put(int argc, Scheme_Object **argv)
00981 {
00982   if (!SCHEME_CHANNELP(argv[0]))
00983     scheme_wrong_type("channel-put-evt", "channel", 0, argc, argv);
00984 
00985   return scheme_make_channel_put_evt(argv[0], argv[1]);
00986 }
00987 
00988 static Scheme_Object *channel_p(int n, Scheme_Object **p)
00989 {
00990   return (SCHEME_CHANNELP(p[0])
00991          ? scheme_true
00992          : scheme_false);
00993 }
00994 
00995 static int channel_get_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo)
00996 {
00997   Scheme_Object *result;
00998 
00999   if (try_channel((Scheme_Sema *)ch, (Syncing *)sinfo->current_syncing, -1, &result)) {
01000     scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 0, NULL);
01001     return 1;
01002   }
01003 
01004   ext_get_into_line(ch, sinfo);
01005   
01006   return 0;
01007 }
01008 
01009 static int channel_put_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo)
01010 {
01011   if (try_channel((Scheme_Sema *)ch, (Syncing *)sinfo->current_syncing, -1, NULL))
01012     return 1;
01013 
01014   ext_get_into_line(ch, sinfo);
01015   
01016   return 0;
01017 }
01018 
01019 static int channel_syncer_ready(Scheme_Object *ch_w, Scheme_Schedule_Info *sinfo)
01020 {
01021   Scheme_Channel_Syncer *w = (Scheme_Channel_Syncer *)ch_w;
01022 
01023   if (w->picked) {
01024     /* The value, if any, should have been tranferred already (in which
01025        case we would not have made it here, actually). */
01026     return 1;
01027   }
01028 
01029   return 0;
01030 }
01031 
01032 int scheme_try_channel_get(Scheme_Object *ch)
01033 {
01034   if (try_channel((Scheme_Sema *)ch, NULL, -1, NULL)) {
01035     return 1;
01036   }
01037   return 0;
01038 }
01039 
01040 /**********************************************************************/
01041 /*                           Thread mbox                              */
01042 /**********************************************************************/
01043 
01044 static void make_mbox_sema(Scheme_Thread *p)
01045 {
01046   if (!p->mbox_sema) {
01047     Scheme_Object *sema = NULL;
01048     sema = scheme_make_sema(0); 
01049     p->mbox_sema = sema;
01050   }
01051 }
01052 
01053 static void mbox_push(Scheme_Thread *p, Scheme_Object *o)
01054 {
01055   Scheme_Object *next;
01056 
01057   next = scheme_make_raw_pair(o, NULL);
01058   
01059   if (p->mbox_first) {
01060     SCHEME_CDR(p->mbox_last) = next;
01061     p->mbox_last = next;
01062   } else {
01063     p->mbox_first = next;
01064     p->mbox_last = next;
01065   }
01066 
01067   make_mbox_sema(p);
01068   scheme_post_sema(p->mbox_sema);
01069   /* Post can't overflow the semaphore, because we'd run out of
01070      memory for the queue, first. */
01071 }
01072 
01073 static void mbox_push_front(Scheme_Thread *p, Scheme_Object *lst) 
01074 {
01075   int cnt = -1;
01076   Scheme_Object *next, *hd;
01077 
01078   make_mbox_sema(p);
01079 
01080   next = lst;
01081   while (!SCHEME_NULLP(next)) {
01082     /* Push one: */
01083     hd = scheme_make_raw_pair(SCHEME_CAR(next), p->mbox_first);
01084     if (!p->mbox_first)
01085       p->mbox_last = hd;
01086     p->mbox_first = hd;
01087 
01088     ++cnt;
01089     next = SCHEME_CDR(next);
01090 
01091     if (SCHEME_NULLP(next) || (cnt == 256)) {
01092       /* Either done or need to pause to allow breaks/swaps; */
01093       /* do a single post for all messages so far. */
01094       ((Scheme_Sema*)p->mbox_sema)->value += cnt;
01095       scheme_post_sema(p->mbox_sema);
01096       SCHEME_USE_FUEL(cnt+1); /* might sleep */
01097       cnt = -1;
01098     }
01099   }
01100 }
01101 
01102 static Scheme_Object *mbox_pop( Scheme_Thread *p, int dec)
01103 {
01104   /* Assertion: mbox_first != NULL */
01105   Scheme_Object *r = NULL;
01106 
01107   r = SCHEME_CAR(p->mbox_first);
01108   p->mbox_first = SCHEME_CDR(p->mbox_first);
01109   if (!p->mbox_first)
01110     p->mbox_last = NULL;
01111   
01112   if (dec)
01113     scheme_try_plain_sema(p->mbox_sema);
01114 
01115   return r;
01116 }
01117 
01118 static Scheme_Object *thread_send(int argc, Scheme_Object **argv)
01119 {
01120   if (SCHEME_THREADP(argv[0])) {
01121     int running;
01122 
01123     if (argc > 2) {
01124       if (!SCHEME_FALSEP(argv[2])) /* redundant, but keeps it fast as possible */
01125         scheme_check_proc_arity2("thread-send", 0, 2, argc, argv, 1);
01126     }
01127 
01128     running = ((Scheme_Thread*)argv[0])->running;
01129     if (MZTHREAD_STILL_RUNNING(running)) {
01130       mbox_push((Scheme_Thread*)argv[0], argv[1]);
01131       return scheme_void;
01132     } else {
01133       if (argc > 2) {
01134         if (SCHEME_FALSEP(argv[2]))
01135           return scheme_false;
01136         else
01137           return _scheme_tail_apply(argv[2], 0, NULL);
01138       } else
01139         scheme_raise_exn(MZEXN_FAIL_CONTRACT, "thread-send: target thread is not running");
01140     }
01141   } else 
01142     scheme_wrong_type("thread-send", "thread", 0, argc, argv);
01143 
01144   return NULL;
01145 }
01146 
01147 static Scheme_Object *thread_receive(int argc, Scheme_Object **argv)
01148 {
01149   /* The mbox semaphore can only be downed by the current thread, so
01150      receive/try-receive can directly dec+pop without syncing 
01151      (by calling mbox_pop with dec=1). */
01152   if (scheme_current_thread->mbox_first) {
01153     return mbox_pop(scheme_current_thread, 1);
01154   } else {
01155     Scheme_Object *v;
01156     Scheme_Thread *p = scheme_current_thread;
01157 
01158     make_mbox_sema(p);
01159 
01160     scheme_wait_sema(p->mbox_sema, 0);
01161     /* We're relying on atomicity of return after wait succeeds to ensure
01162        that a semaphore wait guarantees a mailbox dequeue. */
01163     v = mbox_pop(p, 0);
01164     
01165     /* Due to that atomicity, though, we're obliged to check for
01166        a break at this point: */
01167     scheme_check_break_now();
01168     
01169     return v;
01170   }
01171 }
01172 
01173 static Scheme_Object *thread_try_receive(int argc, Scheme_Object **argv)
01174 {
01175   if (scheme_current_thread->mbox_first)
01176     return mbox_pop(scheme_current_thread, 1);
01177   else
01178     return scheme_false;
01179 }
01180 
01181 static Scheme_Object *thread_receive_evt(int argc, Scheme_Object **argv)
01182 {
01183   return thread_recv_evt;
01184 }
01185 
01186 static int thread_recv_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo)
01187 {
01188   Scheme_Thread *p;
01189 
01190   p = sinfo->false_positive_ok;
01191   if (!p)
01192     p = scheme_current_thread;
01193 
01194   make_mbox_sema(p);
01195 
01196   scheme_set_sync_target(sinfo, p->mbox_sema, thread_recv_evt, NULL, 1, 1, NULL);
01197 
01198   return 0;
01199 }
01200 
01201 static Scheme_Object *thread_rewind_receive(int argc, Scheme_Object **argv)
01202 {
01203   if (scheme_is_list(argv[0])) {
01204     mbox_push_front(scheme_current_thread, argv[0]);
01205     return scheme_void;
01206   } else
01207     scheme_wrong_type("thread-rewind", "list", 0, argc, argv);
01208 
01209   return NULL;
01210 }
01211 
01212 /**********************************************************************/
01213 /*                             alarms                                 */
01214 /**********************************************************************/
01215 
01216 static Scheme_Object *make_alarm(int argc, Scheme_Object **argv)
01217 {
01218   Scheme_Alarm *a;
01219   double sleep_end;
01220 
01221   if (!SCHEME_REALP(argv[0])) {
01222     scheme_wrong_type("alarm-evt", "real number", 0, argc, argv);
01223   }
01224 
01225   sleep_end = scheme_get_val_as_double(argv[0]);
01226 
01227   a = MALLOC_ONE_TAGGED(Scheme_Alarm);
01228   a->so.type = scheme_alarm_type;
01229   a->sleep_end = sleep_end;
01230 
01231   return (Scheme_Object *)a;
01232 }
01233 
01234 static int alarm_ready(Scheme_Object *_a, Scheme_Schedule_Info *sinfo)
01235 {
01236   Scheme_Alarm *a = (Scheme_Alarm *)_a;
01237 
01238   if (!sinfo->sleep_end
01239       || (sinfo->sleep_end > a->sleep_end))
01240     sinfo->sleep_end = a->sleep_end;
01241 
01242   if (a->sleep_end <= scheme_get_inexact_milliseconds())
01243     return 1;
01244 
01245   return 0;
01246 }
01247 
01248 static int always_ready(Scheme_Object *w)
01249 {
01250   return 1;
01251 }
01252 
01253 static int never_ready(Scheme_Object *w)
01254 {
01255   return 0;
01256 }
01257 
01258 static Scheme_Object *make_sys_idle(int n, Scheme_Object **p)
01259 {
01260   if (!system_idle_put_evt) {
01261     Scheme_Object *a[2];
01262     REGISTER_SO(system_idle_put_evt);
01263     system_idle_put_evt = scheme_make_channel_put_evt(scheme_system_idle_channel,
01264                                                       scheme_void);
01265     a[0] = system_idle_put_evt;
01266     a[1] = scheme_void_proc;
01267     system_idle_put_evt = scheme_wrap_evt(2, a);
01268   }
01269 
01270   return system_idle_put_evt;
01271 }
01272 
01273 /**********************************************************************/
01274 /*                           Precise GC                               */
01275 /**********************************************************************/
01276 
01277 #ifdef MZ_PRECISE_GC
01278 
01279 START_XFORM_SKIP;
01280 
01281 #define MARKS_FOR_SEMA_C
01282 #include "mzmark.c"
01283 
01284 static void register_traversers(void)
01285 {
01286   GC_REG_TRAV(scheme_alarm_type, mark_alarm);
01287   GC_REG_TRAV(scheme_channel_syncer_type, mark_channel_syncer);
01288 }
01289 
01290 END_XFORM_SKIP;
01291 
01292 #endif
01293 
01294 #endif /* NO_SCHEME_THREADS */