finally found that time.Sleep (100 * time.Millisecond)
will affect the database. After removing it, I didn"t encounter the problem of busy buffer
I have encountered some problems in converting a piece of data from Table 1 to Table 2 in the past two days.
error prompt:
post total: 2041781
bbs_post
bbs_post
2040031 / 2041781 post: 0errlongDataArr: []
[mysql] 2018/03/31 02:04:52 packets.go:72: read tcp 172.16.0.11:19472->123.207.235.24:3306: read: connection reset by peer
[mysql] 2018/03/31 02:04:52 packets.go:436: busy buffer
bbs_post (2040031)
the code is as follows:
GitHub: https://github.com/skiy/xiuno.
package xn3ToXn4
import (
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/skiy/xiuno-tools/lib"
"log"
"strings"
"time"
)
type post struct {
db3str,
db4str dbstr
fields postFields
waitFix,
count,
total int
}
type postFields struct {
tid, pid, uid, isfirst, create_date, userip, images, files, message, message_fmt string
}
func (this *post) update() {
if !lib.AutoUpdate(this.db4str.Auto, this.db4str.DBPre+"post") {
return
}
currentTime := time.Now()
err := this.toUpdate(this.waitFix)
if err != nil {
log.Fatalln(" " + this.db3str.DBPre + "post : " + err.Error())
}
this.toUpdate(this.waitFix)
fmt.Printf(" %spost (%d)\r\n", this.db3str.DBPre, this.count)
fmt.Println("\r\n post : ", time.Since(currentTime))
}
func (this *post) toUpdate(fixFlag int) (err error) {
xn3pre := this.db3str.DBPre
xn4pre := this.db4str.DBPre
xn3db, err := this.db3str.Connect()
query, err := xn3db.Query("select * from " + xn3pre + "post")
if err != nil {
fmt.Println("", err.Error())
return
}
defer query.Close()
oldField := "tid,pid,uid,isfirst,create_date,userip,images,files,message"
fields := oldField + ",message_fmt"
msgFmtExist := false
cols, _ := query.Columns()
for _, v := range cols {
if v == "message_fmt" {
oldField += ",message_fmt"
msgFmtExist = true
break
}
}
xn3 := fmt.Sprintf("SELECT %s FROM %spost", oldField, xn3pre)
xn5 := fmt.Sprintf("INSERT INTO %spost (%s) VALUES ", xn4pre, fields)
qmark := this.db3str.FieldMakeValue(fields)
qmark2 := this.db3str.FieldMakeQmark(fields, "?")
xn4 := fmt.Sprintf("INSERT INTO %spost (%s) VALUES (%s)", xn4pre, fields, qmark2)
//fmt.Println("Xiuno 3: " + xn3)
//fmt.Println("Xiuno 5: " + xn5)
if fixFlag == 1 {
return this.fixPost(oldField, xn4, msgFmtExist)
}
xn3count := fmt.Sprintf("SELECT COUNT(*) AS count FROM %spost", xn3pre)
rows := xn3db.QueryRow(xn3count)
rows.Scan(&this.total)
fmt.Printf("post total: %d \r\n", this.total)
data, err := xn3db.Query(xn3)
if err != nil {
log.Fatalln(xn3, err.Error())
}
defer data.Close()
xn4db, _ := this.db4str.Connect()
xn4db.SetMaxIdleConns(0)
xn4db.SetMaxOpenConns(100)
xn4db.SetConnMaxLifetime(time.Second * 10)
xn4Clear := "TRUNCATE `" + xn4pre + "post`"
_, err = xn4db.Exec(xn4Clear)
if err != nil {
log.Fatalf("::: %spost : "+err.Error(), xn4pre)
}
fmt.Printf(" %spost \r\n", xn4pre)
fmt.Printf(" %spost \r\n", xn4pre)
//dataArr := make([]postFields, ...)
var dataArr []postFields
var longDataArr, errLongDataArr [][]postFields
var sqlStr string
var sqlArr []string
start := 0
times := 0
offset := 50
maxTimes := 50
var field postFields
for data.Next() {
if msgFmtExist {
err = data.Scan(
&field.tid,
&field.pid,
&field.uid,
&field.isfirst,
&field.create_date,
&field.userip,
&field.images,
&field.files,
&field.message,
&field.message_fmt)
} else {
err = data.Scan(
&field.tid,
&field.pid,
&field.uid,
&field.isfirst,
&field.create_date,
&field.userip,
&field.images,
&field.files,
&field.message)
}
if err != nil {
fmt.Printf("(%s) \r\n", err.Error())
} else {
if field.message_fmt == "" {
field.message_fmt = field.message
}
dataArr = append(dataArr, field)
startPP
if start%offset == 0 {
timesPP
longDataArr = append(longDataArr, dataArr)
dataArr = nil
if times >= maxTimes {
for _, v := range longDataArr {
sqlArr = this.makeFileSql(qmark, v)
sqlStr = xn5 + strings.Join(sqlArr, ",")
_, err = xn4db.Exec(sqlStr)
if err != nil {
fmt.Printf("%d - v - (%s) \r\n", start, err.Error())
fmt.Println(v)
errLongDataArr = append(errLongDataArr, v)
} else {
this.count += len(v)
lib.UpdateProcess(fmt.Sprintf(" %d / %d post: %d", this.count, this.total, len(errLongDataArr)))
}
}
times = 0
longDataArr = nil
}
start = 0
}
}
}
if err = data.Err(); err != nil {
log.Fatalln("dataErr: " + err.Error())
}
if dataArr != nil {
sqlArr = this.makeFileSql(qmark, dataArr)
sqlStr = xn5 + strings.Join(sqlArr, ",")
_, err = xn4db.Exec(sqlStr)
if err != nil {
fmt.Printf("dataArr - (%s) \r\n", err.Error())
errLongDataArr = append(errLongDataArr, dataArr)
}
this.count += len(dataArr)
lib.UpdateProcess(fmt.Sprintf(" %d / %d post: %d", this.count, this.total, len(errLongDataArr)))
}
fmt.Println("errlongDataArr:", errLongDataArr)
//
if errLongDataArr != nil {
stmt, err := xn4db.Prepare(xn4)
if err != nil {
log.Fatalln(": " + err.Error())
}
start = 0
errCount := 0
for _, values := range errLongDataArr {
for _, value := range values {
startPP
fmt.Sprintf(": %d \r\n", start)
_, err = stmt.Exec(
&value.tid,
&value.pid,
&value.uid,
&value.isfirst,
&value.create_date,
&value.userip,
&value.images,
&value.files,
&value.message,
&value.message_fmt)
if err != nil {
fmt.Printf("(%s) \r\n", err.Error())
errCountPP
} else {
this.countPP
lib.UpdateProcess(fmt.Sprintf(" %d / %d post: %d", this.count, this.total, errCount))
}
}
}
}
if err != nil {
log.Fatalln("txErr: " + err.Error())
}
defer xn3db.Close()
defer xn4db.Close()
if this.total-this.count > 0 {
//,
this.waitFix = 1
}
return err
}
func (this *post) fixPost(oldField, xn4 string, msgFmtExist bool) (err error) {
sql := "SELECT " + oldField + " FROM %s WHERE pid NOT IN (SELECT pid FROM %s)"
xn3dbName := this.db3str.DBName + "." + this.db3str.DBPre + "post"
xn4dbName := this.db4str.DBName + "." + this.db4str.DBPre + "post"
xn3sql := fmt.Sprintf(sql, xn3dbName, xn4dbName)
xn3db, err := this.db3str.Connect()
data, err := xn3db.Query(xn3sql)
if err != nil {
fmt.Println("", err.Error())
return
}
defer xn3db.Close()
if data != nil {
xn4db, err := this.db4str.Connect()
stmt, err := xn4db.Prepare(xn4)
if err != nil {
log.Fatalln(": " + err.Error())
}
var field postFields
for data.Next() {
errCount := 0
if msgFmtExist {
err = data.Scan(
&field.tid,
&field.pid,
&field.uid,
&field.isfirst,
&field.create_date,
&field.userip,
&field.images,
&field.files,
&field.message,
&field.message_fmt)
} else {
err = data.Scan(
&field.tid,
&field.pid,
&field.uid,
&field.isfirst,
&field.create_date,
&field.userip,
&field.images,
&field.files,
&field.message)
}
if field.message_fmt == "" {
field.message_fmt = field.message
}
_, err = stmt.Exec(
&field.tid,
&field.pid,
&field.uid,
&field.isfirst,
&field.create_date,
&field.userip,
&field.images,
&field.files,
&field.message,
&field.message_fmt)
if err != nil {
fmt.Printf("PID (%s) (%s) \r\n", field.pid, err.Error())
errCountPP
} else {
this.countPP
lib.UpdateProcess(fmt.Sprintf(" %d / %d post: %d", this.count, this.total, errCount))
}
}
}
return
}
func (this *post) makeFileSql(qmark string, dataArr []postFields) (dataStr []string) {
for _, field := range dataArr {
dataStr = append(dataStr, "("+fmt.Sprintf(qmark,
field.tid,
field.pid,
field.uid,
field.isfirst,
field.create_date,
field.userip,
field.images,
field.files,
field.message,
field.message_fmt)+")")
}
return
}