くそデカCSVをソートする

くそでかCSV(10GB以上)をソートしたいときがあると思います

さすがにCSVすべてのデータをメモリにロードできないので

Go言語を使ってソートプログラムを作ってみます

アルゴリズム

  1. Chunkに分けてデータを読み込む
  2. Chunkごとにtempファイルに書き込む
  3. 各Tempファイルのバッファを取得して
  4. PriorityQueueで各Tempの先頭recordを入れる
  5. PriorityQueueで最小値から取り出して、最小値で取り出したTempファイルの次の値をPriorityQueueに入れ繰り返す

コード例

package main

import (
	"bufio"
	"container/heap"
	"encoding/csv"
	"fmt"
	"io"
	"log"
	"math/rand"
	"os"
	"sort"
	"strconv"
	"time"
)

type Record struct {
	Date time.Time
	Data []string
}
type item struct {
	record Record
	idx    int
}

type ByDate []Record

func (a ByDate) Len() int           { return len(a) }
func (a ByDate) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByDate) Less(i, j int) bool { return a[i].Date.Before(a[j].Date) }

func WriteCSV(w io.Writer, records []Record) error {
	writer := csv.NewWriter(w)
	for _, record := range records {
		err := writer.Write(record.Data)
		if err != nil {
			return err
		}
	}
	writer.Flush()
	return writer.Error()
}

func ExternalSort(inputFile, outputFile string, chunkLines int) error {
	file, err := os.Open(inputFile)
	if err != nil {
		return err
	}
	defer file.Close()

	var chunkFiles []string
	reader := csv.NewReader(file)
	reader.FieldsPerRecord = -1
	var records []Record
	for {
		records = nil
		for i := 0; i < chunkLines; i++ {
			record, err := reader.Read()
			if err == io.EOF {
				break
			}
			if err != nil {
				fmt.Println("error:", err, i)
				continue
			}

			utime, err := strconv.ParseInt(record[0], 10, 64)
			if err != nil {
				fmt.Println(record)
				fmt.Println("error:", err, i)
				continue
			}
			date := time.Unix(utime, 0)
			records = append(records, Record{Date: date, Data: record})
		}
		if len(records) == 0 {
			break
		}
		sort.Sort(ByDate(records))

		tmpFile, err := os.CreateTemp("", "chunk")
		if err != nil {
			return err
		}
		defer tmpFile.Close()

		err = WriteCSV(tmpFile, records)
		if err != nil {
			return err
		}
		chunkFiles = append(chunkFiles, tmpFile.Name())
	}
	return MergeChunks(chunkFiles, outputFile)
}

func MergeChunks(chunkFiles []string, outputFile string) error {
	output, err := os.Create(outputFile)
	if err != nil {
		return err
	}
	defer output.Close()

	chunkReaders := make([]*csv.Reader, len(chunkFiles))
	for i, chunkFile := range chunkFiles {
		file, err := os.Open(chunkFile)
		if err != nil {
			return err
		}
		defer file.Close()
		chunkReaders[i] = csv.NewReader(bufio.NewReader(file))
	}

	pq := &PriorityQueue{}
	heap.Init(pq)

	for i, reader := range chunkReaders {
		record, err := reader.Read()
		if err != nil {
			return err
		}
		utime, err := strconv.ParseInt(record[0], 10, 64)
		if err != nil {
			fmt.Println("error:", err)
			continue
		}
		date := time.Unix(utime, 0)

		heap.Push(pq, &item{Record{Date: date, Data: record}, i})
	}

	writer := csv.NewWriter(output)

	for pq.Len() > 0 {
		minItem := heap.Pop(pq).(*item)
		err := writer.Write(minItem.record.Data)
		if err != nil {
			return err
		}

		record, err := chunkReaders[minItem.idx].Read()
		if err == io.EOF {
			continue
		}
		if err != nil {
			return err
		}
		utime, err := strconv.ParseInt(record[0], 10, 64)
		if err != nil {
			fmt.Println("error:", err)
			continue
		}
		date := time.Unix(utime, 0)
		heap.Push(pq, &item{Record{Date: date, Data: record}, minItem.idx})
	}

	writer.Flush()
	for _, v := range chunkFiles {
		os.RemoveAll(v)
	}
	return writer.Error()
}

type PriorityQueue []*item

func (pq PriorityQueue) Len() int { return len(pq) }

func (pq PriorityQueue) Less(i, j int) bool {
	return pq[i].record.Date.Before(pq[j].record.Date)
}

func (pq PriorityQueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
}

func (pq *PriorityQueue) Push(x interface{}) {
	*pq = append(*pq, x.(*item))
}

func (pq *PriorityQueue) Pop() interface{} {
	old := *pq
	n := len(old)
	item := old[n-1]
	*pq = old[0 : n-1]
	return item
}

func randomDateTime() time.Time {
	start := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
	end := time.Date(2023, 12, 31, 23, 59, 59, 0, time.UTC)
	delta := end.Unix() - start.Unix()
	sec := rand.Int63n(delta)
	return start.Add(time.Duration(sec) * time.Second)
}

func generateCSV() {

	file, err := os.Create("data.csv")
	if err != nil {
		panic(err)
	}
	defer file.Close()
	cw := csv.NewWriter(file)
	for i := 0; i < 1000000000; i++ {
		randomDateTime := randomDateTime()
		if err := cw.Write([]string{strconv.FormatInt(randomDateTime.Unix(), 10), "sample", "sample2"}); err != nil {
			log.Fatalln("error writing record to csv:", err)
		}
	}
	cw.Flush()

	if err := cw.Error(); err != nil {
		log.Fatal(err)
	}
}

func main() {
	inputFile := "data.csv"
	outputFile := "sorted_data.csv"
	chunkLines := 1000000
	generateCSV()
	err := ExternalSort(inputFile, outputFile, chunkLines)
	if err != nil {
		fmt.Println("Error:", err)
		return
	}
	fmt.Println("ソート官僚")
}

並列版

24GBのくそでかCSVもソートできた

package main

import (
	"bufio"
	"container/heap"
	"encoding/csv"
	"fmt"
	"io"
	"log"
	"math/rand"
	"os"
	"sort"
	"strconv"
	"sync"
	"time"
)

type Record struct {
	Date time.Time
	Data []string
}
type item struct {
	record Record
	idx    int
}

type ByDate []Record

func (a ByDate) Len() int           { return len(a) }
func (a ByDate) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByDate) Less(i, j int) bool { return a[i].Date.Before(a[j].Date) }

func WriteCSV(w io.Writer, records []Record) error {
	writer := csv.NewWriter(w)
	for _, record := range records {
		err := writer.Write(record.Data)
		if err != nil {
			return err
		}
	}
	writer.Flush()
	return writer.Error()
}

func ExternalSort(inputFile, outputFile string, chunkLines int) error {
	file, err := os.Open(inputFile)
	if err != nil {
		return err
	}
	defer file.Close()

	var chunkFiles []string
	var wg sync.WaitGroup
	chunkChan := make(chan []Record)
	fileChan := make(chan string)

	reader := csv.NewReader(file)
	reader.FieldsPerRecord = -1
	var records []Record

	for i := 0; i < 6; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for records := range chunkChan {
				sort.Sort(ByDate(records))
				tmpFile, err := os.CreateTemp("", "chunk")
				if err != nil {
					fmt.Println("Error creating temp file:", err)
					continue
				}
				defer tmpFile.Close()

				err = WriteCSV(tmpFile, records)
				if err != nil {
					fmt.Println("Error writing to temp file:", err)
					continue
				}
				fileChan <- tmpFile.Name()
			}
		}()
	}

	go func() {
		defer close(chunkChan)
		for {
			records = nil
			for i := 0; i < chunkLines; i++ {
				record, err := reader.Read()
				if err == io.EOF {
					break
				}
				if err != nil {
					fmt.Println("error:", err, i)
					continue
				}

				utime, err := strconv.ParseInt(record[0], 10, 64)
				if err != nil {
					fmt.Println(record)
					fmt.Println("error:", err, i)
					continue
				}
				date := time.Unix(utime, 0)
				records = append(records, Record{Date: date, Data: record})
			}
			if len(records) == 0 {
				break
			}
			chunkChan <- records
		}
	}()
	// Gather sorted chunk files
	go func() {
		for f := range fileChan {
			chunkFiles = append(chunkFiles, f)
		}
	}()

	wg.Wait()
	close(fileChan)
	return MergeChunks(chunkFiles, outputFile)
}

func MergeChunks(chunkFiles []string, outputFile string) error {
	output, err := os.Create(outputFile)
	if err != nil {
		return err
	}
	defer output.Close()

	chunkReaders := make([]*csv.Reader, len(chunkFiles))
	for i, chunkFile := range chunkFiles {
		file, err := os.Open(chunkFile)
		if err != nil {
			return err
		}
		defer file.Close()
		chunkReaders[i] = csv.NewReader(bufio.NewReader(file))
	}

	pq := &PriorityQueue{}
	heap.Init(pq)

	for i, reader := range chunkReaders {
		record, err := reader.Read()
		if err != nil {
			return err
		}
		utime, err := strconv.ParseInt(record[0], 10, 64)
		if err != nil {
			fmt.Println("error:", err)
			continue
		}
		date := time.Unix(utime, 0)

		heap.Push(pq, &item{Record{Date: date, Data: record}, i})
	}

	writer := csv.NewWriter(output)
	for pq.Len() > 0 {
		minItem := heap.Pop(pq).(*item)
		err := writer.Write(minItem.record.Data)
		if err != nil {
			return err
		}
		record, err := chunkReaders[minItem.idx].Read()
		if err == io.EOF {
			continue
		}
		if err != nil {
			return err
		}
		utime, err := strconv.ParseInt(record[0], 10, 64)
		if err != nil {
			fmt.Println("error:", err)
			continue
		}
		date := time.Unix(utime, 0)
		heap.Push(pq, &item{Record{Date: date, Data: record}, minItem.idx})
	}

	writer.Flush()
	for _, v := range chunkFiles {
		os.RemoveAll(v)
	}
	return writer.Error()
}

type PriorityQueue []*item

func (pq PriorityQueue) Len() int { return len(pq) }

func (pq PriorityQueue) Less(i, j int) bool {
	return pq[i].record.Date.Before(pq[j].record.Date)
}

func (pq PriorityQueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
}

func (pq *PriorityQueue) Push(x interface{}) {
	*pq = append(*pq, x.(*item))
}

func (pq *PriorityQueue) Pop() interface{} {
	old := *pq
	n := len(old)
	item := old[n-1]
	*pq = old[0 : n-1]
	return item
}

func randomDateTime() time.Time {
	start := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
	end := time.Date(2023, 12, 31, 23, 59, 59, 0, time.UTC)
	delta := end.Unix() - start.Unix()
	sec := rand.Int63n(delta)
	return start.Add(time.Duration(sec) * time.Second)
}

func generateCSV() {

	file, err := os.Create("data.csv")
	if err != nil {
		panic(err)
	}
	defer file.Close()
	cw := csv.NewWriter(file)
	for i := 0; i < 1000000000; i++ {
		randomDateTime := randomDateTime()
		if err := cw.Write([]string{strconv.FormatInt(randomDateTime.Unix(), 10), "sample", "sample2"}); err != nil {
			log.Fatalln("error writing record to csv:", err)
		}
	}
	cw.Flush()

	if err := cw.Error(); err != nil {
		log.Fatal(err)
	}
}

func main() {
	inputFile := "data.csv"
	outputFile := "sorted_data.csv"
	chunkLines := 1000000
	generateCSV()
	err := ExternalSort(inputFile, outputFile, chunkLines)
	if err != nil {
		fmt.Println("Error:", err)
		return
	}
	fmt.Println("ソート官僚")
}

コメント

タイトルとURLをコピーしました