2015-09-20 70 views
1

我写了下面的ETL工具,它从MySQL中获取数据,转换成JSON &打印在屏幕调试坚持走程序

package main 

import (
    "bytes" 
    "database/sql" 
    "encoding/json" 
    "fmt" 
    "log" 
    "strconv" 
    "strings" 
    "time" 

    _ "github.com/go-sql-driver/mysql" 
) 

const dbformat = "2006-01-02 15:04:05" 

type MysqlReceipt struct { 
    Id    int 
    Amount   sql.NullFloat64 
    Cc_last4   sql.NullString 
    Employee_id  sql.NullString 
    Employee_name sql.NullString 
    Is_test   byte 
    Menu_items  sql.NullString 
    Payable   sql.NullFloat64 
    Pos_type   sql.NullString 
    Pos_version  sql.NullString 
    Punchh_key  string 
    Receipt_datetime sql.NullString 
    Subtotal_amount sql.NullFloat64 
    Transaction_no sql.NullString 
    Business_id  int 
    Location_id  int 
    Created_at  string 
    Updated_at  sql.NullString 
    Revenue_code  sql.NullString 
    Revenue_id  sql.NullString 
    Status   sql.NullString 
    Ipv4_addr  sql.NullString 
} 

type Menu_item struct { 
    id, name, family, major_group, item_type string 
    qty          int 
    amount         float64 
} 

type BigReceipt struct { 
    Id      int 
    Amount     float64 
    Cc_last4     string 
    Employee_id    string `json:",omitempty"` 
    Employee_name   string `json:",omitempty"` 
    Is_test     byte 
    Menu_item_name   string 
    Menu_item_id    string 
    Menu_item_amount   float64 
    Menu_item_family   string 
    Menu_item_major_group string 
    Menu_item_type   string 
    Menu_item_qty   int 
    Payable     float64 
    Pos_type     string `json:",omitempty"` 
    Pos_version    string `json:",omitempty"` 
    Punchh_key    string 
    Receipt_datetime   string 
    Subtotal_amount   float64 
    Transaction_no   string `json:",omitempty"` 
    Business_id, Location_id int 
    Created_at    time.Time 
    Updated_at    time.Time `json:",omitempty"` 
    Revenue_code    string `json:",omitempty"` 
    Revenue_id    string `json:",omitempty"` 
    Status     string `json:",omitempty"` 
    Ipv4_addr    string `json:",omitempty"` 
    Stored_at    int64 
} 

func (m Menu_item) ValidItem() bool { 
    if m.item_type == "M" || m.item_type == "D" { 
     return true 
    } else { 
     return false 
    } 
} 

func main() { 
    db, err := sql.Open("mysql", "root:[email protected](xxxxxxx.us-east-1.rds.amazonaws.com:3306)/db_name_goes_here") 
    if err != nil { 
     log.Fatal(err) 
    } 
    err = db.Ping() 
    if err != nil { 
     log.Fatal(err) 
    } 
    defer db.Close() 
    rows, err := db.Query(`select id,amount,cc_last4,employee_id,employee_name,is_test,menu_items,payable,pos_type, 
    pos_version,punchh_key,receipt_datetime,subtotal_amount,transaction_no,business_id,location_id,created_at, 
    updated_at,revenue_code,revenue_id,status,ipv4_addr from receipts`) 
    if err != nil { 
     log.Fatal(err) 
    } 
    defer rows.Close() 
    for rows.Next() { 
     var mr MysqlReceipt 
     err = rows.Scan(&mr.Id, &mr.Amount, &mr.Cc_last4, &mr.Employee_id, &mr.Employee_name, &mr.Is_test, &mr.Menu_items, 
      &mr.Payable, &mr.Pos_type, &mr.Pos_version, &mr.Punchh_key, &mr.Receipt_datetime, &mr.Subtotal_amount, &mr.Transaction_no, 
      &mr.Business_id, &mr.Location_id, &mr.Created_at, &mr.Updated_at, &mr.Revenue_code, &mr.Revenue_id, &mr.Status, &mr.Ipv4_addr) 
     if err != nil { 
      log.Fatal(err) 
     } 
     if !mr.Menu_items.Valid { 
      continue 
     } 
     r := BigReceipt{Id: mr.Id, 
      Amount:   mr.Amount.Float64, 
      Cc_last4:   mr.Cc_last4.String, 
      Employee_id:  mr.Employee_id.String, 
      Employee_name: mr.Employee_name.String, 
      Is_test:   mr.Is_test, 
      Payable:   mr.Payable.Float64, 
      Pos_type:   mr.Pos_type.String, 
      Pos_version:  mr.Pos_version.String, 
      Punchh_key:  mr.Punchh_key, 
      Receipt_datetime: mr.Receipt_datetime.String, 
      Subtotal_amount: mr.Subtotal_amount.Float64, 
      Transaction_no: mr.Transaction_no.String, 
      Business_id:  mr.Business_id, 
      Location_id:  mr.Location_id, 
      Revenue_code:  mr.Revenue_code.String, 
      Revenue_id:  mr.Revenue_id.String, 
      Status:   mr.Status.String, 
      Ipv4_addr:  mr.Ipv4_addr.String, 
      Stored_at:  time.Now().Unix(), 
     } 
     r.Created_at = datetimeParse(mr.Created_at) 
     if mr.Updated_at.Valid { 
      r.Updated_at = datetimeParse(mr.Updated_at.String) 
     } 
     menuItems := strings.Split(mr.Menu_items.String, "^") 
     items := parseMenuItems(menuItems) 
     for _, v := range items { 
      r.Menu_item_name = v.name 
      r.Menu_item_id = v.id 
      r.Menu_item_amount = v.amount 
      r.Menu_item_family = v.family 
      r.Menu_item_major_group = v.major_group 
      r.Menu_item_type = v.item_type 
      r.Menu_item_qty = v.qty 
      b, err := json.Marshal(r) 
      if err != nil { 
       log.Fatal(err) 
      } 
      fmt.Println(r.Id) 
      var out bytes.Buffer 
      json.Compact(&out, b) 
      fmt.Println(string(b)) 
     } 
    } 
    err = rows.Err() 
    if err != nil { 
     log.Fatal(err) 
    } 
} 

func datetimeParse(dateStr string) time.Time { 
    datetime, err := time.Parse(dbformat, dateStr) 
    if err != nil { 
     log.Fatal(err) 
    } 
    return datetime 
} 

func parseMenuItems(menuItems []string) []Menu_item { 
    var items []Menu_item 
    var item Menu_item 
    for _, v := range menuItems { 
     itemParts := strings.Split(v, "|") 

     item.name = itemParts[0] 
     item.qty, _ = strconv.Atoi(itemParts[1]) 
     item.amount, _ = strconv.ParseFloat(itemParts[2], 64) 
     item.item_type = strings.ToUpper(itemParts[3]) 
     item.id = itemParts[4] 
     item.family = itemParts[5] 
     item.major_group = itemParts[6] 
     if item.ValidItem() { 
      items = append(items, item) 
     } else { 
      continue 
     } 
    } 
    return items 
} 

现在开始,这工作正常,测试数据库,但在生产数据库,在那里有数百万行,在获取1,000行后卡住&停止在屏幕上打印。我让它跑了一晚。

在早晨, 我把它QUIT信号&得到了下面的堆栈跟踪

SIGQUIT: quit 
PC=0x5fecb m=0 

goroutine 0 [idle]: 
runtime.mach_semaphore_wait(0xe03, 0x0, 0x0, 0x0, 0x0, 0x407520, 0x52db9, 0xffffffffffffffff, 0x0, 0x7fff5fbff0fc, ...) 
    /usr/local/Cellar/go/1.5/libexec/src/runtime/sys_darwin_amd64.s:407 +0xb 
runtime.semasleep1(0xffffffffffffffff, 0x0) 
    /usr/local/Cellar/go/1.5/libexec/src/runtime/os1_darwin.go:385 +0xe5 
runtime.semasleep.func1() 
    /usr/local/Cellar/go/1.5/libexec/src/runtime/os1_darwin.go:401 +0x29 
runtime.systemstack(0x7fff5fbff100) 
    /usr/local/Cellar/go/1.5/libexec/src/runtime/asm_amd64.s:278 +0xab 
runtime.semasleep(0xffffffffffffffff, 0x0) 
    /usr/local/Cellar/go/1.5/libexec/src/runtime/os1_darwin.go:402 +0x36 
runtime.notesleep(0x407970) 
    /usr/local/Cellar/go/1.5/libexec/src/runtime/lock_sema.go:169 +0x100 
runtime.stopm() 
    /usr/local/Cellar/go/1.5/libexec/src/runtime/proc1.go:1128 +0x112 
runtime.findrunnable(0xc82001d500, 0x0) 
    /usr/local/Cellar/go/1.5/libexec/src/runtime/proc1.go:1530 +0x69e 
runtime.schedule() 
    /usr/local/Cellar/go/1.5/libexec/src/runtime/proc1.go:1639 +0x267 
runtime.park_m(0xc820000180) 
    /usr/local/Cellar/go/1.5/libexec/src/runtime/proc1.go:1698 +0x18b 
runtime.mcall(0x7fff5fbff280) 
    /usr/local/Cellar/go/1.5/libexec/src/runtime/asm_amd64.s:204 +0x5b 

goroutine 1 [IO wait]: 
net.runtime_pollWait(0x7a1000, 0x72, 0xc82000a2d0) 
    /usr/local/Cellar/go/1.5/libexec/src/runtime/netpoll.go:157 +0x60 
net.(*pollDesc).Wait(0xc8200a4060, 0x72, 0x0, 0x0) 
    /usr/local/Cellar/go/1.5/libexec/src/net/fd_poll_runtime.go:73 +0x3a 
net.(*pollDesc).WaitRead(0xc8200a4060, 0x0, 0x0) 
    /usr/local/Cellar/go/1.5/libexec/src/net/fd_poll_runtime.go:78 +0x36 
net.(*netFD).Read(0xc8200a4000, 0xc820372077, 0x3f89, 0x3f89, 0x0, 0x760050, 0xc82000a2d0) 
    /usr/local/Cellar/go/1.5/libexec/src/net/fd_unix.go:232 +0x23a 
net.(*conn).Read(0xc8200a6000, 0xc820372077, 0x3f89, 0x3f89, 0x0, 0x0, 0x0) 
    /usr/local/Cellar/go/1.5/libexec/src/net/net.go:172 +0xe4 
github.com/go-sql-driver/mysql.(*buffer).fill(0xc820080080, 0x102, 0x0, 0x0) 
    /Users/gaurish/golang/src/github.com/go-sql-driver/mysql/buffer.go:57 +0x2b5 
github.com/go-sql-driver/mysql.(*buffer).readNext(0xc820080080, 0x102, 0x0, 0x0, 0x0, 0x0, 0x0) 
    /Users/gaurish/golang/src/github.com/go-sql-driver/mysql/buffer.go:86 +0x55 
github.com/go-sql-driver/mysql.(*mysqlConn).readPacket(0xc820080080, 0x0, 0x0, 0x0, 0x0, 0x0) 
    /Users/gaurish/golang/src/github.com/go-sql-driver/mysql/packets.go:57 +0x47a 
github.com/go-sql-driver/mysql.(*mysqlConn).readUntilEOF(0xc820080080, 0x0, 0x0) 
    /Users/gaurish/golang/src/github.com/go-sql-driver/mysql/packets.go:698 +0x2d 
github.com/go-sql-driver/mysql.(*mysqlRows).Close(0xc8200a0120, 0x0, 0x0) 
    /Users/gaurish/golang/src/github.com/go-sql-driver/mysql/rows.go:67 +0x73 
database/sql.(*Rows).Close(0xc8200aa060, 0x0, 0x0) 
    /usr/local/Cellar/go/1.5/libexec/src/database/sql/sql.go:1710 +0x92 
main.parseMenuItems(0xc82036e480, 0x44, 0x44, 0x0, 0x0, 0x0) 
    /Users/gaurish/code/practice/mysql2json/mysql2json.go:186 +0x468 
main.main() 
    /Users/gaurish/code/practice/mysql2json/mysql2json.go:142 +0xf2e 

goroutine 17 [syscall, locked to thread]: 
runtime.goexit() 
    /usr/local/Cellar/go/1.5/libexec/src/runtime/asm_amd64.s:1696 +0x1 

goroutine 5 [chan receive]: 
database/sql.(*DB).connectionOpener(0xc820088960) 
    /usr/local/Cellar/go/1.5/libexec/src/database/sql/sql.go:634 +0x45 
created by database/sql.Open 
    /usr/local/Cellar/go/1.5/libexec/src/database/sql/sql.go:481 +0x336 

rax 0xe 
rbx 0xe03 
rcx 0x7fff5fbff088 
rdx 0x7fff5fbff100 
rdi 0xe03 
rsi 0x407520 
rbp 0x407860 
rsp 0x7fff5fbff088 
r8  0x407860 
r9  0x0 
r10 0x0 
r11 0x286 
r12 0x2c 
r13 0x4fc3ed4b8b0 
r14 0x14059837c8b46200 
r15 0x38 
rip 0x5fecb 
rflags 0x286 
cs  0x7 
fs  0x0 
gs  0x0 
exit status 2 

而且,我试图通过注释掉defer rows.Close()调用进一步调试它。现在经过1000行左右,而不是拖延,它会立即返回以下错误:

panic: runtime error: index out of range 

goroutine 1 [running]: 
main.parseMenuItems(0xc820366900, 0x44, 0x44, 0x0, 0x0, 0x0) 
    /Users/gaurish/code/practice/mysql2json/mysql2json.go:186 +0x468 
main.main() 
    /Users/gaurish/code/practice/mysql2json/mysql2json.go:142 +0xf03 

goroutine 17 [syscall, locked to thread]: 
runtime.goexit() 
    /usr/local/Cellar/go/1.5/libexec/src/runtime/asm_amd64.s:1696 +0x1 

goroutine 5 [runnable]: 
database/sql.(*DB).connectionOpener(0xc820088780) 
    /usr/local/Cellar/go/1.5/libexec/src/database/sql/sql.go:634 +0x45 
created by database/sql.Open 
    /usr/local/Cellar/go/1.5/libexec/src/database/sql/sql.go:481 +0x336 
exit status 2 

问题:

  1. 它为什么会得到立即取1000(appox)行后卡住?
  2. 如何评论出defer rows.Close()停止卡住&立即恐慌?
  3. 有没有更好的方法来编写这个程序?

最后,我很抱歉,如果上述任何这些都是愚蠢的问题或问题太长。我是新手,想去学。

+0

第二个错误似乎是因为数据库中的菜单项没有第六个''''-separated元素,所以'itemParts [5]'失败。您可以在使用硬编码索引访问它之前检查'itemParts'的长度。不知道悬挂。 – twotwotwo

+0

'rows.Close()'中的挂起看起来有点像[这个MySQL驱动程序错误](https://github.com/go-sql-driver/mysql/issues/285),但我不确定。 – twotwotwo

回答

0

这是司机的问题。通过删除延迟请求&检查数组边界来修复它。