ThingsBoard源码解析-数据订阅与规则链数据处理

2023-05-16

前言

结合本篇对规则链的执行过程进行探讨
根据之前对MQTT源码的学习,我们由消息的处理入手

//org.thingsboard.server.transport.mqtt.MqttTransportHandler

void processRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) {
	switch (msg.fixedHeader().messageType()) {
		case PUBLISH:
			processPublish(ctx, (MqttPublishMessage) msg);
			break;
		case SUBSCRIBE:
			processSubscribe(ctx, (MqttSubscribeMessage) msg);
			break;
		case UNSUBSCRIBE:
			processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
			break;
		case PINGREQ:
			if (checkConnected(ctx, msg)) {
				ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
				transportService.reportActivity(deviceSessionCtx.getSessionInfo());
			}
			break;
		case DISCONNECT:
			ctx.close();
			break;
		case PUBACK:
			int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId();
			TransportProtos.ToDeviceRpcRequestMsg rpcRequest = rpcAwaitingAck.remove(msgId);
			if (rpcRequest != null) {
				transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
			}
			break;
		default:
			break;
        }
    }

接着看对发布消息的处理

//org.thingsboard.server.transport.mqtt.MqttTransportHandler

private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
	if (!checkConnected(ctx, mqttMsg)) {
		return;
	}
	String topicName = mqttMsg.variableHeader().topicName();
	int msgId = mqttMsg.variableHeader().packetId();
	log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);

	if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
		//消息来源为网关主题
		if (gatewaySessionHandler != null) {
			handleGatewayPublishMsg(ctx, topicName, msgId, mqttMsg);
			transportService.reportActivity(deviceSessionCtx.getSessionInfo());
		}
	} else {
		//处理设备的消息,重点
		processDevicePublish(ctx, mqttMsg, topicName, msgId);
	}
}

继续

//org.thingsboard.server.transport.mqtt.MqttTransportHandler

private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
	try {
		Matcher fwMatcher;
		MqttTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();
		if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {
			TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
		} else if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {
			TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
		} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
			TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
			transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
			attrReqTopicType = TopicType.V1;
		} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)) {
			TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC);
			transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
		} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) {
			TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC);
			transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
			toServerRpcSubTopicType = TopicType.V1;
		} else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) {
			TransportProtos.ClaimDeviceMsg claimDeviceMsg = payloadAdaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg));
		} else if ((fwMatcher = FW_REQUEST_PATTERN.matcher(topicName)).find()) {
			getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.FIRMWARE);
		} else if ((fwMatcher = SW_REQUEST_PATTERN.matcher(topicName)).find()) {
			getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.SOFTWARE);
		} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC)) {
			TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
		} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC)) {
			TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getJsonMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
		} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC)) {
			TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getProtoMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
		} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC)) {
			TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
		} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC)) {
			TransportProtos.PostAttributeMsg postAttributeMsg = context.getJsonMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
		} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC)) {
			TransportProtos.PostAttributeMsg postAttributeMsg = context.getProtoMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
		} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC)) {
			TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getJsonMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC);
			transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
		} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC)) {
			TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getProtoMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC);
			transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
		} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC)) {
			TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC);
			transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
		} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC)) {
			TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = context.getJsonMqttAdaptor().convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC);
			transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
			toServerRpcSubTopicType = TopicType.V2_JSON;
		} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC)) {
			TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = context.getProtoMqttAdaptor().convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC);
			transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
			toServerRpcSubTopicType = TopicType.V2_PROTO;
		} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC)) {
			TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC);
			transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
			toServerRpcSubTopicType = TopicType.V2;
		} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX)) {
			TransportProtos.GetAttributeRequestMsg getAttributeMsg = context.getJsonMqttAdaptor().convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX);
			transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
			attrReqTopicType = TopicType.V2_JSON;
		} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX)) {
			TransportProtos.GetAttributeRequestMsg getAttributeMsg = context.getProtoMqttAdaptor().convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX);
			transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
			attrReqTopicType = TopicType.V2_PROTO;
		} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX)) {
			TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX);
			transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
			attrReqTopicType = TopicType.V2;
		} else {
			transportService.reportActivity(deviceSessionCtx.getSessionInfo());
			ack(ctx, msgId);
		}
	} catch (AdaptorException e) {
		log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
		log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
		ctx.close();
	}
}

这里根据消息来源主题的不同,进行对应的处理
顺着属性消息处理继续往下看

//org.thingsboard.server.common.transport.service.DefaultTransportService

@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
	if (checkLimits(sessionInfo, msg, callback, msg.getKvCount())) {
		//更新活动时间
		reportActivityInternal(sessionInfo);
		TenantId tenantId = getTenantId(sessionInfo);
		DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
		//获取键值对
		JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
		//构造元数据
		TbMsgMetaData metaData = new TbMsgMetaData();
		metaData.putValue("deviceName", sessionInfo.getDeviceName());
		metaData.putValue("deviceType", sessionInfo.getDeviceType());
		metaData.putValue("notifyDevice", "false");
		CustomerId customerId = getCustomerId(sessionInfo);
		//发送至规则引擎
		sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST, 
				new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, customerId, msg.getKvList().size(), callback)));
	}
}

继续

//org.thingsboard.server.common.transport.service.DefaultTransportService

private void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, CustomerId customerId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json, 
							TbMsgMetaData metaData, SessionMsgType sessionMsgType, TbQueueCallback callback) {
	//创建设备配置标识
	DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB()));
	//从缓存中获取设备配置
	DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId);
	RuleChainId ruleChainId;
	String queueName;

	if (deviceProfile == null) {
		//无设备配置,使用默认的规则链和队列名
		log.warn("[{}] Device profile is null!", deviceProfileId);
		ruleChainId = null;
		queueName = ServiceQueue.MAIN;
	} else {
		//获取规则链标识
		ruleChainId = deviceProfile.getDefaultRuleChainId();
		//获取队列名
		String defaultQueueName = deviceProfile.getDefaultQueueName();
		queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN;
	}

	//创建消息
	TbMsg tbMsg = TbMsg.newMsg(queueName, sessionMsgType.name(), deviceId, customerId, metaData, gson.toJson(json), ruleChainId, null);
	//发送至规则引擎
	sendToRuleEngine(tenantId, tbMsg, callback);
}

继续

//org.thingsboard.server.common.transport.service.DefaultTransportService

private void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
	
	//获取分区信息
	TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, tbMsg.getOriginator());
	if (log.isTraceEnabled()) {
		log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg);
	}
	//创建消息
	ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg))
			.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
			.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
	//统计数增加
	ruleEngineProducerStats.incrementTotal();
	//统计相关回调
	StatsCallback wrappedCallback = new StatsCallback(callback, ruleEngineProducerStats);
	//发送至规则消息引擎队列
	ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback);
}

接下来,我们需要去消费端查看后续的处理

protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;

先找到TbQueueProducer的位置
在这里插入图片描述

接着从TbQueueConsumer入手

package org.thingsboard.server.queue;

import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;

import java.util.List;
import java.util.Set;

public interface TbQueueConsumer<T extends TbQueueMsg> {

    String getTopic();

    void subscribe();

    void subscribe(Set<TopicPartitionInfo> partitions);

    void unsubscribe();

    List<T> poll(long durationInMillis);

    void commit();

    boolean isStopped();

}

重点关注拉取方法List<T> poll(long durationInMillis);,看看在哪些地方被调用
在这里插入图片描述

发现目标DefaultTbRuleEngineConsumerService

//org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerService

void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {
	updateCurrentThreadName(threadSuffix);
	while (!stopped && !consumer.isStopped()) {
		try {
			//拉取消息
			List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(pollDuration);
			if (msgs.isEmpty()) {
				continue;
			}
			//获取提交策略
			final TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(configuration);
			//获取确认策略
			final TbRuleEngineProcessingStrategy ackStrategy = getAckStrategy(configuration);
			//初始化提交策略
			submitStrategy.init(msgs);
			while (!stopped) {
				//创建处理上下文
				TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(configuration.getName(), submitStrategy, ackStrategy.isSkipTimeoutMsgs());
				//提交,重点为 submitMessage 方法
				submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> submitMessage(configuration, stats, ctx, id, msg)));

				//超时等待
				final boolean timeout = !ctx.await(configuration.getPackProcessingTimeout(), TimeUnit.MILLISECONDS);

				//创建结果
				TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getName(), timeout, ctx);
				if (timeout) {
					//超时处理
					printFirstOrAll(configuration, ctx, ctx.getPendingMap(), "Timeout");
				}
				if (!ctx.getFailedMap().isEmpty()) {
					//失败处理
					printFirstOrAll(configuration, ctx, ctx.getFailedMap(), "Failed");
				}
				//打印统计信息
				ctx.printProfilerStats();

				//根据结果获取决策
				TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result);
				if (statsEnabled) {
					//记录是否提交完成
					stats.log(result, decision.isCommit());
				}

				//清理上下文
				ctx.cleanup();

				//判断决策
				if (decision.isCommit()) {
					//已提交
					//停止提交策略
					submitStrategy.stop();
					//退出循环
					break;
				} else {
					//未提交完毕
					//将决策的重试消息集合更新至提交策略,继续提交
					submitStrategy.update(decision.getReprocessMap());
				}
			}
			//消费端提交确认
			consumer.commit();
		} catch (Exception e) {
			if (!stopped) {
				log.warn("Failed to process messages from queue.", e);
				try {
					Thread.sleep(pollDuration);
				} catch (InterruptedException e2) {
					log.trace("Failed to wait until the server has capacity to handle new requests", e2);
				}
			}
		}
	}
	log.info("TB Rule Engine Consumer stopped.");
}

查看关键方法

//org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerService

void submitMessage(TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, TbMsgPackProcessingContext ctx, UUID id, TbProtoQueueMsg<ToRuleEngineMsg> msg) {
	log.trace("[{}] Creating callback for topic {} message: {}", id, configuration.getName(), msg.getValue());
	//获取原始消息
	ToRuleEngineMsg toRuleEngineMsg = msg.getValue();
	//获取租户标识
	TenantId tenantId = TenantId.fromUUID(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB()));
	//创建回调
	TbMsgCallback callback = prometheusStatsEnabled ? 
			new TbMsgPackCallback(id, tenantId, ctx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) : 
			new TbMsgPackCallback(id, tenantId, ctx);
	try {
		if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) {
			//转发至规则引擎 Actor
			forwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback);
		} else {
			//消息为空直接回调成功方法
			callback.onSuccess();
		}
	} catch (Exception e) {
		//回调失败方法
		callback.onFailure(new RuleEngineException(e.getMessage()));
	}
}

继续

//org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerService

private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
	//构建消息
	TbMsg tbMsg = TbMsg.fromBytes(queueName, toRuleEngineMsg.getTbMsg().toByteArray(), callback);
	QueueToRuleEngineMsg msg;
	//获取关联类型列表
	ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList();
	Set<String> relationTypes = null;
	if (relationTypesList != null) {
		if (relationTypesList.size() == 1) {
			relationTypes = Collections.singleton(relationTypesList.get(0));
		} else {
			relationTypes = new HashSet<>(relationTypesList);
		}
	}
	//创建消息
	msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes, toRuleEngineMsg.getFailureMessage());
	//使用 Actor 系统上下文发送消息
	actorContext.tell(msg);
}

先看一下QueueToRuleEngineMsg

package org.thingsboard.server.common.msg.queue;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbRuleEngineActorMsg;

import java.util.Set;

/**
 * Created by ashvayka on 15.03.18.
 */
@ToString
@EqualsAndHashCode(callSuper = true)
public final class QueueToRuleEngineMsg extends TbRuleEngineActorMsg {

    @Getter
    private final TenantId tenantId;
    @Getter
    private final Set<String> relationTypes;
    @Getter
    private final String failureMessage;

    public QueueToRuleEngineMsg(TenantId tenantId, TbMsg tbMsg, Set<String> relationTypes, String failureMessage) {
        super(tbMsg);
        this.tenantId = tenantId;
        this.relationTypes = relationTypes;
        this.failureMessage = failureMessage;
    }

    @Override
    public MsgType getMsgType() {
        return MsgType.QUEUE_TO_RULE_ENGINE_MSG;
    }

    @Override
    public void onTbActorStopped(TbActorStopReason reason) {
        String message;
        if (msg.getRuleChainId() != null) {
            message = reason == TbActorStopReason.STOPPED ?
                    String.format("Rule chain [%s] stopped", msg.getRuleChainId().getId()) :
                    String.format("Failed to initialize rule chain [%s]!", msg.getRuleChainId().getId());
        } else {
            message = reason == TbActorStopReason.STOPPED ? "Rule chain stopped" : "Failed to initialize rule chain!";
        }
        msg.getCallback().onFailure(new RuleEngineException(message));
    }

    public boolean isTellNext() {
        return relationTypes != null && !relationTypes.isEmpty();
    }

}

得知消息类型为MsgType.QUEUE_TO_RULE_ENGINE_MSG,后面会用到
接着我们看Actor系统对消息的处理

//org.thingsboard.server.actors.ActorSystemContext

public void tell(TbActorMsg tbActorMsg) {
	appActor.tell(tbActorMsg);
}

appActor 为应用Actor,是整个Actor系统的根Actor,感兴趣可以自行阅读
根据之前的学习,我们了解到Actor的处理方法为boolean process(TbActorMsg msg)

//org.thingsboard.server.actors.service.ContextAwareActor

@Override
public boolean process(TbActorMsg msg) {
	if (log.isDebugEnabled()) {
		log.debug("Processing msg: {}", msg);
	}
	//处理消息
	if (!doProcess(msg)) {
		log.warn("Unprocessed message: {}!", msg);
	}
	return false;
}

可见,正真执行的方法为protected abstract boolean doProcess(TbActorMsg msg)

//org.thingsboard.server.actors.app.AppActor

@Override
protected boolean doProcess(TbActorMsg msg) {
	if (!ruleChainsInitialized) {
		//规则链未初始化
		//初始化租户 Actor
		initTenantActors();
		ruleChainsInitialized = true;
		if (msg.getMsgType() != MsgType.APP_INIT_MSG && msg.getMsgType() != MsgType.PARTITION_CHANGE_MSG) {
			log.warn("Rule Chains initialized by unexpected message: {}", msg);
		}
	}
	//判断消息类型
	switch (msg.getMsgType()) {
		case APP_INIT_MSG:
			break;
		case PARTITION_CHANGE_MSG:
			ctx.broadcastToChildren(msg);
			break;
		case COMPONENT_LIFE_CYCLE_MSG:
			onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
			break;
		case QUEUE_TO_RULE_ENGINE_MSG:
			onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
			break;
		case TRANSPORT_TO_DEVICE_ACTOR_MSG:
			onToDeviceActorMsg((TenantAwareMsg) msg, false);
			break;
		case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
		case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
		case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
		case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:
		case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
		case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
		case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
		case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:
			onToDeviceActorMsg((TenantAwareMsg) msg, true);
			break;
		case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
			onToTenantActorMsg((EdgeEventUpdateMsg) msg);
			break;
		case SESSION_TIMEOUT_MSG:
			ctx.broadcastToChildrenByType(msg, EntityType.TENANT);
			break;
		default:
			return false;
	}
	return true;
}

回忆之前消息类型为MsgType.QUEUE_TO_RULE_ENGINE_MSG

//org.thingsboard.server.actors.app.AppActor

private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
	if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) {
		//消息来自系统,视为异常
		msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
	} else {
		if (!deletedTenants.contains(msg.getTenantId())) {
			//租户未被删除
			//获取或创建租户 Actor 并发送消息
			getOrCreateTenantActor(msg.getTenantId()).tell(msg);
		} else {
			//租户已删除,直接回调成功方法
			msg.getMsg().getCallback().onSuccess();
		}
	}
}

deletedTenants 用于记录删除的用户标识

//org.thingsboard.server.actors.app.AppActor

private TbActorRef getOrCreateTenantActor(TenantId tenantId) {
	return ctx.getOrCreateChildActor(new TbEntityActorId(tenantId), 
			() -> DefaultActorService.TENANT_DISPATCHER_NAME, 
			() -> new TenantActor.ActorCreator(systemContext, tenantId));
}

直接查看TenantActordoProcess方法

//org.thingsboard.server.actors.tenant.TenantActor

@Override
protected boolean doProcess(TbActorMsg msg) {
	if (cantFindTenant) {
		//找不到租户
		log.info("[{}] Processing missing Tenant msg: {}", tenantId, msg);
		if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) {
			QueueToRuleEngineMsg queueMsg = (QueueToRuleEngineMsg) msg;
			//直接回调成功方法
			queueMsg.getMsg().getCallback().onSuccess();
		} else if (msg.getMsgType().equals(MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG)) {
			TransportToDeviceActorMsgWrapper transportMsg = (TransportToDeviceActorMsgWrapper) msg;
			//直接回调成功方法
			transportMsg.getCallback().onSuccess();
		}
		return true;
	}
	switch (msg.getMsgType()) {
		case PARTITION_CHANGE_MSG:
			PartitionChangeMsg partitionChangeMsg = (PartitionChangeMsg) msg;
			ServiceType serviceType = partitionChangeMsg.getServiceQueueKey().getServiceType();
			if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {
				//To Rule Chain Actors
				broadcast(msg);
			} else if (ServiceType.TB_CORE.equals(serviceType)) {
				List<TbActorId> deviceActorIds = ctx.filterChildren(new TbEntityTypeActorIdPredicate(EntityType.DEVICE) {
					@Override
					protected boolean testEntityId(EntityId entityId) {
						return super.testEntityId(entityId) && !isMyPartition(entityId);
					}
				});
				deviceActorIds.forEach(id -> ctx.stop(id));
			}
			break;
		case COMPONENT_LIFE_CYCLE_MSG:
			onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
			break;
		case QUEUE_TO_RULE_ENGINE_MSG:
			onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
			break;
		case TRANSPORT_TO_DEVICE_ACTOR_MSG:
			onToDeviceActorMsg((DeviceAwareMsg) msg, false);
			break;
		case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
		case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
		case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
		case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:
		case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
		case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
		case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
		case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:
			onToDeviceActorMsg((DeviceAwareMsg) msg, true);
			break;
		case SESSION_TIMEOUT_MSG:
			ctx.broadcastToChildrenByType(msg, EntityType.DEVICE);
			break;
		case RULE_CHAIN_INPUT_MSG:
		case RULE_CHAIN_OUTPUT_MSG:
		case RULE_CHAIN_TO_RULE_CHAIN_MSG:
			onRuleChainMsg((RuleChainAwareMsg) msg);
			break;
		case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
			onToEdgeSessionMsg((EdgeEventUpdateMsg) msg);
			break;
		default:
			return false;
	}
	return true;
}

查看MsgType.QUEUE_TO_RULE_ENGINE_MSG类型的处理方法

//org.thingsboard.server.actors.tenant.TenantActor

private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
	//检查当前服务是否为规则引擎服务
	if (!isRuleEngine) {
		log.warn("RECEIVED INVALID MESSAGE: {}", msg);
		return;
	}
	TbMsg tbMsg = msg.getMsg();
	//检查规则引擎是否启用状态
	if (getApiUsageState().isReExecEnabled()) {
		//判断规则链标识是否为空
		if (tbMsg.getRuleChainId() == null) {
			//获取根链 Actor 并判断是否为空
			if (getRootChainActor() != null) {
				//向根链 Actor 发送消息
				getRootChainActor().tell(msg);
			} else {
				//无根链 Actor ,回调失败方法
				tbMsg.getCallback().onFailure(new RuleEngineException("No Root Rule Chain available!"));
				log.info("[{}] No Root Chain: {}", tenantId, msg);
			}
		} else {
			try {
				//向指定规则链发送消息
				ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
			} catch (TbActorNotRegisteredException ex) {
				log.trace("Received message for non-existing rule chain: [{}]", tbMsg.getRuleChainId());
				//TODO: 3.1 Log it to dead letters queue;
				tbMsg.getCallback().onSuccess();
			}
		}
	} else {
		log.trace("[{}] Ack message because Rule Engine is disabled", tenantId);
		tbMsg.getCallback().onSuccess();
	}
}

跳过中间的步骤,直接看规则链ActordoProcess方法即可

//org.thingsboard.server.actors.ruleChain.RuleChainActor

@Override
protected boolean doProcess(TbActorMsg msg) {
	switch (msg.getMsgType()) {
		case COMPONENT_LIFE_CYCLE_MSG:
			onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
			break;
		case QUEUE_TO_RULE_ENGINE_MSG:
			processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
			break;
		case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
			processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
			break;
		case RULE_CHAIN_TO_RULE_CHAIN_MSG:
			processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg);
			break;
		case RULE_CHAIN_INPUT_MSG:
			processor.onRuleChainInputMsg((RuleChainInputMsg) msg);
			break;
		case RULE_CHAIN_OUTPUT_MSG:
			processor.onRuleChainOutputMsg((RuleChainOutputMsg) msg);
			break;
		case PARTITION_CHANGE_MSG:
			processor.onPartitionChangeMsg((PartitionChangeMsg) msg);
			break;
		case STATS_PERSIST_TICK_MSG:
			onStatsPersistTick(id);
			break;
		default:
			return false;
	}
	return true;
}

查看MsgType.QUEUE_TO_RULE_ENGINE_MSG类型的处理方法

//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor

void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) {
	TbMsg msg = envelope.getMsg();
	//验证消息
	if (!checkMsgValid(msg)) {
		return;
	}
	log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, msg.getId(), msg);
	//判断是否包含关联类型
	if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) {
		onTellNext(msg, true);
	} else {
		onTellNext(msg, envelope.getMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage());
	}
}

此处的关联类型是什么?
在这里插入图片描述

结合规则链的结构联想一下,规则链本质是从上个节点传递到下个节点,节点传递之间有什么是可有可无的?
显然关联类型就是节点流转的条件
由于首节点是没有条件的,因此在构造消息时没有设置关联类型
注:每条规则链仅有一个首节点,且除首节点外的其他节点至少存在一个关联类型TbRelationTypes.FAILURE),这部分感兴趣自行研究
先看没有关联类型的处理

//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor

private void onTellNext(TbMsg msg, boolean useRuleNodeIdFromMsg) {
	try {
		//检查组件(节点)状态是否正常
		checkComponentStateActive(msg);
		//获取节点标识
		RuleNodeId targetId = useRuleNodeIdFromMsg ? msg.getRuleNodeId() : null;
		RuleNodeCtx targetCtx;
		if (targetId == null) {
			//未指定目标节点
			//将当前规则链的首节点作为目标
			targetCtx = firstNode;
			//拷贝消息,entityId即当前规则链的标识
			msg = msg.copyWithRuleChainId(entityId);
		} else {
			//已指定目标节点,获取节点上下文
			targetCtx = nodeActors.get(targetId);
		}
		//判断上下文是否存在
		if (targetCtx != null) {
			log.trace("[{}][{}] Pushing message to target rule node", entityId, targetId);
			//推送至节点
			pushMsgToNode(targetCtx, msg, NA_RELATION_TYPE);
		} else {
			log.trace("[{}][{}] Rule node does not exist. Probably old message", entityId, targetId);
			msg.getCallback().onSuccess();
		}
	} catch (RuleNodeException rne) {
		msg.getCallback().onFailure(rne);
	} catch (Exception e) {
		msg.getCallback().onFailure(new RuleEngineException(e.getMessage()));
	}
}

查看下一个方法前,我们先了解一下节点上下文的结构

package org.thingsboard.server.actors.ruleChain;

import lombok.AllArgsConstructor;
import lombok.Data;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleNode;

/**
 * Created by ashvayka on 19.03.18.
 */
@Data
@AllArgsConstructor
final class RuleNodeCtx {
    private final TenantId tenantId;
    private final TbActorRef chainActor;
    private final TbActorRef selfActor;
    private RuleNode self;
}

很简单的结构,记录了租户标识,规则链Actor,自身节点Actor和自身节点
继续

//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor

private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) {
	if (nodeCtx != null) {
		//创建消息并告知自身节点
		nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, ruleChainName, nodeCtx), msg, fromRelationType));
	} else {
		log.error("[{}][{}] RuleNodeCtx is empty", entityId, ruleChainName);
		msg.getCallback().onFailure(new RuleEngineException("Rule Node CTX is empty"));
	}
}

QueueToRuleEngineMsg类似,查看可知RuleChainToRuleNodeMsg的消息类型为MsgType.RULE_CHAIN_TO_RULE_MSG,后面同样以此为处理条件
终于到节点的Actor

//org.thingsboard.server.actors.ruleChain.RuleNodeActor

@Override
protected boolean doProcess(TbActorMsg msg) {
	switch (msg.getMsgType()) {
		case COMPONENT_LIFE_CYCLE_MSG:
		case RULE_NODE_UPDATED_MSG:
			onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
			break;
		case RULE_CHAIN_TO_RULE_MSG:
			onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg);
			break;
		case RULE_TO_SELF_MSG:
			onRuleNodeToSelfMsg((RuleNodeToSelfMsg) msg);
			break;
		case STATS_PERSIST_TICK_MSG:
			onStatsPersistTick(id);
			break;
		case PARTITION_CHANGE_MSG:
			onClusterEventMsg((PartitionChangeMsg) msg);
			break;
		default:
			return false;
	}
	return true;
}

查看MsgType.RULE_CHAIN_TO_RULE_MSG处理方法

//org.thingsboard.server.actors.ruleChain.RuleNodeActor

private void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg envelope) {
	TbMsg msg = envelope.getMsg();
	//验证消息
	if (!msg.isValid()) {
		if (log.isTraceEnabled()) {
			log.trace("Skip processing of message: {} because it is no longer valid!", msg);
		}
		return;
	}
	if (log.isDebugEnabled()) {
		log.debug("[{}][{}][{}] Going to process rule engine msg: {}", ruleChainId, id, processor.getComponentName(), msg);
	}
	try {
		//处理消息
		processor.onRuleChainToRuleNodeMsg(envelope);
		//增加处理计数
		increaseMessagesProcessedCount();
	} catch (Exception e) {
		logAndPersist("onRuleMsg", e);
	}
}

继续

//org.thingsboard.server.actors.ruleChain.RuleNodeActorMessageProcessor

void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {
	//回调处理开始通知
	msg.getMsg().getCallback().onProcessingStart(info);
	//检查组件状态可用
	checkComponentStateActive(msg.getMsg());
	TbMsg tbMsg = msg.getMsg();
	//获取规则节点计数
	int ruleNodeCount = tbMsg.getAndIncrementRuleNodeCounter();
	//获取消息的最大规则节点执行次数
	int maxRuleNodeExecutionsPerMessage = getTenantProfileConfiguration().getMaxRuleNodeExecsPerMessage();
	//判断执行次数是否超限
	if (maxRuleNodeExecutionsPerMessage == 0 || ruleNodeCount < maxRuleNodeExecutionsPerMessage) {
		//上报规则引擎执行计数
		apiUsageClient.report(tenantId, tbMsg.getCustomerId(), ApiUsageRecordKey.RE_EXEC_COUNT);
		if (ruleNode.isDebugMode()) {
			systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());
		}
		try {
			//执行节点处理方法
			tbNode.onMsg(msg.getCtx(), msg.getMsg());
		} catch (Exception e) {
			msg.getCtx().tellFailure(msg.getMsg(), e);
		}
	} else {
		tbMsg.getCallback().onFailure(new RuleNodeException("Message is processed by more then " + maxRuleNodeExecutionsPerMessage + " rule nodes!", ruleChainName, ruleNode));
	}
}

这里仅调用了tbNodeonMsg方法,那么节点的流转呢?
猜想所有节点继承自一个公共的父抽象类,该类中实现了节点的流转
那么看一下继承关系
在这里插入图片描述

这些节点并没由继承公共的父类,流转方法没有抽离出来?
随便找一个节点看看,这里我看的是TbMsgTypeFilterNode

//org.thingsboard.rule.engine.filter.TbMsgTypeFilterNode

@Override
public void onMsg(TbContext ctx, TbMsg msg) {
	ctx.tellNext(msg, config.getMessageTypes().contains(msg.getType()) ? "True" : "False");
}

验证了猜想,接着看下去

//org.thingsboard.server.actors.ruleChain.DefaultTbContext

@Override
public void tellSuccess(TbMsg msg) {
	tellNext(msg, Collections.singleton(TbRelationTypes.SUCCESS), null);
}

@Override
public void tellNext(TbMsg msg, String relationType) {
	tellNext(msg, Collections.singleton(relationType), null);
}

@Override
public void tellNext(TbMsg msg, Set<String> relationTypes) {
	tellNext(msg, relationTypes, null);
}

这里贴出了tellSuccess方法和另一个tellNext方法,它们有被其他节点使用

//org.thingsboard.server.actors.ruleChain.DefaultTbContext

private void tellNext(TbMsg msg, Set<String> relationTypes, Throwable th) {
	if (nodeCtx.getSelf().isDebugMode()) {
		relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
	}
	//回调处理结束通知
	msg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId());
	//像规则链 Actor 发送消息
	nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null));
}

和前面一样,查看RuleNodeToRuleChainTellNextMsg的消息类型为MsgType.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG
接下来又回到了规则链ActordoProcess方法,找到对应的处理方法

//org.thingsboard.server.actors.ruleChain.RuleChainActor

case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
	processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
	break;
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor

void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
	var msg = envelope.getMsg();
	if (checkMsgValid(msg)) {
		onTellNext(msg, envelope.getOriginator(), envelope.getRelationTypes(), envelope.getFailureMessage());
	}
}

这里调用的onTellNext方法即前面onQueueToRuleEngineMsg方法中根据关联类型通知的方法

//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor

private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) {
	try {
		checkComponentStateActive(msg);
		EntityId entityId = msg.getOriginator();
		//获取主题分区信息
		TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);

		//根据来源节点标识获取关联(后续节点的指向)列表
		List<RuleNodeRelation> ruleNodeRelations = nodeRoutes.get(originatorNodeId);
		if (ruleNodeRelations == null) { // When unchecked, this will cause NullPointerException when rule node doesn't exist anymore
			log.warn("[{}][{}][{}] No outbound relations (null). Probably rule node does not exist. Probably old message.", tenantId, entityId, msg.getId());
			ruleNodeRelations = Collections.emptyList();
		}

		//根据指定的关联类型筛选关联
		List<RuleNodeRelation> relationsByTypes = ruleNodeRelations.stream()
				.filter(r -> contains(relationTypes, r.getType()))
				.collect(Collectors.toList());
		//获取关联个数
		int relationsCount = relationsByTypes.size();
		if (relationsCount == 0) {
			//没有后续节点了
			log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());
			//判断当前关联是否包含失败(上个节点是否执行失败)
			if (relationTypes.contains(TbRelationTypes.FAILURE)) {
				//获取规则节点上下文
				RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);
				if (ruleNodeCtx != null) {
					//回调消息的失败方法
					msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf()));
				} else {
					log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());
					//回调消息的失败方法
					msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));
				}
			} else {
				//回调消息的成功方法
				msg.getCallback().onSuccess();
			}
		} else if (relationsCount == 1) {
			//后续仅一个节点
			//此处循环仅执行一次
			for (RuleNodeRelation relation : relationsByTypes) {
				log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
				//推送给目标
				pushToTarget(tpi, msg, relation.getOut(), relation.getType());
			}
		} else {
			//后续多个节点
			MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback());
			log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relationsByTypes);
			//遍历关联列表
			for (RuleNodeRelation relation : relationsByTypes) {
				EntityId target = relation.getOut();
				//推送至队列
				putToQueue(tpi, msg, callbackWrapper, target);
			}
		}
	} catch (RuleNodeException rne) {
		msg.getCallback().onFailure(rne);
	} catch (Exception e) {
		log.warn("[" + tenantId + "]" + "[" + entityId + "]" + "[" + msg.getId() + "]" + " onTellNext failure", e);
		msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));
	}
}

我们先看后续多个节点的推送

//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor

private void putToQueue(TopicPartitionInfo tpi, TbMsg msg, TbQueueCallback callbackWrapper, EntityId target) {
	switch (target.getEntityType()) {
		case RULE_NODE:
			putToQueue(tpi, msg.copyWithRuleNodeId(entityId, new RuleNodeId(target.getId()), UUID.randomUUID()), callbackWrapper);
			break;
		case RULE_CHAIN:
			putToQueue(tpi, msg.copyWithRuleChainId(new RuleChainId(target.getId()), UUID.randomUUID()), callbackWrapper);
			break;
	}
}

根据实体类型拷贝消息,发送至队列

//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor

private void putToQueue(TopicPartitionInfo tpi, TbMsg newMsg, TbQueueCallback callbackWrapper) {
	//构建消息
	ToRuleEngineMsg toQueueMsg = ToRuleEngineMsg.newBuilder()
			.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
			.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
			.setTbMsg(TbMsg.toByteString(newMsg))
			.build();
	//推送至规则引擎
	clusterService.pushMsgToRuleEngine(tpi, newMsg.getId(), toQueueMsg, callbackWrapper);
}

继续

//org.thingsboard.server.service.queue.DefaultTbClusterService

@Override
public void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback) {
	log.trace("PUSHING msg: {} to:{}", msg, tpi);
	//获取规则引擎消息生产端,发送消息
	producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback);
	toRuleEngineMsgs.incrementAndGet();
}

消息发送至队列,消费端接收到消息后,根据关联类型再次调用onTellNext方法处理,此时的消息仅指向单个后续节点
最后查看单个后续节点的处理

//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor

private void pushToTarget(TopicPartitionInfo tpi, TbMsg msg, EntityId target, String fromRelationType) {
	//判断是否为当前服务负责的分区
	if (tpi.isMyPartition()) {
		//判断实体类型
		switch (target.getEntityType()) {
			case RULE_NODE:
				//推送至节点
				pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType);
				break;
			case RULE_CHAIN:
				//通知父 Actor
				parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, fromRelationType));
				break;
		}
	} else {
		//放入队列,交给负责的服务
		putToQueue(tpi, msg, new TbQueueTbMsgCallbackWrapper(msg.getCallback()), target);
	}
}

注:集群部署下,每个服务负责若干分区
pushMsgToNode方法和putToQueue方法前面都已经看过了,重点看跳转规则链
和前面一样,查看RuleChainToRuleChainMsg的消息类型为MsgType.RULE_CHAIN_TO_RULE_CHAIN_MSG
接着查看租户Actor中对应的处理方法

//org.thingsboard.server.actors.tenant.TenantActor

private void onRuleChainMsg(RuleChainAwareMsg msg) {
	if (getApiUsageState().isReExecEnabled()) {
		getOrCreateActor(msg.getRuleChainId()).tell(msg);
	}
}

再查看规则链Actor中对应的处理方法

//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor

void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
	var msg = envelope.getMsg();
	if (!checkMsgValid(msg)) {
		return;
	}
	try {
		checkComponentStateActive(envelope.getMsg());
		if (firstNode != null) {
			pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
		} else {
			envelope.getMsg().getCallback().onSuccess();
		}
	} catch (RuleNodeException e) {
		log.debug("Rule Chain is not active. Current state [{}] for processor [{}][{}] tenant [{}]", state, entityId.getEntityType(), entityId, tenantId);
	}
}

pushMsgToNode方法前面已经看过了,至此,我们已阅读完整个规则链的执行逻辑

总结

最后,画一下规则链的执行逻辑
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ThingsBoard源码解析-数据订阅与规则链数据处理 的相关文章

  • ThingsBoard源码解析-数据订阅与规则链数据处理

    前言 结合本篇对规则链的执行过程进行探讨 根据之前对MQTT源码的学习 xff0c 我们由消息的处理入手 org thingsboard server transport mqtt MqttTransportHandler void pro
  • ThingsBoard—自定义规则节点

    一般的功能 xff0c 可以使用现有的节点来完成 但如果有比较复杂 xff0c 或有自己特殊业务需求的 xff0c 可能就需要自定义了 按官方教程来基本就可以入门 xff0c 如果需要深入 xff0c 可以参考ThingsBoard自有节点
  • 每天一个Lodash源码解析

    每天一个Lodash源码解析 chunk 方法介绍自我实现源码分析代码对比知识点补充浮点数转化为浮点数数组创建方法区别js中切割数组方法 slice 方法介绍自我实现源码分析代码对比知识点补充 96 96 gt gt gt 96 96 移位
  • 机器人地面站-[QGroundControl源码解析]-[9]-[Camera]

    目录 前言 一 QGCCameraManager 二 QGCCameraIO 三 QGCCameraControl 前言 本篇介绍Camera文件夹下的内容 xff0c 该文件夹下又三个类文件 xff0c 分别是QGCCameraManag
  • FREERTOS源码解析——heap4.c

    目录 内存管理heap4无法解决的内存碎片 xff1a HEAP4简析分配内存在哪 xff0c 大小多少如何管理 重要源码解析 内存管理 freertos目前提供了以下5种内存管理 xff0c 特点如下 heap1 xff1a 最简单的内存
  • DataBinding源码解析

    DataBinding是Google发布的支持库 xff0c 它可以实现UI组件及数据源的双向绑定 使用DataBinding可以轻松实现MVVM模式 xff0c 当数据发生变化时会体现在View界面上 xff0c 反过来界面内容变化也会同
  • Android-Handler源码解析-Looper

    成员变量 Log的TAG private static final String TAG 61 34 Looper 34 线程本地变量 xff0c 保证了每个线程仅有唯一的Looper对象 64 UnsupportedAppUsage st
  • OkHttp-ConnectInterceptor源码解析

    ConnectInterceptor源码解析 本文基于okhttp3 10 0 1 概述 ConnectInterceptor主要是用于建立连接 xff0c 并再连接成功后将流封装成对象传递给下一个拦截器CallServerIntercep
  • OutLine源码解析 -- 为什么要尽量避免使用OutLine

    相信很多人在刚入职Unity的时候都被告诫过尽量避免使用OutLine xff0c 只知道它很费性能 xff0c 但是很多人并不知道它为什么很费性能 今天通过源码来探索一下 首先看一下OutLine cs里的源码 public overri
  • Axios源码解析(部分)

    从 Github 上把 Axios 项目的 master 分支拷贝到本地 xff0c 用编辑器打开项目目录 首先我们先解析一下整个 Axios 项目的一些关键的文件结构 对照项目的文件目录 xff0c 梳理一下其中的一些关键文件夹以及它的作
  • FreeRTOS源码解析 -> vTaskDelete()

    vTaskDelete API 函数 任务可以使用API函数vTaskDelete 删除自己或其它任务 任务被删除后就不复存在 xff0c 也不会再进入运行态 空闲任务的责任是要将分配给已删除任务的内存释放掉 因此有一点很重要 xff0c
  • Tracealyzer for FreeRTOS(FreeRTOS+Trace) 详解(源码解析+移植)

    原文 xff1a http blog csdn net zcshoucsdn article details 54670473 最近公司搞新项目 xff0c 需要使用FreeRTOS xff0c 之前只有Linux的基础 xff08 学了个
  • MSCKF 源码解析 一

    论文 xff1a https arxiv org abs 1712 00036 源码路径 https github com daniilidis group msckf mono 源码框架 上图展示了整个msckf源码框架 xff0c 每当
  • pomelo源码解析--新建项目(cli工具: pomelo)

    pomelo怎么新建项目 官方文档 1 安装pomelo 2 新建项目HelloWorld 我简单整理了下创建新项目关键步骤 xff1a 安装pomelo 方式一 xff1a npm install pomelo g 方式二 xff1a g
  • Pytorch学习(3) —— nn.Parameter nn.ParameterList nn.ParameterDict 源码解析

    为了更好理解Pytorch基本类的实现方法 xff0c 我这里给出了关于参数方面的3个类的源码详解 此部分可以更好的了解实现逻辑结构 xff0c 有助于后续代码理解 xff0c 学pytorch的话这个不是必须掌握的 xff0c 看不懂也没
  • 对象池(连接池):commons-pool2源码解析:GenericObjectPool的returnObject方法解析

    为什么会有对象池 在实际的应用工程当中 存在一些被频繁使用的 创建或者销毁比较耗时 持有的资源也比较昂贵的一些对象 比如 数据库连接对象 线程对象 所以如果能够通过一种方式 把这类对象统一管理 让这类对象可以被循环利用的话 就可以减少很多系
  • LevelDB源码解析(2) SkipList(跳跃表)

    你也可以通过我的独立博客 www huliujia com 获取本篇文章 背景 SkipList是LevelDB的MemTable使用的底层存储结构 LevelDB实现了一个支持泛型的跳跃表 本文不会具体介绍跳跃表的数据结构 如果读者不了解
  • aivms--CentOS7.6安装/JDK1.8/ThingsBoard CE /PostgreSQL

    先决条件 yum install y nano wget yum install y https dl fedoraproject org pub epel epel release latest 7 noarch rpm 1 安装JDK8
  • xxl-job(2.4.1)使用spring-mvc替换netty的功能

    xxl job 2 4 1 使用spring mvc替换netty的功能 1 xxl job core引入spring mvc的依赖
  • Android 相机库CameraView源码解析 (三) : 滤镜相关类说明

    1 前言 这段时间 在使用 natario1 CameraView 来实现带滤镜的 预览 拍照 录像 功能 由于 CameraView 封装的比较到位 在项目前期 的确为我们节省了不少时间 但随着项目持续深入 对于 CameraView 的

随机推荐

  • Linux centos升级nodejs,解决升级NodeJS遇到的问题,升级GLIBC、GLIBCXX、gcc(含资源包下载)

    公司网站用的Nuxt开发的 xff0c 本地开发环境NodeJS已经升级到16 14 2版本 xff0c 服务器也要从12版本升级到16 14 2 如需本次安装的资源 xff0c 请下滑到文章下面下载整套资源 NodeJS版本下载地址 xf
  • 关于UEFI引导的理解

    UEFI 和 Legacy区别 UEFT和Legacy是引导模式 xff0c 是用来引导系统的 按下开机键到看到windows标识 Legacy 传统BIOS模式 xff0c 启动顺序 xff1a 开机 gt BIOS初始化 gt BIOS
  • IDEA license server 地址

    旧地址 xff1a http jetbrains license server 新地址 xff1a http fls jetbrains agent com
  • 线性探测再散列

    哈希表又称散列表 哈希表存储的基本思想是 xff1a 以数据表中的每个记录的关键字 k为自变量 xff0c 通过一种函数H k 计算出函数值 把这个值解释为一块连续存储空间 xff08 即数组空间 xff09 的单元地址 xff08 即下标
  • 特征选择的几种方法

    目录 1 过滤法 xff08 Filter xff09 1 1 方差选择法 1 2 相关系数法 1 3 卡方检验 1 4 互信息法 1 5 relief算法 2 包裹法 xff08 Wrapper xff09 2 1 递归特征消除法 2 2
  • Excel调用有道词典实现批量翻译

    如图所示 xff0c 我们在B2单元格中写入公式 xff1a 61 FILTERXML WEBSERVICE 34 http fanyi youdao com translate amp i 61 34 amp A2 amp 34 amp
  • Python的使用技巧:any all的短路

    注意迭代类型和list的结果是不一样的 xff1a if name 61 61 39 main 39 a 61 1 2 3 if any print i is None for i in a print 6666666666 1 2 3 6
  • curl升级到7.87(centos7和TencentOS2.4 tk)

    centos7升级curl到7 8 7 按照之前写过的一篇文章 大致按描述操作即可 只不过需要做一点点修正 CentOS 7升级curl 乐大师的博客 CSDN博客 centos7 curl升级 更新操作中会报错安装失败 提示如下 nbsp
  • Python中raise…from用法

    本来这几天是计划阅读string模块的源码 xff0c 恰好其中一段异常处理的代码我觉得很新奇 xff0c 也就是raise from的用法 xff0c raise的用法大家都知道 因为我之前没遇到过 xff0c 所以就去网上查了相关的资料
  • AI模型隐私风险及防护技术

    一 背景 随着AI成为新一代关键技术趋势 xff0c 围绕着AI的服务也越来越普及 特别是结合了云计算以后 xff0c 机器学习数据的标注 模型训练及预测等服务纷纷上云 xff0c 为用户提供了强大的算力和优秀的算法 xff0c 极大方便了
  • 汉诺塔的图解递归算法

    一 xff0e 起源 xff1a 汉诺塔 xff08 又称河内塔 xff09 问题是源于印度一个古老传说的益智玩具 大梵天创造世界的时候做了三根金刚石柱子 xff0c 在一根柱子上从下往上按照大小顺序摞着64片黄金圆盘 大梵天命令婆罗门把圆
  • 推荐系统中的矩阵分解总结

    最近学习矩阵分解 xff0c 但是学了好多种类 xff0c 都乱了 xff0c 看了这篇文章 xff0c 系统性的总结了矩阵分解 xff0c 感觉很棒 xff0c 故分享如下 前言 推荐系统中最为主流与经典的技术之一是协同过滤技术 xff0
  • 几种常见的离群点检验方法

    在一组平行测定中 xff0c 若有个别数据与平均值差别较大 xff0c 则把此数据视为可疑值 xff0c 也称离群值 如果统计学上认为应该舍弃的数据留用了 xff0c 势必会影响其平均值的可靠性 相反 xff0c 本应该留用的数 据被舍弃
  • Spring框架介绍及使用(一)

    文章目录 概念为什么要用 xff1f Spring的体系结构Spring框架之控制反转 xff08 IOC xff09 概念Spring文件包解释入门程序入门程序需要的jar包配置文件入门程序的建立ApplicationContext与Be
  • SpringMVC 相关配置

    SpringMVC 相关配置 打印请求与响应日志 打印 64 RequestBody 64 Response日志 https blog csdn net ww 1997 article details 116006445 https www
  • 普通表到分区表转换

    A 通过 Export import 方法 B 通过 Insert with a subquery 方法 C 通过 Partition Exchange 方法 D 通过 DBMS REDEFINITION 方法 比如把test用户下的普通表
  • Ubuntu 20.04 上安装 Node.js 和 npm 的三种方法

    主要介绍三种在 Ubuntu 20 04 上安装 Node js 和 npm 的方法 xff1a 通过Ubuntu标准软件库 这是最简单的安装方法 xff0c 并且适用大多数场景 但是标准软件库中最高版本只有 v10 19 0 root 6
  • android databinding 数据绑定错误 错误:任务':app:compileDebugJavaWithJavac' 的执行失败

    今天到公司照常打开项目 xff0c 突然运行不了显示databinding错误 Error Execution failed for task 39 app compileDebugJavaWithJavac 39 gt android d
  • 解决idea新建Module的奇怪路径问题

    问题由来 xff1a 在部署SpringCloud的时候想新建一个module来快速创建 xff0c 结果被创建出来的目录结构搞得一脸懵逼 xff0c 新建的module的根目录跑到了 xff0c 项目的src目录下 xff0c 整个看起来
  • ThingsBoard源码解析-数据订阅与规则链数据处理

    前言 结合本篇对规则链的执行过程进行探讨 根据之前对MQTT源码的学习 xff0c 我们由消息的处理入手 org thingsboard server transport mqtt MqttTransportHandler void pro