Testando corretamente estruturas de dados concorrentes
Um, dois, três, dois
- É possível testar exaustivamente estruturas de dados lock-free usando
loom, uma biblioteca Rust
- Exemplo de código de um contador concorrente simples
- O bug no código é que a operação de incremento não é atômica
use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
#[derive(Default)]
pub struct Counter {
value: AtomicU32,
}
impl Counter {
pub fn increment(&self) {
let value = self.value.load(SeqCst);
self.value.store(value + 1, SeqCst);
}
pub fn get(&self) -> u32 {
self.value.load(SeqCst)
}
}
Teste simples
- Um teste que incrementa repetidamente o mesmo contador em várias threads e verifica o resultado
- O teste falha com sucesso, mas é difícil reproduzir por depender do timing
#[test]
fn threaded_test() {
let counter = Counter::default();
let thread_count = 100;
let increment_count = 100;
std::thread::scope(|scope| {
for _ in 0..thread_count {
scope.spawn(|| {
for _ in 0..increment_count {
counter.increment()
}
});
}
});
assert_eq!(counter.get(), thread_count * increment_count);
}
Teste baseado em propriedades (PBT)
- Tentativa de aplicar teste baseado em propriedades, adequado para testar máquinas de estado
- Se fosse possível executar manualmente as threads passo a passo, seria fácil intercalar outra thread entre o
load e o store
#[test]
fn state_machine_test() {
arbtest::arbtest(|rng| {
let mut state: i32 = 0;
let step_count: usize = rng.int_in_range(0..=100)?;
for _ in 0..step_count {
match *rng.choose(&["inc", "dec"])? {
"inc" => state += 1,
"dec" => state -= 1,
_ => unreachable!(),
}
}
Ok(())
});
}
Instrumentação simples
- Uma forma de permitir que a thread "pause" entre operações atômicas
pub fn increment(&self) {
pause();
let value = self.value.load(SeqCst);
pause();
self.value.store(value + 1, SeqCst);
pause();
}
fn pause() {
// ¯\\_(ツ)_/¯
}
API de threads gerenciadas
- Uma regra no design de APIs é começar com um único usuário para entender a sensação da API antes de partir para a implementação real
- Escrevendo testes baseados em propriedades usando threads gerenciadas
let counter = Counter::default();
let t1 = managed_thread::spawn(&counter);
let t2 = managed_thread::spawn(&counter);
while !rng.is_empty() {
let coin_flip: bool = rng.arbitrary()?;
if t1.is_paused() {
if coin_flip {
t1.unpause();
}
} else if t2.is_paused() {
if coin_flip {
t2.unpause();
}
}
}
Implementação de threads gerenciadas
- É necessária comunicação entre a thread controladora e as threads gerenciadas
- Implementado usando um mutex e uma variável de condição para proteger o estado
struct SharedContext {
state: Mutex<State>,
cv: Condvar,
}
#[derive(PartialEq, Eq, Default)]
enum State {
#[default]
Running,
Paused,
}
impl SharedContext {
fn pause(&self) {
let mut guard = self.state.lock().unwrap();
assert_eq!(*guard, State::Running);
*guard = State::Paused;
self.cv.notify_all();
guard = self.cv.wait_while(guard, |state| *state == State::Paused).unwrap();
assert_eq!(*guard, State::Running);
}
}
Integração do código completo
- Integração das threads gerenciadas com o código de teste
#[test]
fn test_counter() {
arbtest::arbtest(|rng| {
eprintln!("begin trace");
let counter = Counter::default();
let mut counter_model: u32 = 0;
std::thread::scope(|scope| {
let t1 = managed_thread::spawn(scope, &counter);
let t2 = managed_thread::spawn(scope, &counter);
let mut threads = [t1, t2];
while !rng.is_empty() {
for (tid, t) in threads.iter_mut().enumerate() {
if rng.arbitrary()? {
if t.is_paused() {
eprintln!("{tid}: unpause");
t.unpause()
} else {
eprintln!("{tid}: increment");
t.submit(|c| c.increment());
counter_model += 1;
}
}
}
}
for t in threads {
t.join();
}
assert_eq!(counter_model, counter.get());
Ok(())
})
});
}
Resumo do GN⁺
- Este texto explica como testar estruturas de dados concorrentes
- Explora como usar a biblioteca
loom do Rust para testar operações não atômicas
- Usa threads gerenciadas para testar problemas de concorrência de forma reproduzível e depurável
- Este texto deve ser útil para desenvolvedores interessados em programação concorrente
- Um projeto com funcionalidade semelhante é o JCStress, do Java
1 comentários
Opiniões do Hacker News
Estou desenvolvendo uma biblioteca chamada Temper em Rust, e é preciso muito esforço para lidar com as partes complexas do modelo de memória do Rust
Implementei snapshots atômicos de memória compartilhada em Rust e considero testes automatizados extremamente importantes
A desvantagem dessa abordagem é que é preciso modificar o próprio código para adequá-lo ao código de teste
ptracepara misturar aleatoriamente a execução das instruções por meio de single-stepO Lincheck da JetBrains é uma boa biblioteca no mundo Kotlin/Java
Fico me perguntando se existe alguma biblioteca parecida com o "Loom" para C++
Essa abordagem pode ter limitações quanto às garantias de progresso suave
cmpxchg, em hardware real e com o escalonador real, a chance de interrupção é muito baixaÉ preciso conhecimento prático e criar threads reais
É possível usar
ptracepara executar threads em single-step e criar diferentes interleavings no nível de instruçãoPara usar Loom, é necessário usar compilação condicional, o que é um tanto intrusivo
Gostaria de saber como fazer o mesmo em Python