1 /**
2  * The semaphore module provides a general use semaphore for synchronization.
3  *
4  * Copyright: Copyright Sean Kelly 2005 - 2009.
5  * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
6  * Authors:   Sean Kelly
7  * Source:    $(DRUNTIMESRC core/sync/_semaphore.d)
8  */
9 
10 /*          Copyright Sean Kelly 2005 - 2009.
11  * Distributed under the Boost Software License, Version 1.0.
12  *    (See accompanying file LICENSE or copy at
13  *          http://www.boost.org/LICENSE_1_0.txt)
14  */
15 module core.sync.semaphore;
16 
17 
18 public import core.sync.exception;
19 public import core.time;
20 
21 version (OSX)
22     version = Darwin;
23 else version (iOS)
24     version = Darwin;
25 else version (TVOS)
26     version = Darwin;
27 else version (WatchOS)
28     version = Darwin;
29 
30 version (Windows)
31 {
32     import core.sys.windows.basetsd /+: HANDLE+/;
33     import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, INFINITE,
34         ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/;
35     import core.sys.windows.windef /+: BOOL, DWORD+/;
36     import core.sys.windows.winerror /+: WAIT_TIMEOUT+/;
37 }
38 else version (Darwin)
39 {
40     import core.sync.config;
41     import core.stdc.errno;
42     import core.sys.posix.time;
43     import core.sys.darwin.mach.semaphore;
44 }
45 else version (Posix)
46 {
47     import core.sync.config;
48     import core.stdc.errno;
49     import core.sys.posix.pthread;
50     import core.sys.posix.semaphore;
51 }
52 else
53 {
54     static assert(false, "Platform not supported");
55 }
56 
57 
58 ////////////////////////////////////////////////////////////////////////////////
59 // Semaphore
60 //
61 // void wait();
62 // void notify();
63 // bool tryWait();
64 ////////////////////////////////////////////////////////////////////////////////
65 
66 
67 /**
68  * This class represents a general counting semaphore as concieved by Edsger
69  * Dijkstra.  As per Mesa type monitors however, "signal" has been replaced
70  * with "notify" to indicate that control is not transferred to the waiter when
71  * a notification is sent.
72  */
73 class Semaphore
74 {
75     ////////////////////////////////////////////////////////////////////////////
76     // Initialization
77     ////////////////////////////////////////////////////////////////////////////
78 
79 
80     /**
81      * Initializes a semaphore object with the specified initial count.
82      *
83      * Params:
84      *  count = The initial count for the semaphore.
85      *
86      * Throws:
87      *  SyncError on error.
88      */
89     this( uint count = 0 )
90     {
91         version (Windows)
92         {
93             m_hndl = CreateSemaphoreA( null, count, int.max, null );
94             if ( m_hndl == m_hndl.init )
95                 throw new SyncError( "Unable to create semaphore" );
96         }
97         else version (Darwin)
98         {
99             auto rc = semaphore_create( mach_task_self(), &m_hndl, SYNC_POLICY_FIFO, count );
100             if ( rc )
101                 throw new SyncError( "Unable to create semaphore" );
102         }
103         else version (Posix)
104         {
105             int rc = sem_init( &m_hndl, 0, count );
106             if ( rc )
107                 throw new SyncError( "Unable to create semaphore" );
108         }
109     }
110 
111 
112     ~this()
113     {
114         version (Windows)
115         {
116             BOOL rc = CloseHandle( m_hndl );
117             assert( rc, "Unable to destroy semaphore" );
118         }
119         else version (Darwin)
120         {
121             auto rc = semaphore_destroy( mach_task_self(), m_hndl );
122             assert( !rc, "Unable to destroy semaphore" );
123         }
124         else version (Posix)
125         {
126             int rc = sem_destroy( &m_hndl );
127             assert( !rc, "Unable to destroy semaphore" );
128         }
129     }
130 
131 
132     ////////////////////////////////////////////////////////////////////////////
133     // General Actions
134     ////////////////////////////////////////////////////////////////////////////
135 
136 
137     /**
138      * Wait until the current count is above zero, then atomically decrement
139      * the count by one and return.
140      *
141      * Throws:
142      *  SyncError on error.
143      */
144     void wait()
145     {
146         version (Windows)
147         {
148             DWORD rc = WaitForSingleObject( m_hndl, INFINITE );
149             if ( rc != WAIT_OBJECT_0 )
150                 throw new SyncError( "Unable to wait for semaphore" );
151         }
152         else version (Darwin)
153         {
154             while ( true )
155             {
156                 auto rc = semaphore_wait( m_hndl );
157                 if ( !rc )
158                     return;
159                 if ( rc == KERN_ABORTED && errno == EINTR )
160                     continue;
161                 throw new SyncError( "Unable to wait for semaphore" );
162             }
163         }
164         else version (Posix)
165         {
166             while ( true )
167             {
168                 if ( !sem_wait( &m_hndl ) )
169                     return;
170                 if ( errno != EINTR )
171                     throw new SyncError( "Unable to wait for semaphore" );
172             }
173         }
174     }
175 
176 
177     /**
178      * Suspends the calling thread until the current count moves above zero or
179      * until the supplied time period has elapsed.  If the count moves above
180      * zero in this interval, then atomically decrement the count by one and
181      * return true.  Otherwise, return false.
182      *
183      * Params:
184      *  period = The time to wait.
185      *
186      * In:
187      *  period must be non-negative.
188      *
189      * Throws:
190      *  SyncError on error.
191      *
192      * Returns:
193      *  true if notified before the timeout and false if not.
194      */
195     bool wait( Duration period )
196     in
197     {
198         assert( !period.isNegative );
199     }
200     do
201     {
202         version (Windows)
203         {
204             auto maxWaitMillis = dur!("msecs")( uint.max - 1 );
205 
206             while ( period > maxWaitMillis )
207             {
208                 auto rc = WaitForSingleObject( m_hndl, cast(uint)
209                                                        maxWaitMillis.total!"msecs" );
210                 switch ( rc )
211                 {
212                 case WAIT_OBJECT_0:
213                     return true;
214                 case WAIT_TIMEOUT:
215                     period -= maxWaitMillis;
216                     continue;
217                 default:
218                     throw new SyncError( "Unable to wait for semaphore" );
219                 }
220             }
221             switch ( WaitForSingleObject( m_hndl, cast(uint) period.total!"msecs" ) )
222             {
223             case WAIT_OBJECT_0:
224                 return true;
225             case WAIT_TIMEOUT:
226                 return false;
227             default:
228                 throw new SyncError( "Unable to wait for semaphore" );
229             }
230         }
231         else version (Darwin)
232         {
233             mach_timespec_t t = void;
234             (cast(byte*) &t)[0 .. t.sizeof] = 0;
235 
236             if ( period.total!"seconds" > t.tv_sec.max )
237             {
238                 t.tv_sec  = t.tv_sec.max;
239                 t.tv_nsec = cast(typeof(t.tv_nsec)) period.split!("seconds", "nsecs")().nsecs;
240             }
241             else
242                 period.split!("seconds", "nsecs")(t.tv_sec, t.tv_nsec);
243             while ( true )
244             {
245                 auto rc = semaphore_timedwait( m_hndl, t );
246                 if ( !rc )
247                     return true;
248                 if ( rc == KERN_OPERATION_TIMED_OUT )
249                     return false;
250                 if ( rc != KERN_ABORTED || errno != EINTR )
251                     throw new SyncError( "Unable to wait for semaphore" );
252             }
253         }
254         else version (Posix)
255         {
256             import core.sys.posix.time : clock_gettime, CLOCK_REALTIME;
257 
258             timespec t = void;
259             clock_gettime( CLOCK_REALTIME, &t );
260             mvtspec( t, period );
261 
262             while ( true )
263             {
264                 if ( !sem_timedwait( &m_hndl, &t ) )
265                     return true;
266                 if ( errno == ETIMEDOUT )
267                     return false;
268                 if ( errno != EINTR )
269                     throw new SyncError( "Unable to wait for semaphore" );
270             }
271         }
272     }
273 
274 
275     /**
276      * Atomically increment the current count by one.  This will notify one
277      * waiter, if there are any in the queue.
278      *
279      * Throws:
280      *  SyncError on error.
281      */
282     void notify()
283     {
284         version (Windows)
285         {
286             if ( !ReleaseSemaphore( m_hndl, 1, null ) )
287                 throw new SyncError( "Unable to notify semaphore" );
288         }
289         else version (Darwin)
290         {
291             auto rc = semaphore_signal( m_hndl );
292             if ( rc )
293                 throw new SyncError( "Unable to notify semaphore" );
294         }
295         else version (Posix)
296         {
297             int rc = sem_post( &m_hndl );
298             if ( rc )
299                 throw new SyncError( "Unable to notify semaphore" );
300         }
301     }
302 
303 
304     /**
305      * If the current count is equal to zero, return.  Otherwise, atomically
306      * decrement the count by one and return true.
307      *
308      * Throws:
309      *  SyncError on error.
310      *
311      * Returns:
312      *  true if the count was above zero and false if not.
313      */
314     bool tryWait()
315     {
316         version (Windows)
317         {
318             switch ( WaitForSingleObject( m_hndl, 0 ) )
319             {
320             case WAIT_OBJECT_0:
321                 return true;
322             case WAIT_TIMEOUT:
323                 return false;
324             default:
325                 throw new SyncError( "Unable to wait for semaphore" );
326             }
327         }
328         else version (Darwin)
329         {
330             return wait( dur!"hnsecs"(0) );
331         }
332         else version (Posix)
333         {
334             while ( true )
335             {
336                 if ( !sem_trywait( &m_hndl ) )
337                     return true;
338                 if ( errno == EAGAIN )
339                     return false;
340                 if ( errno != EINTR )
341                     throw new SyncError( "Unable to wait for semaphore" );
342             }
343         }
344     }
345 
346 
347 protected:
348 
349     /// Aliases the operating-system-specific semaphore type.
350     version (Windows)        alias Handle = HANDLE;
351     /// ditto
352     else version (Darwin)    alias Handle = semaphore_t;
353     /// ditto
354     else version (Posix)     alias Handle = sem_t;
355 
356     /// Handle to the system-specific semaphore.
357     Handle m_hndl;
358 }
359 
360 
361 ////////////////////////////////////////////////////////////////////////////////
362 // Unit Tests
363 ////////////////////////////////////////////////////////////////////////////////
364 
365 unittest
366 {
367     import core.thread, core.atomic;
368 
369     void testWait()
370     {
371         auto semaphore = new Semaphore;
372         shared bool stopConsumption = false;
373         immutable numToProduce = 20;
374         immutable numConsumers = 10;
375         shared size_t numConsumed;
376         shared size_t numComplete;
377 
378         void consumer()
379         {
380             while (true)
381             {
382                 semaphore.wait();
383 
384                 if (atomicLoad(stopConsumption))
385                     break;
386                 atomicOp!"+="(numConsumed, 1);
387             }
388             atomicOp!"+="(numComplete, 1);
389         }
390 
391         void producer()
392         {
393             assert(!semaphore.tryWait());
394 
395             foreach (_; 0 .. numToProduce)
396                 semaphore.notify();
397 
398             // wait until all items are consumed
399             while (atomicLoad(numConsumed) != numToProduce)
400                 Thread.yield();
401 
402             // mark consumption as finished
403             atomicStore(stopConsumption, true);
404 
405             // wake all consumers
406             foreach (_; 0 .. numConsumers)
407                 semaphore.notify();
408 
409             // wait until all consumers completed
410             while (atomicLoad(numComplete) != numConsumers)
411                 Thread.yield();
412 
413             assert(!semaphore.tryWait());
414             semaphore.notify();
415             assert(semaphore.tryWait());
416             assert(!semaphore.tryWait());
417         }
418 
419         auto group = new ThreadGroup;
420 
421         for ( int i = 0; i < numConsumers; ++i )
422             group.create(&consumer);
423         group.create(&producer);
424         group.joinAll();
425     }
426 
427 
428     void testWaitTimeout()
429     {
430         auto sem = new Semaphore;
431         shared bool semReady;
432         bool alertedOne, alertedTwo;
433 
434         void waiter()
435         {
436             while (!atomicLoad(semReady))
437                 Thread.yield();
438             alertedOne = sem.wait(dur!"msecs"(1));
439             alertedTwo = sem.wait(dur!"msecs"(1));
440             assert(alertedOne && !alertedTwo);
441         }
442 
443         auto thread = new Thread(&waiter);
444         thread.start();
445 
446         sem.notify();
447         atomicStore(semReady, true);
448         thread.join();
449         assert(alertedOne && !alertedTwo);
450     }
451 
452     testWait();
453     testWaitTimeout();
454 }