Source file
src/net/rpc/client.go
Documentation: net/rpc
1
2
3
4
5 package rpc
6
7 import (
8 "bufio"
9 "encoding/gob"
10 "errors"
11 "io"
12 "log"
13 "net"
14 "net/http"
15 "sync"
16 )
17
18
19
20 type ServerError string
21
22 func (e ServerError) Error() string {
23 return string(e)
24 }
25
26 var ErrShutdown = errors.New("connection is shut down")
27
28
29 type Call struct {
30 ServiceMethod string
31 Args interface{}
32 Reply interface{}
33 Error error
34 Done chan *Call
35 }
36
37
38
39
40
41 type Client struct {
42 codec ClientCodec
43
44 reqMutex sync.Mutex
45 request Request
46
47 mutex sync.Mutex
48 seq uint64
49 pending map[uint64]*Call
50 closing bool
51 shutdown bool
52 }
53
54
55
56
57
58
59
60
61
62
63 type ClientCodec interface {
64 WriteRequest(*Request, interface{}) error
65 ReadResponseHeader(*Response) error
66 ReadResponseBody(interface{}) error
67
68 Close() error
69 }
70
71 func (client *Client) send(call *Call) {
72 client.reqMutex.Lock()
73 defer client.reqMutex.Unlock()
74
75
76 client.mutex.Lock()
77 if client.shutdown || client.closing {
78 client.mutex.Unlock()
79 call.Error = ErrShutdown
80 call.done()
81 return
82 }
83 seq := client.seq
84 client.seq++
85 client.pending[seq] = call
86 client.mutex.Unlock()
87
88
89 client.request.Seq = seq
90 client.request.ServiceMethod = call.ServiceMethod
91 err := client.codec.WriteRequest(&client.request, call.Args)
92 if err != nil {
93 client.mutex.Lock()
94 call = client.pending[seq]
95 delete(client.pending, seq)
96 client.mutex.Unlock()
97 if call != nil {
98 call.Error = err
99 call.done()
100 }
101 }
102 }
103
104 func (client *Client) input() {
105 var err error
106 var response Response
107 for err == nil {
108 response = Response{}
109 err = client.codec.ReadResponseHeader(&response)
110 if err != nil {
111 break
112 }
113 seq := response.Seq
114 client.mutex.Lock()
115 call := client.pending[seq]
116 delete(client.pending, seq)
117 client.mutex.Unlock()
118
119 switch {
120 case call == nil:
121
122
123
124
125
126 err = client.codec.ReadResponseBody(nil)
127 if err != nil {
128 err = errors.New("reading error body: " + err.Error())
129 }
130 case response.Error != "":
131
132
133
134 call.Error = ServerError(response.Error)
135 err = client.codec.ReadResponseBody(nil)
136 if err != nil {
137 err = errors.New("reading error body: " + err.Error())
138 }
139 call.done()
140 default:
141 err = client.codec.ReadResponseBody(call.Reply)
142 if err != nil {
143 call.Error = errors.New("reading body " + err.Error())
144 }
145 call.done()
146 }
147 }
148
149 client.reqMutex.Lock()
150 client.mutex.Lock()
151 client.shutdown = true
152 closing := client.closing
153 if err == io.EOF {
154 if closing {
155 err = ErrShutdown
156 } else {
157 err = io.ErrUnexpectedEOF
158 }
159 }
160 for _, call := range client.pending {
161 call.Error = err
162 call.done()
163 }
164 client.mutex.Unlock()
165 client.reqMutex.Unlock()
166 if debugLog && err != io.EOF && !closing {
167 log.Println("rpc: client protocol error:", err)
168 }
169 }
170
171 func (call *Call) done() {
172 select {
173 case call.Done <- call:
174
175 default:
176
177
178 if debugLog {
179 log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
180 }
181 }
182 }
183
184
185
186
187
188
189
190
191
192
193 func NewClient(conn io.ReadWriteCloser) *Client {
194 encBuf := bufio.NewWriter(conn)
195 client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
196 return NewClientWithCodec(client)
197 }
198
199
200
201 func NewClientWithCodec(codec ClientCodec) *Client {
202 client := &Client{
203 codec: codec,
204 pending: make(map[uint64]*Call),
205 }
206 go client.input()
207 return client
208 }
209
210 type gobClientCodec struct {
211 rwc io.ReadWriteCloser
212 dec *gob.Decoder
213 enc *gob.Encoder
214 encBuf *bufio.Writer
215 }
216
217 func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
218 if err = c.enc.Encode(r); err != nil {
219 return
220 }
221 if err = c.enc.Encode(body); err != nil {
222 return
223 }
224 return c.encBuf.Flush()
225 }
226
227 func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
228 return c.dec.Decode(r)
229 }
230
231 func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
232 return c.dec.Decode(body)
233 }
234
235 func (c *gobClientCodec) Close() error {
236 return c.rwc.Close()
237 }
238
239
240
241 func DialHTTP(network, address string) (*Client, error) {
242 return DialHTTPPath(network, address, DefaultRPCPath)
243 }
244
245
246
247 func DialHTTPPath(network, address, path string) (*Client, error) {
248 conn, err := net.Dial(network, address)
249 if err != nil {
250 return nil, err
251 }
252 io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
253
254
255
256 resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
257 if err == nil && resp.Status == connected {
258 return NewClient(conn), nil
259 }
260 if err == nil {
261 err = errors.New("unexpected HTTP response: " + resp.Status)
262 }
263 conn.Close()
264 return nil, &net.OpError{
265 Op: "dial-http",
266 Net: network + " " + address,
267 Addr: nil,
268 Err: err,
269 }
270 }
271
272
273 func Dial(network, address string) (*Client, error) {
274 conn, err := net.Dial(network, address)
275 if err != nil {
276 return nil, err
277 }
278 return NewClient(conn), nil
279 }
280
281
282
283 func (client *Client) Close() error {
284 client.mutex.Lock()
285 if client.closing {
286 client.mutex.Unlock()
287 return ErrShutdown
288 }
289 client.closing = true
290 client.mutex.Unlock()
291 return client.codec.Close()
292 }
293
294
295
296
297
298 func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
299 call := new(Call)
300 call.ServiceMethod = serviceMethod
301 call.Args = args
302 call.Reply = reply
303 if done == nil {
304 done = make(chan *Call, 10)
305 } else {
306
307
308
309
310 if cap(done) == 0 {
311 log.Panic("rpc: done channel is unbuffered")
312 }
313 }
314 call.Done = done
315 client.send(call)
316 return call
317 }
318
319
320 func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
321 call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
322 return call.Error
323 }
324
View as plain text