嵌入式 C 语言实战:基于 STM32 的覆盖型环形队列设计与并发控制

目录

1. 背景知识与应用场景

在 STM32 等微控制器开发中,环形队列(Ring Buffer)处理异步数据收发(如 UART、CAN、USB 接收)的标准数据结构。传统的无锁环形队列在面对数据量过载时,通常采取丢弃新数据的保护策略。

然而,在诸如实时传感器数据流、设备最新状态更新、视频帧传输等场景下,旧数据往往已经失去了时效性。此时,系统的核心需求转变为:​当缓冲区满时,丢弃最老的数据,强制存入最新数据​。

本文将详细实现一种支持“强制覆盖旧数据”的环形队列,并重点探讨在引入此机制后,如何处理 STM32 中断与主循环之间的并发冲突问题。

2. 核心设计思想:覆盖策略与锁的引入

2.1 强制覆盖逻辑

  • ​**入队(Push)**​:无论队列是否已满,新数据直接写入当前写指针(head)所在位置,随后推进 head。如果写入前检测到队列已满,则同步将读指针(tail)也向前推进一格,以此实现“挤掉”最老数据的效果。
  • ​**出队(Pop)**​:遵循先进先出(FIFO),从 tail 开始读取数据。

2.2 并发冲突的打破与重建

在传统的单生产者单消费者(SPSC)无锁队列中:中断只管改 head,主循环只管改 tail,互不干涉。

但引入覆盖逻辑后,​当队列满时,中断中的 Push 函数也会修改 tail 指针​。这就意味着,主循环的 Pop 和中断的 Push 可能会同时修改同一个变量(tail)。如果主循环读到一半被中断打断,读写指针就会发生错乱。

解决方案​:必须在主循环调用 Pop 时引入​**临界区(Critical Section)**​,短暂关闭全局中断,确保出队操作的原子性。

3. 完整源码实现

3.1 队列头文件 (ring_buffer.h)

#ifndef __RING_BUFFER_H
#define __RING_BUFFER_H

#include <stdint.h>
#include <stdbool.h>

/* 环形队列结构体 */
typedef struct {
    uint8_t *buffer;            /* 数据缓冲区指针 */
    volatile uint32_t head;     /* 写指针(队头) */
    volatile uint32_t tail;     /* 读指针(队尾) */
    uint32_t capacity;          /* 缓冲区总容量 */
} RingBuffer_t;

/* 核心 API 声明 */
void RingBuffer_Init(RingBuffer_t *rb, uint8_t *buffer, uint32_t capacity);
bool RingBuffer_Push(RingBuffer_t *rb, uint8_t data);
bool RingBuffer_Pop(RingBuffer_t *rb, uint8_t *data);
bool RingBuffer_IsFull(const RingBuffer_t *rb);
bool RingBuffer_IsEmpty(const RingBuffer_t *rb);
uint32_t RingBuffer_GetCount(const RingBuffer_t *rb);
void RingBuffer_Clear(RingBuffer_t *rb);

#endif /* __RING_BUFFER_H */

3.2 队列源文件 (ring_buffer.c)

#include "ring_buffer.h"

void RingBuffer_Init(RingBuffer_t *rb, uint8_t *buffer, uint32_t capacity) {
    rb->buffer = buffer;
    rb->capacity = capacity;
    rb->head = 0;
    rb->tail = 0;
}

bool RingBuffer_IsFull(const RingBuffer_t *rb) {
    return ((rb->head + 1) % rb->capacity) == rb->tail;
}

bool RingBuffer_IsEmpty(const RingBuffer_t *rb) {
    return rb->head == rb->tail;
}

/**
 * @brief 写入数据(覆盖最老数据模式)
 */
bool RingBuffer_Push(RingBuffer_t *rb, uint8_t data) {
    bool was_full = RingBuffer_IsFull(rb);
    
    /* 直接写入数据,并推进写指针 */
    rb->buffer[rb->head] = data;
    rb->head = (rb->head + 1) % rb->capacity; 
    
    if (was_full) {
        /* 如果队列已满,必须将尾指针同步向前推进,丢弃最老数据 */
        rb->tail = (rb->tail + 1) % rb->capacity;
        return false; /* 返回 false 表示发生了旧数据覆盖 */
    }
    
    return true; 
}

/**
 * @brief 读取最老的数据
 */
bool RingBuffer_Pop(RingBuffer_t *rb, uint8_t *data) {
    if (RingBuffer_IsEmpty(rb)) {
        return false; /* 队列空 */
    }
    
    *data = rb->buffer[rb->tail];
    rb->tail = (rb->tail + 1) % rb->capacity;
    
    return true;
}

uint32_t RingBuffer_GetCount(const RingBuffer_t *rb) {
    uint32_t head = rb->head;
    uint32_t tail = rb->tail;
    if (head >= tail) {
        return head - tail;
    } else {
        return rb->capacity - tail + head;
    }
}

void RingBuffer_Clear(RingBuffer_t *rb) {
    rb->head = 0;
    rb->tail = 0;
}

4. STM32 平台应用示例

在实际应用中,生产者通常是硬件中断,消费者是 main 函数中的 while(1) 循环。

#include "stm32f4xx_hal.h"
#include "ring_buffer.h"
#include <stdio.h>

#define UART_RX_BUFFER_SIZE 128

RingBuffer_t uart_rx_rb;
uint8_t uart_rx_buffer_array[UART_RX_BUFFER_SIZE];
extern UART_HandleTypeDef huart1;
uint8_t rx_temp_byte;

void System_Setup(void) {
    RingBuffer_Init(&uart_rx_rb, uart_rx_buffer_array, UART_RX_BUFFER_SIZE);
    HAL_UART_Receive_IT(&huart1, &rx_temp_byte, 1);
}

/* 中断回调:作为生产者 */
void HAL_UART_RxCpltCallback(UART_HandleTypeDef *huart) {
    if (huart->Instance == USART1) {
        /* 队列满了会自动覆盖旧数据,中断中无需加锁 */
        RingBuffer_Push(&uart_rx_rb, rx_temp_byte);
        HAL_UART_Receive_IT(&huart1, &rx_temp_byte, 1);
    }
}

/* 主循环:作为消费者 */
int main(void) {
    HAL_Init();
    // ... 系统与外设初始化 ...
    System_Setup();
    
    uint8_t process_byte;
    
    while (1) {
        if (!RingBuffer_IsEmpty(&uart_rx_rb)) {
            
            /* --- 进入临界区:保护 Pop 操作 --- */
            uint32_t primask_bit = __get_PRIMASK(); /* 保存当前中断掩码状态 */
            __disable_irq();                        /* 关闭全局中断 */
            
            bool has_data = RingBuffer_Pop(&uart_rx_rb, &process_byte);
            
            __set_PRIMASK(primask_bit);             /* 恢复全局中断 */
            /* --- 退出临界区 --- */
            
            if (has_data) {
                // 业务逻辑:处理获取到的数据
                printf("Data: %c\r\n", process_byte);
            }
        }
        // ... 其他后台任务 ...
    }
}

5. 常见问题排查:高频通信下的数据错乱

  • 现象​:在低波特率或低频数据写入时一切正常,但在高波特率连续传输时,主循环取出的数据偶尔出现跳段、乱码,甚至系统直接卡死(死循环)。
  • 分析​:数据内容异常通常是由于指针越界或读写错位导致的。系统卡死则说明 headtail 的相对位置关系被破坏,导致 RingBuffer_GetCountIsEmpty 计算陷入异常状态。
  • 定位​:重点检查共享变量 tail 的读写场景。发现在高频接收且队列满载的状态下,中断频率极高。
  • 原因​​:产生了竞态条件(Race Condition)。当队列满时,中断里的 Push 正在执行推进 tail 的指令;恰好此时主循环也正在执行 Pop 中的推进 tail 指令。汇编级别的“读-改-写”过程被打断,导致 tail 指针的值被覆盖或计算错误,整个队列的结构遭到破坏。
  • 解决方案​:打破并发条件。在主循环执行 Pop 操作前后,利用内核指令 __disable_irq()__set_PRIMASK() 将其包装成不可被中断的临界区。由于 Pop 内部仅有几条基础运算指令,关中断的时间仅在数百纳秒级别,不会影响整体系统的实时响应。
  • 结果​:添加临界区保护后,读写冲突被彻底消除,队列在极限并发压力下依然保持稳定,旧数据平滑滚动覆盖,新数据无一错漏。

6. 总结

在嵌入式开发中,需求决定了数据结构的选型。普通的无锁环形队列适用于绝大多数常规数据缓冲场景;而支持“覆盖旧数据”的环形队列,则是解决高频实时数据流防堵塞的利器。

但在修改底层逻辑时,开发者必须具备极强的并发意识。哪怕只是让中断多修改了一个原本归属于主循环的变量,都会直接摧毁无锁设计的根基。熟练运用临界区保护共享资源,是每一位嵌入式 C 语言开发者跨向进阶的必修课。