くそでかCSV(10GB以上)をソートしたいときがあると思います
さすがにCSVすべてのデータをメモリにロードできないので
Go言語を使ってソートプログラムを作ってみます
アルゴリズム
- Chunkに分けてデータを読み込む
- Chunkごとにtempファイルに書き込む
- 各Tempファイルのバッファを取得して
- PriorityQueueで各Tempの先頭recordを入れる
- PriorityQueueで最小値から取り出して、最小値で取り出したTempファイルの次の値をPriorityQueueに入れ繰り返す
コード例
package main
import (
"bufio"
"container/heap"
"encoding/csv"
"fmt"
"io"
"log"
"math/rand"
"os"
"sort"
"strconv"
"time"
)
type Record struct {
Date time.Time
Data []string
}
type item struct {
record Record
idx int
}
type ByDate []Record
func (a ByDate) Len() int { return len(a) }
func (a ByDate) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByDate) Less(i, j int) bool { return a[i].Date.Before(a[j].Date) }
func WriteCSV(w io.Writer, records []Record) error {
writer := csv.NewWriter(w)
for _, record := range records {
err := writer.Write(record.Data)
if err != nil {
return err
}
}
writer.Flush()
return writer.Error()
}
func ExternalSort(inputFile, outputFile string, chunkLines int) error {
file, err := os.Open(inputFile)
if err != nil {
return err
}
defer file.Close()
var chunkFiles []string
reader := csv.NewReader(file)
reader.FieldsPerRecord = -1
var records []Record
for {
records = nil
for i := 0; i < chunkLines; i++ {
record, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
fmt.Println("error:", err, i)
continue
}
utime, err := strconv.ParseInt(record[0], 10, 64)
if err != nil {
fmt.Println(record)
fmt.Println("error:", err, i)
continue
}
date := time.Unix(utime, 0)
records = append(records, Record{Date: date, Data: record})
}
if len(records) == 0 {
break
}
sort.Sort(ByDate(records))
tmpFile, err := os.CreateTemp("", "chunk")
if err != nil {
return err
}
defer tmpFile.Close()
err = WriteCSV(tmpFile, records)
if err != nil {
return err
}
chunkFiles = append(chunkFiles, tmpFile.Name())
}
return MergeChunks(chunkFiles, outputFile)
}
func MergeChunks(chunkFiles []string, outputFile string) error {
output, err := os.Create(outputFile)
if err != nil {
return err
}
defer output.Close()
chunkReaders := make([]*csv.Reader, len(chunkFiles))
for i, chunkFile := range chunkFiles {
file, err := os.Open(chunkFile)
if err != nil {
return err
}
defer file.Close()
chunkReaders[i] = csv.NewReader(bufio.NewReader(file))
}
pq := &PriorityQueue{}
heap.Init(pq)
for i, reader := range chunkReaders {
record, err := reader.Read()
if err != nil {
return err
}
utime, err := strconv.ParseInt(record[0], 10, 64)
if err != nil {
fmt.Println("error:", err)
continue
}
date := time.Unix(utime, 0)
heap.Push(pq, &item{Record{Date: date, Data: record}, i})
}
writer := csv.NewWriter(output)
for pq.Len() > 0 {
minItem := heap.Pop(pq).(*item)
err := writer.Write(minItem.record.Data)
if err != nil {
return err
}
record, err := chunkReaders[minItem.idx].Read()
if err == io.EOF {
continue
}
if err != nil {
return err
}
utime, err := strconv.ParseInt(record[0], 10, 64)
if err != nil {
fmt.Println("error:", err)
continue
}
date := time.Unix(utime, 0)
heap.Push(pq, &item{Record{Date: date, Data: record}, minItem.idx})
}
writer.Flush()
for _, v := range chunkFiles {
os.RemoveAll(v)
}
return writer.Error()
}
type PriorityQueue []*item
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
return pq[i].record.Date.Before(pq[j].record.Date)
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq *PriorityQueue) Push(x interface{}) {
*pq = append(*pq, x.(*item))
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
func randomDateTime() time.Time {
start := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
end := time.Date(2023, 12, 31, 23, 59, 59, 0, time.UTC)
delta := end.Unix() - start.Unix()
sec := rand.Int63n(delta)
return start.Add(time.Duration(sec) * time.Second)
}
func generateCSV() {
file, err := os.Create("data.csv")
if err != nil {
panic(err)
}
defer file.Close()
cw := csv.NewWriter(file)
for i := 0; i < 1000000000; i++ {
randomDateTime := randomDateTime()
if err := cw.Write([]string{strconv.FormatInt(randomDateTime.Unix(), 10), "sample", "sample2"}); err != nil {
log.Fatalln("error writing record to csv:", err)
}
}
cw.Flush()
if err := cw.Error(); err != nil {
log.Fatal(err)
}
}
func main() {
inputFile := "data.csv"
outputFile := "sorted_data.csv"
chunkLines := 1000000
generateCSV()
err := ExternalSort(inputFile, outputFile, chunkLines)
if err != nil {
fmt.Println("Error:", err)
return
}
fmt.Println("ソート官僚")
}
並列版
24GBのくそでかCSVもソートできた
package main
import (
"bufio"
"container/heap"
"encoding/csv"
"fmt"
"io"
"log"
"math/rand"
"os"
"sort"
"strconv"
"sync"
"time"
)
type Record struct {
Date time.Time
Data []string
}
type item struct {
record Record
idx int
}
type ByDate []Record
func (a ByDate) Len() int { return len(a) }
func (a ByDate) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByDate) Less(i, j int) bool { return a[i].Date.Before(a[j].Date) }
func WriteCSV(w io.Writer, records []Record) error {
writer := csv.NewWriter(w)
for _, record := range records {
err := writer.Write(record.Data)
if err != nil {
return err
}
}
writer.Flush()
return writer.Error()
}
func ExternalSort(inputFile, outputFile string, chunkLines int) error {
file, err := os.Open(inputFile)
if err != nil {
return err
}
defer file.Close()
var chunkFiles []string
var wg sync.WaitGroup
chunkChan := make(chan []Record)
fileChan := make(chan string)
reader := csv.NewReader(file)
reader.FieldsPerRecord = -1
var records []Record
for i := 0; i < 6; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for records := range chunkChan {
sort.Sort(ByDate(records))
tmpFile, err := os.CreateTemp("", "chunk")
if err != nil {
fmt.Println("Error creating temp file:", err)
continue
}
defer tmpFile.Close()
err = WriteCSV(tmpFile, records)
if err != nil {
fmt.Println("Error writing to temp file:", err)
continue
}
fileChan <- tmpFile.Name()
}
}()
}
go func() {
defer close(chunkChan)
for {
records = nil
for i := 0; i < chunkLines; i++ {
record, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
fmt.Println("error:", err, i)
continue
}
utime, err := strconv.ParseInt(record[0], 10, 64)
if err != nil {
fmt.Println(record)
fmt.Println("error:", err, i)
continue
}
date := time.Unix(utime, 0)
records = append(records, Record{Date: date, Data: record})
}
if len(records) == 0 {
break
}
chunkChan <- records
}
}()
// Gather sorted chunk files
go func() {
for f := range fileChan {
chunkFiles = append(chunkFiles, f)
}
}()
wg.Wait()
close(fileChan)
return MergeChunks(chunkFiles, outputFile)
}
func MergeChunks(chunkFiles []string, outputFile string) error {
output, err := os.Create(outputFile)
if err != nil {
return err
}
defer output.Close()
chunkReaders := make([]*csv.Reader, len(chunkFiles))
for i, chunkFile := range chunkFiles {
file, err := os.Open(chunkFile)
if err != nil {
return err
}
defer file.Close()
chunkReaders[i] = csv.NewReader(bufio.NewReader(file))
}
pq := &PriorityQueue{}
heap.Init(pq)
for i, reader := range chunkReaders {
record, err := reader.Read()
if err != nil {
return err
}
utime, err := strconv.ParseInt(record[0], 10, 64)
if err != nil {
fmt.Println("error:", err)
continue
}
date := time.Unix(utime, 0)
heap.Push(pq, &item{Record{Date: date, Data: record}, i})
}
writer := csv.NewWriter(output)
for pq.Len() > 0 {
minItem := heap.Pop(pq).(*item)
err := writer.Write(minItem.record.Data)
if err != nil {
return err
}
record, err := chunkReaders[minItem.idx].Read()
if err == io.EOF {
continue
}
if err != nil {
return err
}
utime, err := strconv.ParseInt(record[0], 10, 64)
if err != nil {
fmt.Println("error:", err)
continue
}
date := time.Unix(utime, 0)
heap.Push(pq, &item{Record{Date: date, Data: record}, minItem.idx})
}
writer.Flush()
for _, v := range chunkFiles {
os.RemoveAll(v)
}
return writer.Error()
}
type PriorityQueue []*item
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
return pq[i].record.Date.Before(pq[j].record.Date)
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq *PriorityQueue) Push(x interface{}) {
*pq = append(*pq, x.(*item))
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
func randomDateTime() time.Time {
start := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
end := time.Date(2023, 12, 31, 23, 59, 59, 0, time.UTC)
delta := end.Unix() - start.Unix()
sec := rand.Int63n(delta)
return start.Add(time.Duration(sec) * time.Second)
}
func generateCSV() {
file, err := os.Create("data.csv")
if err != nil {
panic(err)
}
defer file.Close()
cw := csv.NewWriter(file)
for i := 0; i < 1000000000; i++ {
randomDateTime := randomDateTime()
if err := cw.Write([]string{strconv.FormatInt(randomDateTime.Unix(), 10), "sample", "sample2"}); err != nil {
log.Fatalln("error writing record to csv:", err)
}
}
cw.Flush()
if err := cw.Error(); err != nil {
log.Fatal(err)
}
}
func main() {
inputFile := "data.csv"
outputFile := "sorted_data.csv"
chunkLines := 1000000
generateCSV()
err := ExternalSort(inputFile, outputFile, chunkLines)
if err != nil {
fmt.Println("Error:", err)
return
}
fmt.Println("ソート官僚")
}
コメント