Source file
src/os/pipe_test.go
Documentation: os
1
2
3
4
5
6
7
8
9 package os_test
10
11 import (
12 "bufio"
13 "bytes"
14 "fmt"
15 "internal/testenv"
16 "io"
17 "io/fs"
18 "os"
19 osexec "os/exec"
20 "os/signal"
21 "runtime"
22 "strconv"
23 "strings"
24 "sync"
25 "syscall"
26 "testing"
27 "time"
28 )
29
30 func TestEPIPE(t *testing.T) {
31 r, w, err := os.Pipe()
32 if err != nil {
33 t.Fatal(err)
34 }
35 if err := r.Close(); err != nil {
36 t.Fatal(err)
37 }
38
39 expect := syscall.EPIPE
40 if runtime.GOOS == "windows" {
41
42 expect = syscall.Errno(232)
43 }
44
45 for i := 0; i < 20; i++ {
46 _, err = w.Write([]byte("hi"))
47 if err == nil {
48 t.Fatal("unexpected success of Write to broken pipe")
49 }
50 if pe, ok := err.(*fs.PathError); ok {
51 err = pe.Err
52 }
53 if se, ok := err.(*os.SyscallError); ok {
54 err = se.Err
55 }
56 if err != expect {
57 t.Errorf("iteration %d: got %v, expected %v", i, err, expect)
58 }
59 }
60 }
61
62 func TestStdPipe(t *testing.T) {
63 switch runtime.GOOS {
64 case "windows":
65 t.Skip("Windows doesn't support SIGPIPE")
66 }
67 testenv.MustHaveExec(t)
68 r, w, err := os.Pipe()
69 if err != nil {
70 t.Fatal(err)
71 }
72 if err := r.Close(); err != nil {
73 t.Fatal(err)
74 }
75
76
77
78
79
80
81 for _, sig := range []bool{false, true} {
82 for dest := 1; dest < 4; dest++ {
83 cmd := osexec.Command(os.Args[0], "-test.run", "TestStdPipeHelper")
84 cmd.Stdout = w
85 cmd.Stderr = w
86 cmd.ExtraFiles = []*os.File{w}
87 cmd.Env = append(os.Environ(), fmt.Sprintf("GO_TEST_STD_PIPE_HELPER=%d", dest))
88 if sig {
89 cmd.Env = append(cmd.Env, "GO_TEST_STD_PIPE_HELPER_SIGNAL=1")
90 }
91 if err := cmd.Run(); err == nil {
92 if !sig && dest < 3 {
93 t.Errorf("unexpected success of write to closed pipe %d sig %t in child", dest, sig)
94 }
95 } else if ee, ok := err.(*osexec.ExitError); !ok {
96 t.Errorf("unexpected exec error type %T: %v", err, err)
97 } else if ws, ok := ee.Sys().(syscall.WaitStatus); !ok {
98 t.Errorf("unexpected wait status type %T: %v", ee.Sys(), ee.Sys())
99 } else if ws.Signaled() && ws.Signal() == syscall.SIGPIPE {
100 if sig || dest > 2 {
101 t.Errorf("unexpected SIGPIPE signal for descriptor %d sig %t", dest, sig)
102 }
103 } else {
104 t.Errorf("unexpected exit status %v for descriptor %d sig %t", err, dest, sig)
105 }
106 }
107 }
108
109
110 cmd := osexec.Command(os.Args[0], "-test.run", "TestStdPipeHelper")
111 cmd.Stdout = w
112 var stderr bytes.Buffer
113 cmd.Stderr = &stderr
114 cmd.Env = append(os.Environ(), "GO_TEST_STD_PIPE_HELPER=1")
115 if err := cmd.Run(); err == nil {
116 t.Errorf("unexpected success of write to closed stdout")
117 } else if ee, ok := err.(*osexec.ExitError); !ok {
118 t.Errorf("unexpected exec error type %T: %v", err, err)
119 } else if ws, ok := ee.Sys().(syscall.WaitStatus); !ok {
120 t.Errorf("unexpected wait status type %T: %v", ee.Sys(), ee.Sys())
121 } else if !ws.Signaled() || ws.Signal() != syscall.SIGPIPE {
122 t.Errorf("unexpected exit status %v for write to closed stdout", err)
123 }
124 if output := stderr.Bytes(); len(output) > 0 {
125 t.Errorf("unexpected output on stderr: %s", output)
126 }
127 }
128
129
130 func TestStdPipeHelper(t *testing.T) {
131 if os.Getenv("GO_TEST_STD_PIPE_HELPER_SIGNAL") != "" {
132 signal.Notify(make(chan os.Signal, 1), syscall.SIGPIPE)
133 }
134 switch os.Getenv("GO_TEST_STD_PIPE_HELPER") {
135 case "1":
136 os.Stdout.Write([]byte("stdout"))
137 case "2":
138 os.Stderr.Write([]byte("stderr"))
139 case "3":
140 if _, err := os.NewFile(3, "3").Write([]byte("3")); err == nil {
141 os.Exit(3)
142 }
143 default:
144 t.Skip("skipping test helper")
145 }
146
147
148
149
150 os.Exit(0)
151 }
152
153 func testClosedPipeRace(t *testing.T, read bool) {
154 switch runtime.GOOS {
155 case "freebsd":
156 t.Skip("FreeBSD does not use the poller; issue 19093")
157 }
158
159 limit := 1
160 if !read {
161
162
163 limit = 131073
164 if b, err := os.ReadFile("/proc/sys/fs/pipe-max-size"); err == nil {
165 if i, err := strconv.Atoi(strings.TrimSpace(string(b))); err == nil {
166 limit = i + 1
167 }
168 }
169 t.Logf("using pipe write limit of %d", limit)
170 }
171
172 r, w, err := os.Pipe()
173 if err != nil {
174 t.Fatal(err)
175 }
176 defer r.Close()
177 defer w.Close()
178
179
180
181 go func() {
182
183
184
185 time.Sleep(20 * time.Millisecond)
186
187 var err error
188 if read {
189 err = r.Close()
190 } else {
191 err = w.Close()
192 }
193 if err != nil {
194 t.Error(err)
195 }
196 }()
197
198 b := make([]byte, limit)
199 if read {
200 _, err = r.Read(b[:])
201 } else {
202 _, err = w.Write(b[:])
203 }
204 if err == nil {
205 t.Error("I/O on closed pipe unexpectedly succeeded")
206 } else if pe, ok := err.(*fs.PathError); !ok {
207 t.Errorf("I/O on closed pipe returned unexpected error type %T; expected fs.PathError", pe)
208 } else if pe.Err != fs.ErrClosed {
209 t.Errorf("got error %q but expected %q", pe.Err, fs.ErrClosed)
210 } else {
211 t.Logf("I/O returned expected error %q", err)
212 }
213 }
214
215 func TestClosedPipeRaceRead(t *testing.T) {
216 testClosedPipeRace(t, true)
217 }
218
219 func TestClosedPipeRaceWrite(t *testing.T) {
220 testClosedPipeRace(t, false)
221 }
222
223
224
225
226 func TestReadNonblockingFd(t *testing.T) {
227 switch runtime.GOOS {
228 case "windows":
229 t.Skip("Windows doesn't support SetNonblock")
230 }
231 if os.Getenv("GO_WANT_READ_NONBLOCKING_FD") == "1" {
232 fd := syscallDescriptor(os.Stdin.Fd())
233 syscall.SetNonblock(fd, true)
234 defer syscall.SetNonblock(fd, false)
235 _, err := os.Stdin.Read(make([]byte, 1))
236 if err != nil {
237 if perr, ok := err.(*fs.PathError); !ok || perr.Err != syscall.EAGAIN {
238 t.Fatalf("read on nonblocking stdin got %q, should have gotten EAGAIN", err)
239 }
240 }
241 os.Exit(0)
242 }
243
244 testenv.MustHaveExec(t)
245 r, w, err := os.Pipe()
246 if err != nil {
247 t.Fatal(err)
248 }
249 defer r.Close()
250 defer w.Close()
251 cmd := osexec.Command(os.Args[0], "-test.run="+t.Name())
252 cmd.Env = append(os.Environ(), "GO_WANT_READ_NONBLOCKING_FD=1")
253 cmd.Stdin = r
254 output, err := cmd.CombinedOutput()
255 t.Logf("%s", output)
256 if err != nil {
257 t.Errorf("child process failed: %v", err)
258 }
259 }
260
261 func TestCloseWithBlockingReadByNewFile(t *testing.T) {
262 var p [2]syscallDescriptor
263 err := syscall.Pipe(p[:])
264 if err != nil {
265 t.Fatal(err)
266 }
267
268 testCloseWithBlockingRead(t, os.NewFile(uintptr(p[0]), "reader"), os.NewFile(uintptr(p[1]), "writer"))
269 }
270
271 func TestCloseWithBlockingReadByFd(t *testing.T) {
272 r, w, err := os.Pipe()
273 if err != nil {
274 t.Fatal(err)
275 }
276
277 _ = r.Fd()
278 testCloseWithBlockingRead(t, r, w)
279 }
280
281
282 func testCloseWithBlockingRead(t *testing.T, r, w *os.File) {
283 defer r.Close()
284 defer w.Close()
285
286 c1, c2 := make(chan bool), make(chan bool)
287 var wg sync.WaitGroup
288
289 wg.Add(1)
290 go func(c chan bool) {
291 defer wg.Done()
292
293
294
295 time.Sleep(20 * time.Millisecond)
296
297 if err := r.Close(); err != nil {
298 t.Error(err)
299 }
300 close(c)
301 }(c1)
302
303 wg.Add(1)
304 go func(c chan bool) {
305 defer wg.Done()
306 var b [1]byte
307 _, err := r.Read(b[:])
308 close(c)
309 if err == nil {
310 t.Error("I/O on closed pipe unexpectedly succeeded")
311 }
312 if pe, ok := err.(*fs.PathError); ok {
313 err = pe.Err
314 }
315 if err != io.EOF && err != fs.ErrClosed {
316 t.Errorf("got %v, expected EOF or closed", err)
317 }
318 }(c2)
319
320 for c1 != nil || c2 != nil {
321 select {
322 case <-c1:
323 c1 = nil
324
325
326 w.Close()
327 case <-c2:
328 c2 = nil
329 case <-time.After(1 * time.Second):
330 switch {
331 case c1 != nil && c2 != nil:
332 t.Error("timed out waiting for Read and Close")
333 w.Close()
334 case c1 != nil:
335 t.Error("timed out waiting for Close")
336 case c2 != nil:
337 t.Error("timed out waiting for Read")
338 default:
339 t.Error("impossible case")
340 }
341 }
342 }
343
344 wg.Wait()
345 }
346
347
348 func TestPipeEOF(t *testing.T) {
349 r, w, err := os.Pipe()
350 if err != nil {
351 t.Fatal(err)
352 }
353
354 var wg sync.WaitGroup
355 wg.Add(1)
356 go func() {
357 defer wg.Done()
358
359 defer func() {
360 if err := w.Close(); err != nil {
361 t.Errorf("error closing writer: %v", err)
362 }
363 }()
364
365 for i := 0; i < 3; i++ {
366 time.Sleep(10 * time.Millisecond)
367 _, err := fmt.Fprintf(w, "line %d\n", i)
368 if err != nil {
369 t.Errorf("error writing to fifo: %v", err)
370 return
371 }
372 }
373 time.Sleep(10 * time.Millisecond)
374 }()
375
376 defer wg.Wait()
377
378 done := make(chan bool)
379 go func() {
380 defer close(done)
381
382 defer func() {
383 if err := r.Close(); err != nil {
384 t.Errorf("error closing reader: %v", err)
385 }
386 }()
387
388 rbuf := bufio.NewReader(r)
389 for {
390 b, err := rbuf.ReadBytes('\n')
391 if err == io.EOF {
392 break
393 }
394 if err != nil {
395 t.Error(err)
396 return
397 }
398 t.Logf("%s\n", bytes.TrimSpace(b))
399 }
400 }()
401
402 select {
403 case <-done:
404
405 case <-time.After(time.Second):
406 t.Error("timed out waiting for read")
407
408 r.Close()
409 }
410 }
411
412
413 func TestFdRace(t *testing.T) {
414 r, w, err := os.Pipe()
415 if err != nil {
416 t.Fatal(err)
417 }
418 defer r.Close()
419 defer w.Close()
420
421 var wg sync.WaitGroup
422 call := func() {
423 defer wg.Done()
424 w.Fd()
425 }
426
427 const tries = 100
428 for i := 0; i < tries; i++ {
429 wg.Add(1)
430 go call()
431 }
432 wg.Wait()
433 }
434
435 func TestFdReadRace(t *testing.T) {
436 t.Parallel()
437
438 r, w, err := os.Pipe()
439 if err != nil {
440 t.Fatal(err)
441 }
442 defer r.Close()
443 defer w.Close()
444
445 const count = 10
446
447 c := make(chan bool, 1)
448 var wg sync.WaitGroup
449 wg.Add(1)
450 go func() {
451 defer wg.Done()
452 var buf [count]byte
453 r.SetReadDeadline(time.Now().Add(time.Minute))
454 c <- true
455 if _, err := r.Read(buf[:]); os.IsTimeout(err) {
456 t.Error("read timed out")
457 }
458 }()
459
460 wg.Add(1)
461 go func() {
462 defer wg.Done()
463 <-c
464
465
466
467 time.Sleep(10 * time.Millisecond)
468 r.Fd()
469
470
471
472
473 w.Write(make([]byte, count))
474 r.Close()
475 }()
476
477 wg.Wait()
478 }
479
View as plain text