以下的分析环境基于内存消息队列和无注册中心配置以及按照默认配置

Clustering mode

官网对集群模式有一部分介绍这可以帮助我们理解代码为什么会这么做:

ThingsBoard adopts consistent hashing to ensure scalability and availability. Message from Device A that is received on a particular node may be forwarded to the other node based on the hash of the device ID. Although this introduces certain networking overhead, it allows to process all messages from a particular device using corresponding device actor on a determined server, which introduces the following advantages:
improve cache hit rate. Device attributes and other device related data are fetched by device actor on a specific server.
avoid race conditions. All messages for a particular device are processed on a determined server.
allows targeting server-side api calls based on the device id.

TbServiceInfoProvider

DefaultTbServiceInfoProvider init()方法由@PostConstruct标记,spring启动的时候会自动调用该方法:

public void init() {
        if (StringUtils.isEmpty(serviceId)) {
            try {
                //获取本机的HostName作为serviceId
                serviceId = InetAddress.getLocalHost().getHostName();
            } catch (UnknownHostException e) {
                serviceId = org.apache.commons.lang3.RandomStringUtils.randomAlphabetic(10);
            }
        }
        log.info("Current Service ID: {}", serviceId);
              //serviceType是配置文件下service.type的值,默认为monolith
            //serviceTypes将会是一个List包含TB_CORE, TB_RULE_ENGINE, TB_TRANSPORT, JS_EXECUTOR ①
        if (serviceType.equalsIgnoreCase("monolith")) { 
            serviceTypes = Collections.unmodifiableList(Arrays.asList(ServiceType.values()));
        } else {
            serviceTypes = Collections.singletonList(ServiceType.of(serviceType));
        }
        ServiceInfo.Builder builder = ServiceInfo.newBuilder()
                .setServiceId(serviceId)
                .addAllServiceTypes(serviceTypes.stream().map(ServiceType::name).collect(Collectors.toList()));
        UUID tenantId;
            //tenantIdStr是配置文件中service.tenant_id的值,默认情况下为空,isolatedTenant也就为空了
        if (!StringUtils.isEmpty(tenantIdStr)) {
            tenantId = UUID.fromString(tenantIdStr);
            isolatedTenant = new TenantId(tenantId);
        } else {
            tenantId = TenantId.NULL_UUID;
        }
            //返回此 uuid 的 128 位值中的最高有效 64 位和最低64位
        builder.setTenantIdMSB(tenantId.getMostSignificantBits());
        builder.setTenantIdLSB(tenantId.getLeastSignificantBits());
                //ruleEngineSettings是一个TbQueueRuleEngineSettings的一个实例,读取queue.rule-engine下的值
            //ruleEngineSettings包含topic是tb_rule_engine,queue队列有三个分别是②:
            // 1. name: Main topic: tb_rule_engine.main partition: 10
            // 2. name: HighPriority topic: tb_rule_engine.hp partition: 10
              // 3. name: SequentialByOriginator topic: tb_rule_engine.sq partition: 10
        if (serviceTypes.contains(ServiceType.TB_RULE_ENGINE) && ruleEngineSettings != null) {
            for (TbRuleEngineQueueConfiguration queue : ruleEngineSettings.getQueues()) {
                TransportProtos.QueueInfo queueInfo = TransportProtos.QueueInfo.newBuilder()
                        .setName(queue.getName())
                        .setTopic(queue.getTopic())
                        .setPartitions(queue.getPartitions()).build();
                builder.addRuleEngineQueues(queueInfo);
            }
        }
        serviceInfo = builder.build();
    }

PartitionService

PartitionService的默认实现是HashPartitionService:

 @PostConstruct
    public void init() {
        //根据queue.partitions.hash_function_name的配置选择以后做partition的hash方法,默认值是murmur3_128
        this.hashFunction = forName(hashFunctionName);
        //ConcurrentMap partitionSizes
        //ServiceQueue 类成员ServiceType和字符串类型的queue name,构造函数如果不提供queue name或者queue name是null的话,ServiceQueue对象的的queue name是"Main"
      //corePartitions 是queue.core.partitions默认值10
        partitionSizes.put(new ServiceQueue(ServiceType.TB_CORE), corePartitions);
      //coreTopic对应的配置文件键是queue.core.topic,默认值tb_core
        partitionTopics.put(new ServiceQueue(ServiceType.TB_CORE), coreTopic);
      //根据DefaultTbServiceInfoProvider②的分析可以得出partitionTopics,partitionSizes的具体内容
        tbQueueRuleEngineSettings.getQueues().forEach(queueConfiguration -> {
            partitionTopics.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()), queueConfiguration.getTopic());
            partitionSizes.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()),          queueConfiguration.getPartitions());
        });
    }

DiscoveryService

因为没有使用Zookeeper做注册中心,DiscoveryService的实现由DummyDiscoveryService实现,在收到Spring发送的ApplicationReadyEvent事件后,调用partitionService.recalculatePartitions方法:

public void recalculatePartitions(ServiceInfo currentService, List otherServices) {
    //日志记录
    logServiceInfo(currentService);
    //dummy Discovery将不包含otherService,Zookeeper注册中心的实现将会有otherService
    otherServices.forEach(this::logServiceInfo);
    Map> queueServicesMap = new HashMap();
    //展开ServiceInfo的serviceTypes和RuleEngineQueue,并添加到queueServicesMap
    addNode(queueServicesMap, currentService);
    for (ServiceInfo other : otherServices) {
        addNode(queueServicesMap, other);
    }
    queueServicesMap.values().forEach(list -> list.sort((a, b) -> a.getServiceId().compareTo(b.getServiceId())));

    ConcurrentMap> oldPartitions = myPartitions;
    TenantId myIsolatedOrSystemTenantId = getSystemOrIsolatedTenantId(currentService);
    myPartitions = new ConcurrentHashMap();
   //创建了ServiceQueueKey和partitionIndex的list组合
    partitionSizes.forEach((serviceQueue, size) -> {
        ServiceQueueKey myServiceQueueKey = new ServiceQueueKey(serviceQueue, myIsolatedOrSystemTenantId);
        for (int i = 0; i  new ArrayList()).add(i);
            }
        }
    });

    oldPartitions.forEach((serviceQueueKey, partitions) -> {
        if (!myPartitions.containsKey(serviceQueueKey)) {
            log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", serviceQueueKey);
            applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceQueueKey, Collections.emptySet()));
        }
    });
   //发送PartitionChangeEvent,创建的TopicPartitionInfo的topic是partitionTopics的topic,fullTopicName是topic+Index, DummyDiscovery每次tpiList包含了所有的partitionIndex, 此例0-9.例:tpiList其中的fullTopicName是tb_core.9,tb_rule_engine.main.3
    myPartitions.forEach((serviceQueueKey, partitions) -> {
        if (!partitions.equals(oldPartitions.get(serviceQueueKey))) {
            log.info("[{}] NEW PARTITIONS: {}", serviceQueueKey, partitions);
            Set tpiList = partitions.stream()
                    .map(partition -> buildTopicPartitionInfo(serviceQueueKey, partition))
                    .collect(Collectors.toSet());
            applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceQueueKey, tpiList));
        }
    });
    tpiCache.clear();

    if (currentOtherServices == null) {
        currentOtherServices = new ArrayList(otherServices);
    } else {
        Set changes = new HashSet();
        Map> currentMap = getServiceKeyListMap(currentOtherServices);
        Map> newMap = getServiceKeyListMap(otherServices);
        currentOtherServices = otherServices;
        currentMap.forEach((key, list) -> {
            if (!list.equals(newMap.get(key))) {
                changes.add(key);
            }
        });
        currentMap.keySet().forEach(newMap::remove);
        changes.addAll(newMap.keySet());
        if (!changes.isEmpty()) {
            applicationEventPublisher.publishEvent(new ClusterTopologyChangeEvent(this, changes));
        }
    }
}

DefaultTbCoreConsumerService

ThingsBoard 二次开发之源码分析2-启动分析
image.png

DefaultTbRuleEngineConsumerService

ThingsBoard 二次开发之源码分析2-启动分析
image.png
public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
    //此时serviceType是ServiceType.TB_RULE_ENGINE,过滤了其他的type
    if (partitionChangeEvent.getServiceType().equals(getServiceType())) {
        ServiceQueue serviceQueue = partitionChangeEvent.getServiceQueueKey().getServiceQueue();
        log.info("[{}] Subscribing to partitions: {}", serviceQueue.getQueue(), partitionChangeEvent.getPartitions());
        //根绝queueName做区分,让三个consumer分别订阅了三个主题的列表tb_rule_engine.main.0-9,tb_rule_engine.sq.0-9,tb_rule_engine.hp.0-9
        consumers.get(serviceQueue.getQueue()).subscribe(partitionChangeEvent.getPartitions());
    }
}

通过上面类的初始化一系列过程,我们有了大概的印象:

本文章来源于互联网,如有侵权,请联系删除!原文地址:ThingsBoard 二次开发之源码分析2-启动分析