Line data Source code
1 : /* t-poll.c - Check the poll function
2 : * Copyright (C) 2015 g10 Code GmbH
3 : *
4 : * This file is part of libgpg-error.
5 : *
6 : * libgpg-error is free software; you can redistribute it and/or
7 : * modify it under the terms of the GNU Lesser General Public License
8 : * as published by the Free Software Foundation; either version 2.1 of
9 : * the License, or (at your option) any later version.
10 : *
11 : * libgpg-error is distributed in the hope that it will be useful, but
12 : * WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 : * Lesser General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public
17 : * License along with this program; if not, see <https://www.gnu.org/licenses/>.
18 : */
19 :
20 : /* FIXME: We need much better tests that this very basic one. */
21 :
22 : #if HAVE_CONFIG_H
23 : # include <config.h>
24 : #endif
25 :
26 : #include <stdio.h>
27 : #include <stdlib.h>
28 : #include <string.h>
29 : #include <assert.h>
30 : #include <sys/types.h>
31 : #include <unistd.h>
32 : #include <errno.h>
33 : #ifdef _WIN32
34 : # include <windows.h>
35 : # include <time.h>
36 : #else
37 : # ifdef USE_POSIX_THREADS
38 : # include <pthread.h>
39 : # endif
40 : #endif
41 :
42 : #define PGM "t-poll"
43 :
44 : #include "t-common.h"
45 :
46 : #ifdef _WIN32
47 : # define THREAD_RET_TYPE DWORD WINAPI
48 : # define THREAD_RET_VALUE 0
49 : #else
50 : # define THREAD_RET_TYPE void *
51 : # define THREAD_RET_VALUE NULL
52 : #endif
53 :
54 :
55 : /* Object to convey data to a thread. */
56 : struct thread_arg
57 : {
58 : const char *name;
59 : estream_t stream;
60 : volatile int stop_me;
61 : #ifdef USE_POSIX_THREADS
62 : pthread_t thread;
63 : #elif _WIN32
64 : HANDLE thread;
65 : #endif
66 : };
67 :
68 :
69 : static struct thread_arg peer_stdin; /* Thread to feed the stdin. */
70 : static struct thread_arg peer_stdout; /* Thread to feed the stdout. */
71 : static struct thread_arg peer_stderr; /* Thread to feed the stderr. */
72 :
73 : static estream_t test_stdin;
74 : static estream_t test_stdout;
75 : static estream_t test_stderr;
76 :
77 : #if defined(_WIN32) || defined(USE_POSIX_THREADS)
78 :
79 : /* This thread feeds data to the given stream. */
80 : static THREAD_RET_TYPE
81 1 : producer_thread (void *argaddr)
82 : {
83 : struct thread_arg *arg = argaddr;
84 : int i = 0;
85 :
86 : (void)arg;
87 :
88 5 : while (!arg->stop_me && i++ < 3)
89 : {
90 3 : show ("thread '%s' about to write\n", arg->name);
91 3 : es_fprintf (arg->stream, "This is '%s' count=%d\n", arg->name, i);
92 3 : es_fflush (arg->stream);
93 : }
94 1 : es_fclose (arg->stream);
95 1 : return THREAD_RET_VALUE;
96 : }
97 :
98 : /* This thread eats data from the given stream. */
99 : static THREAD_RET_TYPE
100 2 : consumer_thread (void *argaddr)
101 : {
102 : struct thread_arg *arg = argaddr;
103 : char buf[15];
104 :
105 : (void)arg;
106 :
107 13 : while (!arg->stop_me)
108 : {
109 11 : show ("thread '%s' ready to read\n", arg->name);
110 11 : if (!es_fgets (buf, sizeof buf, arg->stream))
111 : {
112 2 : show ("Thread '%s' received EOF or error\n", arg->name);
113 2 : break;
114 : }
115 9 : show ("Thread '%s' got: '%s'\n", arg->name, buf);
116 : }
117 2 : es_fclose (arg->stream);
118 2 : return THREAD_RET_VALUE;
119 : }
120 :
121 : #endif /*_WIN32 || USE_POSIX_THREADS */
122 :
123 :
124 : static void
125 3 : launch_thread (THREAD_RET_TYPE (*fnc)(void *), struct thread_arg *th)
126 : {
127 : int fd;
128 :
129 3 : th->stop_me = 0;
130 3 : fd = es_fileno (th->stream);
131 : #ifdef _WIN32
132 :
133 : th->thread = CreateThread (NULL, 0, fnc, th, 0, NULL);
134 : if (!th->thread)
135 : die ("creating thread '%s' failed: rc=%d", th->name, (int)GetLastError ());
136 : show ("thread '%s' launched (fd=%d)\n", th->name, fd);
137 :
138 : #elif USE_POSIX_THREADS
139 :
140 3 : if (pthread_create (&th->thread, NULL, fnc, th))
141 0 : die ("creating thread '%s' failed: %s\n", th->name, strerror (errno));
142 3 : show ("thread '%s' launched (fd=%d)\n", th->name, fd);
143 :
144 : # else /* no thread support */
145 :
146 : verbose++;
147 : show ("no thread support - skipping test\n", PGM);
148 : verbose--;
149 :
150 : #endif /* no thread support */
151 3 : }
152 :
153 :
154 : static void
155 3 : join_thread (struct thread_arg *th)
156 : {
157 : #ifdef _WIN32
158 : int rc;
159 :
160 : rc = WaitForSingleObject (th->thread, INFINITE);
161 : if (rc == WAIT_OBJECT_0)
162 : show ("thread '%s' has terminated\n", th->name);
163 : else
164 : fail ("waiting for thread '%s' failed: %d", th->name, (int)GetLastError ());
165 : CloseHandle (th->thread);
166 :
167 : #elif USE_POSIX_THREADS
168 :
169 3 : pthread_join (th->thread, NULL);
170 3 : show ("thread '%s' has terminated\n", th->name);
171 :
172 : #endif
173 3 : }
174 :
175 :
176 : static void
177 3 : create_pipe (estream_t *r_in, estream_t *r_out)
178 : {
179 : gpg_error_t err;
180 : int filedes[2];
181 :
182 : #ifdef _WIN32
183 : if (_pipe (filedes, 512, 0) == -1)
184 : #else
185 3 : if (pipe (filedes) == -1)
186 : #endif
187 : {
188 : err = gpg_error_from_syserror ();
189 0 : die ("error creating a pipe: %s\n", gpg_strerror (err));
190 : }
191 :
192 3 : show ("created pipe [%d, %d]\n", filedes[0], filedes[1]);
193 :
194 3 : *r_in = es_fdopen (filedes[0], "r,pollable");
195 3 : if (!*r_in)
196 : {
197 : err = gpg_error_from_syserror ();
198 0 : die ("error creating a stream for a pipe: %s\n", gpg_strerror (err));
199 : }
200 :
201 3 : *r_out = es_fdopen (filedes[1], "w,pollable");
202 3 : if (!*r_out)
203 : {
204 : err = gpg_error_from_syserror ();
205 0 : die ("error creating a stream for a pipe: %s\n", gpg_strerror (err));
206 : }
207 3 : }
208 :
209 :
210 : static void
211 1 : test_poll (void)
212 : {
213 : int ret;
214 : gpgrt_poll_t fds[3];
215 : char buffer[16];
216 : size_t used, nwritten;
217 : int c;
218 :
219 1 : memset (fds, 0, sizeof fds);
220 1 : fds[0].stream = test_stdin;
221 1 : fds[0].want_read = 1;
222 1 : fds[1].stream = test_stdout;
223 1 : fds[1].want_write = 1;
224 : /* FIXME: We don't use the next stream at all. */
225 1 : fds[2].stream = test_stderr;
226 1 : fds[2].want_write = 1;
227 1 : fds[2].ignore = 1;
228 :
229 :
230 : used = 0;
231 15 : while (used || !fds[0].ignore)
232 : {
233 13 : ret = gpgrt_poll (fds, DIM(fds), -1);
234 13 : if (ret == -1)
235 : {
236 0 : fail ("gpgrt_poll failed: %s\n", strerror (errno));
237 0 : continue;
238 : }
239 13 : if (!ret)
240 : {
241 0 : fail ("gpgrt_poll unexpectedly timed out\n");
242 0 : continue;
243 : }
244 :
245 13 : show ("gpgrt_poll detected %d events\n", ret);
246 13 : if (debug)
247 0 : show ("gpgrt_poll: r=%d"
248 : " 0:%c%c%c%c%c%c%c%c%c%c%c%c"
249 : " 1:%c%c%c%c%c%c%c%c%c%c%c%c"
250 : " 2:%c%c%c%c%c%c%c%c%c%c%c%c"
251 : "\n",
252 : ret,
253 0 : fds[0].want_read? 'r':'-',
254 0 : fds[0].want_write? 'w':'-',
255 0 : fds[0].want_oob? 'o':'-',
256 0 : fds[0].want_rdhup? 'h':'-',
257 : fds[0].ignore? '!':'=',
258 0 : fds[0].got_read? 'r':'-',
259 0 : fds[0].got_write? 'w':'-',
260 0 : fds[0].got_oob? 'o':'-',
261 0 : fds[0].got_rdhup? 'h':'-',
262 0 : fds[0].got_hup? 'H':' ',
263 0 : fds[0].got_err? 'e':' ',
264 0 : fds[0].got_nval? 'n':' ',
265 :
266 0 : fds[1].want_read? 'r':'-',
267 0 : fds[1].want_write? 'w':'-',
268 0 : fds[1].want_oob? 'o':'-',
269 0 : fds[1].want_rdhup? 'h':'-',
270 : fds[1].ignore? '!':'=',
271 0 : fds[1].got_read? 'r':'-',
272 0 : fds[1].got_write? 'w':'-',
273 0 : fds[1].got_oob? 'o':'-',
274 0 : fds[1].got_rdhup? 'h':'-',
275 0 : fds[1].got_hup? 'H':' ',
276 0 : fds[1].got_err? 'e':' ',
277 0 : fds[1].got_nval? 'n':' ',
278 :
279 0 : fds[2].want_read? 'r':'-',
280 0 : fds[2].want_write? 'w':'-',
281 0 : fds[2].want_oob? 'o':'-',
282 0 : fds[2].want_rdhup? 'h':'-',
283 : fds[2].ignore? '!':'=',
284 0 : fds[2].got_read? 'r':'-',
285 0 : fds[2].got_write? 'w':'-',
286 0 : fds[2].got_oob? 'o':'-',
287 0 : fds[2].got_rdhup? 'h':'-',
288 0 : fds[2].got_hup? 'H':' ',
289 0 : fds[2].got_err? 'e':' ',
290 0 : fds[2].got_nval? 'n':' '
291 : );
292 : else
293 13 : show ("gpgrt_poll detected %d events\n", ret);
294 :
295 13 : if (fds[0].got_read)
296 : {
297 : /* Read from the producer. */
298 : for (;;)
299 : {
300 100 : c = es_fgetc (fds[0].stream);
301 100 : if (c == EOF)
302 : {
303 1 : if (es_feof (fds[0].stream))
304 : {
305 1 : show ("reading '%s': EOF\n", peer_stdin.name);
306 1 : fds[0].ignore = 1; /* Not anymore needed. */
307 1 : peer_stdin.stop_me = 1; /* Tell the thread to stop. */
308 : }
309 0 : else if (es_ferror (fds[0].stream))
310 : {
311 0 : fail ("error reading '%s': %s\n",
312 0 : peer_stdin.name, strerror (errno));
313 0 : fds[0].ignore = 1; /* Disable. */
314 0 : peer_stdin.stop_me = 1; /* Tell the thread to stop. */
315 : }
316 : else
317 0 : show ("reading '%s': EAGAIN\n", peer_stdin.name);
318 : break;
319 : }
320 : else
321 : {
322 99 : if (used <= sizeof buffer -1)
323 99 : buffer[used++] = c;
324 99 : if (used == sizeof buffer)
325 : {
326 6 : show ("throttling reading from '%s'\n", peer_stdin.name);
327 6 : fds[0].ignore = 1;
328 6 : break;
329 : }
330 : }
331 : }
332 7 : show ("read from '%s': %zu bytes\n", peer_stdin.name, used);
333 7 : if (used)
334 7 : fds[1].ignore = 0; /* Data to send. */
335 : }
336 13 : if (fds[1].got_write)
337 : {
338 7 : if (used)
339 : {
340 7 : ret = es_write (fds[1].stream, buffer, used, &nwritten);
341 7 : show ("result for writing to '%s': ret=%d, n=%zu, nwritten=%zu\n",
342 : peer_stdout.name, ret, used, nwritten);
343 7 : if (!ret)
344 : {
345 7 : assert (nwritten <= used);
346 : /* Move the remaining data to the front of buffer. */
347 7 : memmove (buffer, buffer + nwritten,
348 : sizeof buffer - nwritten);
349 7 : used -= nwritten;
350 : }
351 7 : ret = es_fflush (fds[1].stream);
352 7 : if (ret)
353 0 : fail ("Flushing for '%s' failed: %s\n",
354 0 : peer_stdout.name, strerror (errno));
355 : }
356 7 : if (!used)
357 7 : fds[1].ignore = 1; /* No need to send data. */
358 : }
359 :
360 13 : if (used < sizeof buffer / 2 && !peer_stdin.stop_me && fds[0].ignore)
361 : {
362 6 : show ("accelerate reading from '%s'\n", peer_stdin.name);
363 6 : fds[0].ignore = 0;
364 : }
365 : }
366 1 : }
367 :
368 :
369 : int
370 1 : main (int argc, char **argv)
371 : {
372 : int last_argc = -1;
373 :
374 1 : if (argc)
375 : {
376 1 : argc--; argv++;
377 : }
378 1 : while (argc && last_argc != argc )
379 : {
380 : last_argc = argc;
381 0 : if (!strcmp (*argv, "--help"))
382 : {
383 0 : puts (
384 : "usage: ./t-poll [options]\n"
385 : "\n"
386 : "Options:\n"
387 : " --verbose Show what is going on\n"
388 : " --debug Flyswatter\n"
389 : );
390 0 : exit (0);
391 : }
392 0 : if (!strcmp (*argv, "--verbose"))
393 : {
394 0 : verbose = 1;
395 0 : argc--; argv++;
396 : }
397 0 : else if (!strcmp (*argv, "--debug"))
398 : {
399 0 : verbose = debug = 1;
400 0 : argc--; argv++;
401 : }
402 : }
403 :
404 1 : if (!gpg_error_check_version (GPG_ERROR_VERSION))
405 : {
406 0 : die ("gpg_error_check_version returned an error");
407 : errorcount++;
408 : }
409 :
410 1 : peer_stdin.name = "stdin producer";
411 1 : create_pipe (&test_stdin, &peer_stdin.stream);
412 1 : peer_stdout.name = "stdout consumer";
413 1 : create_pipe (&peer_stdout.stream, &test_stdout);
414 1 : peer_stderr.name = "stderr consumer";
415 1 : create_pipe (&peer_stderr.stream, &test_stderr);
416 :
417 1 : if (es_set_nonblock (test_stdin, 1))
418 0 : fail ("error setting test_stdin to nonblock: %s\n", strerror (errno));
419 1 : if (es_set_nonblock (test_stdout, 1))
420 0 : fail ("error setting test_stdout to nonblock: %s\n", strerror (errno));
421 1 : if (es_set_nonblock (test_stderr, 1))
422 0 : fail ("error setting test_stderr to nonblock: %s\n", strerror (errno));
423 :
424 1 : launch_thread (producer_thread, &peer_stdin );
425 1 : launch_thread (consumer_thread, &peer_stdout);
426 1 : launch_thread (consumer_thread, &peer_stderr);
427 1 : test_poll ();
428 1 : show ("Waiting for threads to terminate...\n");
429 1 : es_fclose (test_stdin);
430 1 : es_fclose (test_stdout);
431 1 : es_fclose (test_stderr);
432 1 : peer_stdin.stop_me = 1;
433 1 : peer_stdout.stop_me = 1;
434 1 : peer_stderr.stop_me = 1;
435 1 : join_thread (&peer_stdin);
436 1 : join_thread (&peer_stdout);
437 1 : join_thread (&peer_stderr);
438 :
439 1 : return errorcount ? 1 : 0;
440 : }
|