Engenharia Backend (Go)¶
O backend do Data Profiler é um motor de alta performance escrito em Go (Golang), projetado para processar arquivos maiores que a memória RAM disponível (Out-of-Core Processing).
A arquitetura segue o padrão Producer-Consumer utilizando primitivas de concorrência nativas da linguagem (Goroutines e Channels).
1. Arquitetura de Concorrência¶
Diferente de abordagens tradicionais que carregam o arquivo inteiro na memória (ReadAll), nosso sistema cria um pipeline de processamento contínuo.
graph LR
Input[CSV File] -->|Stream| Loader(CSV Loader)
Loader -->|Jobs| Channel{Job Channel}
subgraph "Worker Pool (Paralelo)"
Channel --> W1[Worker 1]
Channel --> W2[Worker 2]
Channel --> W3[Worker 3]
end
W1 -->|Stats Parciais| Accumulator[Accumulator]
W2 -->|Stats Parciais| Accumulator
W3 -->|Stats Parciais| Accumulator
Accumulator -->|JSON Final| Output[Relatório]
Componentes Principais¶
| Componente | Responsabilidade |
|---|---|
| Infra (Loader) | Lê o arquivo linha a linha e envia para o canal. Atua como Producer. |
| Profiler (Pool) | Gerencia um conjunto de Workers que processam dados em paralelo. |
| Accumulator | Centraliza os resultados parciais, evitando Race Conditions críticas. |
2. Worker Pool e Paralelismo¶
O coração do sistema é o Worker Pool. Ele limita o número de Goroutines ativas para evitar Context Switching excessivo e exaustão de CPU.
O número de workers é definido dinamicamente baseado nas CPUs disponíveis ou via configuração.
package profiler
import "sync"
var RowPool = sync.Pool{
New: func() any {
return make([]string, 0, 10)
},
}
func GetRowSlice() []string {
return RowPool.Get().([]string)
}
func PutRowSlice(row []string) {
RowPool.Put(row[:0])
}
Cpu Bound vs I/O Bound
Como o gargalo do sistema é o processamento de texto (Regex/Parsing) e não a leitura de disco, a estratégia de múltiplos workers escala linearmente com o número de núcleos do processador.
3. Inferência de Tipos (Core Logic)¶
A detecção de tipos é feita através de expressões regulares (Regex) otimizadas e compiladas na inicialização do pacote. O sistema tenta inferir o tipo mais específico possível (Int > Float > String).
A lógica é isolada em funções puras para facilitar Testes Unitários e Fuzzing.
package profiler
import (
"regexp"
"strconv"
"strings"
"time"
)
var (
// Essenciais
RegexFiscalKey = regexp.MustCompile(`^\d{44}$`) // NFe, CTe, MDFe
RegexPlaca = regexp.MustCompile(`^[A-Z]{3}-?[0-9][0-9A-Z][0-9]{2}$`) // ABC1234 ou ABC1C34
// Documentos Brasileiros
RegexCPF = regexp.MustCompile(`^\d{3}\.\d{3}\.\d{3}-\d{2}$`) // 000.000.000-00
RegexCNPJ = regexp.MustCompile(`^\d{2}\.\d{3}\.\d{3}/\d{4}-\d{2}$`) // 00.000.000/0000-00
// Datas (Formatos comuns BR e ISO)
RegexDateBr = regexp.MustCompile(`^\d{2}/\d{2}/\d{4}$`) // DD/MM/YYYY
RegexDateIso = regexp.MustCompile(`^\d{4}-\d{2}-\d{2}$`) // YYYY-MM-DD
// Logística Avançada
RegexContainer = regexp.MustCompile(`^[A-Z]{4}\d{7}$`) // Padrão ISO
Regex8Digits = regexp.MustCompile(`^\d{8}$`) // NCM, RNTRC, CEP sem traço, Data compacta
Regex11Digits = regexp.MustCompile(`^\d{11}$`) // CPF sem formatação
RegexCEP = regexp.MustCompile(`^\d{5}-\d{3}$`) // CEP com traço
RegexMobile = regexp.MustCompile(`^\(?\d{2}\)?\s?9\d{4}-?\d{4}$`) // Celular com 9
RegexEmail = regexp.MustCompile(`^[\w-\.]+@([\w-]+\.)+[\w-]{2,4}$`) // Email
RegexEAN = regexp.MustCompile(`^\d{13,14}$`) // GTIN/EAN (Produtos)
)
func InferType(value string, headerName string) DataType {
if value == "" {
return TypeEmpty
}
headerLower := strings.ToLower(headerName)
if Regex8Digits.MatchString(value) {
if containsAny(headerLower, "ncm", "fiscal", "classificacao", "sh") {
return TypeNCM
}
if containsAny(headerLower, "rntrc", "antt", "transportador") {
return TypeRNTRC
}
if containsAny(headerLower, "cep", "zip", "postal") {
return TypeCEP
}
if isCompactDate(value) {
return TypeDateCompact
}
}
if RegexFiscalKey.MatchString(value) {
return TypeFiscalKey44
}
if RegexEmail.MatchString(value) {
return TypeEmail
}
if RegexPlaca.MatchString(value) {
return TypePlaca
}
if RegexContainer.MatchString(value) {
return TypeContainer
}
if RegexCEP.MatchString(value) {
return TypeCEP
}
if RegexMobile.MatchString(value) {
return TypeMobile
}
if RegexDateBr.MatchString(value) || RegexDateIso.MatchString(value) {
return TypeDate
}
if RegexEAN.MatchString(value) {
if containsAny(headerLower, "ean", "gtin", "barras", "item", "produto", "sku") {
return TypeEAN
}
if containsAny(headerLower, "cnpj", "fornecedor", "empresa", "transportadora") {
return TypeCNPJ
}
}
if RegexCNPJ.MatchString(value) {
return TypeCNPJ
}
if Regex11Digits.MatchString(value) {
if containsAny(headerLower, "cpf", "cliente", "consumidor", "pessoa", "colaborador", "funcionario", "funcionário", "usuario", "usuário", "rg", "identidade", "documento") {
return TypeCPF
}
}
if RegexCPF.MatchString(value) {
return TypeCPF
}
if isInt(value) {
return TypeInteger
}
if isFloat(value) {
return TypeFloat
}
if isBool(value) {
return TypeBoolean
}
return TypeString
}
func containsAny(text string, keywords ...string) bool {
for _, k := range keywords {
if strings.Contains(text, k) {
return true
}
}
return false
}
func isCompactDate(value string) bool {
_, err := time.Parse("20060102", value)
if err == nil {
return true
}
_, err = time.Parse("02012006", value)
return err == nil
}
func isInt(value string) bool {
_, err := strconv.Atoi(value)
return err == nil
}
func isBool(value string) bool {
lower := strings.ToLower(value)
return lower == "true" || lower == "false" || lower == "s" || lower == "n"
}
func isFloat(value string) bool {
value = strings.Replace(value, ",", ".", 1)
_, err := strconv.ParseFloat(value, 64)
return err == nil
}
4. Estratégia de Streaming (Memória)¶
Para garantir que o consumo de memória permaneça constante (O(1)) independentemente do tamanho do arquivo, utilizamos bufio.Scanner e processamento em chunks.
- O arquivo é aberto via
multipart.File. - Um
sniffinicial lê os primeiros 4KB para detectar separadores e encoding. - O cursor volta ao início e o streaming começa.
package infra
import (
"bufio"
"context"
"encoding/csv"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"sort"
"unicode"
"unicode/utf8"
"github.com/JGustavoCN/dataprofiler/internal/profiler"
"golang.org/x/text/encoding/charmap"
unicodeenc "golang.org/x/text/encoding/unicode"
"golang.org/x/text/transform"
)
func LoadCSV(logger *slog.Logger, filePath string) ([]profiler.Column, string, error) {
if logger == nil {
logger = slog.New(slog.NewJSONHandler(io.Discard, nil))
}
file, err := os.Open(filePath)
if err != nil {
logger.Error("Falha ao abrir arquivo", "path", filePath, "error", err)
return nil, "", err
}
defer file.Close()
nameFile := filepath.Base(file.Name())
column, err := ParseData(logger, file)
return column, nameFile, err
}
func ParseData(logger *slog.Logger, file io.Reader) ([]profiler.Column, error) {
if logger == nil {
logger = slog.New(slog.NewJSONHandler(io.Discard, nil))
}
smartReader, err := NewSmartReader(logger, file)
if err != nil {
return nil, err
}
bufferedSmartReader := bufio.NewReaderSize(smartReader, 1024*1024)
separator, err := DetectSeparator(bufferedSmartReader)
if err != nil {
separator = ';'
logger.Warn("Falha na detecção de separador, usando fallback", "error", err, "fallback", separator)
} else {
logger.Info("Separador detectado", "separator", string(separator))
}
reader := csv.NewReader(bufferedSmartReader)
reader.Comma = separator
reader.LazyQuotes = true
records, err := reader.ReadAll()
if err != nil {
return nil, err
}
if len(records) == 0 {
return []profiler.Column{}, nil
}
headers := records[0]
logger.Info("Estrutura carregada",
"total_rows", len(records),
"columns_count", len(headers),
"headers", headers,
)
columns := make([]profiler.Column, len(headers))
for i, name := range headers {
columns[i] = profiler.Column{
Name: name,
Values: make([]string, 0, len(records)-1),
}
}
for _, row := range records[1:] {
for i, value := range row {
if i < len(columns) {
columns[i].Values = append(columns[i].Values, value)
}
}
}
return columns, nil
}
func ParseDataAsync(ctx context.Context, logger *slog.Logger, r io.Reader) ([]string, <-chan profiler.StreamData, error) {
if logger == nil {
logger = slog.New(slog.NewJSONHandler(io.Discard, nil))
}
smartReader, err := NewSmartReader(logger, r)
if err != nil {
return nil, nil, err
}
bufferedSmartReader := bufio.NewReaderSize(smartReader, 1024*1024)
isJson, err := sniffJSON(bufferedSmartReader)
if err != nil {
return nil, nil, fmt.Errorf("erro ao detectar formato: %w", err)
}
if isJson {
logger.Info("Formato detectado: JSONL (Logs/NoSQL)")
return parseJSONLAsync(ctx, logger, bufferedSmartReader)
}
logger.Info("Formato detectado: CSV (Tabular)")
return parseCSVAsync(ctx, logger, bufferedSmartReader)
}
func sniffJSON(r *bufio.Reader) (bool, error) {
bytesToPeek, err := r.Peek(50)
if err != nil && err != io.EOF {
return false, err
}
for _, b := range bytesToPeek {
if unicode.IsSpace(rune(b)) {
continue
}
if b == '{' {
return true, nil
}
return false, nil
}
return false, nil
}
func parseCSVAsync(ctx context.Context, logger *slog.Logger, reader *bufio.Reader) ([]string, <-chan profiler.StreamData, error) {
out := make(chan profiler.StreamData, 1000)
separator, err := DetectSeparator(reader)
if err != nil {
separator = ';'
logger.Warn("Falha na detecção de separador, usando fallback", "error", err, "fallback", separator)
} else {
logger.Info("Separador detectado", "separator", string(separator))
}
csvReader := csv.NewReader(reader)
csvReader.Comma = separator
csvReader.LazyQuotes = true
csvReader.ReuseRecord = true
headersRef, err := csvReader.Read()
if err != nil {
close(out)
return nil, nil, err
}
headers := make([]string, len(headersRef))
copy(headers, headersRef)
csvReader.FieldsPerRecord = len(headers)
logger.Info("Início do streaming",
"columns_count", len(headers),
"headers", headers,
)
go func() {
defer close(out)
count := 0
lineNum := 1
errorCount := 0
for {
select {
case <-ctx.Done():
logger.Warn("Leitura cancelada pelo contexto")
return
default:
record, err := csvReader.Read()
lineNum++
if err == io.EOF {
goto EndProcessing
}
if err != nil {
errorCount++
out <- profiler.StreamData{
Row: nil,
LineNumber: lineNum,
Err: err,
}
continue
}
rowCopy := profiler.GetRowSlice()
rowCopy = append(rowCopy, record...)
out <- profiler.StreamData{
Row: rowCopy,
LineNumber: lineNum,
Err: nil,
}
count++
}
}
EndProcessing:
logger.Info("Streaming CSV finalizado",
"total_rows_read", lineNum-1,
"total_errors", errorCount,
)
}()
return headers, out, nil
}
func parseJSONLAsync(ctx context.Context, logger *slog.Logger, reader *bufio.Reader) ([]string, <-chan profiler.StreamData, error) {
out := make(chan profiler.StreamData, 1000)
scanner := bufio.NewScanner(reader)
const maxCapacity = 1024 * 1024
buf := make([]byte, maxCapacity)
scanner.Buffer(buf, maxCapacity)
if !scanner.Scan() {
if err := scanner.Err(); err != nil {
return nil, nil, fmt.Errorf("erro lendo primeira linha JSON: %w", err)
}
return nil, nil, errors.New("arquivo JSONL vazio")
}
firstLine := scanner.Bytes()
var firstMap map[string]interface{}
if err := json.Unmarshal(firstLine, &firstMap); err != nil {
return nil, nil, fmt.Errorf("erro de parsing na primeira linha (não é JSON válido?): %w", err)
}
headers := make([]string, 0, len(firstMap))
for k := range firstMap {
headers = append(headers, k)
}
sort.Strings(headers)
logger.Info("Schema JSONL inferido", "headers", headers)
go func() {
defer close(out)
processMap := func(m map[string]interface{}, lineNum int) {
row := profiler.GetRowSlice()
for _, header := range headers {
val, exists := m[header]
if !exists || val == nil {
row = append(row, "")
} else {
row = append(row, fmt.Sprintf("%v", val))
}
}
out <- profiler.StreamData{
Row: row,
LineNumber: lineNum,
Err: nil,
}
}
processMap(firstMap, 1)
lineNum := 1
for scanner.Scan() {
select {
case <-ctx.Done():
return
default:
}
lineNum++
if len(scanner.Bytes()) == 0 {
continue
}
var currentMap map[string]interface{}
if err := json.Unmarshal(scanner.Bytes(), ¤tMap); err != nil {
out <- profiler.StreamData{
LineNumber: lineNum,
Err: fmt.Errorf("json malformado: %w", err),
}
continue
}
processMap(currentMap, lineNum)
}
if err := scanner.Err(); err != nil {
logger.Error("Erro fatal no scanner JSONL", "error", err)
out <- profiler.StreamData{
LineNumber: lineNum,
Err: fmt.Errorf("erro de I/O: %w", err),
}
}
}()
return headers, out, nil
}
func NewSmartReader(logger *slog.Logger, r io.Reader) (io.Reader, error) {
br := bufio.NewReaderSize(r, 1024*1024)
bomCheck, err := br.Peek(4)
if err != nil && err != io.EOF && len(bomCheck) < 2 {
return br, nil
}
if len(bomCheck) >= 2 && bomCheck[0] == 0xFF && bomCheck[1] == 0xFE {
logger.Info("Encoding detectado: UTF-16 LE (Convertendo para UTF-8)")
win16le := unicodeenc.UTF16(unicodeenc.LittleEndian, unicodeenc.UseBOM)
return transform.NewReader(br, win16le.NewDecoder()), nil
}
if len(bomCheck) >= 2 && bomCheck[0] == 0xFE && bomCheck[1] == 0xFF {
logger.Info("Encoding detectado: UTF-16 BE (Convertendo para UTF-8)")
win16be := unicodeenc.UTF16(unicodeenc.BigEndian, unicodeenc.UseBOM)
return transform.NewReader(br, win16be.NewDecoder()), nil
}
const sampleSize = 2048
sample, err := br.Peek(sampleSize)
if err != nil && err != io.EOF {
return nil, err
}
if utf8.Valid(sample) {
logger.Debug("Encoding detectado: UTF-8 (Nativo)")
return br, nil
}
logger.Warn("Encoding UTF-8 inválido detectado na amostra. Aplicando fallback Windows1252 -> UTF-8")
decoderReader := transform.NewReader(br, charmap.Windows1252.NewDecoder())
return decoderReader, nil
}
func DetectSeparator(r *bufio.Reader) (rune, error) {
bytesToPeek, err := r.Peek(2048)
if err != nil && err != io.EOF {
return ';', err
}
semicolonCount := 0 // ;
commaCount := 0 // ,
pipeCount := 0 // |
tabCount := 0 // \t
for _, b := range bytesToPeek {
if b == '\n' || b == '\r' {
break
}
switch b {
case ';':
semicolonCount++
case ',':
commaCount++
case '|':
pipeCount++
case '\t':
tabCount++
}
}
separator := ';'
maxCount := semicolonCount
if commaCount > maxCount {
maxCount = commaCount
separator = ','
}
if pipeCount > maxCount {
maxCount = pipeCount
separator = '|'
}
if tabCount > maxCount {
separator = '\t'
}
return separator, nil
}
5. API e Server-Sent Events (SSE)¶
A comunicação com o frontend não bloqueia a requisição de upload.
- Upload Handler: Recebe o arquivo e despacha uma goroutine
go p.Run(). - SSE Handler: Mantém uma conexão HTTP aberta para enviar o progresso calculado pelo
ProgressTracker.
Graceful Shutdown
O servidor utiliza context.Context para cancelar o processamento de arquivos caso o cliente encerre a conexão abruptamente, liberando recursos do servidor.
logger := slog.New(slog.NewJSONHandler(logOutput, nil))
slog.SetDefault(logger)
if *cliMode {
if *filePath == "" {
slog.Error("Erro: No modo -cli, forneça o arquivo: -file=\"dados.csv\"")
os.Exit(1)
}
runCLI(logger, *filePath)
return
}
runServer()
}
func runCLI(logger *slog.Logger, path string) {
start := time.Now()
logger.Info("CLI: Iniciando DataProfiler", "mode", "streaming", "file", path)
(Exemplo de trecho do main.go configurando as rotas)