4. 消息队列

2023-05-16

消息队列

队列又称消息队列,常用于任务间通信的数据结构,可以在任务与任务之间,中断与任务之间传递消息,实现任务接收来自其他任务或中断的不固定长度的消息

任务可从消息队列中读取消消息,当消息队列为空,读取消息的任务被阻塞,用户可指定任务阻塞任务的时间 xTicksToWait

如果消息队列为满,向消息队列发送消息的任务会进入阻塞态,同样可指定阻塞时间,超时恢复为就绪态

在指定阻塞时间内,如果队列为空,任务保持阻塞状态等待队列数据有效;如果等待超时,任务自动恢复就绪态

不论是否写入,不论是否读取成功,任务或中断程序会收到一个错误码 errQUEUE_FULL

通过消息队列服务,任务或中断服务例程可将一条,或者多条信息放入消息队列

一个或多个任务可从消息队列中获取消息,当有多个消息发送到消息队列,将先进入消息队列的消息传给任务

任务得到的是最先进入消息队列的消息,消息队列支持先进先出原则,也支持后进先出原则

需要注意,读写速度不一致,消息队列为空,或者满载

消息队列不仅支持任务与任务之间,也支持中断与队列之间,不过中断消息队列函数不支持超时等待

运作机制

创建消息队列时,RTOS会给消息队列分配一块内存空间,内存的大小 = 消息队列控制块大小 + 单个消息空间大小 x 消息队列长度

在分配内存之后,系统会初始化消息队列,此时消息队列为空

每个消息队列都与消息空间在同一段连续的内存空间,创建成功后,这些内存就被占用

只有删除了消息队列后,这段内存才会被释放,创建成功,已经分配给每个消息空间与消息队列的容量无法更改

每个消息空间可存放不大于消息大小 uxItemSize 的任意类型数据,所有消息队列中的消息空间总数就是消息队列的长度,长度可在消息队列创建时指定

任务或中断给消息队列发送消息时,如果队列未满,或允许覆盖入队,RTOS会将消息拷贝到消息队列队尾

否则会根据用户指定的阻塞超时时间进行阻塞,一直不允许入队,则任务保持阻塞状态直到允许队列允许入队

发送紧急消息的过程与发送普通消息几乎一样,不同的是发送紧急消息时,消息的位置是消息队列队列头,而非是队尾(消息先出)

这样接收者可优先接收紧急消息,可及时进行消息处理

阻塞机制

使用的消息队列一般不是属于某个任务的队列,创建的队列是共用的,每个任务都可对队列进行读写操作

为了保护每个任务对其进行读写操作的过程,必须要有阻塞机制

某个任务对消息队列读写操作时,必须保证任务能完成读写操作,不受其他任务的干扰

保护任务对消息队列的读写操作的过程,这种机制称为阻塞机制

任务A读队列,当消息队列没有消息,则A有三种可能

1、任务A继续执行,不会陷入阻塞

2、任务A阻塞,等待消息,可以指定等待时长,超时则变回就绪态,发送任务返回错误码(有消息就变回就绪态,准备继续执行)

3、任务A一直阻塞,直到队列中有消息,完成消息队列的读取

需要注意的是,中断中发送消息是不支持消息等待的,也是允许阻塞的,这会影响系统的实时性

多个任务因为一个消息队列阻塞,阻塞任务会按照任务的优先级进行排序,优先级高的优先访问队列

消息队列控制块

typedef struct QueueDefinition 
{
	int8_t *pcHead;					// 消息队列存储区的起始位置,也就是第一个消息的地址
	int8_t *pcTail;					// 指向内存中消息队列存储区最后一个字节,int8 *类型
	int8_t *pcWriteTo;				// 指向下一个可用消息空间,可写入消息空间地址
	union 
	{
		int8_t *pcReadFrom;			// 用作队列时,指向最后一个出队的队列项首地址
		UBaseType_t uxRecursiveCallCount;	// 记录递归互斥量被调用次数
	} u;

	List_t xTasksWaitingToSend; 	// 等待发送消息列表,挂载列表项,因队列满发送失败进入阻塞态的任务会挂载在该列表,按优先级进行排序
	List_t xTasksWaitingToReceive;	// 等待读取消息列表,挂载列表项,因队列空读取失败进入阻塞态的任务会挂载在该列表,按优先级进行排序
	volatile UBaseType_t uxMessagesWaiting;// 记录当前消息队列已用消息个数,若用于信号量,表示已用信号量个数
	UBaseType_t uxLength;			// 记录消息队列长度,队列深度,可存储消息个数量
	UBaseType_t uxItemSize;			// 记录单个消息的大小
	volatile int8_t cRxLock;		// 队列上锁后,从队列接收的消息数目,读取多少;没上锁,则设置为queueUNLOCKED
	volatile int8_t cTxLock;		// 队列上锁后,发送到队列的消息数目,写入多少;没上锁,则设置为queueUNLOCKED
    								// 成员变量为queueLOCKED_UNMODIFIED ,表示队列上锁
	#if( ( configSUPPORT_STATIC_ALLOCATION == 1 ) && ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) )
		uint8_t ucStaticallyAllocated;		// 使用静态存储则设置为pdTRUE
	#endif

	#if ( configUSE_QUEUE_SETS == 1 )		// 队列结相关宏,其实就是队列控制块指针
		struct QueueDefinition *pxQueueSetContainer;
	#endif

	#if ( configUSE_TRACE_FACILITY == 1 )	// 跟踪调试相关宏
		UBaseType_t uxQueueNumber;
		uint8_t ucQueueType;
	#endif
} xQUEUE;
typedef xQUEUE Queue_t;

API函数

创建

#if( configSUPPORT_DYNAMIC_ALLOCATION == 1 )
	#define xQueueCreate( uxQueueLength, uxItemSize ) \
			xQueueGenericCreate( ( uxQueueLength ), ( uxItemSize ), ( queueQUEUE_TYPE_BASE ) )
#endif

QueueHandle_t xQueueCreate( UBaseType_t uxQueueLength,UBaseType_t uxItemSize );
// uxQueueLength:消息队列的消息个数(能放几个消息)
// uxItemSize:每个消息的大小,单位字节(8位)
// 创建成功返回队列的句柄,创建失败返回NULL,失败原因一般是RAM无法分配成功

用于创建一个新的队列,返回可用于访问这个队列的队列句柄(需要我们自行定义句柄,才可以操作消息队列)

队列句柄就是一个 指向队列数据结构类型 的指针

队列就是一个数据结构,用于任务间的数据传递

每创建一个新的队列都需要分配RAM,一部分用于存储队列的状态,剩下部分作为队列消息的存储区域

xQueueCreate()为动态创建,使用静态创建时,需要事先定义空间进行分配

#define xQueueCreate( uxQueueLength, uxItemSize ) \
		xQueueGenericCreate( ( uxQueueLength ), ( uxItemSize ), ( queueQUEUE_TYPE_BASE ) )

/*  ucQueueType表示队列类型
	queueQUEUE_TYPE_BASE:表示消息队列 。
	queueQUEUE_TYPE_SET:表示消息队列集合 。
	queueQUEUE_TYPE_MUTEX:表示互斥量。
	queueQUEUE_TYPE_COUNTING_SEMAPHORE:表示计数信号量 。
	queueQUEUE_TYPE_BINARY_SEMAPHORE:表示二进制信号量 。
	queueQUEUE_TYPE_RECURSIVE_MUTEX :表示递归互斥量。*/

QueueHandle_t xQueueGenericCreate( 	const UBaseType_t uxQueueLength,
                                  	const UBaseType_t uxItemSize,
                                  	const uint8_t ucQueueType )
{
	Queue_t *pxNewQueue;
	size_t xQueueSizeInBytes;									// 消息队列消息长度(个数)
	uint8_t *pucQueueStorage;									// 消息队列消息起始地址

	configASSERT( uxQueueLength > ( UBaseType_t ) 0 );			// 判定uxQueueLength队列消息长度是否为0
	if ( uxItemSize == ( UBaseType_t ) 0 )                      // 消息长度0则不需要分配空间消息空间
	{
	     xQueueSizeInBytes = ( size_t ) 0;						// 不必给消息分配空间,只需要给控制块分配空间
	} 
	else                                                                        
	{
	     xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize );	// 消息空间大小 = 单个消息大小 * 消息个数 
	}
																// 申请内存,大小 = 消息队列控制块 + 消息空间
	pxNewQueue = (Queue_t*)pvPortMalloc(sizeof(Queue_t) + xQueueSizeInBytes);
	if ( pxNewQueue != NULL ) 									// 内存分配成功
	{
		// 计算消息队列消息起始地址 = 申请的地址 + 消息队列控制块,也就是消息队列开头是  |控制块|消息空间| 
		pucQueueStorage = ( ( uint8_t * ) pxNewQueue ) + sizeof( Queue_t );

		#if( configSUPPORT_STATIC_ALLOCATION == 1 )
			pxNewQueue->ucStaticallyAllocated = pdFALSE;		// 静态分配内存设置为pdTRUE,动态配置为pdFALSE
		#endif

		prvInitialiseNewQueue(	uxQueueLength,					// 初始化消息队列控制块函数,消息个数
								uxItemSize,						// 单个消息大小
								pucQueueStorage,				// 消息存储空间首地址
								ucQueueType,					// 消息队列类型
								pxNewQueue );					// 控制块,已经分配空间
	}
	return pxNewQueue;											// 最终返回的句柄,是指向消息存储空间的指针 */
}
#endif

static void prvInitialiseNewQueue(	const UBaseType_t uxQueueLength,	// 消息队列消息个数
									const UBaseType_t uxItemSize,		// 消息队列单个消息大小
									uint8_t *pucQueueStorage,			// 存储消息起始地址
									const uint8_t ucQueueType,			// 消息队列类型
									Queue_t *pxNewQueue )				// 消息队列控制块
{
	( void ) ucQueueType;
	if ( uxItemSize == ( UBaseType_t ) 0 )					// 单个消息大小为0,pcHead直接指向队列控制块
	{
	     pxNewQueue->pcHead = ( int8_t * ) pxNewQueue;		// pcHead指向队列控制块,仅在队列用于互斥量,才能为NULL
	} 
	else 
	{
	     pxNewQueue->pcHead = ( int8_t * ) pucQueueStorage;	// pcHead指向申请空间中,消息存储的位置 |控制器|消息
	}
	pxNewQueue->uxLength = uxQueueLength;   				// 在控制块中记录消息长度
	pxNewQueue->uxItemSize = uxItemSize;    				// 在控制块中记录单个消息大小
	( void ) xQueueGenericReset( pxNewQueue, pdTRUE );		// 重置消息队列,重新设置某些参数

	#if ( configUSE_TRACE_FACILITY == 1 )
	      pxNewQueue->ucQueueType = ucQueueType;			// 记录消息队列类型
	#endif

	#if( configUSE_QUEUE_SETS == 1 )
	     pxNewQueue->pxQueueSetContainer = NULL;			// 消息队列集先关字段
	#endif
	traceQUEUE_CREATE( pxNewQueue );
}

BaseType_t xQueueGenericReset( QueueHandle_t xQueue,BaseType_t xNewQueue )
{
	Queue_t * const pxQueue = ( Queue_t * ) xQueue;			// 获取队列控制块指针,以便操作队列
	configASSERT( pxQueue );								// 断言函数

	taskENTER_CRITICAL();   								// 临界段屏蔽代码
	{                                						// 尾指针 = 队列控制块地址 + 队列长度 * 单个消息大小 
	     pxQueue->pcTail = pxQueue->pcHead + ( pxQueue->uxLength * pxQueue->uxItemSize );
	     pxQueue->uxMessagesWaiting = ( UBaseType_t ) 0U;	// 记录当前使用消息的数量
	     pxQueue->pcWriteTo = pxQueue->pcHead;				// 下一个可写入消息地址,写入到这个地址
		/* 下一可读消息地址 = 队列存储空间首地址 + (队列长度 - 1)* 单个消息大小,读取最后一个消息 */
	     pxQueue->u.pcReadFrom = pxQueue->pcHead + (( pxQueue->uxLength - ( UBaseType_t ) 1U ) * pxQueue->uxItemSize );
	     pxQueue->cRxLock = queueUNLOCKED; 					// 消息队列未上锁
	     pxQueue->cTxLock = queueUNLOCKED;
	     if ( xNewQueue == pdFALSE ) 						// 不是新建的消息队列,可能消息队列阻塞了任务,需解除阻塞
	     {													// 读取等待发送消息任务列表是否为空,判定有发送任务阻塞否
	           if ( listLIST_IS_EMPTY ( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE ) 
	           {											// 将任务从等待发送消息列表中清除
	                 if ( xTaskRemoveFromEventList ( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE ) 
	                 {
	                      queueYIELD_IF_USING_PREEMPTION();	// 上下文切换
	                 } 
	                 else 
	                 {
	                      mtCOVERAGE_TEST_MARKER();
	                 }
	           } 
	           else 										// 如果读取消息任务被阻塞,重置后消息队列为空,无需被恢复
	           {
	                 mtCOVERAGE_TEST_MARKER();
	           }
	     } 
	     else 												// 消息队列是新建的
	     {													// 是新建的消息队列直接初始化等待发送列表,初始化等待接收/发送列表
	           vListInitialise( &( pxQueue->xTasksWaitingToSend ) );
	           vListInitialise( &( pxQueue->xTasksWaitingToReceive ) );
	     }
	}
	taskEXIT_CRITICAL();									// 退出临界段
	return pdPASS;
}

创建消息队列时,需要用户自行定义消息队列的句柄

定义了队列的句柄并不等于创建队列,创建队列必须调用消息队列创建函数进行创建,否则根据句柄使用消息队列会发生错误

通过句柄可使用消息队列进行发送与读取消息队列操作,如果返回NULL表示创建失败

QueueHandle_t Test_Queue =NULL;

#define QUEUE_LEN 4 				// 队列的长度,最大可包含多少个消息
#define QUEUE_SIZE 4 				// 队列中每个消息大小(字节)

BaseType_t xReturn = pdPASS;		// 定义一个创建信息返回值,默认为 pdPASS
taskENTER_CRITICAL(); 				//进入临界区
									// 创建 Test_Queue
Test_Queue = xQueueCreate(	(UBaseType_t ) QUEUE_LEN,	// 消息队列的长度
							(UBaseType_t ) QUEUE_SIZE);	// 消息的大小
if (NULL != Test_Queue)
	printf("创建 Test_Queue 消息队列成功!\r\n");
taskEXIT_CRITICAL(); 				//退出临界区

创建成功则返回队列的句柄,通过这个句柄可以对消息队列进行读写操作

创建不成功则一定是RAM分配不成功的问题,返回值为NULL

#define xQueueCreateStatic( uxQueueLength, uxItemSize, pucQueueStorage, pxQueueBuffer ) \
		xQueueGenericCreateStatic(uxQueueLength,uxItemSize,pucQueueStorage,pxQueueBuffer,queueQUEUE_TYPE_BASE)

QueueHandle_t xQueueCreateStatic(	UBaseType_t uxQueueLength,		// 消息队列消息个数
									UBaseType_t uxItemSize,			// 消息队列单个消息大小
									uint8_t *pucQueueStorageBuffer,	// 分配给消息队列的静态内存
									StaticQueue_t *pxQueueBuffer );	// 用于存储队列的指针

QueueHandle_t xQueueGenericCreateStatic(	const UBaseType_t uxQueueLength,
                                        	const UBaseType_t uxItemSize, 
                                        	uint8_t *pucQueueStorage, 
                                        	StaticQueue_t *pxStaticQueue, 
                                        	const uint8_t ucQueueType )
	{
		Queue_t *pxNewQueue;
		configASSERT( uxQueueLength > ( UBaseType_t ) 0 );			// 判定消息队列消息长度
		configASSERT( pxStaticQueue != NULL );						// 判定静态内存是否为空

		configASSERT( !( ( pucQueueStorage != NULL ) && ( uxItemSize == 0 ) ) );
		configASSERT( !( ( pucQueueStorage == NULL ) && ( uxItemSize != 0 ) ) );

		#if( configASSERT_DEFINED == 1 )
		{
			volatile size_t xSize = sizeof( StaticQueue_t );		// 根据静态内存分配消息队列控制块的内存
			configASSERT( xSize == sizeof( Queue_t ) );
		}
		#endif
		pxNewQueue = ( Queue_t * ) pxStaticQueue;					// 消息队列控制块

		if( pxNewQueue != NULL )
		{
			#if( configSUPPORT_DYNAMIC_ALLOCATION == 1 )
			{
				pxNewQueue->ucStaticallyAllocated = pdTRUE;			// 静态分配内存
			}
			#endif													// 初始化队列控制块
			prvInitialiseNewQueue( uxQueueLength, uxItemSize, pucQueueStorage, ucQueueType, pxNewQueue );
		}
		else
		{
			traceQUEUE_CREATE_FAILED( ucQueueType );
		}
		return pxNewQueue;
	}

创建成功,返回的是消息队列的句柄

消息队列的句柄其实是指向队列的指针

每创建一个队列,都要为队列分配RAM,一部分存储消息队列的队列控制块,一部分用于存储消息队列

删除

通过消息队列句柄,清空消息队列控制块,然后这块内存被释放,不能再被使用消息队列

删除后消息队列的所有信息都会被系统回收清空,并且不能再次使用这个消息队列

如果消息队列没有被创建,则无法被删除

vQueueDelete()可用于删除消息队列,也可用于删除信号量

删除消息队列时,如果有任务正在等待消息,则不应该进行删除操作(并不禁止,只是自己不要操作)

#define QUEUE_LENGTH 5
#define QUEUE_ITEM_SIZE 4

int main( void )
{
	QueueHandle_t xQueue;
	xQueue = xQueueCreate( QUEUE_LENGTH, QUEUE_ITEM_SIZE );	// 创建消息队列
	if ( xQueue == NULL ) 
	{
									// 消息队列创建失败
	}
	else 
	{
		vQueueDelete( xQueue );		// 删除已创建的消息队列
	}
}

void vQueueDelete( QueueHandle_t xQueue )
{
	Queue_t * const pxQueue = ( Queue_t * ) xQueue;		// 消息队列控制块,通过此控制消息队列,const避免修改

	configASSERT( pxQueue );
	traceQUEUE_DELETE( pxQueue );

	#if ( configQUEUE_REGISTRY_SIZE > 0 )
	{
		vQueueUnregisterQueue( pxQueue );				// 将消息队列从注册表中删除
	}
	#endif

	#if( ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) && ( configSUPPORT_STATIC_ALLOCATION == 0 ) )
	{													// 只支持动态分配内存
		vPortFree( pxQueue );							// 因为使用动态分配内存,需要free函数释放空间
	}
	#elif( ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) && ( configSUPPORT_STATIC_ALLOCATION == 1 ) )
	{													// 静态或动态都支持
		if( pxQueue->ucStaticallyAllocated == ( uint8_t ) pdFALSE )	// 如果是动态分配,会调用释放内存函数
		{
			vPortFree( pxQueue );
		}
		else
		{
			mtCOVERAGE_TEST_MARKER();
		}
	}
	#else
	{
		( void ) pxQueue;
	}
	#endif
}

发送

任务或中断都可以给消息队列发送消息

发送消息时,队列未满或允许覆盖入队,RTOS会将消息拷贝到消息队列队尾巴,否则会进入阻塞

如果队列一直不允许入队,任务会保持阻塞直到允许入队,或者阻塞时间到达系统恢复阻塞态,函数返回错误信息

发送紧急消息过程与发送普通消息一致,不同的是发送紧急消息,消息的位置在消息队列队头而非队尾

xQueueSend()用于向队列队尾发送一个队列消息

消息以拷贝的形式入队,而不是传入地址通过指针引用

在中断中不允许使用api函数,必须使用带中断保护的的api函数,xQueueSendFromISR()

当队列上锁后,队列向可加入或者溢出,但是事件列表中两个列表(WwaiToSend/Rec)的任务事件列表不会变化(操作不改变)

#define xQueueSend( xQueue, pvItemToQueue, xTicksToWait ) \
   	xQueueGenericSend( ( xQueue ),( pvItemToQueue ),( xTicksToWait ), queueSEND_TO_BACK )	// 普通发送,发送到队列尾
#define xQueueSendToBack( xQueue, pvItemToQueue, xTicksToWait ) \
   	xQueueGenericSend( ( xQueue ),( pvItemToQueue ),( xTicksToWait ), queueSEND_TO_BACK )	// 普通发送,发送到队列尾
#define xQueueSendToFront( xQueue, pvItemToQueue, xTicksToWait ) \
   	xQueueGenericSend( ( xQueue ),( pvItemToQueue ),( xTicksToWait ), queueSEND_TO_FRONT )	// 紧急发送,发送到队列首
#define xQueueOverwrite( xQueue, pvItemToQueue )\
   	xQueueGenericSend( ( xQueue ), ( pvItemToQueue ), 0, queueOVERWRITE )					// 覆盖发送,替换指向位置
   // xQueue:队列句柄
   // pvItemToQueue:指向要发送到队列尾部的消息
   // xTicksToWait:队列为满时,任务等待消息队列的最大超时时间。如果队列已满,等待时间为0,函数立刻返回
   // queueSEND_TO_BACK:表示发送到队列尾
   // queueSEND_TO_FRONT:表示发送到队列队首
   // 消息发送成功则返回 pdTrue,消息发送失败且任务阻塞时间到达则返回 errQUEUE_FULL
#define xQueueSendToFrontFromISR(xQueue,pvItemToQueue,pxHigherPriorityTaskWoken) \
   	xQueueGenericSendFromISR( ( xQueue ),( pvItemToQueue ),( pxHigherPriorityTaskWoken ), queueSEND_TO_FRONT )
#define xQueueSendToBackFromISR(xQueue,pvItemToQueue,pxHigherPriorityTaskWoken) \
   	xQueueGenericSendFromISR( ( xQueue ),( pvItemToQueue ),( pxHigherPriorityTaskWoken ), queueSEND_TO_BACK )
   // pxHigherPriorityTaskWoken:
   // 如果中断中消息入队导致一个任务解锁,解锁的任务优先级高于之前被中断的任务,则需要设定为pdTRUE
   // 在中断退出前需要进行一次上下文切换,执行被唤醒的优先级更高的任务,待高优先级任务执行完毕再执行之前被中断的任务
void vBufferISR( void )									// 中断处理函数
{
	char cIn;
	BaseType_t xHigherPriorityTaskWoken;				// 定义变量pxHigherPriorityTaskWoken
	xHigherPriorityTaskWoken = pdFALSE;					// 不会导致高优先任务被唤醒
	/* 直到缓冲区为空 */
	do 
    {
		cIn = portINPUT_BYTE( RX_REGISTER_ADDRESS );	// 从缓冲区获取一个字节的数据
		xQueueSendFromISR( xRxQueue, &cIn, &xHigherPriorityTaskWoken );	// 发送这个数据到消息队列中
	}
    while ( portINPUT_BYTE( BUFFER_COUNT ) );			// 消息队列读空之后,此时会导致任务切换
	if ( xHigherPriorityTaskWoken )						// 如果导致任务切换
    {
		taskYIELD_FROM_ISR ();							// 上下文切换,这是一个宏,不同的处理器,具体的方法不一样
	}
}

阻塞时间不为0,任务因等待入队进入阻塞,在将任务设置为阻塞的过程中,不允许其他任务或中断操作对应列表,这回导致其他任务解除阻塞,导致任务优先级翻转

BaseType_t xQueueGenericSend(	QueueHandle_t xQueue,				// 消息队列句柄,通过句柄控制消息队列
								const void * const pvItemToQueue,	// 要发送的消息
								TickType_t xTicksToWait,			// 写入消息队列最大阻塞时间
								const BaseType_t xCopyPosition )	// 发送到消息队列位置,queueOVERWRITE以覆盖方式写入
{
	BaseType_t xEntryTimeSet = pdFALSE, xYieldRequired;				// 定义变量
	TimeOut_t xTimeOut;
	Queue_t * const pxQueue = ( Queue_t * ) xQueue;					// 定义常量句柄防止被修改

	for ( ;; )
	{
		taskENTER_CRITICAL();										// 进入临界区
		{															// 该情况为可写情况
			if ( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength )|| ( xCopyPosition == queueOVERWRITE ) )
			{														// 已用消息数量没有达到消息队列深度;覆盖写入方式入,都允许写入
				traceQUEUE_SEND( pxQueue );
				xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );		// 拷贝消息到消息队列中
				// queueSEND_TO_BACK写入 pcWriteTo , queueSEND_TO_FRONT 或 queueOVERWRITE 拷贝到 u.pcReadFrom (紧急消息)
				if ( listLIST_IS_EMPTY(&(pxQueue->xTasksWaitingToReceive))==pdFALSE)				// 查询的等待消息列表是否为空
				{// 现在有消息了,将任务从等待接收列表中xTasksWaitingToReceive删除,添加到就绪列表中。
					if ( xTaskRemoveFromEventList(&( pxQueue->xTasksWaitingToReceive ) )!=pdFALSE)	// 任务从等待接收消息列表删除
					{
						queueYIELD_IF_USING_PREEMPTION(); 	// 进行任务切换,是否有高优先级任务切换 
					}
					else									// 等待接收消息列表删除失败
					{
						mtCOVERAGE_TEST_MARKER();
					}
				}
				else if ( xYieldRequired != pdFALSE )		// 消息拷贝成功
				{
					queueYIELD_IF_USING_PREEMPTION();		// 任务切换
				}
				else										// 消息拷贝失败
				{
					mtCOVERAGE_TEST_MARKER();
				}
	
				taskEXIT_CRITICAL();
				return pdPASS;								// 至此消息都拷贝成功
			}
			else											// 消息队列已经写满并且不是覆写方式,也就是无法写入
			{
				if ( xTicksToWait == ( TickType_t ) 0 )		// 指定阻塞时间为0
				{
					taskEXIT_CRITICAL();					// 退出临界保护区
					traceQUEUE_SEND_FAILED( pxQueue );
					return errQUEUE_FULL;					// 直接退出,返回错误信息
				}
				else if ( xEntryTimeSet == pdFALSE )		// 队列已满,指定了阻塞时间
				{
					/* 设置事件结构体,记录进入阻塞的时间 xTickCount 和溢出次数 xNumOfOverflows */
					vTaskSetTimeOutState( &xTimeOut );
					xEntryTimeSet = pdTRUE;
				}
				else
				{
					mtCOVERAGE_TEST_MARKER();
				}
			}
		}													// 至此消息都拷贝失败,记录了开始阻塞的时间
		taskEXIT_CRITICAL();								// 退出临界保护区
		vTaskSuspendAll();									// 关闭调度器,挂所有任务(任务要进入阻塞)
		prvLockQueue( pxQueue );							// 锁住消息队列
		if (xTaskCheckForTimeOut(&xTimeOut, &xTicksToWait)==pdFALSE)	// 根据时间结构体,检测超时时间是否溢出
		{													// 超时时间没有溢出
			if ( prvIsQueueFull( pxQueue ) != pdFALSE )		// 检测消息队列是否为满
			{
				traceBLOCKING_ON_QUEUE_SEND( pxQueue );		// 消息队列为满,根据设定的阻塞时间阻塞任务
															// 当前任务的 EventList挂载在 xTasksWaitingToSend下
                											// 根据延时的时间,将任务 xStateList 挂载在 xTasksWaitingToSend下
				vTaskPlaceOnEventList(&( pxQueue->xTasksWaitingToSend ), xTicksToWait );
				prvUnlockQueue( pxQueue );					// 消息队列解锁
	
				if ( xTaskResumeAll() == pdFALSE )			// 恢复调度器
				{
					portYIELD_WITHIN_API();					// 
				}
			}
			else											// 消息队列没有满,消息队列有空闲消息
			{
				prvUnlockQueue( pxQueue );					// 消息队列解锁
				( void ) xTaskResumeAll();					// 恢复调度器,所有任务恢复
			}
		}
		else												// 超时时间溢出
		{
			prvUnlockQueue( pxQueue );						// 解锁消息队列
			( void ) xTaskResumeAll();						// 恢复调度器,恢复所有任务
			traceQUEUE_SEND_FAILED( pxQueue );				// 
			return errQUEUE_FULL;							// 发送失败返回错误信息
		}
	}
}
BaseType_t xQueueGenericSendFromISR(	QueueHandle_t xQueue,							// 消息队列控制块
   									const void * const pvItemToQueue,				// 发送到队列的信息
   									BaseType_t * const xHigherPriorityTaskWoken,	// 
   									const BaseType_t xCopyPosition )				// 要发送到队列的位置
{
   BaseType_t xReturn;
   UBaseType_t uxSavedInterruptStatus;
   Queue_t * const pxQueue = ( Queue_t * ) xQueue;
   uxSavedInterruptStatus = portSET_INTERRUPT_MASK_FROM_ISR();
   {
   	if ( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength )|| ( xCopyPosition == queueOVERWRITE ) )
   	{// 已用消息不到消息队列深度 或 写入方式是覆写方式
   		const int8_t cTxLock = pxQueue->cTxLock;
   		traceQUEUE_SEND_FROM_ISR( pxQueue );
   		(void)prvCopyDataToQueue(pxQueue,pvItemToQueue,xCopyPosition );	// 复制消息到指定的消息队列中

   		if ( cTxLock == queueUNLOCKED )									// 判断消息队列是否上锁
   		{
   			/* 已删除使用队列集部分代码 */
   			{
   			/* 等待消息接收队列中有任务阻塞 */
   				if ( listLIST_IS_EMPTY(&( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
   				{
   					/* 将任务从等待接收队列中删除,恢复为就绪态 */
   					if ( xTaskRemoveFromEventList(&( pxQueue->xTasksWaitingToReceive )) != pdFALSE )
   					{
   						if ( pxHigherPriorityTaskWoken != NULL )
   						{
   							/* 解除阻塞任务优先级比当前任务高,记录上下文切换请求,返回中断服务程序后(退出中断前),进行上下文切换 */
   							*pxHigherPriorityTaskWoken = pdTRUE;
   						}
   						else
   						{
   							mtCOVERAGE_TEST_MARKER();
   						}
   					}
   					else
   					{
   						mtCOVERAGE_TEST_MARKER();
   					}
   				}
   				else
   				{
   					mtCOVERAGE_TEST_MARKER();
   				}
   			}
   		}
   		else
   		{/* 队列上锁,记录上锁次数,等到任务解除队列锁时,使用这个计录数就可以知道有多少数据入队 */
   			pxQueue->cTxLock = ( int8_t ) ( cTxLock + 1 );
   		}
   		xReturn = pdPASS;	// 写入成功,后续返回
   	}
   	else					// 队列无法写入
   	{
   		/* 队列是满的,因为 API 执行的上下文环境是中断,所以不能阻塞,直接返回队列已满错误代码 errQUEUE_FULL */
   		traceQUEUE_SEND_FROM_ISR_FAILED( pxQueue );
   		xReturn = errQUEUE_FULL;
   	}
   }
   portCLEAR_INTERRUPT_MASK_FROM_ISR( uxSavedInterruptStatus );
   return xReturn;
}

读消息

任务从队列中读消息,可以指定阻塞超时时间,当消息队列中有效细,任务才会读取消息。

当消息过于庞大的时候,可以将消息的地址作为消息进行发送,任务通过地址来进行读取。

#define xQueueReceive( xQueue, pvBuffer, xTicksToWait) \
		xQueueGenericReceive( xQueue, pvBuffer, xTicksToWait, pdFALSE)	// 读取后删除队列项
#define xQueuePeek( xQueue, pvBuffer, xTicksToWait) \
		xQueueGenericReceive( xQueue, pvBuffer, xTicksToWait, pdTRUE)	// 读取后不删除队列项
// xQueue:队列句柄,通过句柄操作消息队列
// pvBuffer:要发送消息的地址
// xTicksToWait:最大阻塞时间
// xJustPeek: 标记当读取成功以后是否删除掉队列项, pdTRUE为不删除,pdFLASE为删除
BaseType_t xQueueGenericReceive( QueueHandle_t xQueue, void * const pvBuffer, TickType_t xTicksToWait, const BaseType_t xJustPeeking )
{
	BaseType_t xEntryTimeSet = pdFALSE;
	TimeOut_t xTimeOut;
	int8_t *pcOriginalReadPosition;
	Queue_t * const pxQueue = ( Queue_t * ) xQueue;
	for( ;; )
	{
		taskENTER_CRITICAL();
		{
			const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting;	// 已使用消息数量
			if( uxMessagesWaiting > ( UBaseType_t ) 0 ) 						// 消息已被使用
			{
				pcOriginalReadPosition = pxQueue->u.pcReadFrom; 				// 获取初始读取位置并记录

				prvCopyDataFromQueue( pxQueue, pvBuffer ); 						// 从队列中拷贝数据到数据区
				if( xJustPeeking == pdFALSE ) 									// 是否删除消息
				{
					traceQUEUE_RECEIVE( pxQueue );
					pxQueue->uxMessagesWaiting = uxMessagesWaiting - 1;			// pfFALSE会删除消息,消息数量减一
					if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE ) // 有任务因队列满无法发送阻塞
					{
						if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE ) 
						// 将任务从等待发送消息列表中删除,并主动进行消息调度
						{
							queueYIELD_IF_USING_PREEMPTION();
						}
						else
						{
						mtCOVERAGE_TEST_MARKER();
						}
					}
					else
					{
						mtCOVERAGE_TEST_MARKER();
					}
				}
				else															// 消息不需要删除
				{
					traceQUEUE_PEEK( pxQueue );
					pxQueue->u.pcReadFrom = pcOriginalReadPosition; 			//拷贝数据后,读指针恢复为原始状态
					if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE ) 
					// 由于数据并没有被删除,所以如果有任务还在请求队列数据的,仍然可以拿数据,查看是否还有等待这个消息的任务
					{
						if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE ) 
						// 从对应事件表或状态表删除并加入就绪表或挂起就绪表,并酌情调度,有tickless的系统,还要刷新最新任务解锁时间
						{
							/* The task waiting has a higher priority than this task. */
							queueYIELD_IF_USING_PREEMPTION();
						}
						else
						{
							mtCOVERAGE_TEST_MARKER();
						}
					}
					else
					{
						mtCOVERAGE_TEST_MARKER();
					}
				}
				taskEXIT_CRITICAL();
				return pdPASS;							// 至此消息读取成功,返回成功的消息
			}
			else 										// 没有有效消息
			{
				if( xTicksToWait == ( TickType_t ) 0 )	// 不设置超时,直接返回队列空错误
				{
					taskEXIT_CRITICAL();				// 退出临界区
					traceQUEUE_RECEIVE_FAILED( pxQueue );
					return errQUEUE_EMPTY;
				}
				else if( xEntryTimeSet == pdFALSE )		// 如果设置阻塞时间,记录阻塞时间与溢出次数
				{
					vTaskSetTimeOutState( &xTimeOut ); //记录当前系统节拍溢出次数和当前节拍数
					xEntryTimeSet = pdTRUE;
				}
				else
				{
					mtCOVERAGE_TEST_MARKER();		/* Entry time was already set. */
				}
			}
		}
		taskEXIT_CRITICAL();						// 退出临界段,至此消息没有发送成功,从此开始检测阻塞
		vTaskSuspendAll(); 							// 挂起调度
		prvLockQueue( pxQueue ); 					// 开读写事务锁
		/* Update the timeout state to see if it has expired yet. */
		if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE ) 	// 检查是否超时
		{
			if( prvIsQueueEmpty( pxQueue ) != pdFALSE )						// 没有超时且队列空
			{
				traceBLOCKING_ON_QUEUE_RECEIVE( pxQueue );
				//按优先级顺序向等待接收表中插入任务控制块的事件表项,并将当前任务从就绪表移除,挂入延时表,更新最新任务解锁时间
				vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToReceive ), xTicksToWait );
				prvUnlockQueue( pxQueue ); 									// 解锁读写事务锁
				if( xTaskResumeAll() == pdFALSE )
				{
					portYIELD_WITHIN_API();
				}
				else
				{
					mtCOVERAGE_TEST_MARKER();
				}
			}															// 在超时时间内检测到队列中有消息
			else
			{
				prvUnlockQueue( pxQueue );
				( void ) xTaskResumeAll();
			}
		}
		else															// 阻塞时间超时,直接退出
		{
			prvUnlockQueue( pxQueue );
			( void ) xTaskResumeAll();
			if( prvIsQueueEmpty( pxQueue ) != pdFALSE )
			{
				traceQUEUE_RECEIVE_FAILED( pxQueue );
				return errQUEUE_EMPTY;
			}
			else
			{
				mtCOVERAGE_TEST_MARKER();
			}
		}
	}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

4. 消息队列 的相关文章

  • 任务间通信 | 邮箱、消息队列

    本文分享自中移OneOS公众号 任务间通信 上篇讲解了任务间同步 xff0c 在本篇中主要讲解任务间通信机制 xff0c 并对邮箱及消息队列进行详细介绍 通过对其概念 详细设计 接口设计等的讲解帮助开发者更好的理解其在操作系统中的应用 任务
  • RocketMQ和kafka

    RocketMQ 分为集群消息 一组中只有一个消费者竞争到消息 和广播消息 组内消费者都会消费消息 相关概念有 topic 一个消息的主题 一级分类 tag 消息的二级分类 queque 消息队列 brocker里直接存储消息就是在queq
  • RocketMQ第二篇 单机版安装操作步骤

    MQ下载地址 下载RocketMQ 4 7 1版本 RocketMQ运行版本下载地址 https archive apache org dist rocketmq 4 7 1 rocketmq all 4 7 1 bin release z
  • RocketMQ系列之集群搭建

    前言 上节我们对RocketMQ 以下简称RMQ 有了一些基本的认识 大致知道了 什么是RMQ以及他能做什么 今天我们来讲讲如何搭建RMQ 与其说搭建RMQ不如说是搭建RMQ集群 为什么这么说呢 看完这篇文章自然就懂了 RMQ几个重要角色
  • 4 SpringBoot整合RocketMQ实现消息发送和接收

    我们使用主流的SpringBoot框架整合RocketMQ来讲解 使用方便快捷 最终项目结构如下 具体步骤如下 第一步 我们新建一个父项目rocketmq test pom类型 主要是依赖管理 包括版本的管理 以及管理module子项目 p
  • 13 SpringBoot整合RocketMQ实现过滤消息-根据SQL表达式过滤消息

    SQL表达式方式可以根据发送消息时输入的属性进行一些计算 RocketMQ的SQL表达式语法 只定义了一些基本的语法功能 数字比较 如 gt gt lt lt BETWEEN 字符比较 如 lt gt IN IS NULL or IS NO
  • RocketMQ系列之顺序消费

    前言 上节我们介绍了RMQ的两大亮点 重试和重复消费的问题 其实重试才能算亮点 重复消费最终还是要由我们自己来解决这个问题 RMQ自身并没有提供很好的机制 至少目前是没有 不知道将来会不会有 OK扯远了 今天呢 我们再来介绍RMQ一个不错的
  • Kafka常见面试题

    1 什么是消息中间件 2 kafka 是什么 有什么作用 3 kafka 的架构是怎么样的 4 Kafka Replicas是怎么管理的 5 如何确定当前能读到哪一条消息 6 生产者发送消息有哪些模式 7 发送消息的分区策略有哪些 8 Ka
  • 一文详解RabbitMQ,RocketMQ和Kafka的异同

  • RabbitMQ中的限流、return机制、死信队列

    目录 优点 缺点 1 限流 2 return机制 3 死信队列 优点 高可用性 RabbitMQ支持集群和镜像队列等多种方式实现高可用性 保证系统稳定运行 可靠性强 RabbitMQ使用AMQP协议作为消息传递的标准 能够确保消息传递的可靠
  • 非阻塞的connect使用方式

    connect 函数的调用涉及到3次握手 默认connect函数为阻塞连接状态 通常connect 会阻塞到三次握手的完成和失败 而这个connect阻塞超时时间会依赖于系统 一般为75s到几分钟时间 一种方式可以通过该系统配置 proc
  • 7 SpringBoot整合RocketMQ发送单向消息

    发送单向消息是指producer向 broker 发送消息 执行 API 时直接返回 不等待broker 服务器的结果 这种方式主要用在不特别关心发送结果的场景 举例 日志发送 RocketMQTemplate给我们提供了sendOneWa
  • 将mysql中的数据移到另一个数据库中

    1 导出整个数据库 mysqldump u 用户名 p 数据库名 gt 导出的文件名 mysqldump u admin p vmiplatform gt sql sql 运行结果如下 2 导出一个表 mysqldump u 用户名 p 数
  • 代码技巧——如何关闭订单?延迟任务的实现方案【建议收藏】

    先思考个问题 为什么要关闭订单 业务上 1 提供待付款时间 而不是简单的 一次付款机会 提高业务指标之一的成单率 成单率 成功下单的人数 发起支付的人数 2 下单成功意味着这个商品被当前订单占用 库存已经预扣减 如果迟迟不支付则需要回收库存
  • RocketMQ 简介

    本文根据阿里云 RocketMQ产品文档整理 地址 https help aliyun com document detail 29532 html userCode qtldtin2 简介 RocketMQ是由阿里捐赠给Apache的一款
  • 关于rocketmq 中日志文件路径的配置

    前些天发现了一个巨牛的人工智能学习网站 通俗易懂 风趣幽默 忍不住分享一下给大家 点击跳转到网站 rocketmq 中的数据和日志文件默认都是存储在user home路径下面的 往往我们都需要修改这些路径到指定文件夹以便管理 服务端日志 网
  • docker安装rocketmq4.6.1(精简版)

    一 创建文件 mkdir p usr local rocketmq server logs usr local rocketmq server store usr local rocketmq broker logs usr local r
  • kafka系列——KafkaProducer源码分析

    实例化过程 在KafkaProducer的构造方法中 根据配置项主要完成以下对象或数据结构的实例化 配置项中解析出 clientId 用于跟踪程序运行情况 在有多个KafkProducer时 若没有配置 client id则clientId
  • RT-Thread记录(七、IPC机制之邮箱、消息队列)

    讲完了线程同步的机制 我们要开始线程通讯的学习 线程通讯中的邮箱消息队列也属于 RT Thread 的IPC机制 目录 前言 一 邮箱 1 1 邮箱控制块 1 2 邮箱操作 1 2 1 创建和删除 1 2 2 初始化和脱离 1 2 3 发送
  • 腾讯技术工程总结-主流消息队列你了解哪些?

    文章参考 腾讯技术工程 关于消息队列的知识总结 主流消息队列你了解哪些 消息队列的发展历程 2003 年至今有很多优秀的消息队列诞生 如 kafka 阿里自研的 rocketmq 以及后起之秀 pulsar 消息队列在刚出现所需要解决的问题

随机推荐