Pular para conteúdo

Engenharia Backend (Go)

O backend do Data Profiler é um motor de alta performance escrito em Go (Golang), projetado para processar arquivos maiores que a memória RAM disponível (Out-of-Core Processing).

A arquitetura segue o padrão Producer-Consumer utilizando primitivas de concorrência nativas da linguagem (Goroutines e Channels).


1. Arquitetura de Concorrência

Diferente de abordagens tradicionais que carregam o arquivo inteiro na memória (ReadAll), nosso sistema cria um pipeline de processamento contínuo.

graph LR
    Input[CSV File] -->|Stream| Loader(CSV Loader)
    Loader -->|Jobs| Channel{Job Channel}

    subgraph "Worker Pool (Paralelo)"
        Channel --> W1[Worker 1]
        Channel --> W2[Worker 2]
        Channel --> W3[Worker 3]
    end

    W1 -->|Stats Parciais| Accumulator[Accumulator]
    W2 -->|Stats Parciais| Accumulator
    W3 -->|Stats Parciais| Accumulator

    Accumulator -->|JSON Final| Output[Relatório]
Figura 1: Fluxo de dentro do Backend

Componentes Principais

Componente Responsabilidade
Infra (Loader) Lê o arquivo linha a linha e envia para o canal. Atua como Producer.
Profiler (Pool) Gerencia um conjunto de Workers que processam dados em paralelo.
Accumulator Centraliza os resultados parciais, evitando Race Conditions críticas.

2. Worker Pool e Paralelismo

O coração do sistema é o Worker Pool. Ele limita o número de Goroutines ativas para evitar Context Switching excessivo e exaustão de CPU.

O número de workers é definido dinamicamente baseado nas CPUs disponíveis ou via configuração.

internal/profiler/pool.go
package profiler

import "sync"

var RowPool = sync.Pool{
    New: func() any {
        return make([]string, 0, 10)
    },
}

func GetRowSlice() []string {
    return RowPool.Get().([]string)
}

func PutRowSlice(row []string) {
    RowPool.Put(row[:0])
}

Cpu Bound vs I/O Bound

Como o gargalo do sistema é o processamento de texto (Regex/Parsing) e não a leitura de disco, a estratégia de múltiplos workers escala linearmente com o número de núcleos do processador.


3. Inferência de Tipos (Core Logic)

A detecção de tipos é feita através de expressões regulares (Regex) otimizadas e compiladas na inicialização do pacote. O sistema tenta inferir o tipo mais específico possível (Int > Float > String).

A lógica é isolada em funções puras para facilitar Testes Unitários e Fuzzing.

internal/profiler/infer.go
package profiler

import (
    "regexp"
    "strconv"
    "strings"
    "time"
)

var (
    // Essenciais
    RegexFiscalKey = regexp.MustCompile(`^\d{44}$`)                          // NFe, CTe, MDFe
    RegexPlaca     = regexp.MustCompile(`^[A-Z]{3}-?[0-9][0-9A-Z][0-9]{2}$`) // ABC1234 ou ABC1C34

    // Documentos Brasileiros
    RegexCPF  = regexp.MustCompile(`^\d{3}\.\d{3}\.\d{3}-\d{2}$`)       // 000.000.000-00
    RegexCNPJ = regexp.MustCompile(`^\d{2}\.\d{3}\.\d{3}/\d{4}-\d{2}$`) // 00.000.000/0000-00

    // Datas (Formatos comuns BR e ISO)
    RegexDateBr  = regexp.MustCompile(`^\d{2}/\d{2}/\d{4}$`) // DD/MM/YYYY
    RegexDateIso = regexp.MustCompile(`^\d{4}-\d{2}-\d{2}$`) // YYYY-MM-DD

    // Logística Avançada
    RegexContainer = regexp.MustCompile(`^[A-Z]{4}\d{7}$`)                  // Padrão ISO
    Regex8Digits   = regexp.MustCompile(`^\d{8}$`)                          // NCM, RNTRC, CEP sem traço, Data compacta
    Regex11Digits  = regexp.MustCompile(`^\d{11}$`)                         // CPF sem formatação
    RegexCEP       = regexp.MustCompile(`^\d{5}-\d{3}$`)                    // CEP com traço
    RegexMobile    = regexp.MustCompile(`^\(?\d{2}\)?\s?9\d{4}-?\d{4}$`)    // Celular com 9
    RegexEmail     = regexp.MustCompile(`^[\w-\.]+@([\w-]+\.)+[\w-]{2,4}$`) // Email
    RegexEAN       = regexp.MustCompile(`^\d{13,14}$`)                      // GTIN/EAN (Produtos)
)


func InferType(value string, headerName string) DataType {
    if value == "" {
        return TypeEmpty
    }
    headerLower := strings.ToLower(headerName)

    if Regex8Digits.MatchString(value) {
        if containsAny(headerLower, "ncm", "fiscal", "classificacao", "sh") {
            return TypeNCM
        }
        if containsAny(headerLower, "rntrc", "antt", "transportador") {
            return TypeRNTRC
        }
        if containsAny(headerLower, "cep", "zip", "postal") {
            return TypeCEP
        }
        if isCompactDate(value) {
            return TypeDateCompact
        }
    }

    if RegexFiscalKey.MatchString(value) {
        return TypeFiscalKey44
    }
    if RegexEmail.MatchString(value) {
        return TypeEmail
    }
    if RegexPlaca.MatchString(value) {
        return TypePlaca
    }
    if RegexContainer.MatchString(value) {
        return TypeContainer
    }
    if RegexCEP.MatchString(value) {
        return TypeCEP
    }
    if RegexMobile.MatchString(value) {
        return TypeMobile
    }

    if RegexDateBr.MatchString(value) || RegexDateIso.MatchString(value) {
        return TypeDate
    }

    if RegexEAN.MatchString(value) {
        if containsAny(headerLower, "ean", "gtin", "barras", "item", "produto", "sku") {
            return TypeEAN
        }

        if containsAny(headerLower, "cnpj", "fornecedor", "empresa", "transportadora") {
            return TypeCNPJ
        }
    }

    if RegexCNPJ.MatchString(value) {
        return TypeCNPJ
    }

    if Regex11Digits.MatchString(value) {
        if containsAny(headerLower, "cpf", "cliente", "consumidor", "pessoa", "colaborador", "funcionario", "funcionário", "usuario", "usuário", "rg", "identidade", "documento") {
            return TypeCPF
        }
    }
    if RegexCPF.MatchString(value) {
        return TypeCPF
    }

    if isInt(value) {
        return TypeInteger
    }
    if isFloat(value) {
        return TypeFloat
    }
    if isBool(value) {
        return TypeBoolean
    }

    return TypeString
}

func containsAny(text string, keywords ...string) bool {
    for _, k := range keywords {
        if strings.Contains(text, k) {
            return true
        }
    }
    return false
}

func isCompactDate(value string) bool {
    _, err := time.Parse("20060102", value)
    if err == nil {
        return true
    }
    _, err = time.Parse("02012006", value)
    return err == nil
}

func isInt(value string) bool {
    _, err := strconv.Atoi(value)

    return err == nil
}

func isBool(value string) bool {
    lower := strings.ToLower(value)
    return lower == "true" || lower == "false" || lower == "s" || lower == "n"
}

func isFloat(value string) bool {
    value = strings.Replace(value, ",", ".", 1)
    _, err := strconv.ParseFloat(value, 64)
    return err == nil
}

4. Estratégia de Streaming (Memória)

Para garantir que o consumo de memória permaneça constante (O(1)) independentemente do tamanho do arquivo, utilizamos bufio.Scanner e processamento em chunks.

  1. O arquivo é aberto via multipart.File.
  2. Um sniff inicial lê os primeiros 4KB para detectar separadores e encoding.
  3. O cursor volta ao início e o streaming começa.
internal/infra/csv_loader.go
package infra

import (
    "bufio"
    "context"
    "encoding/csv"
    "encoding/json"
    "errors"
    "fmt"
    "io"
    "log/slog"
    "os"
    "path/filepath"
    "sort"
    "unicode"
    "unicode/utf8"

    "github.com/JGustavoCN/dataprofiler/internal/profiler"
    "golang.org/x/text/encoding/charmap"
    unicodeenc "golang.org/x/text/encoding/unicode"
    "golang.org/x/text/transform"
)

func LoadCSV(logger *slog.Logger, filePath string) ([]profiler.Column, string, error) {
    if logger == nil {
        logger = slog.New(slog.NewJSONHandler(io.Discard, nil))
    }
    file, err := os.Open(filePath)
    if err != nil {
        logger.Error("Falha ao abrir arquivo", "path", filePath, "error", err)
        return nil, "", err
    }
    defer file.Close()

    nameFile := filepath.Base(file.Name())
    column, err := ParseData(logger, file)
    return column, nameFile, err
}

func ParseData(logger *slog.Logger, file io.Reader) ([]profiler.Column, error) {
    if logger == nil {
        logger = slog.New(slog.NewJSONHandler(io.Discard, nil))
    }
    smartReader, err := NewSmartReader(logger, file)
    if err != nil {
        return nil, err
    }

    bufferedSmartReader := bufio.NewReaderSize(smartReader, 1024*1024)

    separator, err := DetectSeparator(bufferedSmartReader)
    if err != nil {
        separator = ';'
        logger.Warn("Falha na detecção de separador, usando fallback", "error", err, "fallback", separator)
    } else {
        logger.Info("Separador detectado", "separator", string(separator))
    }

    reader := csv.NewReader(bufferedSmartReader)
    reader.Comma = separator
    reader.LazyQuotes = true
    records, err := reader.ReadAll()
    if err != nil {
        return nil, err
    }

    if len(records) == 0 {
        return []profiler.Column{}, nil
    }

    headers := records[0]

    logger.Info("Estrutura carregada",
        "total_rows", len(records),
        "columns_count", len(headers),
        "headers", headers,
    )
    columns := make([]profiler.Column, len(headers))
    for i, name := range headers {
        columns[i] = profiler.Column{
            Name:   name,
            Values: make([]string, 0, len(records)-1),
        }
    }

    for _, row := range records[1:] {
        for i, value := range row {
            if i < len(columns) {
                columns[i].Values = append(columns[i].Values, value)
            }
        }
    }

    return columns, nil
}

func ParseDataAsync(ctx context.Context, logger *slog.Logger, r io.Reader) ([]string, <-chan profiler.StreamData, error) {
    if logger == nil {
        logger = slog.New(slog.NewJSONHandler(io.Discard, nil))
    }

    smartReader, err := NewSmartReader(logger, r)
    if err != nil {
        return nil, nil, err
    }

    bufferedSmartReader := bufio.NewReaderSize(smartReader, 1024*1024)
    isJson, err := sniffJSON(bufferedSmartReader)
    if err != nil {
        return nil, nil, fmt.Errorf("erro ao detectar formato: %w", err)
    }

    if isJson {
        logger.Info("Formato detectado: JSONL (Logs/NoSQL)")
        return parseJSONLAsync(ctx, logger, bufferedSmartReader)
    }
    logger.Info("Formato detectado: CSV (Tabular)")
    return parseCSVAsync(ctx, logger, bufferedSmartReader)

}

func sniffJSON(r *bufio.Reader) (bool, error) {
    bytesToPeek, err := r.Peek(50)
    if err != nil && err != io.EOF {
        return false, err
    }

    for _, b := range bytesToPeek {
        if unicode.IsSpace(rune(b)) {
            continue
        }
        if b == '{' {
            return true, nil
        }
        return false, nil
    }
    return false, nil
}

func parseCSVAsync(ctx context.Context, logger *slog.Logger, reader *bufio.Reader) ([]string, <-chan profiler.StreamData, error) {
    out := make(chan profiler.StreamData, 1000)

    separator, err := DetectSeparator(reader)
    if err != nil {
        separator = ';'
        logger.Warn("Falha na detecção de separador, usando fallback", "error", err, "fallback", separator)
    } else {
        logger.Info("Separador detectado", "separator", string(separator))
    }

    csvReader := csv.NewReader(reader)
    csvReader.Comma = separator
    csvReader.LazyQuotes = true
    csvReader.ReuseRecord = true

    headersRef, err := csvReader.Read()
    if err != nil {
        close(out)
        return nil, nil, err
    }
    headers := make([]string, len(headersRef))
    copy(headers, headersRef)
    csvReader.FieldsPerRecord = len(headers)
    logger.Info("Início do streaming",
        "columns_count", len(headers),
        "headers", headers,
    )

    go func() {
        defer close(out)
        count := 0
        lineNum := 1
        errorCount := 0
        for {
            select {
            case <-ctx.Done():
                logger.Warn("Leitura cancelada pelo contexto")
                return
            default:
                record, err := csvReader.Read()
                lineNum++
                if err == io.EOF {
                    goto EndProcessing
                }
                if err != nil {
                    errorCount++
                    out <- profiler.StreamData{
                        Row:        nil,
                        LineNumber: lineNum,
                        Err:        err,
                    }
                    continue
                }

                rowCopy := profiler.GetRowSlice()
                rowCopy = append(rowCopy, record...)

                out <- profiler.StreamData{
                    Row:        rowCopy,
                    LineNumber: lineNum,
                    Err:        nil,
                }
                count++

            }
        }
    EndProcessing:
        logger.Info("Streaming CSV finalizado",
            "total_rows_read", lineNum-1,
            "total_errors", errorCount,
        )
    }()

    return headers, out, nil
}

func parseJSONLAsync(ctx context.Context, logger *slog.Logger, reader *bufio.Reader) ([]string, <-chan profiler.StreamData, error) {
    out := make(chan profiler.StreamData, 1000)

    scanner := bufio.NewScanner(reader)

    const maxCapacity = 1024 * 1024
    buf := make([]byte, maxCapacity)
    scanner.Buffer(buf, maxCapacity)

    if !scanner.Scan() {
        if err := scanner.Err(); err != nil {
            return nil, nil, fmt.Errorf("erro lendo primeira linha JSON: %w", err)
        }
        return nil, nil, errors.New("arquivo JSONL vazio")
    }

    firstLine := scanner.Bytes()
    var firstMap map[string]interface{}
    if err := json.Unmarshal(firstLine, &firstMap); err != nil {
        return nil, nil, fmt.Errorf("erro de parsing na primeira linha (não é JSON válido?): %w", err)
    }

    headers := make([]string, 0, len(firstMap))
    for k := range firstMap {
        headers = append(headers, k)
    }

    sort.Strings(headers)

    logger.Info("Schema JSONL inferido", "headers", headers)

    go func() {
        defer close(out)

        processMap := func(m map[string]interface{}, lineNum int) {
            row := profiler.GetRowSlice()

            for _, header := range headers {
                val, exists := m[header]
                if !exists || val == nil {
                    row = append(row, "")
                } else {
                    row = append(row, fmt.Sprintf("%v", val))
                }
            }

            out <- profiler.StreamData{
                Row:        row,
                LineNumber: lineNum,
                Err:        nil,
            }
        }

        processMap(firstMap, 1)

        lineNum := 1
        for scanner.Scan() {
            select {
            case <-ctx.Done():
                return
            default:
            }
            lineNum++

            if len(scanner.Bytes()) == 0 {
                continue
            }

            var currentMap map[string]interface{}
            if err := json.Unmarshal(scanner.Bytes(), &currentMap); err != nil {
                out <- profiler.StreamData{
                    LineNumber: lineNum,
                    Err:        fmt.Errorf("json malformado: %w", err),
                }
                continue
            }

            processMap(currentMap, lineNum)
        }

        if err := scanner.Err(); err != nil {
            logger.Error("Erro fatal no scanner JSONL", "error", err)
            out <- profiler.StreamData{
                LineNumber: lineNum,
                Err:        fmt.Errorf("erro de I/O: %w", err),
            }
        }
    }()

    return headers, out, nil
}

func NewSmartReader(logger *slog.Logger, r io.Reader) (io.Reader, error) {
    br := bufio.NewReaderSize(r, 1024*1024)

    bomCheck, err := br.Peek(4)
    if err != nil && err != io.EOF && len(bomCheck) < 2 {
        return br, nil
    }

    if len(bomCheck) >= 2 && bomCheck[0] == 0xFF && bomCheck[1] == 0xFE {
        logger.Info("Encoding detectado: UTF-16 LE (Convertendo para UTF-8)")
        win16le := unicodeenc.UTF16(unicodeenc.LittleEndian, unicodeenc.UseBOM)
        return transform.NewReader(br, win16le.NewDecoder()), nil
    }
    if len(bomCheck) >= 2 && bomCheck[0] == 0xFE && bomCheck[1] == 0xFF {
        logger.Info("Encoding detectado: UTF-16 BE (Convertendo para UTF-8)")
        win16be := unicodeenc.UTF16(unicodeenc.BigEndian, unicodeenc.UseBOM)
        return transform.NewReader(br, win16be.NewDecoder()), nil
    }

    const sampleSize = 2048
    sample, err := br.Peek(sampleSize)

    if err != nil && err != io.EOF {
        return nil, err
    }

    if utf8.Valid(sample) {
        logger.Debug("Encoding detectado: UTF-8 (Nativo)")
        return br, nil
    }

    logger.Warn("Encoding UTF-8 inválido detectado na amostra. Aplicando fallback Windows1252 -> UTF-8")
    decoderReader := transform.NewReader(br, charmap.Windows1252.NewDecoder())
    return decoderReader, nil
}

func DetectSeparator(r *bufio.Reader) (rune, error) {

    bytesToPeek, err := r.Peek(2048)
    if err != nil && err != io.EOF {
        return ';', err
    }

    semicolonCount := 0 // ;
    commaCount := 0     // ,
    pipeCount := 0      // |
    tabCount := 0       // \t

    for _, b := range bytesToPeek {
        if b == '\n' || b == '\r' {
            break
        }
        switch b {
        case ';':
            semicolonCount++
        case ',':
            commaCount++
        case '|':
            pipeCount++
        case '\t':
            tabCount++
        }
    }

    separator := ';'
    maxCount := semicolonCount

    if commaCount > maxCount {
        maxCount = commaCount
        separator = ','
    }
    if pipeCount > maxCount {
        maxCount = pipeCount
        separator = '|'
    }
    if tabCount > maxCount {
        separator = '\t'
    }

    return separator, nil
}

5. API e Server-Sent Events (SSE)

A comunicação com o frontend não bloqueia a requisição de upload.

  1. Upload Handler: Recebe o arquivo e despacha uma goroutine go p.Run().
  2. SSE Handler: Mantém uma conexão HTTP aberta para enviar o progresso calculado pelo ProgressTracker.

Graceful Shutdown

O servidor utiliza context.Context para cancelar o processamento de arquivos caso o cliente encerre a conexão abruptamente, liberando recursos do servidor.

cmd/api/main.go
    logger := slog.New(slog.NewJSONHandler(logOutput, nil))

    slog.SetDefault(logger)

    if *cliMode {
        if *filePath == "" {
            slog.Error("Erro: No modo -cli, forneça o arquivo: -file=\"dados.csv\"")
            os.Exit(1)
        }
        runCLI(logger, *filePath)
        return
    }

    runServer()

}

func runCLI(logger *slog.Logger, path string) {
    start := time.Now()

    logger.Info("CLI: Iniciando DataProfiler", "mode", "streaming", "file", path)

(Exemplo de trecho do main.go configurando as rotas)