...
Source file
src/io/pipe.go
Documentation: io
1
2
3
4
5
6
7
8 package io
9
10 import (
11 "errors"
12 "sync"
13 )
14
15
16 type onceError struct {
17 sync.Mutex
18 err error
19 }
20
21 func (a *onceError) Store(err error) {
22 a.Lock()
23 defer a.Unlock()
24 if a.err != nil {
25 return
26 }
27 a.err = err
28 }
29 func (a *onceError) Load() error {
30 a.Lock()
31 defer a.Unlock()
32 return a.err
33 }
34
35
36 var ErrClosedPipe = errors.New("io: read/write on closed pipe")
37
38
39 type pipe struct {
40 wrMu sync.Mutex
41 wrCh chan []byte
42 rdCh chan int
43
44 once sync.Once
45 done chan struct{}
46 rerr onceError
47 werr onceError
48 }
49
50 func (p *pipe) Read(b []byte) (n int, err error) {
51 select {
52 case <-p.done:
53 return 0, p.readCloseError()
54 default:
55 }
56
57 select {
58 case bw := <-p.wrCh:
59 nr := copy(b, bw)
60 p.rdCh <- nr
61 return nr, nil
62 case <-p.done:
63 return 0, p.readCloseError()
64 }
65 }
66
67 func (p *pipe) readCloseError() error {
68 rerr := p.rerr.Load()
69 if werr := p.werr.Load(); rerr == nil && werr != nil {
70 return werr
71 }
72 return ErrClosedPipe
73 }
74
75 func (p *pipe) CloseRead(err error) error {
76 if err == nil {
77 err = ErrClosedPipe
78 }
79 p.rerr.Store(err)
80 p.once.Do(func() { close(p.done) })
81 return nil
82 }
83
84 func (p *pipe) Write(b []byte) (n int, err error) {
85 select {
86 case <-p.done:
87 return 0, p.writeCloseError()
88 default:
89 p.wrMu.Lock()
90 defer p.wrMu.Unlock()
91 }
92
93 for once := true; once || len(b) > 0; once = false {
94 select {
95 case p.wrCh <- b:
96 nw := <-p.rdCh
97 b = b[nw:]
98 n += nw
99 case <-p.done:
100 return n, p.writeCloseError()
101 }
102 }
103 return n, nil
104 }
105
106 func (p *pipe) writeCloseError() error {
107 werr := p.werr.Load()
108 if rerr := p.rerr.Load(); werr == nil && rerr != nil {
109 return rerr
110 }
111 return ErrClosedPipe
112 }
113
114 func (p *pipe) CloseWrite(err error) error {
115 if err == nil {
116 err = EOF
117 }
118 p.werr.Store(err)
119 p.once.Do(func() { close(p.done) })
120 return nil
121 }
122
123
124 type PipeReader struct {
125 p *pipe
126 }
127
128
129
130
131
132
133 func (r *PipeReader) Read(data []byte) (n int, err error) {
134 return r.p.Read(data)
135 }
136
137
138
139 func (r *PipeReader) Close() error {
140 return r.CloseWithError(nil)
141 }
142
143
144
145
146
147
148 func (r *PipeReader) CloseWithError(err error) error {
149 return r.p.CloseRead(err)
150 }
151
152
153 type PipeWriter struct {
154 p *pipe
155 }
156
157
158
159
160
161
162 func (w *PipeWriter) Write(data []byte) (n int, err error) {
163 return w.p.Write(data)
164 }
165
166
167
168 func (w *PipeWriter) Close() error {
169 return w.CloseWithError(nil)
170 }
171
172
173
174
175
176
177
178 func (w *PipeWriter) CloseWithError(err error) error {
179 return w.p.CloseWrite(err)
180 }
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197 func Pipe() (*PipeReader, *PipeWriter) {
198 p := &pipe{
199 wrCh: make(chan []byte),
200 rdCh: make(chan int),
201 done: make(chan struct{}),
202 }
203 return &PipeReader{p}, &PipeWriter{p}
204 }
205
View as plain text