Работа с кольцевым буфером Печать
Добавил(а) microsin   

Применение кольцевого буфера (ring buffer) - обычный прием, когда нужно организовать поток данных между асинхронными по отношению друг к другу процессами - например, между получением данных по каналу связи и обработкой этих данных в программе. Кольцевой буфер является упрощенным аналогом очереди (queue), которая применяется в многозадачных операционных системах (Windows, Linux, FreeRTOS и многих других) для организации безопасного (thread-safe) обмена данными между потоками (синхронизации).

Кольцевой буфер может использоваться не только на прием, но и на передачу - например, если нужно быстро подготовленные где-то данные постепенно передавать с течением времени. Кольцевой буфер удобен прежде всего тем, что очень просто производить заполнение буфера, проверку наличия данных в буфере и выборку данных из буфера, при этом не нужно особенно беспокоиться о выходе данных за границу буфера, переполнении памяти и т. п. Кольцевой буфер позволяет корректно обмениваться данными между обработчиком прерывания и основной программой.

Кольцевой буфер является разновидностью буфера FIFO, First Input First Output (первый зашел - первый вышел). Принцип кольцевого буфера довольно прост - в памяти выделяется непрерывный блок размером обычно равным степени двойки (назовем его buffer), и два индекса (idxIN и idxOUT) - один индекс указывает на место для записи в буфер (idxIN), другой - на место чтения из буфера. Размер буфера, равный степени двойки, выбирается для того, чтобы удобно было манипулировать индексами, указывающими на данные буфера, с помощью инкремента/декремента и наложения маски (это будет понятно далее). Индекс - это обычное число, равное адресу ячейки буфера. Например, на ячейку buffer[0] указывает индекс, равный нулю. Количество бит индекса как раз равно степени двойки размера буфера. Максимально удобны буфера размером 256 байт - в этом случае в качестве индекса можно применить 1 байт, и маску при операциях с указателями уже накладывать не надо. Код получается в этом случае максимально быстрый и компактный. На рисунке показан принцип работы кольцевого буфера на примере буфера в 16 байт (желтым показаны еще не обработанные данные буфера):

ringbuf.jpg

Вот так, например, выделяется буфер:

#define BUF_SIZE 16 //размер буфера обязательно должен быть равен степени двойки!
#define BUF_MASK (BUF_SIZE-1)
 
u8 idxIN, idxOUT;
char buffer [BUF_SIZE];

При помещении значения value в буфер используется индекс idxIN. Это делается так:

buffer[idxIN++] = value;
idxIN &= BUF_MASK;

Значение константы BUF_MASK равно размеру буфера минус 1 (при условии, что размер буфера равен степени двойки). При таком принципе работы с индексом происходит автоматический переход на начало буфера, как только мы достигли его конца.

Операция выборки из буфера происходит похожим образом, только используется индекс idxOUT:

value = buffer[idxOUT++];
idxOUT &= BUF_MASK;

Теперь становится понятным, почему при размере буфера 256 байт и байтовом индексе не нужна операция наложения маски - переход в ноль индекса при достижении конца буфера происходит автоматически.

Проверка на наличие данных в буфере происходит очень просто - если idxIN не равен idxOUT, то в буфере есть данные, в противном случае данных в буфере нет. Индекс idxOUT как-бы постоянно догоняет индекс idxIN при работе с данными буфера. Если догнал - значит, считывать из буфера больше нечего.

if (idxIN != idxOUT)
{
   //данные есть, обрабатываем их
   ...
}

Сбросить данные буфера (т. е. удалить их оттуда) тоже очень просто - для этого в idxOUT записывают значение idxIN:

idxOUT = idxIN;

Иногда бывает необходимо знать, что не только данные в буфере есть, а еще нужно знать сколько именно байт данных в буфере. Для этого используется довольно простая процедура:

u8 idxDiff (u8 idxIN, u8 idxOUT)
{
    if (idxIN >= idxOUT)
        return (idxIN - idxOUT);
    else
        return ((BUF_SIZE - idxOUT) + idxIN);
}

UPD100920: процедуру получения количества данных в буфере idxDiff можно упростить следующим образом:

u8 idxDiff (u8 idxIN, u8 idxOUT)
{
   return (idxIN - idxOUT)&BUF_MASK;
}

Кольцевой буфер не очень удобен для случаев, когда имеется несколько отправителей и один получатель, тогда при выборке информации из буфера необходимо правильно её сортировать. Для обратного случая - один отправитель и несколько получателей - кольцевой буфер подходит отлично, но нужно иметь для каждого получателя отдельный индекс idxOUT. Если же только один получатель и только один отправитель, то кольцевой буфер по быстродействию и простоте реализации практически идеальный вариант.

[RingBuffer_t, автор Stanimir Bonev]

Есть более сложные, однако более гибкие в плане удобства использования кольцевые буферы. Реализация программного менеджера кольцевого буфера от Stanimir Bonev позволяет работать с кольцевым буфером как с линейным блоком памяти (можно выбрать как один байт, так и несколько). Есть возможность произвольно выбирать размер буфера, реализована блокировка на исключительный доступ к буферу.

Работа с буфером происходит через структуру RingBuffer_t:

typedef struct _RingBuffer_t
{
   u8 *pPush;        // Текущая позиция вставки
   u8 *pPop;         // Текущая позиция выборки
   u32 Count;        // Количество занятых байт в буфере
   u8 *pBegin;       // Начало кольцевого буфера в памяти
   u8 *pEnd;         // Конец кольцевого буфера в памяти
   u32 Size;         // Размер кольцевого буфера
   bool LockUsed;    // Флаг блокировки на извлечение из буфера
   bool LockFree;    // Флаг блокировки на вставку в буфер
} RingBuffer_t, *pRingBuffer_t;

В таблице приведено описание функций кольцевого буфера.

Функция Описание
RingBuff_Init Инициализация структуры кольцевого буфера.
RingBuff_GetAvableBufferSize Получает сведения о доступном месте в кольцевом буфере и позиции в памяти, куда можно записать данные. Если буфер заблокирован, то функция вернет 0.
RingBuff_Push Функция записывает данные в буфер. Перед её вызовом обязательно должна быть вызвана функция RingBuff_GetFreeLinBuff.
RingBuff_Pop Функция для выборки данных из буфера.
RingBuff_GetFreeLinBuff Получает сведения о доступном месте в кольцевом буфере и позиции в памяти, куда можно записать данные. Функция должна вызываться непосредственно перед записью в буфер, она одновременно устанавливает блокировку на запись.
RingBuff_GetUsedLinBuff Функция для выборки данных из буфера. Возвратит указатель на еще не выбранные данные и их размер.
RingBuff_Allocate Резервирует линейный блок в буфере. Эта функция должна быть вызвана после вставки в буфер, она одновременно разблокирует буфер на запись.
RingBuff_Free Функция освобождает блок в буфере после выборки данных. Она одновременно снимает блокировку буфера на чтение.

Пример использования этих функций можно увидеть в проектах IAR для платы Olimex STM32-103STK, также см. врезку ниже.

Пример для микроконтроллера STM32F103x, скомпилированный в среде IAR 5.5. Испытывался на макетной плате STM32F103R-BOARD [1].

#include "includes.h"
#include "pins.h"
 
#define RING_BUFFER_SIZE  150
 
u32 CriticalSecCntr;
 
RingBuffer_t RXUSBringBuffer;
RingBuffer_t TXUSBringBuffer;
u8 RxBuff[RING_BUFFER_SIZE];
u8 TxBuff[RING_BUFFER_SIZE];
 
static u32 Push (RingBuffer_t *pRB, void *src, u32 SizeSrc)
{
   u32 portion;
   
   portion = 0;
   if (SizeSrc)
   {
      u32 SizeDst;
      u8* pBufferDst = RingBuff_GetFreeLinBuff(pRB,&SizeDst);
      if (SizeDst)
      {
         portion = SizeSrc>SizeDst?SizeDst:SizeSrc;
         memcpy(pBufferDst, src, portion);
      }
      RingBuff_Allocate(pRB, portion);
   }
   return portion;
}
 
void main(void)
{
   u32 SizeSrc, SizeDst;
   u8* pBufferSrc, pBufferDst;
   char msg[128];
 
   ENTR_CRT_SECTION();
   // Настройка системы STM32 (конфигурация тактирования, PLL и Flash):
   SystemInit();
   // Настройка тестовых светодиодов:
   LEDs_init();
   // Настройка входов кнопок:
   S2S3_init();
 
   // Инициализация NVIC
#ifndef  EMB_FLASH
   /* Установка базового адреса таблицы векторов на 0x20000000 */
   NVIC_SetVectorTable(NVIC_VectTab_RAM, 0x0);
#else  /* VECT_TAB_FLASH  */
   /* Установка базового адреса таблицы векторов на 0x08000000 */
   NVIC_SetVectorTable(NVIC_VectTab_FLASH, 0x0);
#endif
   NVIC_PriorityGroupConfig(NVIC_PriorityGroup_4);
 
   // Разрешение счета таймера:
   TIM_Cmd(TIM1,ENABLE);
 
   // CDC USB
   UsbCdcInit();
   // Разрешение активизации pull-up резистора:
   USB_ConnectRes(TRUE);
 
   // Инициализация кольцевых буферов приема и передачи:
   RingBuff_Init(&RXUSBringBuffer,RxBuff,RING_BUFFER_SIZE);
   RingBuff_Init(&TXUSBringBuffer,TxBuff,RING_BUFFER_SIZE);
 
   EXT_CRT_SECTION();
   msg[0] = 0;
   while(1)
   {
      if (IsUsbCdcConfigure())
      {
////////////////////////////////////////////////////////////////////////////////
// Обработка приема USB: принятые данные помещаются в буфер RXUSBringBuffer,
// если там есть свободное место.
         // Получение свободного места в буфере приема:
         pBufferDst = RingBuff_GetFreeLinBuff(&RXUSBringBuffer,&SizeDst);
         if(SizeDst)
         {
            // Чтение данных из USB:
            SizeSrc = UsbCdcRead(pBufferDst,SizeDst);
            if(SizeSrc)
            {
               LED1(ON);
            }
         }
         RingBuff_Allocate(&RXUSBringBuffer,SizeSrc);
 
////////////////////////////////////////////////////////////////////////////////
// Обработка передачи USB: если в буфере передачи TXUSBringBuffer есть данные,
// то они передаются через USB, и эти данные удаляются из буфера.
         // Получение заполненного места в буфере передачи:
         pBufferSrc = RingBuff_GetUsedLinBuff(&TXUSBringBuffer,&SizeSrc);
         if(SizeSrc)
         {
            // Отправка данных:
            LED2(ON);
            UsbCdcWrite(pBufferSrc,SizeSrc);
            RingBuff_Free(&TXUSBringBuffer,SizeSrc);
         }
         
////////////////////////////////////////////////////////////////////////////////
// Выборка данных из буфера приема и отправка их в буфер передачи
         pBufferSrc = RingBuff_GetUsedLinBuff(&RXUSBringBuffer,&SizeSrc);
         if (SizeSrc)
         {
            u32 portion = Push(&TXUSBringBuffer, pBufferSrc, SizeSrc);
            RingBuff_Free(&RXUSBringBuffer,portion);
         }
         
         if (PRESSED(S2_PIN))
         {
            strcat(msg, "Нажата S2\r\n");
         }
         if (PRESSED(S3_PIN))
         {
            strcat(msg, "Нажата S3\r\n");
         }
         if (msg[0])
         {
            Push(&TXUSBringBuffer, msg, strlen(msg));
            msg[0] = 0;
         }
         LED1(OFF);
         LED2(OFF);
      }
   }
}

[RingBuffer_t из библиотеки LUFA]

Еще одну реализацию кольцевого буфера см. в библиотеке LUFA, файл LUFA\Drivers\Misc\RingBuffer.h, автор Dean Camera. Эта реализация намного проще, чем предыдущая, но имеет определенные ограничения.

Работа с буфером происходит через структуру RingBuffer_t:

typedef struct
{
   u8* In;     // Текущее место сохранения в кольцевом буфере.
   u8* Out;    // Текущее место выборки из кольцевого буфера.
   u8* Start;  // Указатель на начало нижележащего хранилища буфера.
   u8* End;    // Указатель на конец нижележащего хранилища буфера.
   u16 Size;   // Размер нижележащего хранилища буфера.
   u16 Count;  // Количество байт, сохраненных в настоящее время в буфере.
}RingBuffer_t;

В таблице приведено описание функций кольцевого буфера.

Функция Описание
RingBuffer_InitBuffer Инициализирует кольцевой буфер, чтобы его можно было использовать. Буферы должны быть инициализированы этой функцией перед выполнением любой операции над буфером. Уже инициализированные буферы могут быть сброшены повторной переинициализацией путем вызова этой функции.
RingBuffer_GetCount Запрашивает текущее количество байт, сохраненных в кольцевом буфере. Это значение вычисляется путем входа в атомарную блокировку буфера, так что буфер не может модифицироваться, пока происходят вычисления. Возвращенное значение должно быть закэшировано, когда вычитывается содержимое буфера, поэтому на атомарную блокировку может быть потрачено некоторое малое время.
RingBuffer_GetFreeCount Запрашивает свободное пространство в определенном буфере. Это значение может быть вычислено путем входа в атомарную блокировку на буфере, чтобы буфер не был случайно изменен во время проведения вычислений объема свободного места.
RingBuffer_IsEmpty Атомарно определяет, есть ли в указанном кольцевом буфере какие-либо данные. Эта проверка должна быть сделана перед удалением данных из буфера, чтобы гарантировать отсутствие опустошения буфера (отсутствие underflow, т. е. отсутствие ошибочного чтения из пустого буфера).
RingBuffer_IsFull Атомарно определяет, есть ли в указанном кольцевом буфере какое-либо свободное место. Это должно быть проверено перед сохранением данных в буфер для гарантии, что данные не будут потеряны из-за переполнения буфера (overrun, т. е. ошибочная попытка записи в полностью заполненный буфер).
RingBuffer_Insert Вставляет элементы в кольцевой буфер.
RingBuffer_Remove Удаляет (извлекает) элемент из кольцевого буфера.
RingBuffer_Peek Возвратит следующий элемент, сохраненный в кольцевом буфере, без его удаления.

// Создание структуры буфера и его нижележащего хранилища:
RingBuffer_t Buffer;
u8 BufferData[128];
 
// Инициализация буфера созданным хранилищем:
RingBuffer_InitBuffer(&Buffer, BufferData, sizeof(BufferData));
 
// Вставка некоторых данных в буфер:
RingBuffer_Insert(&Buffer, 'H');
RingBuffer_Insert(&Buffer, 'E');
RingBuffer_Insert(&Buffer, 'L');
RingBuffer_Insert(&Buffer, 'L');
RingBuffer_Insert(&Buffer, 'O');
 
// Получение количества сохраненных в буфере байт:
u16 BufferCount = RingBuffer_GetCount(&Buffer);
 
// Вывод на печать длины сохраненных данных:
printf("Buffer Length: %d, Buffer Data: \r\n", BufferCount);
 
// Вывод на печать содержимого буфера по одному символу:
while (BufferCount--)
   putc(RingBuffer_Remove(&Buffer));

[Другие популярные разновидности буферов]

Флажковый буфер. Работает он как набор одинаковых ячеек в памяти. Каждая ячейка может иметь как простую структуру, так и сложную (в виде структуры или объекта). Каждая ячейка снабжена флажком, показывающим - занята эта ячейка или свободна. Флажковый буфер удобен в двух случаях. Во-первых, когда у передаваемой информации есть один отправитель и несколько получателей (и наоборот - когда получатель один, а отправителей несколько). Во-вторых, когда в нескольких ячейках может содержаться части какого-то большого сообщения, и эти части могут прийти в разное время, и не по порядку. Флаги занятости ячеек обычно для удобства держат в отдельном регистре. Если регистр 8-битный, то он может поддерживать максимум 8 ячеек, если 16-битный, то 16 ячеек, и так далее.

Алгоритм заполнения флажкового буфера и выборки из него следующий. Когда отправителю нужно поместить в буфер сообщение, то он ищет в регистре флагов первый попавшийся нулевой бит. Когда нулевой бит найден, то передатчик устанавливает его в единицу (помечая тем самым занятой соответствующую ячейку), вычисляет по номеру бита индекс ячейки, и затем помещает в ячейку информацию. Получатель действует так - просматривает флаги в поиске ненулевых бит. Все ячейки, которым соответствуют ненулевые биты, опрашиваются, и если там находится подлежащая выборке информация, то эта информация копируется из ячейки буфера, и соответствующий флажок сбрасывается, сигнализируя об освобождении ячейки.

Кэширование. Этот буфер работает как набор пронумерованных ячеек, составляющих образ какой-то другой (обычно более медленной) памяти, условно назовем эту память внешнее хранилище. В каждой ячейке хранятся как полезные данные, так и адрес, который соответствует месту размещения информации во внешнем хранилище. Обычный способ применения такого кэширования - доступ к данным на диске (карта SD, жесткий диск, флешка и т. п.), в этом случае каждая ячейка часто имеет размер 512 байт (сектор), и номер сектора является её адресом.

Стек. Если кольцевой буфер является буфером типа FIFO, то стек в этом отношении является его полной противоположностью, так как стек имеет тип FILO (First Input Last Output, т. е. первым пришел и последним вышел). Стек обычно реализован на уровне системы (аппаратно в микроконтроллере или процессоре), и поддерживается как со стороны инструкций процессора, так и со стороны системы программирования. Этот стек программист часто может использовать только неявно - определяя локальные переменные в функциях, передавая параметры и возвращая значения.

Программист сам может реализовать подпрограммы для доступа к своему собственному буферу со стековой структурой - если это требуется для обеспечения функционала программы. Однако искусственный стековый буфер применяется достаточно редко.

[Ссылки]

1. STM32F103R-BOARD.