蘑菇先生学习记

基于ActiveMQ和Redis的分布式车辆实时数据处理

  本次毕业设计论文的题目是基于ActiveMQ和Redis的分布式车辆实时数据处理。阐述围绕四个方面展开,应用背景、相关技术、解决方案、设计实现。

应用背景

  车联网依赖于物联网和传感器的快速发展。物联网使得车与平台、车与人、人与平台三者的交流沟通成为可能;而传感器技术使得车辆位置、状态等数据的采集成为可能。
  数据是车联网平台的核心,车联网平台的业务依赖于对数据的分析、挖掘和展示。因此车联网的一个重要挑战在于海量数据的存储和处理,这也带来了实时性、负载能力等诸多难题。
distributed
  此次分享主要围绕数据处理展开。这也是我的毕设研究内容,基于ActiveMQ和Redis这两个中间件,探讨并构建一套实时性好、响应速度快、负载能力强的车联网分布式车辆实时数据处理解决方案。

难点

  车载终端,也就是传感器上报频率非常高。如果按照30s发送一次实时位置,假设我们支持3万辆车的接入能力,以最大负载来算,所有的车辆全部在线,那么1分钟内产生的实时位置数据为6万条,1小时产生的数据为360万条,1天按10小时算的话,也会产生3600万条数据。实际上,产生的数据远不止车辆实时位置,还包括表征车辆状态的CAN数据等,可见数据量是很惊人的。
  另外,由于我们的云服务平台地图模块是基于Web端设计的,平台的载体是浏览器客户端,浏览器的性能瓶颈使得我们不可能一次性同时将上万级别的数据发送至地图模块处理,因此必须进行数据过滤、处理,达到流量控制的目的。
  全车监控的难点在于车辆数以及产生的位置数据众多,很难实现对所有车进行实时车辆位置的定位。一旦车辆众多,既可能造成数据传输到客户端过程中出现卡死现象,也可能在对车辆进行渲染展示时造成百度地图卡死现象。上图采取聚合操作,聚合会将邻近的车辆聚合在一起,并在聚合中心上显示车辆数,一个聚合中心内可能会有上千辆车。一旦有大量新数据产生,位置移动渲染过程以及聚合计算过程将会给浏览器和地图带来很大的负载压力。假如采取轮询的方式轮询这上万辆车,进行实时位置获取,那么势必更会造成客户端性能下降。如下图所示:
distributed
  而单车监控的难点在于实时性的控制。如下图所示,会放大到最细粒度图层,并实时绘制轨迹。用户可能同时对多辆车进行实时监控。实时监控的难点在于数据的及时性。轮询的方式首先及时性不够,会受到轮询时间间隔的影响,同时仍然会加大客户端的负载,造成用户体验下降。
distributed
  因此针对大数据量导致的客户端负载难点,本文需要考虑对大数据量进行多次的过滤和处理,根据一定的策略实现流量控制的目的。针对实时性要求,本文需要从多方面进行设计,既包括数据采集架构来加快数据采集的及时性;又包括读写吞吐量的提升来加快数据快速处理;同时还包括数据流转的方式设计,抛弃传统的轮询方式,构建消息驱动的方式来监听数据;最后需要设计复杂的地图监控的逻辑,包括车辆移动、聚合、轨迹绘制、全车监控和实时监控切换逻辑等。
  因此,这里的核心问题在于,海量数据实时性处理需求与地图和浏览器客户端性能瓶颈之间的矛盾。本文的目标不仅是能够支持大数据量的处理、存储,还需要实现实时性、地图模块稳定性、平台运行流畅、用户体验好等目标。

核心组件

  为了解决这样的难题,我们的核心组件或需求应该至少包括如下几个部分,
  首先需要一个高效、响应速度快、高可用的数据采集架构,能够应付车载终端源源不断采集的数据。其次需要实现海量的数据存储,一边在不断采集数据,一边就需要保存这些数据。保存数据的目的在于进行离线分析,例如历史轨迹回放,驾驶行为分析等。再进一步,如果要进行在线分析的话,就需要实时性数据处理架构,例如车辆实时监控,远程诊断。中间两个步骤分别对应于离线分析和在线分析需求。处理完之后的数据,需要传输到表现层进行渲染,因此需要一个数据推送便捷、高效的架构,尤其是针对在线分析的功能。最后一步就是Web端渲染展示,例如基于百度地图实现车辆实时监控功能。
distributed

核心技术

  这里涉及两种技术。

ActiveMQ

  ActiveMQ是Apache出品,最流行,能力最强劲的开源消息总线,有两种消息模型,P2P点对点模式和发布订阅模式。
  MQ核心应用场景包括,异步处理、应用解耦、流量削峰、消息通讯。

  • 其中异步处理能够提升系统的响应速度,处在业务处理前端的生产者服务器在处理完业务请求后,将数据写入消息队列,不需要等待消费者服务器处理就可以直接返回,响应延迟减少。
  • 应用解耦能够提高系统的高可用性,即使消费者服务器发生故障,数据也可以在消息队列服务器中存储堆积,生产者服务器可以继续处理业务请求,系统整体表现无障碍,消费者服务器恢复正常后,继续处理消息队列中的数据。
  • 流量削峰能够消除并发访问高峰,它是将突然增加的访问请求数据放入消息队列中,等待消费者服务器依次处理,不会对整个系统负载造成太大压力。
  • 而消息通讯则经常用在广播、通知、聊天等业务中,对于新增的用户,不需要额外增加一个接口来通知,只需要使用发布订阅模型就能轻松扩展。

  在车联网项目中,使用ActiveMQ搭建消息队列服务器,通过异步的方式处理车载终端采集的数据,大幅度提高车载终端的响应能力,随着接入终端数的不断增加,可通过横向扩展,快速提升吞吐量。

Redis

  Redis是一个开源、内存存储的数据结构服务器,可用作数据库,高速缓存、消息队列代理等。

  • 在易用性方面,Redis支持包括字符串,Hash表,链表,集合,有序结合在内的多种数据结构。
  • 读写吞吐量方面,Redis基于内存存储,支持主从模式减少单机器并发量,支持分布式读写分离模型,提供基于管理批量读写。
  • 高可用方面,Redis支持分布式集群部署、Redis主从复制、Sentinel哨兵支持单点故障转移。
  • 最后,拥有众多的优秀客户端,例如Spring-data-redis、jedis等。

  在车联网项目中,将使用Redis构建分布式缓存集群。Redis提供了业务数据的内存存储和批量读写功能,可提高关键数据的快速读写。并提供主从热备、主从热切换功能,保证数据的高可靠性。

解决方案

  该解决方案会围绕此前介绍的核心架构展开。

数据采集架构

  前置机承担数据采集、初步校验解析、下发控制指令等工作,并将数据转发至消息队列服务器。
  而消息队列服务器通过异步处理可提高前置机的响应速度和数据采集能力,随着接入终端数的不断增加,可通过横向扩展,快速的提升吞吐量,因为松耦合的软件体系结构,所以不同的业务服务器只需要采用订阅/发布的方式获取相关数据,并与数据层进行交互。
distributed

数据存储架构

  结合不同的数据特点,采用不同的方式进行存储,如MySql、mongoDB、redis,并且结合不同的使用场景进行存储设计。
  如图MYSQL可用于存储用户数据、车辆数据等平台中比较固定的一些业务数据。Redis缓存使用读写分离模型,适用于少量写,大量读的数据,加快业务的处理。MongoDB采用分表分片模式,用于存储海量的数据,将一些大表根据需要进行拆分,例如车辆状态数据就可能有上百/上千个属性。
  因为异构数据库管理的复杂度较高,所以封装统一的访问层,从外部看,业务层仅连接一个单一的数据源。也就是统一数据访问模块。
distributed

数据处理架构

  传统的方案是基于存储+轮询。构建DBP数据处理模块监听接收MQ中的数据,存储到非关系型数据库MongoDB,WEB端地图模块再通过轮询的方式读取MongoDB中的数据,再进行渲染处理。
  该方案存在着诸多弊端。首先,实时性方面,存储到数据库中,再从数据库中取数据,本身步骤是繁琐多余的,比直接将数据传送到地图模块进行处理慢的多,因此会造成实时性的降低;其次,在性能方面,由于是采用轮询机制,并且需要监控每一辆在线的车,正常情况下是需要为每辆车都进行轮询查看有没有最新消息,因此会受到车辆数的影响,随着车辆的增加,性能越来越低。另外,性能还受到网络的影响,大量的数据库连接请求会降低数据库的吞吐能力。最后,稳定性方面,地图模块的稳定性会受到轮询方式的影响,由于JS是单线程的,随着车辆数的增加,轮询势必会影响地图模块的正常操作,造成用户体验下降。
  本文的方案是基于缓存+消息中间件+websocket技术,能够兼顾负载能力、实时性、稳定性、流畅度等多项目标。
  核心部分包括:

  • 基于Redis构建分布式缓存集群能够加快数据处理。
  • 基于ActiveMQ构建数据处理模块,以消息驱动的方式对数据进行监听处理。
  • 基于Websocket、百度地图实现实时监控,Websocket能够实现数据的主动推送,不需要前端进行轮询。

物理结构

  前置机模块采集车辆的位置、终端告警、CAN数据、行为数据、状态等数据,发送到消息服务器集群的全局队列MQ。这部分对应于前文提到的数据采集架构。
  后台数据处理机部分的数据处理模块以异步的方式监听该队列中的数据,接收下来进行数据解析、过滤、清洗后,调用前文提到的数据存储架构中的统一数据访问模块存储到MongoDB、MYSQL或Redis当中。
  对于在线分析功能,也就是实时监控的位置、告警、CAN状态数据,数据处理模块处理完后,会进行二次转发至ActiveMQ消息服务器的Location、Alarm、CAN主题。Web服务器会以发布订阅的方式监听获取主题里的数据,进行二次处理后,通过Websocket主动将数据推送至地图模块,进而进行监控或渲染。
  我们可以看到这里面数据监听和处理有两处地方,
  第一处是数据处理机中的数据处理模块,该模块承担着数据消费处理的核心任务,必须保证效率,所做的事情很单纯,接收解析数据后,保存到数据库当中。
  第二处是Web服务器的监听模块,这部分主要用于数据的在线分析功能,也就是车辆实时监控、告警、CAN数据等,这里面主要是对数据进行二次处理。
  为什么不直接在数据处理模块进行数据二次处理,并将这些数据使用WEBSOCKET推送到地图模块呢?原因在于效率,如果让数据处理模块承担这么多工作,这部分的消费就会很慢,这是数据处理的第一道门,这部分的效率慢的话,会直接导致后续业务应用的障碍。基于单一职责原则,这部分只负责数据的二次转发,数据二次处理放到WEB服务器进行处理。
distributed

数据处理流程图

  车载终端采集车辆数据第一次转发至MQ,数据处理模块进行第一次流量控制。
  第一次流量控制主要工作是,会排除那些不存在在线监控用户的车辆数据,也就是说该车辆数据不存在监控它的在线用户,那就没必要发送到地图了。
  如何判断某个数据是否存在在线用户监控它呢?这部分就需要Redis来提供大量快速的读操作,用户登录的时候会在车辆和在线用户归属关系的缓存中记录。排除后的数据只剩下存在在线用户监控的车辆数据,数据处理模块会将这部分剩余的数据二次转发到消息服务器中另外的Topic中。WEB应用模块接收下来后会进行第二次流量控制。
  为了介绍第二次流量控制,这里首先介绍一下两种监控场景,全车监控和实时监控。
全车监控在地图级别比较大的时候,邻近的车辆会聚合起来。聚合物上面数字代表这里面聚合的车辆数目。地图放大的时候,车辆会显示出来。实时监控会实时绘制车辆的轨迹图。由于全车监控和单车监控粒度不同,全车监控会因为车辆数众多,导致地图聚合操作,而单车监控会放大到最细粒度图层,并实时绘制轨迹,故二者在粒度、精度、及时性方面要求不同。为了区分这两种方式的监控,在ActiveMQ中专门设置了两种主题,MonitorLocation对应全车监控,RealLocation对应单车实时监控。同样,用户在打开实时监控某辆车时,会在实时监控缓存中记录,因此数据处理模块在进行二次转发的时候,能够将这两种数据区分开来,发送到不同的主题当中。
  在进行第二次流量控制的时候,针对全车监控,根据用户目前的地图级别、用户所处的视野、车辆位移来进行流量控制。首先是用户的视野,比如用户目前视野在福建,其他省份看不清,此时其他省份车辆位置就不需要发送过来进行监控。其次是地图级别,地图级别越大,地图图层越细,车辆移动的效果也就更明显,反之,如果地图级别缩小至全国级别,那么车辆移动很不明显,基本会处在聚合物当中,只有当位移比较大的时候,地图上才会有效果。因此需要根据地图级别以及车辆的位移来进行流量控制。整体策略是,只发送处在用户视野当中,并且位移能够明显被感知的情况。
  而针对实时监控,我们不进行过滤和流量控制,只要存在监控该车的在线用户,则立即发送至前端地图模块进行实时轨迹绘制。
  这样的区分能够完美的支持全车监控和单车监控场景。
  紧接着,设计数据推送模块。对于处理完的数据,通过websocket和stomp.js并集成Spring,实现主动推送至前端地图模块的目的。抛弃了传统的轮询机制,能够大幅提高系统负载能力和性能,满足实时性要求。
  最后,需要基于百度地图实现车辆实时监控,实现整个监控流程,包括车辆位置移动、车辆轨迹绘制、轨迹点保存、车辆聚合、全车监控和单车监控以及相互间的切换逻辑,其中聚合部分,对百度地图中的聚合操作进行优化。
distributed

实现细节

  接着介绍一下实现细节。

基于Redis构建分布式缓存

  首先是基于Redis构建分布式缓存。从缓存集群、高可用架构、缓存API封装、缓存数据结构设计入手。

缓存集群

  首先是缓存集群,Redis提供了主从热备机制,主服务器的数据同步到从服务器,具体而言,采用一主多从的方式,主从之间进行数据同步,主节点接收到写操作后,直接返回成功,然后在后台用异步方式把数据同步到从节点上。主从另一个目的是进行读写分离,这是当单机读写压力过高的一种通用型解决方案。其主机的角色只提供写操作或少量的读,把多余读请求通过负载均衡算法分流到单个或多个slave服务器上。
  Redis支持主从热切换。通过Sentinel哨兵组件实时监控主服务器状态并负责选举主服务器。当发现主服务器异常时根据一定的算法重新选举主服务器,并将问题服务器从可用列表中去除,原来的主节点恢复后以slave的身份重新加入。由于主节点IP发生变化,故在主节点地址发生变化后要及时通知到客户端,客户端收到新地址后,使用新地址继续发送新请求。
distributed

高可用方案

  接着是高可用方案设计。本文高可用方案包括: 构建Redis主从模式集群和Sentinel哨兵集群。使用1个主节点,3个从节点。主节点提供读写操作,从节点只提供读操作。因为数据处理模块是数据采集模块,存在着大量的读写操作,故基于本地化策略,减少网络传输消耗的时间,选择主节点Master安装在dbp模块,其余3个从节点,前期可以安装在单台机器的3个端口,后期测试稳定后,可以将其中一个安装在web端模块所在的服务器,基于本地化策略提供大量读操作,其余两个从节点单独使用两台服务器安装。另外,还需要配置3个哨兵,主节点dbp安装1个哨兵,另外3台从服务器选其中两台各安装一个,构建一个Sentinel集群,作为HA高可用方案,防止主节点单点失败。
distributed

缓存API的封装

  我们的目标是针对Redis缓存,定义一个通用的、轻量级的客户端访问框架,能够提供给业务层不同业务进行调用。
  选择Redis的Java版本客户端Jedis进行API封装,Jedis提供了丰富的Redis底层数据结构操作方法并且Jedis能够配置连接池,也能够使用sentinel机制,通过连接sentinel集群支持HA高可用方案。
  为了进一步简化API的封装以及集成Spring应用,我们选择组件spring data redis作为更上层的接口封装工具。并和jedis配套使用,spring-data-redis使用jredis作为底层连接Redis集群的工具。简化了大量复杂的数据操作方法。
  更高层次的抽象,需要我们自己来定义。本文针对Redis不同的数据结构分别定制了RedisStringUtil,RedisSetUtil,RedisMapUtil,RedisListUtil四种工具类的封装,能够实现单条或批量操作。并支持自定义序列化方法。
  这是部分API工具。RedisBaseUtil定义一些通用的操作。
distributed

缓存数据结构

  缓存数据结构强调一点。对于那些频繁读写某个类的部分字段的缓存。推荐使用hash结构。例如终端快照,,可支持单独获取和修改某个或某些属性。如果大家听说过memcache的话,会发现在memcahche如果要实现这点,必须要把整个对象取出来,反序列化后,获取某个字段,再修改对象字段,序列化后存到缓存中,效率极其低下。Redis只需要一步,单独修改该字段即可。

基于ActiveMQ构建实时数据处理模块

  该部分是针对二次转发的主题中的数据进行接收处理,包括消息驱动实现数据监听,数据处理类图设计,全车监控流量控制。

消息驱动

  消息驱动主要是介绍Spring和MQ,Websocket集成。整体流程如图所示。具体配置代码也很简单。
distributed

数据处理类图设计

  数据处理部分主要介绍使用设计模式来设计类之间的关系。为了实现低耦合、高内聚的设计,我们将数据接收、数据处理、数据发送逻辑独立出来。
  其中最后一步数据发送逻辑是由上文中提到的spring-websocket组件SimpMessageSendingOperations来充当,只需要将其注入到数据处理类中,就可以直接调用方法convertAndSendToUser发送到前端进行渲染展示。
  数据接收方面,我们使用MessageListenerAdapter来作为数据监听器代理类,然后委托给我们自定义的不同监听器类来执行,不同的监听器监听不同的主题,在这里,我们使用VehicleTrackListener来监听全车监控队列MonitorLocation,使用RealLocationListener来监听单车实时监控队列RealLocation,使用CanListener来监听CAN数据队列,AlarmListener来监听Alarm告警数据队列。监听器只做一件最简单的事情,就是接收数据。后续扩展非常方便,自定义监听器就可以。
  数据处理方面,我们定义数据处理抽象父类MessageDispatcher,以及不同的子类,VehicleTrackDispatcher对应于全车监控数据处理,RealLocationDispatcher对应于单车监控数据处理,CanDispathcer对应于CAN数据处理,AlarmDispatcher对应于告警数据处理。
其中,MessageDispatcher父类定义了数据处理模板方法sendMessage,sendMessage定义了整个数据处理框架,首先获取所有的在线用户列表,然后遍历每条数据,获取有权限查看该车辆数据的所有在线用户,如果存在这样的在线用户,并且isDispatcher方法返回True,则进行发送。
  isDispatcher是抽象方法,用于判断是否需要发送该数据,不同子类进行重载,根据不同数据各自的处理细节进行定制,例如全车监控需要根据用户视野和地图级别进行监控,单车监控则不进行拦截,默认返回True即可。
  这部分扩展也非常方便,假如需要处理新的类型的数据。定义新的子类继承父类,如果处理逻辑类似,那么就可以共用sendMessage逻辑。重载抽象方法isDispatcher方法即可。如果不同逻辑的话,直接覆盖父类的sendMessage方法,自定义即可。
distributed

全车监控流量控制

  接下来是全车流量控制,其中重点部分是视野和位移逻辑。全车监控流量控制的方法是综合考虑用户所在的地图视野、地图级别、地图聚合功能、前后两次车辆位移。首先是判断车辆位置是否处在用户视野范围,进一步判断移动位移是否足够大。
distributed

基于Websocket/百度地图实现车辆监控

websocket

  使用Websocket推送数据到前端,前端使用Stomp.js、Socket.js实现数据接收。这里面是使用Stomp协议进行数据传输的,Spring很容易进行配置,参考下官方文档。

初次加载性能优化

  接着介绍下初次加载性能优化,我们的需求是在用户登录时,显示其视野内,有权限查看的所有车辆的最后一次位置。由于当时开发时MongoDB不支持这样复杂的批量查询,至于现在支不支持,没有查过。如果要查询的话,只能一条一条查询,性能很差。全车监控在实时性方面,只要用户在线的时候,视野内的、位移明显的最新数据能够实时展示即可,而对于初次登录的最后一次视野范围内车辆位置的精度要求不是非常高。
  我们的解决方案是借助MYSQL批量查询功能,使用Quartz定时任务,在后台每30分钟将MongoDB中所有车辆的最后一次位置保存到MySQL终端快照表中,这样终端快照表中的数据就是最新的位置数据。初次加载地图时,我们只需要将车辆用户归属关系的表和终端快照表做一次连接,加上视野范围的限制条件,就能一次性批量查询出用户有权限查看的、最后一次视野范围内的车辆位置。经过这样的设计,能够实现秒级的响应性能。

地图聚合优化

  聚合功能是将邻近的车辆图标聚合在一起,减少地图上的图标数量,从而减轻地图操作,最终减轻负载的一种功能。
  百度地图提供了相应的聚合功能,但是在我们实际测试过程中,发现百度地图聚合功能性能极差,在最新版的谷歌浏览器中测试,5000个点左右,对地图进行放大缩小操作,响应时间在3s左右,10000点响应时间在7s左右,30000点响应时间在20s左右,性能表现极差。我们平台目前拥有3万辆车,我们的目标是在3万辆车同时展示的最大负载情况下,依然能够实现秒级以内的性能体验,也就是说用户对地图的基础操作,应该在1s左右就能够得到响应,这样才能提高用户的体验。为了实现这样的性能提升,我们基于百度聚合功能进行性能优化。

总结

  本文重点研究基于ActiveMQ和Redis构建分布式车辆实时数据处理环境,并实现车联网平台的核心业务,实时监控模块。
  车联网平台大数据实时处理是一种很棘手的挑战,本文中提到的实时数据处理方法是众多应对方法中的一种。我认为本课题可以通过尝试更多方法对大数据实时处理方法进行深入探讨和研究具体方法包括:

  • 基于Storm实现实时流数据处理。Storm是一个在线实时、分布式以及具备高容错的计算系统。随着车联网平台数据进一步增加,我们可以通过搭建Storm集群,基于分布式算法来对车辆数据进行实时处理分析。
  • 基于Spark构建大数据实时处理环境。Spark实现了内存级别的分布式处理模式,使用户无需关注复杂的内部工作机制,无需具备丰富的分布式系统知识及开发经验,即可实现大规模分布式系统的部署与大数据的并行处理。
    具体改进上,可以将本文中提到的,基于web服务器监听方式的数据处理方法单独隔离出来,使用storm或spark分布式集群来处理,这样能够减轻web服务器的压力。

  总之,希望通过本文的研究,能够提供一种车联网实时数据处理的解决方案。

坚持原创技术分享,您的支持将鼓励我继续创作!