1 /** 2 * The event module provides a primitive for lightweight signaling of other threads 3 * (emulating Windows events on Posix) 4 * 5 * Copyright: Copyright (c) 2019 D Language Foundation 6 * License: Distributed under the 7 * $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0). 8 * (See accompanying file LICENSE) 9 * Authors: Rainer Schuetze 10 * Source: $(DRUNTIMESRC core/sync/event.d) 11 */ 12 module core.sync.event; 13 14 version (Windows) 15 { 16 import core.sys.windows.basetsd /+: HANDLE +/; 17 import core.sys.windows.winerror /+: WAIT_TIMEOUT +/; 18 import core.sys.windows.winbase /+: CreateEvent, CloseHandle, SetEvent, ResetEvent, 19 WaitForSingleObject, INFINITE, WAIT_OBJECT_0+/; 20 } 21 else version (Posix) 22 { 23 import core.sys.posix.pthread; 24 import core.sys.posix.sys.types; 25 import core.sys.posix.time; 26 } 27 else 28 { 29 static assert(false, "Platform not supported"); 30 } 31 32 import core.time; 33 import core.internal.abort : abort; 34 35 /** 36 * represents an event. Clients of an event are suspended while waiting 37 * for the event to be "signaled". 38 * 39 * Implemented using `pthread_mutex` and `pthread_condition` on Posix and 40 * `CreateEvent` and `SetEvent` on Windows. 41 --- 42 import core.sync.event, core.thread, std.file; 43 44 struct ProcessFile 45 { 46 ThreadGroup group; 47 Event event; 48 void[] buffer; 49 50 void doProcess() 51 { 52 event.wait(); 53 // process buffer 54 } 55 56 void process(string filename) 57 { 58 event.initialize(true, false); 59 group = new ThreadGroup; 60 for (int i = 0; i < 10; ++i) 61 group.create(&doProcess); 62 63 buffer = std.file.read(filename); 64 event.set(); 65 group.joinAll(); 66 event.terminate(); 67 } 68 } 69 --- 70 */ 71 struct Event 72 { 73 nothrow @nogc: 74 /** 75 * Creates an event object. 76 * 77 * Params: 78 * manualReset = the state of the event is not reset automatically after resuming waiting clients 79 * initialState = initial state of the signal 80 */ 81 this(bool manualReset, bool initialState) 82 { 83 initialize(manualReset, initialState); 84 } 85 86 /** 87 * Initializes an event object. Does nothing if the event is already initialized. 88 * 89 * Params: 90 * manualReset = the state of the event is not reset automatically after resuming waiting clients 91 * initialState = initial state of the signal 92 */ 93 void initialize(bool manualReset, bool initialState) 94 { 95 version (Windows) 96 { 97 if (m_event) 98 return; 99 m_event = CreateEvent(null, manualReset, initialState, null); 100 m_event || abort("Error: CreateEvent failed."); 101 } 102 else version (Posix) 103 { 104 if (m_initalized) 105 return; 106 pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) == 0 || 107 abort("Error: pthread_mutex_init failed."); 108 static if ( is( typeof( pthread_condattr_setclock ) ) ) 109 { 110 pthread_condattr_t attr = void; 111 pthread_condattr_init(&attr) == 0 || 112 abort("Error: pthread_condattr_init failed."); 113 pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0 || 114 abort("Error: pthread_condattr_setclock failed."); 115 pthread_cond_init(&m_cond, &attr) == 0 || 116 abort("Error: pthread_cond_init failed."); 117 pthread_condattr_destroy(&attr) == 0 || 118 abort("Error: pthread_condattr_destroy failed."); 119 } 120 else 121 { 122 pthread_cond_init(&m_cond, null) == 0 || 123 abort("Error: pthread_cond_init failed."); 124 } 125 m_state = initialState; 126 m_manualReset = manualReset; 127 m_initalized = true; 128 } 129 } 130 131 // copying not allowed, can produce resource leaks 132 @disable this(this); 133 @disable void opAssign(Event); 134 135 ~this() 136 { 137 terminate(); 138 } 139 140 /** 141 * deinitialize event. Does nothing if the event is not initialized. There must not be 142 * threads currently waiting for the event to be signaled. 143 */ 144 void terminate() 145 { 146 version (Windows) 147 { 148 if (m_event) 149 CloseHandle(m_event); 150 m_event = null; 151 } 152 else version (Posix) 153 { 154 if (m_initalized) 155 { 156 pthread_mutex_destroy(&m_mutex) == 0 || 157 abort("Error: pthread_mutex_destroy failed."); 158 pthread_cond_destroy(&m_cond) == 0 || 159 abort("Error: pthread_cond_destroy failed."); 160 m_initalized = false; 161 } 162 } 163 } 164 165 166 /// Set the event to "signaled", so that waiting clients are resumed 167 void set() 168 { 169 version (Windows) 170 { 171 if (m_event) 172 SetEvent(m_event); 173 } 174 else version (Posix) 175 { 176 if (m_initalized) 177 { 178 pthread_mutex_lock(&m_mutex); 179 m_state = true; 180 pthread_cond_broadcast(&m_cond); 181 pthread_mutex_unlock(&m_mutex); 182 } 183 } 184 } 185 186 /// Reset the event manually 187 void reset() 188 { 189 version (Windows) 190 { 191 if (m_event) 192 ResetEvent(m_event); 193 } 194 else version (Posix) 195 { 196 if (m_initalized) 197 { 198 pthread_mutex_lock(&m_mutex); 199 m_state = false; 200 pthread_mutex_unlock(&m_mutex); 201 } 202 } 203 } 204 205 /** 206 * Wait for the event to be signaled without timeout. 207 * 208 * Returns: 209 * `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured 210 */ 211 bool wait() 212 { 213 version (Windows) 214 { 215 return m_event && WaitForSingleObject(m_event, INFINITE) == WAIT_OBJECT_0; 216 } 217 else version (Posix) 218 { 219 return wait(Duration.max); 220 } 221 } 222 223 /** 224 * Wait for the event to be signaled with timeout. 225 * 226 * Params: 227 * tmout = the maximum time to wait 228 * Returns: 229 * `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or 230 * the event is uninitialized or another error occured 231 */ 232 bool wait(Duration tmout) 233 { 234 version (Windows) 235 { 236 if (!m_event) 237 return false; 238 239 auto maxWaitMillis = dur!("msecs")(uint.max - 1); 240 241 while (tmout > maxWaitMillis) 242 { 243 auto res = WaitForSingleObject(m_event, uint.max - 1); 244 if (res != WAIT_TIMEOUT) 245 return res == WAIT_OBJECT_0; 246 tmout -= maxWaitMillis; 247 } 248 auto ms = cast(uint)(tmout.total!"msecs"); 249 return WaitForSingleObject(m_event, ms) == WAIT_OBJECT_0; 250 } 251 else version (Posix) 252 { 253 if (!m_initalized) 254 return false; 255 256 pthread_mutex_lock(&m_mutex); 257 258 int result = 0; 259 if (!m_state) 260 { 261 if (tmout == Duration.max) 262 { 263 result = pthread_cond_wait(&m_cond, &m_mutex); 264 } 265 else 266 { 267 import core.sync.config; 268 269 timespec t = void; 270 mktspec(t, tmout); 271 272 result = pthread_cond_timedwait(&m_cond, &m_mutex, &t); 273 } 274 } 275 if (result == 0 && !m_manualReset) 276 m_state = false; 277 278 pthread_mutex_unlock(&m_mutex); 279 280 return result == 0; 281 } 282 } 283 284 private: 285 version (Windows) 286 { 287 HANDLE m_event; 288 } 289 else version (Posix) 290 { 291 pthread_mutex_t m_mutex; 292 pthread_cond_t m_cond; 293 bool m_initalized; 294 bool m_state; 295 bool m_manualReset; 296 } 297 } 298 299 // Test single-thread (non-shared) use. 300 @nogc nothrow unittest 301 { 302 // auto-reset, initial state false 303 Event ev1 = Event(false, false); 304 assert(!ev1.wait(1.dur!"msecs")); 305 ev1.set(); 306 assert(ev1.wait()); 307 assert(!ev1.wait(1.dur!"msecs")); 308 309 // manual-reset, initial state true 310 Event ev2 = Event(true, true); 311 assert(ev2.wait()); 312 assert(ev2.wait()); 313 ev2.reset(); 314 assert(!ev2.wait(1.dur!"msecs")); 315 } 316 317 unittest 318 { 319 import core.thread, core.atomic; 320 321 scope event = new Event(true, false); 322 int numThreads = 10; 323 shared int numRunning = 0; 324 325 void testFn() 326 { 327 event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner 328 numRunning.atomicOp!"+="(1); 329 } 330 331 auto group = new ThreadGroup; 332 333 for (int i = 0; i < numThreads; ++i) 334 group.create(&testFn); 335 336 auto start = MonoTime.currTime; 337 assert(numRunning == 0); 338 339 event.set(); 340 group.joinAll(); 341 342 assert(numRunning == numThreads); 343 344 assert(MonoTime.currTime - start < 5.dur!"seconds"); 345 }