1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
|
=encoding utf-8
=head1 Name
C<ngx.pipe> - spawn and communicate with OS processes via stdin/stdout/stderr in
a non-blocking fashion.
=head1 Status
This Lua module is production ready.
=head1 Synopsis
location = /t {
content_by_lua_block {
local ngx_pipe = require "ngx.pipe"
local select = select
local function count_char(...)
local proc = ngx_pipe.spawn({'wc', '-c'})
local n = select('#', ...)
for i = 1, n do
local arg = select(i, ...)
local bytes, err = proc:write(arg)
if not bytes then
ngx.say(err)
return
end
end
local ok, err = proc:shutdown('stdin')
if not ok then
ngx.say(err)
return
end
local data, err = proc:stdout_read_line()
if not data then
ngx.say(err)
return
end
ngx.say(data)
end
count_char(("1234"):rep(2048))
}
}
This example counts characters (bytes) directly fed by OpenResty to the UNIX
command C<wc>.
You could not do this with either C<io.popen> or C<os.execute> because C<wc> will
not output the result until its stdin is closed.
=head1 Description
This module does not support non-POSIX operating systems like Windows yet.
If you are not using the Nginx core shipped with OpenResty, you will need to
apply the C<socket_cloexec> patch to the standard Nginx core.
Under the hood, this module uses C<fork> and C<execvp> with the user-specified
command, and communicate with such spawned processes via the POSIX C<pipe> API,
which contributes to the name of this module.
A signal handler for C<SIGCHLD> is registered so that we can receive a
notification once the spawned processes exited.
We combine the above implementation with Nginx's event mechanism and
OpenResty's Lua coroutine scheduler, in order to ensure communication with the
spawned processes is non-blocking.
The communication APIs do not work in phases which do not support yielding,
such as C<init_worker_by_lua*> or C<log_by_lua*>, because there is no way to
yield the current light thread to avoid blocking the OS thread when
communicating with processes in those phases.
=head1 Methods
=head2 spawn
B<syntax:> I<proc, err = pipe_module.spawn(args, opts?)>
B<context:> I<all phases except init_by_luaE<42>>
Creates and returns a new sub-process instance we can communicate with later.
For example:
local ngx_pipe = require "ngx.pipe"
local proc, err = ngx_pipe.spawn({"sh", "-c", "sleep 0.1 && exit 2"})
if not proc then
ngx.say(err)
return
end
In case of failure, this function returns C<nil> and a string describing the
error.
The sub-process will be killed via C<SIGKILL> if it is still alive when the
instance is collected by the garbage collector.
Note that C<args> should either be a single level array-like Lua table with
string values, or just a single string.
Some more examples:
local proc, err = ngx_pipe.spawn({"ls", "-l"})
local proc, err = ngx_pipe.spawn({"perl", "-e", "print 'hello, wolrd'"})
If C<args> is specified as a string, it will be executed by the operating system
shell, just like C<os.execute>. The above example could thus be rewritten as:
local ngx_pipe = require "ngx.pipe"
local proc, err = ngx_pipe.spawn("sleep 0.1 && exit 2")
if not proc then
ngx.say(err)
return
end
In the shell mode, you should be very careful about shell injection attacks
when interpolating variables into command string, especially variables from
untrusted sources. Please make sure that you escape those variables while
assembling the command string. For this reason, it is highly recommended to use
the multi-arguments form (C<args> as a table) to specify each command-line
argument explicitly.
Since by default, Nginx does not pass along the C<PATH> system environment
variable, you will need to configure the C<env PATH> directive if you wish for
it to be respected during the searching of sub-processes:
env PATH;
...
content_by_lua_block {
local ngx_pipe = require "ngx.pipe"
local proc = ngx_pipe.spawn({'ls'})
}
The optional table argument C<opts> can be used to control the behavior of
spawned processes. For instance:
local opts = {
merge_stderr = true,
buffer_size = 256,
environ = {"PATH=/tmp/bin", "CWD=/tmp/work"}
}
local proc, err = ngx_pipe.spawn({"sh", "-c", ">&2 echo data"}, opts)
if not proc then
ngx.say(err)
return
end
The following options are supported:
=over
=item *
C<merge_stderr>: when set to C<true>, the output to stderr will be redirected
to stdout in the spawned process. This is similar to doing C<< 2>&1 >> in a shell.
=item *
C<buffer_size>: specifies the buffer size used by reading operations, in
bytes. The default buffer size is C<4096>.
=item *
C<environ>: specifies environment variables for the spawned process. The value
must be a single-level, array-like Lua table with string values. If the
current platform does not support this option, C<nil> plus a string `"environ
option not supported"` will be returned.
=item *
C<write_timeout>: specifies the write timeout threshold, in milliseconds. The
default threshold is C<10000>. If the threshold is C<0>, the write operation
will never time out.
=item *
C<stdout_read_timeout>: specifies the stdout read timeout threshold, in
milliseconds. The default threshold is C<10000>. If the threshold is C<0>, the
stdout read operation will never time out.
=item *
C<stderr_read_timeout>: specifies the stderr read timeout threshold, in
milliseconds. The default threshold is C<10000>. If the threshold is C<0>, the
stderr read operation will never time out.
=item *
C<wait_timeout>: specifies the wait timeout threshold, in milliseconds. The
default threshold is C<10000>. If the threshold is C<0>, the wait operation
will never time out.
=back
=head2 set_timeouts
B<syntax:> I<proc:set_timeouts(write_timeout?, stdout_read_timeout?, stderr_read_timeout?, wait_timeout?)>
Respectively sets: the write timeout threshold, stdout read timeout threshold,
stderr read timeout threshold, and wait timeout threshold. All timeouts are in
milliseconds.
The default threshold for each timeout is 10 seconds.
If the specified timeout argument is C<nil>, the corresponding timeout threshold
will not be changed. For example:
local proc, err = ngx_pipe.spawn({"sleep", "10s"})
-- only change the wait_timeout to 0.1 second.
proc:set_timeouts(nil, nil, nil, 100)
-- only change the send_timeout to 0.1 second.
proc:set_timeouts(100)
If the specified timeout argument is C<0>, the corresponding operation will
never time out.
=head2 wait
B<syntax:> I<ok, reason, status = proc:wait()>
B<context:> I<phases that support yielding>
Waits until the current sub-process exits.
It is possible to control how long to wait via L<set_timeouts>.
The default timeout is 10 seconds.
If process exited with status code zero, the C<ok> return value will be C<true>.
If process exited abnormally, the C<ok> return value will be C<false>.
The second return value, C<reason>, will be a string. Its values may be:
=over
=item *
C<exit>: the process exited by calling C<exit(3)>, C<_exit(2)>, or by
returning from C<main()>. In this case, C<status> will be the exit code.
=item *
C<signal>: the process was terminated by a signal. In this case, C<status> will
be the signal number.
=back
Note that only one light thread can wait on a process at a time. If another
light thread tries to wait on a process, the return values will be C<nil> and
the error string C<"pipe busy waiting">.
If a thread tries to wait an exited process, the return values will be C<nil>
and the error string C<"exited">.
=head2 pid
B<syntax:> I<pid = proc:pid()>
Returns the pid number of the sub-process.
=head2 kill
B<syntax:> I<ok, err = proc:kill(signum)>
Sends a signal to the sub-process.
Note that the C<signum> argument should be signal's numerical value. If the
specified C<signum> is not a number, an error will be thrown.
You should use [lua-resty-signal's signum()
function](https://github.com/openresty/lua-resty-signal#signum) to convert
signal names to signal numbers in order to ensure portability of your
application.
In case of success, this method returns C<true>. Otherwise, it returns C<nil> and
a string describing the error.
Killing an exited sub-process will return C<nil> and the error string
C<"exited">.
Sending an invalid signal to the process will return C<nil> and the error string
C<"invalid signal">.
=head2 shutdown
B<syntax:> I<ok, err = proc:shutdown(direction)>
Closes the specified direction of the current sub-process.
The C<direction> argument should be one of these three values: C<stdin>, C<stdout>
and C<stderr>.
In case of success, this method returns C<true>. Otherwise, it returns C<nil> and
a string describing the error.
If the C<merge_stderr> option is specified in L<spawn>, closing the
C<stderr> direction will return C<nil> and the error string C<"merged to stdout">.
Shutting down a direction when a light thread is waiting on it (such as during
reading or writing) will abort the light thread and return C<true>.
Shutting down directions of an exited process will return C<nil> and the error
string C<"closed">.
It is fine to shut down the same direction of the same stream multiple times;
no side effects are to be expected.
=head2 write
B<syntax:> I<nbytes, err = proc:write(data)>
B<context:> I<phases that support yielding>
Writes data to the current sub-process's stdin stream.
The C<data> argument can be a string or a single level array-like Lua table with
string values.
This method is a synchronous and non-blocking operation that will not return
until I<all> the data has been flushed to the sub-process's stdin buffer, or
an error occurs.
In case of success, it returns the total number of bytes that have been sent.
Otherwise, it returns C<nil> and a string describing the error.
The timeout threshold of this C<write> operation can be controlled by the
L<set_timeouts> method. The default timeout threshold is 10
seconds.
When a timeout occurs, the data may be partially written into the sub-process's
stdin buffer and read by the sub-process.
Only one light thread is allowed to write to the sub-process at a time. If
another light thread tries to write to it, this method will return C<nil> and
the error string C<"pipe busy writing">.
If the C<write> operation is aborted by the L<shutdown> method,
it will return C<nil> and the error string C<"aborted">.
Writing to an exited sub-process will return C<nil> and the error string
C<"closed">.
=head2 stderr_read_all
B<syntax:> I<data, err, partial = proc:stderr_read_all()>
B<context:> I<phases that support yielding>
Reads all data from the current sub-process's stderr stream until it is closed.
This method is a synchronous and non-blocking operation, just like the
L<write> method.
The timeout threshold of this reading operation can be controlled by
L<set_timeouts>. The default timeout is 10 seconds.
In case of success, it returns the data received. Otherwise, it returns three
values: C<nil>, a string describing the error, and, optionally, the partial data
received so far.
When C<merge_stderr> is specified in L<spawn>, calling C<stderr_read_all>
will return C<nil> and the error string C<"merged to stdout">.
Only one light thread is allowed to read from a sub-process's stderr or stdout
stream at a time. If another thread tries to read from the same stream, this
method will return C<nil> and the error string C<"pipe busy reading">.
If the reading operation is aborted by the L<shutdown> method,
it will return C<nil> and the error string C<"aborted">.
Streams for stdout and stderr are separated, so at most two light threads may
be reading from a sub-process at a time (one for each stream).
The same way, a light thread may read from a stream while another light thread
is writing to the sub-process stdin stream.
Reading from an exited process's stream will return C<nil> and the error string
C<"closed">.
=head2 stdout_read_all
B<syntax:> I<data, err, partial = proc:stdout_read_all()>
B<context:> I<phases that support yielding>
Similar to the L<stderr_read_all> method, but reading from the
stdout stream of the sub-process.
=head2 stderr_read_line
B<syntax:> I<data, err, partial = proc:stderr_read_line()>
B<context:> I<phases that support yielding>
Reads from stderr like L<stderr_read_all>, but only reads a
single line of data.
When C<merge_stderr> is specified in L<spawn>, calling C<stderr_read_line>
will return C<nil> plus the error string C<"merged to stdout">.
When the data stream is truncated without a new-line character, it returns 3
values: C<nil>, the error string C<"closed">, and the partial data received so
far.
The line should be terminated by a C<Line Feed> (LF) character (ASCII 10),
optionally preceded by a C<Carriage Return> (CR) character (ASCII 13). The CR
and LF characters are not included in the returned line data.
=head2 stdout_read_line
B<syntax:> I<data, err, partial = proc:stdout_read_line()>
B<context:> I<phases that support yielding>
Similar to L<stderr_read_line>, but reading from the
stdout stream of the sub-process.
=head2 stderr_read_bytes
B<syntax:> I<data, err, partial = proc:stderr_read_bytes(len)>
B<context:> I<phases that support yielding>
Reads from stderr like L<stderr_read_all>, but only reads the
specified number of bytes.
If C<merge_stderr> is specified in L<spawn>, calling C<stderr_read_bytes>
will return C<nil> plus the error string C<"merged to stdout">.
If the data stream is truncated (fewer bytes of data available than requested),
this method returns 3 values: C<nil>, the error string C<"closed">, and the
partial data string received so far.
=head2 stdout_read_bytes
B<syntax:> I<data, err, partial = proc:stdout_read_bytes(len)>
B<context:> I<phases that support yielding>
Similar to L<stderr_read_bytes>, but reading from the
stdout stream of the sub-process.
=head2 stderr_read_any
B<syntax:> I<data, err = proc:stderr_read_any(max)>
B<context:> I<phases that support yielding>
Reads from stderr like L<stderr_read_all>, but returns
immediately when any amount of data is received.
At most C<max> bytes are received.
If C<merge_stderr> is specified in L<spawn>, calling C<stderr_read_any>
will return C<nil> plus the error string C<"merged to stdout">.
If the received data is more than C<max> bytes, this method will return with
exactly C<max> bytes of data. The remaining data in the underlying receive
buffer can be fetched with a subsequent reading operation.
=head2 stdout_read_any
B<syntax:> I<data, err = proc:stdout_read_any(max)>
B<context:> I<phases that support yielding>
Similar to L<stderr_read_any>, but reading from the stdout
stream of the sub-process.
=head1 Community
=head2 English Mailing List
The L<openresty-en|https://groups.google.com/group/openresty-en> mailing list
is for English speakers.
=head2 Chinese Mailing List
The L<openresty|https://groups.google.com/group/openresty> mailing list is for
Chinese speakers.
=head1 Bugs and Patches
Please report bugs or submit patches by
=over
=item 1.
creating a ticket on the L<GitHub Issue Tracker|https://github.com/openresty/lua-resty-core/issues>,
=item 2.
or posting to the L<OpenResty community>.
=back
=head1 Copyright and License
This module is licensed under the BSD license.
Copyright (C) 2018, by OpenResty Inc.
All rights reserved.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
=over
=item *
Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
=back
=over
=item *
Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
=back
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
=head1 See Also
=over
=item *
the L<lua-resty-core|https://github.com/openresty/lua-resty-core> library.
=item *
the ngx_lua module: https://github.com/openresty/lua-nginx-module
=item *
OpenResty: https://openresty.org
=back
|