大数据量excel或csv对比数据库实现方案

您所在的位置:网站首页 excel数据写入access数据库 大数据量excel或csv对比数据库实现方案

大数据量excel或csv对比数据库实现方案

2023-07-01 04:08| 来源: 网络整理| 查看: 265

需求

两个系统之间同步数据,需要最后校验一下数据的差异,给一个excel或csv文件,与数据库的数据进行对比。

实现方案对比

1.数据库进行一次查询查出所有数据,按标识字段组装成map,逐条遍历csv数据与其进行比对

2.逐条遍历csv数据,每次去查询一下数据库,获取到数据进行对比

痛点

当数据库有千万条数据,csv里面也有百万条数据,这个时候第一种方案会大量占用内存,而且其中部分数据csv里面没有完全是不用查出来的,第二种方案每次一个查询导致查询过久

解决方案

解决的思路是中和两种方案,每次去数据库拿取csv中的部分数据,将这部分查询完毕再拿去下一部分。

所以必须要求csv是有序的,如果使用的关键字段(即用来查询数据库的那个字段,如contnetId)是从小到大排列的,取csv中的1-10000行,数据库通过where条件使用第一行和第10000行卡出一批数据组装map,对比完这10000行,第10001-20000行在进行一次查询数据库,这样即可取时间与空间的一个平衡点

csv排序方案

csv排序的时候可以手动排序使用排好序的csv对比数据,也可以自动排序,这里有注意事项需要说一下

1.手动排序

excel打开的csv如果你的contnetId行是纯数字(数据库是字符串)这个时候直接排序是不可行的,数字与字符串的排序逻辑是不同的,需要增加一个辅助列,然后使用="m"&B1 excel公式填充排序后再整列删除,其目的是将这列数据都变为字母开头,从而排序绝对是按照字符串排序的。

2.使用代码自动排序

这种方案需要将csv一次性读取到内存中作为一个二维数组进行排序,需要根据csv的大小灵活选择,而且排序时也应该注意是按照字符串或者按照数字方式排序的。

3.如果csv大于excel文件的最大行数,则无法使用手动排序,同样csv过大也无法一次加入内存,这个时候必须使用代码自动排序并优化排序算法,这里可以使用归并排序

思路如下:

逐行读取csv文件以一批数据(比如10000)行为标准,读取前10000行并进行排序,之后写入临时文件;重复向下读取,直到读取完毕。

第一步完成之后会生成一堆临时文件,我们接着就需要将这些临时文件进行合并,读取每个文件的第一行数据,进行比较,将最小的存入最终文件,然后将这个临时文件继续向下读取一行填补空缺,再次比较,最终将临时文件的数据全部存入最终文件中。

golang代码大概实现如下,只做思路展示,细节部分还需修改:

package main import ( "encoding/csv" "fmt" "io" "log" "os" "sort" "github.com/lithammer/shortuuid" ) const ( //以多少行拆分临时文件 chunkSize = 10000 //以第几列作为排序字段(填写列数-1) columnIndex = 1 ) func sortLargeCSVFile(filePath string) (string, error) { outputFilePath := filePath + ".sort.csv" // 打开CSV文件 inputFile, err := os.Open(filePath) if err != nil { return "", err } defer inputFile.Close() // 创建CSV读取器 reader := csv.NewReader(inputFile) reader.Comma = ',' reader.LazyQuotes = true // 处理标题行 header, err := reader.Read() if err != nil { return "", err } // 用于存储分块排序后的中间文件路径 sortedFiles := []string{} // 分块读取、排序和写入中间文件 rows := [][]string{} for { row, err := reader.Read() if err == io.EOF { break } if err != nil { return "", err } rows = append(rows, row) if len(rows) >= chunkSize { sortedRows := sortedCopy(rows) sortedFilePath, err := writeSortedChunk(sortedRows) if err != nil { return "", err } sortedFiles = append(sortedFiles, sortedFilePath) rows = [][]string{} } } // 处理剩余的行数据 if len(rows) > 0 { sortedRows := sortedCopy(rows) sortedFilePath, err := writeSortedChunk(sortedRows) if err != nil { return "", err } sortedFiles = append(sortedFiles, sortedFilePath) } // 合并排序后的中间文件 err = mergeSortedFiles(sortedFiles, outputFilePath, header) if err != nil { return "", err } // 删除中间文件 for _, file := range sortedFiles { err = os.Remove(file) if err != nil { log.Println("Failed to remove temporary file:", file) } } return outputFilePath, nil } func sortedCopy(rows [][]string) [][]string { sortedRows := make([][]string, len(rows)) copy(sortedRows, rows) sort.Slice(sortedRows, func(i, j int) bool { return sortedRows[i][columnIndex] < sortedRows[j][columnIndex] // 按照第二列进行排序 }) return sortedRows } func writeSortedChunk(sortedRows [][]string) (string, error) { // sortedFilePath := fmt.Sprintf("%s.csv", os.TempDir()) sortedFilePath := fmt.Sprintf("%s/%s.csv", "D:/temp", shortuuid.New()) sortedFile, err := os.Create(sortedFilePath) if err != nil { return "", err } defer sortedFile.Close() writer := csv.NewWriter(sortedFile) defer writer.Flush() err = writer.WriteAll(sortedRows) if err != nil { return "", err } return sortedFilePath, nil } func mergeSortedFiles(sortedFiles []string, outputFilePath string, header []string) error { outputFile, err := os.Create(outputFilePath) if err != nil { return err } defer outputFile.Close() writer := csv.NewWriter(outputFile) defer writer.Flush() readers := make([]*csv.Reader, len(sortedFiles)) for i, file := range sortedFiles { sortedFile, err := os.Open(file) if err != nil { return err } defer sortedFile.Close() readers[i] = csv.NewReader(sortedFile) readers[i].Comma = ',' readers[i].LazyQuotes = true } //写入标题行 writer.Write(header) // 读取每个文件的第一行作为初始值 currentRows := make([][]string, len(readers)) for i, reader := range readers { row, err := reader.Read() if err != nil && err != io.EOF { return err } currentRows[i] = row } // 归并排序 for { // 找到最小的一行 minIndex := -1 for i, row := range currentRows { if row != nil { if minIndex == -1 || row[columnIndex] < currentRows[minIndex][columnIndex] { minIndex = i } } } // 所有行都读取完毕,退出循环 if minIndex == -1 { break } // 将最小的一行写入输出文件 err := writer.Write(currentRows[minIndex]) if err != nil { return err } // 从当前文件读取下一行 row, err := readers[minIndex].Read() if err != nil && err != io.EOF { return err } currentRows[minIndex] = row } writer.Flush() return writer.Error() } func main() { filePath := `目标csv路径` //排序是以字符方式排序,无法按照数字大小排序 sortedFilePath, err := sortLargeCSVFile(filePath) if err != nil { log.Fatal(err) } fmt.Println("Sorted file path:", sortedFilePath) }



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3