2 pontos por GN⁺ 2024-07-07 | 1 comentários | Compartilhar no WhatsApp

Testando corretamente estruturas de dados concorrentes

Um, dois, três, dois

  • É possível testar exaustivamente estruturas de dados lock-free usando loom, uma biblioteca Rust
  • Exemplo de código de um contador concorrente simples
  • O bug no código é que a operação de incremento não é atômica
use std::sync::atomic::{AtomicU32, Ordering::SeqCst};

#[derive(Default)]
pub struct Counter {
    value: AtomicU32,
}

impl Counter {
    pub fn increment(&self) {
        let value = self.value.load(SeqCst);
        self.value.store(value + 1, SeqCst);
    }

    pub fn get(&self) -> u32 {
        self.value.load(SeqCst)
    }
}

Teste simples

  • Um teste que incrementa repetidamente o mesmo contador em várias threads e verifica o resultado
  • O teste falha com sucesso, mas é difícil reproduzir por depender do timing
#[test]
fn threaded_test() {
    let counter = Counter::default();
    let thread_count = 100;
    let increment_count = 100;

    std::thread::scope(|scope| {
        for _ in 0..thread_count {
            scope.spawn(|| {
                for _ in 0..increment_count {
                    counter.increment()
                }
            });
        }
    });

    assert_eq!(counter.get(), thread_count * increment_count);
}

Teste baseado em propriedades (PBT)

  • Tentativa de aplicar teste baseado em propriedades, adequado para testar máquinas de estado
  • Se fosse possível executar manualmente as threads passo a passo, seria fácil intercalar outra thread entre o load e o store
#[test]
fn state_machine_test() {
    arbtest::arbtest(|rng| {
        let mut state: i32 = 0;
        let step_count: usize = rng.int_in_range(0..=100)?;

        for _ in 0..step_count {
            match *rng.choose(&["inc", "dec"])? {
                "inc" => state += 1,
                "dec" => state -= 1,
                _ => unreachable!(),
            }
        }
        Ok(())
    });
}

Instrumentação simples

  • Uma forma de permitir que a thread "pause" entre operações atômicas
pub fn increment(&self) {
    pause();
    let value = self.value.load(SeqCst);
    pause();
    self.value.store(value + 1, SeqCst);
    pause();
}

fn pause() {
    // ¯\\_(ツ)_/¯
}

API de threads gerenciadas

  • Uma regra no design de APIs é começar com um único usuário para entender a sensação da API antes de partir para a implementação real
  • Escrevendo testes baseados em propriedades usando threads gerenciadas
let counter = Counter::default();
let t1 = managed_thread::spawn(&counter);
let t2 = managed_thread::spawn(&counter);

while !rng.is_empty() {
    let coin_flip: bool = rng.arbitrary()?;
    if t1.is_paused() {
        if coin_flip {
            t1.unpause();
        }
    } else if t2.is_paused() {
        if coin_flip {
            t2.unpause();
        }
    }
}

Implementação de threads gerenciadas

  • É necessária comunicação entre a thread controladora e as threads gerenciadas
  • Implementado usando um mutex e uma variável de condição para proteger o estado
struct SharedContext {
    state: Mutex<State>,
    cv: Condvar,
}

#[derive(PartialEq, Eq, Default)]
enum State {
    #[default]
    Running,
    Paused,
}

impl SharedContext {
    fn pause(&self) {
        let mut guard = self.state.lock().unwrap();
        assert_eq!(*guard, State::Running);
        *guard = State::Paused;
        self.cv.notify_all();
        guard = self.cv.wait_while(guard, |state| *state == State::Paused).unwrap();
        assert_eq!(*guard, State::Running);
    }
}

Integração do código completo

  • Integração das threads gerenciadas com o código de teste
#[test]
fn test_counter() {
    arbtest::arbtest(|rng| {
        eprintln!("begin trace");
        let counter = Counter::default();
        let mut counter_model: u32 = 0;

        std::thread::scope(|scope| {
            let t1 = managed_thread::spawn(scope, &counter);
            let t2 = managed_thread::spawn(scope, &counter);
            let mut threads = [t1, t2];

            while !rng.is_empty() {
                for (tid, t) in threads.iter_mut().enumerate() {
                    if rng.arbitrary()? {
                        if t.is_paused() {
                            eprintln!("{tid}: unpause");
                            t.unpause()
                        } else {
                            eprintln!("{tid}: increment");
                            t.submit(|c| c.increment());
                            counter_model += 1;
                        }
                    }
                }
            }

            for t in threads {
                t.join();
            }
            assert_eq!(counter_model, counter.get());

            Ok(())
        })
    });
}

Resumo do GN⁺

  • Este texto explica como testar estruturas de dados concorrentes
  • Explora como usar a biblioteca loom do Rust para testar operações não atômicas
  • Usa threads gerenciadas para testar problemas de concorrência de forma reproduzível e depurável
  • Este texto deve ser útil para desenvolvedores interessados em programação concorrente
  • Um projeto com funcionalidade semelhante é o JCStress, do Java

1 comentários

 
GN⁺ 2024-07-07
Opiniões do Hacker News
  • Estou desenvolvendo uma biblioteca chamada Temper em Rust, e é preciso muito esforço para lidar com as partes complexas do modelo de memória do Rust

    • Ela inclui a maior coleção de casos de teste para os modelos de memória de C++/Rust
    • O Loom é uma biblioteca mais completa, que permite testar exaustivamente estruturas de alto nível como mutexes e filas
    • Fui inspirado por uma palestra sobre testes do Foundation DB e acredito que WebAssembly terá um papel importante nessa área
  • Implementei snapshots atômicos de memória compartilhada em Rust e considero testes automatizados extremamente importantes

    • No início usei Loom, mas depois migrei para Shuttle
    • O Shuttle usa uma abordagem aleatorizada e oferece garantias probabilísticas de encontrar bugs
    • O Shuttle é mais rápido e escala bem para cenários de teste mais complexos
    • A capacidade de reproduzir testes que falharam é muito importante
  • A desvantagem dessa abordagem é que é preciso modificar o próprio código para adequá-lo ao código de teste

    • Também seria possível fazer isso executando duas threads e usando ptrace para misturar aleatoriamente a execução das instruções por meio de single-step
  • O Lincheck da JetBrains é uma boa biblioteca no mundo Kotlin/Java

    • Gosto do fato de ser declarativo e de imprimir o resultado da linearização
  • Fico me perguntando se existe alguma biblioteca parecida com o "Loom" para C++

    • Quero testar estruturas de dados lock-free
  • Essa abordagem pode ter limitações quanto às garantias de progresso suave

    • Em um loop de cmpxchg, em hardware real e com o escalonador real, a chance de interrupção é muito baixa
    • Porém, nessa abordagem de teste, a probabilidade de progresso muda conforme a quantidade de operações e de pausas
  • É preciso conhecimento prático e criar threads reais

    • Fico me perguntando se seria possível usar um runtime assíncrono
  • É possível usar ptrace para executar threads em single-step e criar diferentes interleavings no nível de instrução

    • Fico me perguntando se existe alguma alternativa de teste de caixa-preta
  • Para usar Loom, é necessário usar compilação condicional, o que é um tanto intrusivo

    • Fico me perguntando se outras linguagens são melhores por usarem seu próprio escalonador
  • Gostaria de saber como fazer o mesmo em Python

    • Fico me perguntando se seria possível criar uma classe de thread que permita esse tipo de teste