1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package sql
17
18 import (
19 "context"
20 "database/sql/driver"
21 "errors"
22 "fmt"
23 "io"
24 "reflect"
25 "runtime"
26 "sort"
27 "strconv"
28 "sync"
29 "sync/atomic"
30 "time"
31 )
32
33 var (
34 driversMu sync.RWMutex
35 drivers = make(map[string]driver.Driver)
36 )
37
38
39 var nowFunc = time.Now
40
41
42
43
44 func Register(name string, driver driver.Driver) {
45 driversMu.Lock()
46 defer driversMu.Unlock()
47 if driver == nil {
48 panic("sql: Register driver is nil")
49 }
50 if _, dup := drivers[name]; dup {
51 panic("sql: Register called twice for driver " + name)
52 }
53 drivers[name] = driver
54 }
55
56 func unregisterAllDrivers() {
57 driversMu.Lock()
58 defer driversMu.Unlock()
59
60 drivers = make(map[string]driver.Driver)
61 }
62
63
64 func Drivers() []string {
65 driversMu.RLock()
66 defer driversMu.RUnlock()
67 list := make([]string, 0, len(drivers))
68 for name := range drivers {
69 list = append(list, name)
70 }
71 sort.Strings(list)
72 return list
73 }
74
75
76
77
78
79
80
81 type NamedArg struct {
82 _Named_Fields_Required struct{}
83
84
85
86
87
88
89
90 Name string
91
92
93
94
95 Value interface{}
96 }
97
98
99
100
101
102
103
104
105
106
107
108
109
110 func Named(name string, value interface{}) NamedArg {
111
112
113
114
115 return NamedArg{Name: name, Value: value}
116 }
117
118
119 type IsolationLevel int
120
121
122
123
124
125 const (
126 LevelDefault IsolationLevel = iota
127 LevelReadUncommitted
128 LevelReadCommitted
129 LevelWriteCommitted
130 LevelRepeatableRead
131 LevelSnapshot
132 LevelSerializable
133 LevelLinearizable
134 )
135
136
137 func (i IsolationLevel) String() string {
138 switch i {
139 case LevelDefault:
140 return "Default"
141 case LevelReadUncommitted:
142 return "Read Uncommitted"
143 case LevelReadCommitted:
144 return "Read Committed"
145 case LevelWriteCommitted:
146 return "Write Committed"
147 case LevelRepeatableRead:
148 return "Repeatable Read"
149 case LevelSnapshot:
150 return "Snapshot"
151 case LevelSerializable:
152 return "Serializable"
153 case LevelLinearizable:
154 return "Linearizable"
155 default:
156 return "IsolationLevel(" + strconv.Itoa(int(i)) + ")"
157 }
158 }
159
160 var _ fmt.Stringer = LevelDefault
161
162
163 type TxOptions struct {
164
165
166 Isolation IsolationLevel
167 ReadOnly bool
168 }
169
170
171
172
173 type RawBytes []byte
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188 type NullString struct {
189 String string
190 Valid bool
191 }
192
193
194 func (ns *NullString) Scan(value interface{}) error {
195 if value == nil {
196 ns.String, ns.Valid = "", false
197 return nil
198 }
199 ns.Valid = true
200 return convertAssign(&ns.String, value)
201 }
202
203
204 func (ns NullString) Value() (driver.Value, error) {
205 if !ns.Valid {
206 return nil, nil
207 }
208 return ns.String, nil
209 }
210
211
212
213
214 type NullInt64 struct {
215 Int64 int64
216 Valid bool
217 }
218
219
220 func (n *NullInt64) Scan(value interface{}) error {
221 if value == nil {
222 n.Int64, n.Valid = 0, false
223 return nil
224 }
225 n.Valid = true
226 return convertAssign(&n.Int64, value)
227 }
228
229
230 func (n NullInt64) Value() (driver.Value, error) {
231 if !n.Valid {
232 return nil, nil
233 }
234 return n.Int64, nil
235 }
236
237
238
239
240 type NullInt32 struct {
241 Int32 int32
242 Valid bool
243 }
244
245
246 func (n *NullInt32) Scan(value interface{}) error {
247 if value == nil {
248 n.Int32, n.Valid = 0, false
249 return nil
250 }
251 n.Valid = true
252 return convertAssign(&n.Int32, value)
253 }
254
255
256 func (n NullInt32) Value() (driver.Value, error) {
257 if !n.Valid {
258 return nil, nil
259 }
260 return int64(n.Int32), nil
261 }
262
263
264
265
266 type NullInt16 struct {
267 Int16 int16
268 Valid bool
269 }
270
271
272 func (n *NullInt16) Scan(value interface{}) error {
273 if value == nil {
274 n.Int16, n.Valid = 0, false
275 return nil
276 }
277 err := convertAssign(&n.Int16, value)
278 n.Valid = err == nil
279 return err
280 }
281
282
283 func (n NullInt16) Value() (driver.Value, error) {
284 if !n.Valid {
285 return nil, nil
286 }
287 return int64(n.Int16), nil
288 }
289
290
291
292
293 type NullByte struct {
294 Byte byte
295 Valid bool
296 }
297
298
299 func (n *NullByte) Scan(value interface{}) error {
300 if value == nil {
301 n.Byte, n.Valid = 0, false
302 return nil
303 }
304 err := convertAssign(&n.Byte, value)
305 n.Valid = err == nil
306 return err
307 }
308
309
310 func (n NullByte) Value() (driver.Value, error) {
311 if !n.Valid {
312 return nil, nil
313 }
314 return int64(n.Byte), nil
315 }
316
317
318
319
320 type NullFloat64 struct {
321 Float64 float64
322 Valid bool
323 }
324
325
326 func (n *NullFloat64) Scan(value interface{}) error {
327 if value == nil {
328 n.Float64, n.Valid = 0, false
329 return nil
330 }
331 n.Valid = true
332 return convertAssign(&n.Float64, value)
333 }
334
335
336 func (n NullFloat64) Value() (driver.Value, error) {
337 if !n.Valid {
338 return nil, nil
339 }
340 return n.Float64, nil
341 }
342
343
344
345
346 type NullBool struct {
347 Bool bool
348 Valid bool
349 }
350
351
352 func (n *NullBool) Scan(value interface{}) error {
353 if value == nil {
354 n.Bool, n.Valid = false, false
355 return nil
356 }
357 n.Valid = true
358 return convertAssign(&n.Bool, value)
359 }
360
361
362 func (n NullBool) Value() (driver.Value, error) {
363 if !n.Valid {
364 return nil, nil
365 }
366 return n.Bool, nil
367 }
368
369
370
371
372 type NullTime struct {
373 Time time.Time
374 Valid bool
375 }
376
377
378 func (n *NullTime) Scan(value interface{}) error {
379 if value == nil {
380 n.Time, n.Valid = time.Time{}, false
381 return nil
382 }
383 n.Valid = true
384 return convertAssign(&n.Time, value)
385 }
386
387
388 func (n NullTime) Value() (driver.Value, error) {
389 if !n.Valid {
390 return nil, nil
391 }
392 return n.Time, nil
393 }
394
395
396 type Scanner interface {
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415 Scan(src interface{}) error
416 }
417
418
419
420
421
422
423
424
425
426 type Out struct {
427 _Named_Fields_Required struct{}
428
429
430
431 Dest interface{}
432
433
434
435
436 In bool
437 }
438
439
440
441
442 var ErrNoRows = errors.New("sql: no rows in result set")
443
444
445
446
447
448
449
450
451
452
453
454
455
456 type DB struct {
457
458
459 waitDuration int64
460
461 connector driver.Connector
462
463
464
465 numClosed uint64
466
467 mu sync.Mutex
468 freeConn []*driverConn
469 connRequests map[uint64]chan connRequest
470 nextRequest uint64
471 numOpen int
472
473
474
475
476
477 openerCh chan struct{}
478 closed bool
479 dep map[finalCloser]depSet
480 lastPut map[*driverConn]string
481 maxIdleCount int
482 maxOpen int
483 maxLifetime time.Duration
484 maxIdleTime time.Duration
485 cleanerCh chan struct{}
486 waitCount int64
487 maxIdleClosed int64
488 maxIdleTimeClosed int64
489 maxLifetimeClosed int64
490
491 stop func()
492 }
493
494
495 type connReuseStrategy uint8
496
497 const (
498
499 alwaysNewConn connReuseStrategy = iota
500
501
502
503 cachedOrNewConn
504 )
505
506
507
508
509
510 type driverConn struct {
511 db *DB
512 createdAt time.Time
513
514 sync.Mutex
515 ci driver.Conn
516 needReset bool
517 closed bool
518 finalClosed bool
519 openStmt map[*driverStmt]bool
520
521
522 inUse bool
523 returnedAt time.Time
524 onPut []func()
525 dbmuClosed bool
526 }
527
528 func (dc *driverConn) releaseConn(err error) {
529 dc.db.putConn(dc, err, true)
530 }
531
532 func (dc *driverConn) removeOpenStmt(ds *driverStmt) {
533 dc.Lock()
534 defer dc.Unlock()
535 delete(dc.openStmt, ds)
536 }
537
538 func (dc *driverConn) expired(timeout time.Duration) bool {
539 if timeout <= 0 {
540 return false
541 }
542 return dc.createdAt.Add(timeout).Before(nowFunc())
543 }
544
545
546
547 func (dc *driverConn) resetSession(ctx context.Context) error {
548 dc.Lock()
549 defer dc.Unlock()
550
551 if !dc.needReset {
552 return nil
553 }
554 if cr, ok := dc.ci.(driver.SessionResetter); ok {
555 return cr.ResetSession(ctx)
556 }
557 return nil
558 }
559
560
561
562 func (dc *driverConn) validateConnection(needsReset bool) bool {
563 dc.Lock()
564 defer dc.Unlock()
565
566 if needsReset {
567 dc.needReset = true
568 }
569 if cv, ok := dc.ci.(driver.Validator); ok {
570 return cv.IsValid()
571 }
572 return true
573 }
574
575
576
577 func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, query string) (*driverStmt, error) {
578 si, err := ctxDriverPrepare(ctx, dc.ci, query)
579 if err != nil {
580 return nil, err
581 }
582 ds := &driverStmt{Locker: dc, si: si}
583
584
585 if cg != nil {
586 return ds, nil
587 }
588
589
590
591
592
593 if dc.openStmt == nil {
594 dc.openStmt = make(map[*driverStmt]bool)
595 }
596 dc.openStmt[ds] = true
597 return ds, nil
598 }
599
600
601 func (dc *driverConn) closeDBLocked() func() error {
602 dc.Lock()
603 defer dc.Unlock()
604 if dc.closed {
605 return func() error { return errors.New("sql: duplicate driverConn close") }
606 }
607 dc.closed = true
608 return dc.db.removeDepLocked(dc, dc)
609 }
610
611 func (dc *driverConn) Close() error {
612 dc.Lock()
613 if dc.closed {
614 dc.Unlock()
615 return errors.New("sql: duplicate driverConn close")
616 }
617 dc.closed = true
618 dc.Unlock()
619
620
621 dc.db.mu.Lock()
622 dc.dbmuClosed = true
623 fn := dc.db.removeDepLocked(dc, dc)
624 dc.db.mu.Unlock()
625 return fn()
626 }
627
628 func (dc *driverConn) finalClose() error {
629 var err error
630
631
632
633 var openStmt []*driverStmt
634 withLock(dc, func() {
635 openStmt = make([]*driverStmt, 0, len(dc.openStmt))
636 for ds := range dc.openStmt {
637 openStmt = append(openStmt, ds)
638 }
639 dc.openStmt = nil
640 })
641 for _, ds := range openStmt {
642 ds.Close()
643 }
644 withLock(dc, func() {
645 dc.finalClosed = true
646 err = dc.ci.Close()
647 dc.ci = nil
648 })
649
650 dc.db.mu.Lock()
651 dc.db.numOpen--
652 dc.db.maybeOpenNewConnections()
653 dc.db.mu.Unlock()
654
655 atomic.AddUint64(&dc.db.numClosed, 1)
656 return err
657 }
658
659
660
661
662 type driverStmt struct {
663 sync.Locker
664 si driver.Stmt
665 closed bool
666 closeErr error
667 }
668
669
670
671 func (ds *driverStmt) Close() error {
672 ds.Lock()
673 defer ds.Unlock()
674 if ds.closed {
675 return ds.closeErr
676 }
677 ds.closed = true
678 ds.closeErr = ds.si.Close()
679 return ds.closeErr
680 }
681
682
683 type depSet map[interface{}]bool
684
685
686
687 type finalCloser interface {
688
689
690 finalClose() error
691 }
692
693
694
695 func (db *DB) addDep(x finalCloser, dep interface{}) {
696 db.mu.Lock()
697 defer db.mu.Unlock()
698 db.addDepLocked(x, dep)
699 }
700
701 func (db *DB) addDepLocked(x finalCloser, dep interface{}) {
702 if db.dep == nil {
703 db.dep = make(map[finalCloser]depSet)
704 }
705 xdep := db.dep[x]
706 if xdep == nil {
707 xdep = make(depSet)
708 db.dep[x] = xdep
709 }
710 xdep[dep] = true
711 }
712
713
714
715
716
717 func (db *DB) removeDep(x finalCloser, dep interface{}) error {
718 db.mu.Lock()
719 fn := db.removeDepLocked(x, dep)
720 db.mu.Unlock()
721 return fn()
722 }
723
724 func (db *DB) removeDepLocked(x finalCloser, dep interface{}) func() error {
725
726 xdep, ok := db.dep[x]
727 if !ok {
728 panic(fmt.Sprintf("unpaired removeDep: no deps for %T", x))
729 }
730
731 l0 := len(xdep)
732 delete(xdep, dep)
733
734 switch len(xdep) {
735 case l0:
736
737 panic(fmt.Sprintf("unpaired removeDep: no %T dep on %T", dep, x))
738 case 0:
739
740 delete(db.dep, x)
741 return x.finalClose
742 default:
743
744 return func() error { return nil }
745 }
746 }
747
748
749
750
751
752
753 var connectionRequestQueueSize = 1000000
754
755 type dsnConnector struct {
756 dsn string
757 driver driver.Driver
758 }
759
760 func (t dsnConnector) Connect(_ context.Context) (driver.Conn, error) {
761 return t.driver.Open(t.dsn)
762 }
763
764 func (t dsnConnector) Driver() driver.Driver {
765 return t.driver
766 }
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784 func OpenDB(c driver.Connector) *DB {
785 ctx, cancel := context.WithCancel(context.Background())
786 db := &DB{
787 connector: c,
788 openerCh: make(chan struct{}, connectionRequestQueueSize),
789 lastPut: make(map[*driverConn]string),
790 connRequests: make(map[uint64]chan connRequest),
791 stop: cancel,
792 }
793
794 go db.connectionOpener(ctx)
795
796 return db
797 }
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816 func Open(driverName, dataSourceName string) (*DB, error) {
817 driversMu.RLock()
818 driveri, ok := drivers[driverName]
819 driversMu.RUnlock()
820 if !ok {
821 return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
822 }
823
824 if driverCtx, ok := driveri.(driver.DriverContext); ok {
825 connector, err := driverCtx.OpenConnector(dataSourceName)
826 if err != nil {
827 return nil, err
828 }
829 return OpenDB(connector), nil
830 }
831
832 return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
833 }
834
835 func (db *DB) pingDC(ctx context.Context, dc *driverConn, release func(error)) error {
836 var err error
837 if pinger, ok := dc.ci.(driver.Pinger); ok {
838 withLock(dc, func() {
839 err = pinger.Ping(ctx)
840 })
841 }
842 release(err)
843 return err
844 }
845
846
847
848 func (db *DB) PingContext(ctx context.Context) error {
849 var dc *driverConn
850 var err error
851
852 for i := 0; i < maxBadConnRetries; i++ {
853 dc, err = db.conn(ctx, cachedOrNewConn)
854 if err != driver.ErrBadConn {
855 break
856 }
857 }
858 if err == driver.ErrBadConn {
859 dc, err = db.conn(ctx, alwaysNewConn)
860 }
861 if err != nil {
862 return err
863 }
864
865 return db.pingDC(ctx, dc, dc.releaseConn)
866 }
867
868
869
870
871
872
873 func (db *DB) Ping() error {
874 return db.PingContext(context.Background())
875 }
876
877
878
879
880
881
882
883 func (db *DB) Close() error {
884 db.mu.Lock()
885 if db.closed {
886 db.mu.Unlock()
887 return nil
888 }
889 if db.cleanerCh != nil {
890 close(db.cleanerCh)
891 }
892 var err error
893 fns := make([]func() error, 0, len(db.freeConn))
894 for _, dc := range db.freeConn {
895 fns = append(fns, dc.closeDBLocked())
896 }
897 db.freeConn = nil
898 db.closed = true
899 for _, req := range db.connRequests {
900 close(req)
901 }
902 db.mu.Unlock()
903 for _, fn := range fns {
904 err1 := fn()
905 if err1 != nil {
906 err = err1
907 }
908 }
909 db.stop()
910 if c, ok := db.connector.(io.Closer); ok {
911 err1 := c.Close()
912 if err1 != nil {
913 err = err1
914 }
915 }
916 return err
917 }
918
919 const defaultMaxIdleConns = 2
920
921 func (db *DB) maxIdleConnsLocked() int {
922 n := db.maxIdleCount
923 switch {
924 case n == 0:
925
926 return defaultMaxIdleConns
927 case n < 0:
928 return 0
929 default:
930 return n
931 }
932 }
933
934 func (db *DB) shortestIdleTimeLocked() time.Duration {
935 if db.maxIdleTime <= 0 {
936 return db.maxLifetime
937 }
938 if db.maxLifetime <= 0 {
939 return db.maxIdleTime
940 }
941
942 min := db.maxIdleTime
943 if min > db.maxLifetime {
944 min = db.maxLifetime
945 }
946 return min
947 }
948
949
950
951
952
953
954
955
956
957
958
959 func (db *DB) SetMaxIdleConns(n int) {
960 db.mu.Lock()
961 if n > 0 {
962 db.maxIdleCount = n
963 } else {
964
965 db.maxIdleCount = -1
966 }
967
968 if db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen {
969 db.maxIdleCount = db.maxOpen
970 }
971 var closing []*driverConn
972 idleCount := len(db.freeConn)
973 maxIdle := db.maxIdleConnsLocked()
974 if idleCount > maxIdle {
975 closing = db.freeConn[maxIdle:]
976 db.freeConn = db.freeConn[:maxIdle]
977 }
978 db.maxIdleClosed += int64(len(closing))
979 db.mu.Unlock()
980 for _, c := range closing {
981 c.Close()
982 }
983 }
984
985
986
987
988
989
990
991
992
993 func (db *DB) SetMaxOpenConns(n int) {
994 db.mu.Lock()
995 db.maxOpen = n
996 if n < 0 {
997 db.maxOpen = 0
998 }
999 syncMaxIdle := db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen
1000 db.mu.Unlock()
1001 if syncMaxIdle {
1002 db.SetMaxIdleConns(n)
1003 }
1004 }
1005
1006
1007
1008
1009
1010
1011 func (db *DB) SetConnMaxLifetime(d time.Duration) {
1012 if d < 0 {
1013 d = 0
1014 }
1015 db.mu.Lock()
1016
1017 if d > 0 && d < db.maxLifetime && db.cleanerCh != nil {
1018 select {
1019 case db.cleanerCh <- struct{}{}:
1020 default:
1021 }
1022 }
1023 db.maxLifetime = d
1024 db.startCleanerLocked()
1025 db.mu.Unlock()
1026 }
1027
1028
1029
1030
1031
1032
1033 func (db *DB) SetConnMaxIdleTime(d time.Duration) {
1034 if d < 0 {
1035 d = 0
1036 }
1037 db.mu.Lock()
1038 defer db.mu.Unlock()
1039
1040
1041 if d > 0 && d < db.maxIdleTime && db.cleanerCh != nil {
1042 select {
1043 case db.cleanerCh <- struct{}{}:
1044 default:
1045 }
1046 }
1047 db.maxIdleTime = d
1048 db.startCleanerLocked()
1049 }
1050
1051
1052 func (db *DB) startCleanerLocked() {
1053 if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {
1054 db.cleanerCh = make(chan struct{}, 1)
1055 go db.connectionCleaner(db.shortestIdleTimeLocked())
1056 }
1057 }
1058
1059 func (db *DB) connectionCleaner(d time.Duration) {
1060 const minInterval = time.Second
1061
1062 if d < minInterval {
1063 d = minInterval
1064 }
1065 t := time.NewTimer(d)
1066
1067 for {
1068 select {
1069 case <-t.C:
1070 case <-db.cleanerCh:
1071 }
1072
1073 db.mu.Lock()
1074
1075 d = db.shortestIdleTimeLocked()
1076 if db.closed || db.numOpen == 0 || d <= 0 {
1077 db.cleanerCh = nil
1078 db.mu.Unlock()
1079 return
1080 }
1081
1082 closing := db.connectionCleanerRunLocked()
1083 db.mu.Unlock()
1084 for _, c := range closing {
1085 c.Close()
1086 }
1087
1088 if d < minInterval {
1089 d = minInterval
1090 }
1091 t.Reset(d)
1092 }
1093 }
1094
1095 func (db *DB) connectionCleanerRunLocked() (closing []*driverConn) {
1096 if db.maxLifetime > 0 {
1097 expiredSince := nowFunc().Add(-db.maxLifetime)
1098 for i := 0; i < len(db.freeConn); i++ {
1099 c := db.freeConn[i]
1100 if c.createdAt.Before(expiredSince) {
1101 closing = append(closing, c)
1102 last := len(db.freeConn) - 1
1103 db.freeConn[i] = db.freeConn[last]
1104 db.freeConn[last] = nil
1105 db.freeConn = db.freeConn[:last]
1106 i--
1107 }
1108 }
1109 db.maxLifetimeClosed += int64(len(closing))
1110 }
1111
1112 if db.maxIdleTime > 0 {
1113 expiredSince := nowFunc().Add(-db.maxIdleTime)
1114 var expiredCount int64
1115 for i := 0; i < len(db.freeConn); i++ {
1116 c := db.freeConn[i]
1117 if db.maxIdleTime > 0 && c.returnedAt.Before(expiredSince) {
1118 closing = append(closing, c)
1119 expiredCount++
1120 last := len(db.freeConn) - 1
1121 db.freeConn[i] = db.freeConn[last]
1122 db.freeConn[last] = nil
1123 db.freeConn = db.freeConn[:last]
1124 i--
1125 }
1126 }
1127 db.maxIdleTimeClosed += expiredCount
1128 }
1129 return
1130 }
1131
1132
1133 type DBStats struct {
1134 MaxOpenConnections int
1135
1136
1137 OpenConnections int
1138 InUse int
1139 Idle int
1140
1141
1142 WaitCount int64
1143 WaitDuration time.Duration
1144 MaxIdleClosed int64
1145 MaxIdleTimeClosed int64
1146 MaxLifetimeClosed int64
1147 }
1148
1149
1150 func (db *DB) Stats() DBStats {
1151 wait := atomic.LoadInt64(&db.waitDuration)
1152
1153 db.mu.Lock()
1154 defer db.mu.Unlock()
1155
1156 stats := DBStats{
1157 MaxOpenConnections: db.maxOpen,
1158
1159 Idle: len(db.freeConn),
1160 OpenConnections: db.numOpen,
1161 InUse: db.numOpen - len(db.freeConn),
1162
1163 WaitCount: db.waitCount,
1164 WaitDuration: time.Duration(wait),
1165 MaxIdleClosed: db.maxIdleClosed,
1166 MaxIdleTimeClosed: db.maxIdleTimeClosed,
1167 MaxLifetimeClosed: db.maxLifetimeClosed,
1168 }
1169 return stats
1170 }
1171
1172
1173
1174
1175 func (db *DB) maybeOpenNewConnections() {
1176 numRequests := len(db.connRequests)
1177 if db.maxOpen > 0 {
1178 numCanOpen := db.maxOpen - db.numOpen
1179 if numRequests > numCanOpen {
1180 numRequests = numCanOpen
1181 }
1182 }
1183 for numRequests > 0 {
1184 db.numOpen++
1185 numRequests--
1186 if db.closed {
1187 return
1188 }
1189 db.openerCh <- struct{}{}
1190 }
1191 }
1192
1193
1194 func (db *DB) connectionOpener(ctx context.Context) {
1195 for {
1196 select {
1197 case <-ctx.Done():
1198 return
1199 case <-db.openerCh:
1200 db.openNewConnection(ctx)
1201 }
1202 }
1203 }
1204
1205
1206 func (db *DB) openNewConnection(ctx context.Context) {
1207
1208
1209
1210 ci, err := db.connector.Connect(ctx)
1211 db.mu.Lock()
1212 defer db.mu.Unlock()
1213 if db.closed {
1214 if err == nil {
1215 ci.Close()
1216 }
1217 db.numOpen--
1218 return
1219 }
1220 if err != nil {
1221 db.numOpen--
1222 db.putConnDBLocked(nil, err)
1223 db.maybeOpenNewConnections()
1224 return
1225 }
1226 dc := &driverConn{
1227 db: db,
1228 createdAt: nowFunc(),
1229 returnedAt: nowFunc(),
1230 ci: ci,
1231 }
1232 if db.putConnDBLocked(dc, err) {
1233 db.addDepLocked(dc, dc)
1234 } else {
1235 db.numOpen--
1236 ci.Close()
1237 }
1238 }
1239
1240
1241
1242
1243 type connRequest struct {
1244 conn *driverConn
1245 err error
1246 }
1247
1248 var errDBClosed = errors.New("sql: database is closed")
1249
1250
1251
1252 func (db *DB) nextRequestKeyLocked() uint64 {
1253 next := db.nextRequest
1254 db.nextRequest++
1255 return next
1256 }
1257
1258
1259 func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
1260 db.mu.Lock()
1261 if db.closed {
1262 db.mu.Unlock()
1263 return nil, errDBClosed
1264 }
1265
1266 select {
1267 default:
1268 case <-ctx.Done():
1269 db.mu.Unlock()
1270 return nil, ctx.Err()
1271 }
1272 lifetime := db.maxLifetime
1273
1274
1275 numFree := len(db.freeConn)
1276 if strategy == cachedOrNewConn && numFree > 0 {
1277 conn := db.freeConn[0]
1278 copy(db.freeConn, db.freeConn[1:])
1279 db.freeConn = db.freeConn[:numFree-1]
1280 conn.inUse = true
1281 if conn.expired(lifetime) {
1282 db.maxLifetimeClosed++
1283 db.mu.Unlock()
1284 conn.Close()
1285 return nil, driver.ErrBadConn
1286 }
1287 db.mu.Unlock()
1288
1289
1290 if err := conn.resetSession(ctx); err == driver.ErrBadConn {
1291 conn.Close()
1292 return nil, driver.ErrBadConn
1293 }
1294
1295 return conn, nil
1296 }
1297
1298
1299
1300 if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
1301
1302
1303 req := make(chan connRequest, 1)
1304 reqKey := db.nextRequestKeyLocked()
1305 db.connRequests[reqKey] = req
1306 db.waitCount++
1307 db.mu.Unlock()
1308
1309 waitStart := nowFunc()
1310
1311
1312 select {
1313 case <-ctx.Done():
1314
1315
1316 db.mu.Lock()
1317 delete(db.connRequests, reqKey)
1318 db.mu.Unlock()
1319
1320 atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
1321
1322 select {
1323 default:
1324 case ret, ok := <-req:
1325 if ok && ret.conn != nil {
1326 db.putConn(ret.conn, ret.err, false)
1327 }
1328 }
1329 return nil, ctx.Err()
1330 case ret, ok := <-req:
1331 atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
1332
1333 if !ok {
1334 return nil, errDBClosed
1335 }
1336
1337
1338
1339
1340
1341
1342 if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
1343 db.mu.Lock()
1344 db.maxLifetimeClosed++
1345 db.mu.Unlock()
1346 ret.conn.Close()
1347 return nil, driver.ErrBadConn
1348 }
1349 if ret.conn == nil {
1350 return nil, ret.err
1351 }
1352
1353
1354 if err := ret.conn.resetSession(ctx); err == driver.ErrBadConn {
1355 ret.conn.Close()
1356 return nil, driver.ErrBadConn
1357 }
1358 return ret.conn, ret.err
1359 }
1360 }
1361
1362 db.numOpen++
1363 db.mu.Unlock()
1364 ci, err := db.connector.Connect(ctx)
1365 if err != nil {
1366 db.mu.Lock()
1367 db.numOpen--
1368 db.maybeOpenNewConnections()
1369 db.mu.Unlock()
1370 return nil, err
1371 }
1372 db.mu.Lock()
1373 dc := &driverConn{
1374 db: db,
1375 createdAt: nowFunc(),
1376 returnedAt: nowFunc(),
1377 ci: ci,
1378 inUse: true,
1379 }
1380 db.addDepLocked(dc, dc)
1381 db.mu.Unlock()
1382 return dc, nil
1383 }
1384
1385
1386 var putConnHook func(*DB, *driverConn)
1387
1388
1389
1390
1391 func (db *DB) noteUnusedDriverStatement(c *driverConn, ds *driverStmt) {
1392 db.mu.Lock()
1393 defer db.mu.Unlock()
1394 if c.inUse {
1395 c.onPut = append(c.onPut, func() {
1396 ds.Close()
1397 })
1398 } else {
1399 c.Lock()
1400 fc := c.finalClosed
1401 c.Unlock()
1402 if !fc {
1403 ds.Close()
1404 }
1405 }
1406 }
1407
1408
1409
1410 const debugGetPut = false
1411
1412
1413
1414 func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
1415 if err != driver.ErrBadConn {
1416 if !dc.validateConnection(resetSession) {
1417 err = driver.ErrBadConn
1418 }
1419 }
1420 db.mu.Lock()
1421 if !dc.inUse {
1422 db.mu.Unlock()
1423 if debugGetPut {
1424 fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
1425 }
1426 panic("sql: connection returned that was never out")
1427 }
1428
1429 if err != driver.ErrBadConn && dc.expired(db.maxLifetime) {
1430 db.maxLifetimeClosed++
1431 err = driver.ErrBadConn
1432 }
1433 if debugGetPut {
1434 db.lastPut[dc] = stack()
1435 }
1436 dc.inUse = false
1437 dc.returnedAt = nowFunc()
1438
1439 for _, fn := range dc.onPut {
1440 fn()
1441 }
1442 dc.onPut = nil
1443
1444 if err == driver.ErrBadConn {
1445
1446
1447
1448
1449 db.maybeOpenNewConnections()
1450 db.mu.Unlock()
1451 dc.Close()
1452 return
1453 }
1454 if putConnHook != nil {
1455 putConnHook(db, dc)
1456 }
1457 added := db.putConnDBLocked(dc, nil)
1458 db.mu.Unlock()
1459
1460 if !added {
1461 dc.Close()
1462 return
1463 }
1464 }
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475 func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
1476 if db.closed {
1477 return false
1478 }
1479 if db.maxOpen > 0 && db.numOpen > db.maxOpen {
1480 return false
1481 }
1482 if c := len(db.connRequests); c > 0 {
1483 var req chan connRequest
1484 var reqKey uint64
1485 for reqKey, req = range db.connRequests {
1486 break
1487 }
1488 delete(db.connRequests, reqKey)
1489 if err == nil {
1490 dc.inUse = true
1491 }
1492 req <- connRequest{
1493 conn: dc,
1494 err: err,
1495 }
1496 return true
1497 } else if err == nil && !db.closed {
1498 if db.maxIdleConnsLocked() > len(db.freeConn) {
1499 db.freeConn = append(db.freeConn, dc)
1500 db.startCleanerLocked()
1501 return true
1502 }
1503 db.maxIdleClosed++
1504 }
1505 return false
1506 }
1507
1508
1509
1510
1511 const maxBadConnRetries = 2
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521 func (db *DB) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
1522 var stmt *Stmt
1523 var err error
1524 for i := 0; i < maxBadConnRetries; i++ {
1525 stmt, err = db.prepare(ctx, query, cachedOrNewConn)
1526 if err != driver.ErrBadConn {
1527 break
1528 }
1529 }
1530 if err == driver.ErrBadConn {
1531 return db.prepare(ctx, query, alwaysNewConn)
1532 }
1533 return stmt, err
1534 }
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544 func (db *DB) Prepare(query string) (*Stmt, error) {
1545 return db.PrepareContext(context.Background(), query)
1546 }
1547
1548 func (db *DB) prepare(ctx context.Context, query string, strategy connReuseStrategy) (*Stmt, error) {
1549
1550
1551
1552
1553
1554
1555 dc, err := db.conn(ctx, strategy)
1556 if err != nil {
1557 return nil, err
1558 }
1559 return db.prepareDC(ctx, dc, dc.releaseConn, nil, query)
1560 }
1561
1562
1563
1564
1565 func (db *DB) prepareDC(ctx context.Context, dc *driverConn, release func(error), cg stmtConnGrabber, query string) (*Stmt, error) {
1566 var ds *driverStmt
1567 var err error
1568 defer func() {
1569 release(err)
1570 }()
1571 withLock(dc, func() {
1572 ds, err = dc.prepareLocked(ctx, cg, query)
1573 })
1574 if err != nil {
1575 return nil, err
1576 }
1577 stmt := &Stmt{
1578 db: db,
1579 query: query,
1580 cg: cg,
1581 cgds: ds,
1582 }
1583
1584
1585
1586
1587 if cg == nil {
1588 stmt.css = []connStmt{{dc, ds}}
1589 stmt.lastNumClosed = atomic.LoadUint64(&db.numClosed)
1590 db.addDep(stmt, stmt)
1591 }
1592 return stmt, nil
1593 }
1594
1595
1596
1597 func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
1598 var res Result
1599 var err error
1600 for i := 0; i < maxBadConnRetries; i++ {
1601 res, err = db.exec(ctx, query, args, cachedOrNewConn)
1602 if err != driver.ErrBadConn {
1603 break
1604 }
1605 }
1606 if err == driver.ErrBadConn {
1607 return db.exec(ctx, query, args, alwaysNewConn)
1608 }
1609 return res, err
1610 }
1611
1612
1613
1614
1615
1616
1617 func (db *DB) Exec(query string, args ...interface{}) (Result, error) {
1618 return db.ExecContext(context.Background(), query, args...)
1619 }
1620
1621 func (db *DB) exec(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (Result, error) {
1622 dc, err := db.conn(ctx, strategy)
1623 if err != nil {
1624 return nil, err
1625 }
1626 return db.execDC(ctx, dc, dc.releaseConn, query, args)
1627 }
1628
1629 func (db *DB) execDC(ctx context.Context, dc *driverConn, release func(error), query string, args []interface{}) (res Result, err error) {
1630 defer func() {
1631 release(err)
1632 }()
1633 execerCtx, ok := dc.ci.(driver.ExecerContext)
1634 var execer driver.Execer
1635 if !ok {
1636 execer, ok = dc.ci.(driver.Execer)
1637 }
1638 if ok {
1639 var nvdargs []driver.NamedValue
1640 var resi driver.Result
1641 withLock(dc, func() {
1642 nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
1643 if err != nil {
1644 return
1645 }
1646 resi, err = ctxDriverExec(ctx, execerCtx, execer, query, nvdargs)
1647 })
1648 if err != driver.ErrSkip {
1649 if err != nil {
1650 return nil, err
1651 }
1652 return driverResult{dc, resi}, nil
1653 }
1654 }
1655
1656 var si driver.Stmt
1657 withLock(dc, func() {
1658 si, err = ctxDriverPrepare(ctx, dc.ci, query)
1659 })
1660 if err != nil {
1661 return nil, err
1662 }
1663 ds := &driverStmt{Locker: dc, si: si}
1664 defer ds.Close()
1665 return resultFromStatement(ctx, dc.ci, ds, args...)
1666 }
1667
1668
1669
1670 func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
1671 var rows *Rows
1672 var err error
1673 for i := 0; i < maxBadConnRetries; i++ {
1674 rows, err = db.query(ctx, query, args, cachedOrNewConn)
1675 if err != driver.ErrBadConn {
1676 break
1677 }
1678 }
1679 if err == driver.ErrBadConn {
1680 return db.query(ctx, query, args, alwaysNewConn)
1681 }
1682 return rows, err
1683 }
1684
1685
1686
1687
1688
1689
1690 func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
1691 return db.QueryContext(context.Background(), query, args...)
1692 }
1693
1694 func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {
1695 dc, err := db.conn(ctx, strategy)
1696 if err != nil {
1697 return nil, err
1698 }
1699
1700 return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
1701 }
1702
1703
1704
1705
1706
1707 func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
1708 queryerCtx, ok := dc.ci.(driver.QueryerContext)
1709 var queryer driver.Queryer
1710 if !ok {
1711 queryer, ok = dc.ci.(driver.Queryer)
1712 }
1713 if ok {
1714 var nvdargs []driver.NamedValue
1715 var rowsi driver.Rows
1716 var err error
1717 withLock(dc, func() {
1718 nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
1719 if err != nil {
1720 return
1721 }
1722 rowsi, err = ctxDriverQuery(ctx, queryerCtx, queryer, query, nvdargs)
1723 })
1724 if err != driver.ErrSkip {
1725 if err != nil {
1726 releaseConn(err)
1727 return nil, err
1728 }
1729
1730
1731 rows := &Rows{
1732 dc: dc,
1733 releaseConn: releaseConn,
1734 rowsi: rowsi,
1735 }
1736 rows.initContextClose(ctx, txctx)
1737 return rows, nil
1738 }
1739 }
1740
1741 var si driver.Stmt
1742 var err error
1743 withLock(dc, func() {
1744 si, err = ctxDriverPrepare(ctx, dc.ci, query)
1745 })
1746 if err != nil {
1747 releaseConn(err)
1748 return nil, err
1749 }
1750
1751 ds := &driverStmt{Locker: dc, si: si}
1752 rowsi, err := rowsiFromStatement(ctx, dc.ci, ds, args...)
1753 if err != nil {
1754 ds.Close()
1755 releaseConn(err)
1756 return nil, err
1757 }
1758
1759
1760
1761 rows := &Rows{
1762 dc: dc,
1763 releaseConn: releaseConn,
1764 rowsi: rowsi,
1765 closeStmt: ds,
1766 }
1767 rows.initContextClose(ctx, txctx)
1768 return rows, nil
1769 }
1770
1771
1772
1773
1774
1775
1776
1777 func (db *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {
1778 rows, err := db.QueryContext(ctx, query, args...)
1779 return &Row{rows: rows, err: err}
1780 }
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791 func (db *DB) QueryRow(query string, args ...interface{}) *Row {
1792 return db.QueryRowContext(context.Background(), query, args...)
1793 }
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805 func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
1806 var tx *Tx
1807 var err error
1808 for i := 0; i < maxBadConnRetries; i++ {
1809 tx, err = db.begin(ctx, opts, cachedOrNewConn)
1810 if err != driver.ErrBadConn {
1811 break
1812 }
1813 }
1814 if err == driver.ErrBadConn {
1815 return db.begin(ctx, opts, alwaysNewConn)
1816 }
1817 return tx, err
1818 }
1819
1820
1821
1822
1823
1824
1825 func (db *DB) Begin() (*Tx, error) {
1826 return db.BeginTx(context.Background(), nil)
1827 }
1828
1829 func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) {
1830 dc, err := db.conn(ctx, strategy)
1831 if err != nil {
1832 return nil, err
1833 }
1834 return db.beginDC(ctx, dc, dc.releaseConn, opts)
1835 }
1836
1837
1838 func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
1839 var txi driver.Tx
1840 keepConnOnRollback := false
1841 withLock(dc, func() {
1842 _, hasSessionResetter := dc.ci.(driver.SessionResetter)
1843 _, hasConnectionValidator := dc.ci.(driver.Validator)
1844 keepConnOnRollback = hasSessionResetter && hasConnectionValidator
1845 txi, err = ctxDriverBegin(ctx, opts, dc.ci)
1846 })
1847 if err != nil {
1848 release(err)
1849 return nil, err
1850 }
1851
1852
1853
1854 ctx, cancel := context.WithCancel(ctx)
1855 tx = &Tx{
1856 db: db,
1857 dc: dc,
1858 releaseConn: release,
1859 txi: txi,
1860 cancel: cancel,
1861 keepConnOnRollback: keepConnOnRollback,
1862 ctx: ctx,
1863 }
1864 go tx.awaitDone()
1865 return tx, nil
1866 }
1867
1868
1869 func (db *DB) Driver() driver.Driver {
1870 return db.connector.Driver()
1871 }
1872
1873
1874
1875 var ErrConnDone = errors.New("sql: connection is already closed")
1876
1877
1878
1879
1880
1881
1882
1883
1884 func (db *DB) Conn(ctx context.Context) (*Conn, error) {
1885 var dc *driverConn
1886 var err error
1887 for i := 0; i < maxBadConnRetries; i++ {
1888 dc, err = db.conn(ctx, cachedOrNewConn)
1889 if err != driver.ErrBadConn {
1890 break
1891 }
1892 }
1893 if err == driver.ErrBadConn {
1894 dc, err = db.conn(ctx, alwaysNewConn)
1895 }
1896 if err != nil {
1897 return nil, err
1898 }
1899
1900 conn := &Conn{
1901 db: db,
1902 dc: dc,
1903 }
1904 return conn, nil
1905 }
1906
1907 type releaseConn func(error)
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918 type Conn struct {
1919 db *DB
1920
1921
1922
1923
1924 closemu sync.RWMutex
1925
1926
1927
1928 dc *driverConn
1929
1930
1931
1932
1933 done int32
1934 }
1935
1936
1937
1938 func (c *Conn) grabConn(context.Context) (*driverConn, releaseConn, error) {
1939 if atomic.LoadInt32(&c.done) != 0 {
1940 return nil, nil, ErrConnDone
1941 }
1942 c.closemu.RLock()
1943 return c.dc, c.closemuRUnlockCondReleaseConn, nil
1944 }
1945
1946
1947 func (c *Conn) PingContext(ctx context.Context) error {
1948 dc, release, err := c.grabConn(ctx)
1949 if err != nil {
1950 return err
1951 }
1952 return c.db.pingDC(ctx, dc, release)
1953 }
1954
1955
1956
1957 func (c *Conn) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
1958 dc, release, err := c.grabConn(ctx)
1959 if err != nil {
1960 return nil, err
1961 }
1962 return c.db.execDC(ctx, dc, release, query, args)
1963 }
1964
1965
1966
1967 func (c *Conn) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
1968 dc, release, err := c.grabConn(ctx)
1969 if err != nil {
1970 return nil, err
1971 }
1972 return c.db.queryDC(ctx, nil, dc, release, query, args)
1973 }
1974
1975
1976
1977
1978
1979
1980
1981 func (c *Conn) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {
1982 rows, err := c.QueryContext(ctx, query, args...)
1983 return &Row{rows: rows, err: err}
1984 }
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994 func (c *Conn) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
1995 dc, release, err := c.grabConn(ctx)
1996 if err != nil {
1997 return nil, err
1998 }
1999 return c.db.prepareDC(ctx, dc, release, c, query)
2000 }
2001
2002
2003
2004
2005
2006
2007 func (c *Conn) Raw(f func(driverConn interface{}) error) (err error) {
2008 var dc *driverConn
2009 var release releaseConn
2010
2011
2012 dc, release, err = c.grabConn(nil)
2013 if err != nil {
2014 return
2015 }
2016 fPanic := true
2017 dc.Mutex.Lock()
2018 defer func() {
2019 dc.Mutex.Unlock()
2020
2021
2022
2023
2024 if fPanic {
2025 err = driver.ErrBadConn
2026 }
2027 release(err)
2028 }()
2029 err = f(dc.ci)
2030 fPanic = false
2031
2032 return
2033 }
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045 func (c *Conn) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
2046 dc, release, err := c.grabConn(ctx)
2047 if err != nil {
2048 return nil, err
2049 }
2050 return c.db.beginDC(ctx, dc, release, opts)
2051 }
2052
2053
2054
2055 func (c *Conn) closemuRUnlockCondReleaseConn(err error) {
2056 c.closemu.RUnlock()
2057 if err == driver.ErrBadConn {
2058 c.close(err)
2059 }
2060 }
2061
2062 func (c *Conn) txCtx() context.Context {
2063 return nil
2064 }
2065
2066 func (c *Conn) close(err error) error {
2067 if !atomic.CompareAndSwapInt32(&c.done, 0, 1) {
2068 return ErrConnDone
2069 }
2070
2071
2072
2073 c.closemu.Lock()
2074 defer c.closemu.Unlock()
2075
2076 c.dc.releaseConn(err)
2077 c.dc = nil
2078 c.db = nil
2079 return err
2080 }
2081
2082
2083
2084
2085
2086
2087 func (c *Conn) Close() error {
2088 return c.close(nil)
2089 }
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101 type Tx struct {
2102 db *DB
2103
2104
2105
2106
2107 closemu sync.RWMutex
2108
2109
2110
2111 dc *driverConn
2112 txi driver.Tx
2113
2114
2115
2116 releaseConn func(error)
2117
2118
2119
2120
2121
2122 done int32
2123
2124
2125
2126
2127 keepConnOnRollback bool
2128
2129
2130
2131 stmts struct {
2132 sync.Mutex
2133 v []*Stmt
2134 }
2135
2136
2137 cancel func()
2138
2139
2140 ctx context.Context
2141 }
2142
2143
2144
2145 func (tx *Tx) awaitDone() {
2146
2147
2148 <-tx.ctx.Done()
2149
2150
2151
2152
2153
2154
2155
2156 discardConnection := !tx.keepConnOnRollback
2157 tx.rollback(discardConnection)
2158 }
2159
2160 func (tx *Tx) isDone() bool {
2161 return atomic.LoadInt32(&tx.done) != 0
2162 }
2163
2164
2165
2166 var ErrTxDone = errors.New("sql: transaction has already been committed or rolled back")
2167
2168
2169
2170
2171 func (tx *Tx) close(err error) {
2172 tx.releaseConn(err)
2173 tx.dc = nil
2174 tx.txi = nil
2175 }
2176
2177
2178
2179 var hookTxGrabConn func()
2180
2181 func (tx *Tx) grabConn(ctx context.Context) (*driverConn, releaseConn, error) {
2182 select {
2183 default:
2184 case <-ctx.Done():
2185 return nil, nil, ctx.Err()
2186 }
2187
2188
2189
2190 tx.closemu.RLock()
2191 if tx.isDone() {
2192 tx.closemu.RUnlock()
2193 return nil, nil, ErrTxDone
2194 }
2195 if hookTxGrabConn != nil {
2196 hookTxGrabConn()
2197 }
2198 return tx.dc, tx.closemuRUnlockRelease, nil
2199 }
2200
2201 func (tx *Tx) txCtx() context.Context {
2202 return tx.ctx
2203 }
2204
2205
2206
2207
2208
2209 func (tx *Tx) closemuRUnlockRelease(error) {
2210 tx.closemu.RUnlock()
2211 }
2212
2213
2214 func (tx *Tx) closePrepared() {
2215 tx.stmts.Lock()
2216 defer tx.stmts.Unlock()
2217 for _, stmt := range tx.stmts.v {
2218 stmt.Close()
2219 }
2220 }
2221
2222
2223 func (tx *Tx) Commit() error {
2224
2225
2226
2227 select {
2228 default:
2229 case <-tx.ctx.Done():
2230 if atomic.LoadInt32(&tx.done) == 1 {
2231 return ErrTxDone
2232 }
2233 return tx.ctx.Err()
2234 }
2235 if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
2236 return ErrTxDone
2237 }
2238
2239
2240
2241
2242
2243 tx.cancel()
2244 tx.closemu.Lock()
2245 tx.closemu.Unlock()
2246
2247 var err error
2248 withLock(tx.dc, func() {
2249 err = tx.txi.Commit()
2250 })
2251 if err != driver.ErrBadConn {
2252 tx.closePrepared()
2253 }
2254 tx.close(err)
2255 return err
2256 }
2257
2258 var rollbackHook func()
2259
2260
2261
2262 func (tx *Tx) rollback(discardConn bool) error {
2263 if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
2264 return ErrTxDone
2265 }
2266
2267 if rollbackHook != nil {
2268 rollbackHook()
2269 }
2270
2271
2272
2273
2274
2275 tx.cancel()
2276 tx.closemu.Lock()
2277 tx.closemu.Unlock()
2278
2279 var err error
2280 withLock(tx.dc, func() {
2281 err = tx.txi.Rollback()
2282 })
2283 if err != driver.ErrBadConn {
2284 tx.closePrepared()
2285 }
2286 if discardConn {
2287 err = driver.ErrBadConn
2288 }
2289 tx.close(err)
2290 return err
2291 }
2292
2293
2294 func (tx *Tx) Rollback() error {
2295 return tx.rollback(false)
2296 }
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308 func (tx *Tx) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
2309 dc, release, err := tx.grabConn(ctx)
2310 if err != nil {
2311 return nil, err
2312 }
2313
2314 stmt, err := tx.db.prepareDC(ctx, dc, release, tx, query)
2315 if err != nil {
2316 return nil, err
2317 }
2318 tx.stmts.Lock()
2319 tx.stmts.v = append(tx.stmts.v, stmt)
2320 tx.stmts.Unlock()
2321 return stmt, nil
2322 }
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333 func (tx *Tx) Prepare(query string) (*Stmt, error) {
2334 return tx.PrepareContext(context.Background(), query)
2335 }
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352 func (tx *Tx) StmtContext(ctx context.Context, stmt *Stmt) *Stmt {
2353 dc, release, err := tx.grabConn(ctx)
2354 if err != nil {
2355 return &Stmt{stickyErr: err}
2356 }
2357 defer release(nil)
2358
2359 if tx.db != stmt.db {
2360 return &Stmt{stickyErr: errors.New("sql: Tx.Stmt: statement from different database used")}
2361 }
2362 var si driver.Stmt
2363 var parentStmt *Stmt
2364 stmt.mu.Lock()
2365 if stmt.closed || stmt.cg != nil {
2366
2367
2368
2369
2370
2371
2372 stmt.mu.Unlock()
2373 withLock(dc, func() {
2374 si, err = ctxDriverPrepare(ctx, dc.ci, stmt.query)
2375 })
2376 if err != nil {
2377 return &Stmt{stickyErr: err}
2378 }
2379 } else {
2380 stmt.removeClosedStmtLocked()
2381
2382
2383 for _, v := range stmt.css {
2384 if v.dc == dc {
2385 si = v.ds.si
2386 break
2387 }
2388 }
2389
2390 stmt.mu.Unlock()
2391
2392 if si == nil {
2393 var ds *driverStmt
2394 withLock(dc, func() {
2395 ds, err = stmt.prepareOnConnLocked(ctx, dc)
2396 })
2397 if err != nil {
2398 return &Stmt{stickyErr: err}
2399 }
2400 si = ds.si
2401 }
2402 parentStmt = stmt
2403 }
2404
2405 txs := &Stmt{
2406 db: tx.db,
2407 cg: tx,
2408 cgds: &driverStmt{
2409 Locker: dc,
2410 si: si,
2411 },
2412 parentStmt: parentStmt,
2413 query: stmt.query,
2414 }
2415 if parentStmt != nil {
2416 tx.db.addDep(parentStmt, txs)
2417 }
2418 tx.stmts.Lock()
2419 tx.stmts.v = append(tx.stmts.v, txs)
2420 tx.stmts.Unlock()
2421 return txs
2422 }
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439 func (tx *Tx) Stmt(stmt *Stmt) *Stmt {
2440 return tx.StmtContext(context.Background(), stmt)
2441 }
2442
2443
2444
2445 func (tx *Tx) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
2446 dc, release, err := tx.grabConn(ctx)
2447 if err != nil {
2448 return nil, err
2449 }
2450 return tx.db.execDC(ctx, dc, release, query, args)
2451 }
2452
2453
2454
2455
2456
2457
2458 func (tx *Tx) Exec(query string, args ...interface{}) (Result, error) {
2459 return tx.ExecContext(context.Background(), query, args...)
2460 }
2461
2462
2463 func (tx *Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
2464 dc, release, err := tx.grabConn(ctx)
2465 if err != nil {
2466 return nil, err
2467 }
2468
2469 return tx.db.queryDC(ctx, tx.ctx, dc, release, query, args)
2470 }
2471
2472
2473
2474
2475
2476 func (tx *Tx) Query(query string, args ...interface{}) (*Rows, error) {
2477 return tx.QueryContext(context.Background(), query, args...)
2478 }
2479
2480
2481
2482
2483
2484
2485
2486 func (tx *Tx) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {
2487 rows, err := tx.QueryContext(ctx, query, args...)
2488 return &Row{rows: rows, err: err}
2489 }
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500 func (tx *Tx) QueryRow(query string, args ...interface{}) *Row {
2501 return tx.QueryRowContext(context.Background(), query, args...)
2502 }
2503
2504
2505 type connStmt struct {
2506 dc *driverConn
2507 ds *driverStmt
2508 }
2509
2510
2511
2512 type stmtConnGrabber interface {
2513
2514
2515 grabConn(context.Context) (*driverConn, releaseConn, error)
2516
2517
2518
2519
2520 txCtx() context.Context
2521 }
2522
2523 var (
2524 _ stmtConnGrabber = &Tx{}
2525 _ stmtConnGrabber = &Conn{}
2526 )
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537 type Stmt struct {
2538
2539 db *DB
2540 query string
2541 stickyErr error
2542
2543 closemu sync.RWMutex
2544
2545
2546
2547
2548
2549
2550 cg stmtConnGrabber
2551 cgds *driverStmt
2552
2553
2554
2555
2556
2557
2558
2559 parentStmt *Stmt
2560
2561 mu sync.Mutex
2562 closed bool
2563
2564
2565
2566
2567
2568 css []connStmt
2569
2570
2571
2572 lastNumClosed uint64
2573 }
2574
2575
2576
2577 func (s *Stmt) ExecContext(ctx context.Context, args ...interface{}) (Result, error) {
2578 s.closemu.RLock()
2579 defer s.closemu.RUnlock()
2580
2581 var res Result
2582 strategy := cachedOrNewConn
2583 for i := 0; i < maxBadConnRetries+1; i++ {
2584 if i == maxBadConnRetries {
2585 strategy = alwaysNewConn
2586 }
2587 dc, releaseConn, ds, err := s.connStmt(ctx, strategy)
2588 if err != nil {
2589 if err == driver.ErrBadConn {
2590 continue
2591 }
2592 return nil, err
2593 }
2594
2595 res, err = resultFromStatement(ctx, dc.ci, ds, args...)
2596 releaseConn(err)
2597 if err != driver.ErrBadConn {
2598 return res, err
2599 }
2600 }
2601 return nil, driver.ErrBadConn
2602 }
2603
2604
2605
2606
2607
2608
2609 func (s *Stmt) Exec(args ...interface{}) (Result, error) {
2610 return s.ExecContext(context.Background(), args...)
2611 }
2612
2613 func resultFromStatement(ctx context.Context, ci driver.Conn, ds *driverStmt, args ...interface{}) (Result, error) {
2614 ds.Lock()
2615 defer ds.Unlock()
2616
2617 dargs, err := driverArgsConnLocked(ci, ds, args)
2618 if err != nil {
2619 return nil, err
2620 }
2621
2622 resi, err := ctxDriverStmtExec(ctx, ds.si, dargs)
2623 if err != nil {
2624 return nil, err
2625 }
2626 return driverResult{ds.Locker, resi}, nil
2627 }
2628
2629
2630
2631
2632
2633 func (s *Stmt) removeClosedStmtLocked() {
2634 t := len(s.css)/2 + 1
2635 if t > 10 {
2636 t = 10
2637 }
2638 dbClosed := atomic.LoadUint64(&s.db.numClosed)
2639 if dbClosed-s.lastNumClosed < uint64(t) {
2640 return
2641 }
2642
2643 s.db.mu.Lock()
2644 for i := 0; i < len(s.css); i++ {
2645 if s.css[i].dc.dbmuClosed {
2646 s.css[i] = s.css[len(s.css)-1]
2647 s.css = s.css[:len(s.css)-1]
2648 i--
2649 }
2650 }
2651 s.db.mu.Unlock()
2652 s.lastNumClosed = dbClosed
2653 }
2654
2655
2656
2657
2658 func (s *Stmt) connStmt(ctx context.Context, strategy connReuseStrategy) (dc *driverConn, releaseConn func(error), ds *driverStmt, err error) {
2659 if err = s.stickyErr; err != nil {
2660 return
2661 }
2662 s.mu.Lock()
2663 if s.closed {
2664 s.mu.Unlock()
2665 err = errors.New("sql: statement is closed")
2666 return
2667 }
2668
2669
2670
2671 if s.cg != nil {
2672 s.mu.Unlock()
2673 dc, releaseConn, err = s.cg.grabConn(ctx)
2674 if err != nil {
2675 return
2676 }
2677 return dc, releaseConn, s.cgds, nil
2678 }
2679
2680 s.removeClosedStmtLocked()
2681 s.mu.Unlock()
2682
2683 dc, err = s.db.conn(ctx, strategy)
2684 if err != nil {
2685 return nil, nil, nil, err
2686 }
2687
2688 s.mu.Lock()
2689 for _, v := range s.css {
2690 if v.dc == dc {
2691 s.mu.Unlock()
2692 return dc, dc.releaseConn, v.ds, nil
2693 }
2694 }
2695 s.mu.Unlock()
2696
2697
2698 withLock(dc, func() {
2699 ds, err = s.prepareOnConnLocked(ctx, dc)
2700 })
2701 if err != nil {
2702 dc.releaseConn(err)
2703 return nil, nil, nil, err
2704 }
2705
2706 return dc, dc.releaseConn, ds, nil
2707 }
2708
2709
2710
2711 func (s *Stmt) prepareOnConnLocked(ctx context.Context, dc *driverConn) (*driverStmt, error) {
2712 si, err := dc.prepareLocked(ctx, s.cg, s.query)
2713 if err != nil {
2714 return nil, err
2715 }
2716 cs := connStmt{dc, si}
2717 s.mu.Lock()
2718 s.css = append(s.css, cs)
2719 s.mu.Unlock()
2720 return cs.ds, nil
2721 }
2722
2723
2724
2725 func (s *Stmt) QueryContext(ctx context.Context, args ...interface{}) (*Rows, error) {
2726 s.closemu.RLock()
2727 defer s.closemu.RUnlock()
2728
2729 var rowsi driver.Rows
2730 strategy := cachedOrNewConn
2731 for i := 0; i < maxBadConnRetries+1; i++ {
2732 if i == maxBadConnRetries {
2733 strategy = alwaysNewConn
2734 }
2735 dc, releaseConn, ds, err := s.connStmt(ctx, strategy)
2736 if err != nil {
2737 if err == driver.ErrBadConn {
2738 continue
2739 }
2740 return nil, err
2741 }
2742
2743 rowsi, err = rowsiFromStatement(ctx, dc.ci, ds, args...)
2744 if err == nil {
2745
2746
2747 rows := &Rows{
2748 dc: dc,
2749 rowsi: rowsi,
2750
2751 }
2752
2753
2754 s.db.addDep(s, rows)
2755
2756
2757
2758 rows.releaseConn = func(err error) {
2759 releaseConn(err)
2760 s.db.removeDep(s, rows)
2761 }
2762 var txctx context.Context
2763 if s.cg != nil {
2764 txctx = s.cg.txCtx()
2765 }
2766 rows.initContextClose(ctx, txctx)
2767 return rows, nil
2768 }
2769
2770 releaseConn(err)
2771 if err != driver.ErrBadConn {
2772 return nil, err
2773 }
2774 }
2775 return nil, driver.ErrBadConn
2776 }
2777
2778
2779
2780
2781
2782
2783 func (s *Stmt) Query(args ...interface{}) (*Rows, error) {
2784 return s.QueryContext(context.Background(), args...)
2785 }
2786
2787 func rowsiFromStatement(ctx context.Context, ci driver.Conn, ds *driverStmt, args ...interface{}) (driver.Rows, error) {
2788 ds.Lock()
2789 defer ds.Unlock()
2790 dargs, err := driverArgsConnLocked(ci, ds, args)
2791 if err != nil {
2792 return nil, err
2793 }
2794 return ctxDriverStmtQuery(ctx, ds.si, dargs)
2795 }
2796
2797
2798
2799
2800
2801
2802
2803 func (s *Stmt) QueryRowContext(ctx context.Context, args ...interface{}) *Row {
2804 rows, err := s.QueryContext(ctx, args...)
2805 if err != nil {
2806 return &Row{err: err}
2807 }
2808 return &Row{rows: rows}
2809 }
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825 func (s *Stmt) QueryRow(args ...interface{}) *Row {
2826 return s.QueryRowContext(context.Background(), args...)
2827 }
2828
2829
2830 func (s *Stmt) Close() error {
2831 s.closemu.Lock()
2832 defer s.closemu.Unlock()
2833
2834 if s.stickyErr != nil {
2835 return s.stickyErr
2836 }
2837 s.mu.Lock()
2838 if s.closed {
2839 s.mu.Unlock()
2840 return nil
2841 }
2842 s.closed = true
2843 txds := s.cgds
2844 s.cgds = nil
2845
2846 s.mu.Unlock()
2847
2848 if s.cg == nil {
2849 return s.db.removeDep(s, s)
2850 }
2851
2852 if s.parentStmt != nil {
2853
2854
2855 return s.db.removeDep(s.parentStmt, s)
2856 }
2857 return txds.Close()
2858 }
2859
2860 func (s *Stmt) finalClose() error {
2861 s.mu.Lock()
2862 defer s.mu.Unlock()
2863 if s.css != nil {
2864 for _, v := range s.css {
2865 s.db.noteUnusedDriverStatement(v.dc, v.ds)
2866 v.dc.removeOpenStmt(v.ds)
2867 }
2868 s.css = nil
2869 }
2870 return nil
2871 }
2872
2873
2874
2875 type Rows struct {
2876 dc *driverConn
2877 releaseConn func(error)
2878 rowsi driver.Rows
2879 cancel func()
2880 closeStmt *driverStmt
2881
2882
2883
2884
2885
2886
2887 closemu sync.RWMutex
2888 closed bool
2889 lasterr error
2890
2891
2892
2893 lastcols []driver.Value
2894 }
2895
2896
2897
2898 func (rs *Rows) lasterrOrErrLocked(err error) error {
2899 if rs.lasterr != nil && rs.lasterr != io.EOF {
2900 return rs.lasterr
2901 }
2902 return err
2903 }
2904
2905
2906
2907 var bypassRowsAwaitDone = false
2908
2909 func (rs *Rows) initContextClose(ctx, txctx context.Context) {
2910 if ctx.Done() == nil && (txctx == nil || txctx.Done() == nil) {
2911 return
2912 }
2913 if bypassRowsAwaitDone {
2914 return
2915 }
2916 ctx, rs.cancel = context.WithCancel(ctx)
2917 go rs.awaitDone(ctx, txctx)
2918 }
2919
2920
2921
2922
2923
2924 func (rs *Rows) awaitDone(ctx, txctx context.Context) {
2925 var txctxDone <-chan struct{}
2926 if txctx != nil {
2927 txctxDone = txctx.Done()
2928 }
2929 select {
2930 case <-ctx.Done():
2931 case <-txctxDone:
2932 }
2933 rs.close(ctx.Err())
2934 }
2935
2936
2937
2938
2939
2940
2941
2942 func (rs *Rows) Next() bool {
2943 var doClose, ok bool
2944 withLock(rs.closemu.RLocker(), func() {
2945 doClose, ok = rs.nextLocked()
2946 })
2947 if doClose {
2948 rs.Close()
2949 }
2950 return ok
2951 }
2952
2953 func (rs *Rows) nextLocked() (doClose, ok bool) {
2954 if rs.closed {
2955 return false, false
2956 }
2957
2958
2959
2960 rs.dc.Lock()
2961 defer rs.dc.Unlock()
2962
2963 if rs.lastcols == nil {
2964 rs.lastcols = make([]driver.Value, len(rs.rowsi.Columns()))
2965 }
2966
2967 rs.lasterr = rs.rowsi.Next(rs.lastcols)
2968 if rs.lasterr != nil {
2969
2970 if rs.lasterr != io.EOF {
2971 return true, false
2972 }
2973 nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet)
2974 if !ok {
2975 return true, false
2976 }
2977
2978
2979
2980 if !nextResultSet.HasNextResultSet() {
2981 doClose = true
2982 }
2983 return doClose, false
2984 }
2985 return false, true
2986 }
2987
2988
2989
2990
2991
2992
2993
2994
2995
2996 func (rs *Rows) NextResultSet() bool {
2997 var doClose bool
2998 defer func() {
2999 if doClose {
3000 rs.Close()
3001 }
3002 }()
3003 rs.closemu.RLock()
3004 defer rs.closemu.RUnlock()
3005
3006 if rs.closed {
3007 return false
3008 }
3009
3010 rs.lastcols = nil
3011 nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet)
3012 if !ok {
3013 doClose = true
3014 return false
3015 }
3016
3017
3018
3019 rs.dc.Lock()
3020 defer rs.dc.Unlock()
3021
3022 rs.lasterr = nextResultSet.NextResultSet()
3023 if rs.lasterr != nil {
3024 doClose = true
3025 return false
3026 }
3027 return true
3028 }
3029
3030
3031
3032 func (rs *Rows) Err() error {
3033 rs.closemu.RLock()
3034 defer rs.closemu.RUnlock()
3035 return rs.lasterrOrErrLocked(nil)
3036 }
3037
3038 var errRowsClosed = errors.New("sql: Rows are closed")
3039 var errNoRows = errors.New("sql: no Rows available")
3040
3041
3042
3043 func (rs *Rows) Columns() ([]string, error) {
3044 rs.closemu.RLock()
3045 defer rs.closemu.RUnlock()
3046 if rs.closed {
3047 return nil, rs.lasterrOrErrLocked(errRowsClosed)
3048 }
3049 if rs.rowsi == nil {
3050 return nil, rs.lasterrOrErrLocked(errNoRows)
3051 }
3052 rs.dc.Lock()
3053 defer rs.dc.Unlock()
3054
3055 return rs.rowsi.Columns(), nil
3056 }
3057
3058
3059
3060 func (rs *Rows) ColumnTypes() ([]*ColumnType, error) {
3061 rs.closemu.RLock()
3062 defer rs.closemu.RUnlock()
3063 if rs.closed {
3064 return nil, rs.lasterrOrErrLocked(errRowsClosed)
3065 }
3066 if rs.rowsi == nil {
3067 return nil, rs.lasterrOrErrLocked(errNoRows)
3068 }
3069 rs.dc.Lock()
3070 defer rs.dc.Unlock()
3071
3072 return rowsColumnInfoSetupConnLocked(rs.rowsi), nil
3073 }
3074
3075
3076 type ColumnType struct {
3077 name string
3078
3079 hasNullable bool
3080 hasLength bool
3081 hasPrecisionScale bool
3082
3083 nullable bool
3084 length int64
3085 databaseType string
3086 precision int64
3087 scale int64
3088 scanType reflect.Type
3089 }
3090
3091
3092 func (ci *ColumnType) Name() string {
3093 return ci.name
3094 }
3095
3096
3097
3098
3099
3100
3101 func (ci *ColumnType) Length() (length int64, ok bool) {
3102 return ci.length, ci.hasLength
3103 }
3104
3105
3106
3107 func (ci *ColumnType) DecimalSize() (precision, scale int64, ok bool) {
3108 return ci.precision, ci.scale, ci.hasPrecisionScale
3109 }
3110
3111
3112
3113
3114 func (ci *ColumnType) ScanType() reflect.Type {
3115 return ci.scanType
3116 }
3117
3118
3119
3120 func (ci *ColumnType) Nullable() (nullable, ok bool) {
3121 return ci.nullable, ci.hasNullable
3122 }
3123
3124
3125
3126
3127
3128
3129
3130 func (ci *ColumnType) DatabaseTypeName() string {
3131 return ci.databaseType
3132 }
3133
3134 func rowsColumnInfoSetupConnLocked(rowsi driver.Rows) []*ColumnType {
3135 names := rowsi.Columns()
3136
3137 list := make([]*ColumnType, len(names))
3138 for i := range list {
3139 ci := &ColumnType{
3140 name: names[i],
3141 }
3142 list[i] = ci
3143
3144 if prop, ok := rowsi.(driver.RowsColumnTypeScanType); ok {
3145 ci.scanType = prop.ColumnTypeScanType(i)
3146 } else {
3147 ci.scanType = reflect.TypeOf(new(interface{})).Elem()
3148 }
3149 if prop, ok := rowsi.(driver.RowsColumnTypeDatabaseTypeName); ok {
3150 ci.databaseType = prop.ColumnTypeDatabaseTypeName(i)
3151 }
3152 if prop, ok := rowsi.(driver.RowsColumnTypeLength); ok {
3153 ci.length, ci.hasLength = prop.ColumnTypeLength(i)
3154 }
3155 if prop, ok := rowsi.(driver.RowsColumnTypeNullable); ok {
3156 ci.nullable, ci.hasNullable = prop.ColumnTypeNullable(i)
3157 }
3158 if prop, ok := rowsi.(driver.RowsColumnTypePrecisionScale); ok {
3159 ci.precision, ci.scale, ci.hasPrecisionScale = prop.ColumnTypePrecisionScale(i)
3160 }
3161 }
3162 return list
3163 }
3164
3165
3166
3167
3168
3169
3170
3171
3172
3173
3174
3175
3176
3177
3178
3179
3180
3181
3182
3183
3184
3185
3186
3187
3188
3189
3190
3191
3192
3193
3194
3195
3196
3197
3198
3199
3200
3201
3202
3203
3204
3205
3206
3207
3208
3209
3210
3211
3212
3213
3214
3215
3216
3217
3218
3219
3220
3221
3222
3223
3224
3225 func (rs *Rows) Scan(dest ...interface{}) error {
3226 rs.closemu.RLock()
3227
3228 if rs.lasterr != nil && rs.lasterr != io.EOF {
3229 rs.closemu.RUnlock()
3230 return rs.lasterr
3231 }
3232 if rs.closed {
3233 err := rs.lasterrOrErrLocked(errRowsClosed)
3234 rs.closemu.RUnlock()
3235 return err
3236 }
3237 rs.closemu.RUnlock()
3238
3239 if rs.lastcols == nil {
3240 return errors.New("sql: Scan called without calling Next")
3241 }
3242 if len(dest) != len(rs.lastcols) {
3243 return fmt.Errorf("sql: expected %d destination arguments in Scan, not %d", len(rs.lastcols), len(dest))
3244 }
3245 for i, sv := range rs.lastcols {
3246 err := convertAssignRows(dest[i], sv, rs)
3247 if err != nil {
3248 return fmt.Errorf(`sql: Scan error on column index %d, name %q: %w`, i, rs.rowsi.Columns()[i], err)
3249 }
3250 }
3251 return nil
3252 }
3253
3254
3255
3256 var rowsCloseHook = func() func(*Rows, *error) { return nil }
3257
3258
3259
3260
3261
3262 func (rs *Rows) Close() error {
3263 return rs.close(nil)
3264 }
3265
3266 func (rs *Rows) close(err error) error {
3267 rs.closemu.Lock()
3268 defer rs.closemu.Unlock()
3269
3270 if rs.closed {
3271 return nil
3272 }
3273 rs.closed = true
3274
3275 if rs.lasterr == nil {
3276 rs.lasterr = err
3277 }
3278
3279 withLock(rs.dc, func() {
3280 err = rs.rowsi.Close()
3281 })
3282 if fn := rowsCloseHook(); fn != nil {
3283 fn(rs, &err)
3284 }
3285 if rs.cancel != nil {
3286 rs.cancel()
3287 }
3288
3289 if rs.closeStmt != nil {
3290 rs.closeStmt.Close()
3291 }
3292 rs.releaseConn(err)
3293 return err
3294 }
3295
3296
3297 type Row struct {
3298
3299 err error
3300 rows *Rows
3301 }
3302
3303
3304
3305
3306
3307
3308 func (r *Row) Scan(dest ...interface{}) error {
3309 if r.err != nil {
3310 return r.err
3311 }
3312
3313
3314
3315
3316
3317
3318
3319
3320
3321
3322
3323
3324
3325
3326 defer r.rows.Close()
3327 for _, dp := range dest {
3328 if _, ok := dp.(*RawBytes); ok {
3329 return errors.New("sql: RawBytes isn't allowed on Row.Scan")
3330 }
3331 }
3332
3333 if !r.rows.Next() {
3334 if err := r.rows.Err(); err != nil {
3335 return err
3336 }
3337 return ErrNoRows
3338 }
3339 err := r.rows.Scan(dest...)
3340 if err != nil {
3341 return err
3342 }
3343
3344 return r.rows.Close()
3345 }
3346
3347
3348
3349
3350
3351 func (r *Row) Err() error {
3352 return r.err
3353 }
3354
3355
3356 type Result interface {
3357
3358
3359
3360
3361
3362 LastInsertId() (int64, error)
3363
3364
3365
3366
3367 RowsAffected() (int64, error)
3368 }
3369
3370 type driverResult struct {
3371 sync.Locker
3372 resi driver.Result
3373 }
3374
3375 func (dr driverResult) LastInsertId() (int64, error) {
3376 dr.Lock()
3377 defer dr.Unlock()
3378 return dr.resi.LastInsertId()
3379 }
3380
3381 func (dr driverResult) RowsAffected() (int64, error) {
3382 dr.Lock()
3383 defer dr.Unlock()
3384 return dr.resi.RowsAffected()
3385 }
3386
3387 func stack() string {
3388 var buf [2 << 10]byte
3389 return string(buf[:runtime.Stack(buf[:], false)])
3390 }
3391
3392
3393 func withLock(lk sync.Locker, fn func()) {
3394 lk.Lock()
3395 defer lk.Unlock()
3396 fn()
3397 }
3398
View as plain text