ELT和ETL是数据集成的两种基本方式。前者专注于大数据的实时抽取和可靠传输,后者则包含了更丰富的数据转换功能。由于今天是和AI前线的朋友们一起探讨数据集成,我主要结合AI应用的场景谈谈:为什么ELT是更适合AI应用场景的数据集成方案、采用Kafka技术栈来构建ELT平台所具备的优势和问题以及我们所做的一些优化工作。希望能够对大家的工作和学习有所帮助。

首先,我会介绍一下AI应用中数据集成的典型场景,ETL和ELT两种数据集成模式的异同点,以及为什么AI应用下更适合采用ELT模式。然后,我会花一些篇幅介绍数据集成中需要重点考虑的基本问题,以及我们所采用的底层平台——KafkaConnect在解决这些问题上的优势和局限。
接下来,我会介绍DataPipeline对于KafkaConnect一些优化。有的是从底层做的优化,例如线程池的优化。有的则是从产品特性上的优化,例如错误数据队列。
最后,我们谈一谈KafkaConnect和KafkaStream的结合,以及我们用KafkaStream做数据质量预警方面的一个应用Case。
一、AI应用场景下的数据集成
数据集成是把不同来源、格式、特点性质的数据在逻辑上或物理上有机地集中,为企业提供全面的数据共享。AI是典型的数据驱动应用,数据集成在其中起着关键的基础性作用。
以一个大家所熟悉的在线推荐服务为例,通常需要依赖三类数据:用户的属性(年龄、性别、地域、注册时间等)、商品的属性(分类、价格、描述等)、用户产生的各类行为(登录、点击、搜索、加购物车、购买、评论、点赞、收藏、加好友、发私信等)事件数据。
随着微服务框架的流行,这三类数据通常会存在于不同的微服务中:“用户管理服务”储存着用户的属性、好友关系、登录等数据;“商品管理服务”存储的商品信息;“订单服务”存储着用户的订单数据;“支付服务”存储用户的支付数据;“评论服务”记录着用户的评论和点赞数据。为了实现一个推荐服务,我们首先需要让服务能够访问到这些数据。这种数据访问应该是非侵入式的,也就是说不能对原有系统的性能、稳定性、安全性造成额外的负担。因此,推荐服务不应当直接访问这些分散的数据源,而是应该通过某种方式将这些数据从各个业务子系统中提取出来,汇集到一个逻辑上集中的数据库/仓库,然后才能方便地使用机器学习框架(例如SparkMLlib)来读取数据、训练和更新模型。
ETL和ELT的区别与联系
数据集成包含三个基本的环节:Extract(抽取)、Transform(转换)、Load(加载)。
抽取是将数据从已有的数据源中提取出来,例如通过JDBC/Binlog方式获取MySQL数据库的增量数据;转换是对原始数据进行处理,例如将用户属性中的手机号替换为匿名的唯一ID、计算每个用户对商品的平均打分、计算每个商品的购买数量、将B表的数据填充到A表中形成新的宽表等;加载是将数据写入目的地。
根据转换转换发生的顺序和位置,数据集成可以分为ETL和ELT两种模式。ETL在数据源抽取后首先进行转换,然后将转换的结果写入目的地。ELT则是在抽取后将结果先写入目的地,然后由下游应用利用数据库的聚合分析能力或者外部计算框架,例如Spark来完成转换的步骤。
为什么ELT更适合AI应用场景
为什么说ELT更适合AI的应用场景呢?
首先这是由AI应用对数据转换的高度灵活性需求决定的。绝大多数AI应用使用的算法模型都包括一个特征提取和变换的过程。根据算法的不同,这个特征提取可能是特征矩阵的简单的归一化或平滑处理,也可以是用Aggregation函数或One-Hot编码进行维度特征的扩充,甚至特征提取本身也需要用到其它模型的输出结果。这使得AI模型很难直接利用ETL工具内建的转换功能,来完成特征提取步骤。此外,企业现在很少会从零构建AI应用。当应用包括Spark/FlinkMLlib在内的机器学习框架时,内建的模型库本身往往包含了特征提取和变换的逻辑,这使得在数据提取阶段就做复杂变换的必要性进一步降低;
其次,企业经常会基于同样的数据构建不同应用。以我之前所在的一家在线教育公司为例,我们构建了两个AI的应用:其中一个是针对各类课程的推荐应用,主要用于增加用户的购买转化率。另外一个是自适应学习系统,用于评估用户的知识掌握程度和题目的难度和区分度,从而为用户动态地规划学习路径。两个应用都需要用户属性、做题记录、点击行为以及学习资料文本,但采用的具体模型的特征提取和处理方式完全不同。如果用ETL模式,我们需要从源端抽取两遍数据。而采用ELT模式,所有数据存储在HBase中,不同的应用根据模型需要过滤提取出所需的数据子集,在Spark集群完成相应的特征提取和模型计算,降低了对源端的依赖和访问频次;
最后,主流的机器学习框架,例如SparkMLlib和FlinkMLlib,对于分布式、并行化和容错都有良好的支持,并且易于进行节点扩容。采用ELT模式,我们可以避免构建一个专有数据转换集群(可能还伴随着昂贵的ETL产品License费用),而是用一个通用的、易于创建和维护的分布式计算集群来完成所有的工作,有利于降低总体拥有成本,同时提升系统的可维护性和扩展性。
二、从ETL和ELT面临的主要问题
采用ELT模式,意味着可以较少的关注数据集成过程中的复杂转换,而将重点放在让数据尽快地传输上。然而,一些共性的问题依然需要得到解决:
1.数据源的异构性:传统ETL方案中,企业要通过ETL工具或者编写脚本的方式来完成数据源到目的地同步工作。当数据源异构的时候,需要特别考虑Schema(可以简单理解为数据字段类型)兼容性带来的影响。无论是ETL还是ELT,都需要解决这一问题。
2.数据源的动态性:动态性有两方面含义。一是如何获取数据源的增量;二是如何应对数据源端的Schema变化,例如增加列和删除列。
3.任务的可伸缩性:当面对少量几个数据源,数据增量不过每日几百MB的时候,ELT平台的可伸缩性不是什么大问题。当ELT面对的是成百上千个数据源,或者数据源数据增速很快时,ELT平台的任务水平切分和多任务并行处理就成为一个必备的要求。平台不仅要支持单节点的多任务并行,还需要支持节点的水平扩展。此外,ELT的上游通常会遇到一些吞吐能力较差的数据源,需要能够对读取进行限速,避免对现有业务产生影响。
4.任务的容错性:ELT平台某些节点出现故障的时候,失败的作业必须能够迁移到健康的节点上继续工作。同时,作业的恢复需要实现断点重传,至少不能出现丢失数据,最好能够做到不产生重复的数据。
三、KafkaConnect的架构
KafkaConnect:基于Kafka的ELT框架
可用于构建ELT的开源数据集成平台方案不止一种,较广泛采用的包括KafkaConnect、DataX等,也有公司直接采用Flink等流式计算框架。DataPipeline作为一家提供企业数据集成产品的公司,我们在KafkaConnect之上踩了许多坑并且也做了许多优化。
四、踩过的坑与优化的点
KafkaConnect应用于ELT的关键问题1
下面我们聊一聊KafkaConnect应用过程中的几个关键问题。
首先是任务的限速和数据缓存问题。从KafkaConnect设计之初,就遵从从源端到目的地解耦性。当Source的写入速度长时间大于Sink端的消费速度时,就会产生Kafka队列中消息的堆积。如果Kafka的TopicRetention参数设置不当,有可能会造成数据在消费前被回收,造成数据丢失。KafkaConnect框架本身并没有提供Connector级别的限速措施,需要进行二次开发。
KafkaConnect应用于ELT的关键问题2
当用户有多个数据源,或者单一数据源中有大量的表需要进行并行同步时,任务的并行化问题就产生了。KafkaConnect的rebalance是牵一发动全身,一个新任务的开始和停止都会导致所有任务的reload。当任务数很多的时候,整个KafkaConnect集群可能陷入长达数分钟的rebalance过程。
解决的方法,一是用CDC(ChangeDataCapture)来捕获全局的数据增量;二是在任务内部引入多线程轮询机制,减少任务数量并提高资源利用率。
KafkaConnect应用于ELT的关键问题3
异构数据源同步会遇到Schema不匹配的问题。在需要精确同步的场景下(例如金融机构的异构数据库同步),通常需要CasebyCase的去定义映射规则。而在AI应用场景下,这个问题并不是很突出,模型训练对于损失一点精度通常是可容忍的,一些数据库独有的类型也不常用。
KafkaConnect应用于ELT的关键问题4
Source端需要能够检测到Schema的变化,从而生成具有正确Schema格式的SourceRecord。CDC模式下,通过解析DDL语句可以获取到。非CDC模式下,需要保存一个快照才能够获取到这种变化。
下面我用一些时间对DataPipeline所做的优化和产品特性方面的工作。
DataPipeline是一个底层使用KafkaConnect框架的ELT产品。首先,我们在底层上引入了Manager来进行全局化的任务管理。Manager负责管理SourceConnector和SinkConnector的生命周期,与KafkaConnect的管理API通过REST进行交互。
系统的任何运行异常,都会进行统一的处理,并由通知中心发送给任务的负责人和运维工程师。我们还提供了一个Dashboard,用于图形化方式对任务进行生命周期管理、检索和状态监控。用户可以告别KafkaConnect的命令行。
DataPipeline的任务并行模型
DataPipeline在任务并行方面做了一些加强。在DataPipelineConnector中,我们在每个Task内部定义和维护一个线程池,从而能够用较少的Task数量达到比较高的并行度,降低了rebalance的开销。而对于JDBC类型的Connector,我们额外允许配置连接池的大小,减少上游和下游资源的开销。此外,每个Connector还可以定义自己限速策略,以适应不同的应用环境需求。
DataPipeline的错误队列机制
通过产品中错误队列预警功能,用户可以指定面对错误数据暂存和处理逻辑,比如错误队列达到某个百分比的时候任务会暂停,这样的设置可以保证任务不会因少量异常数据而中断,被完整记录下来的异常数据可以被管理员非常方便地进行追踪、排查和处理。
相比以前通过日志来筛查异常数据,这种错误队列可视化功能能够大大提升管理员的工作效率。
DataPipeline的数据转换
DataPipeline实现了自己的动态加载机制。提供了两种可视化的转换器:基本转换器和高级转换器。前者提供包括字段过滤、字段替换和字段忽略等功能;后者基于Java,可以更加灵活地对数据处理,并且校验处理结果的Schema一致性。DataPipeline还提供了数据采样和动态调试能力,方便用户进行表级别的转换规则开发。
值得注意的是,Kafka不仅仅是一个消息队列系统,本身也提供了持久化能力。一个很自然的问题就是:能否不额外引入Sink端的外部存储,直接从Kafka中获取训练数据?
如果模型本身要用到某个Topic的全量数据或者最近一段时间的数据,那么通过设置合适的retention参数,可以直接将Kafka作为训练数据的来源。Kafka的顺序读模式可以提供非常高的读取速度;如果模型要根据消息的内容做数据筛选,那么由于Kafka本身并不提供检索能力,需要遍历所有消息,这样就显得比较低效了。
当模型用于线上时,可能还需要引入流式计算来完成实时特征的提取工作。Kafka本身就提供了这种流式计算能力。
流式计算在ELT中的作用-数据质量预警
DataPipeline也将流式计算引入到平台的质量预警功能中。在我们的未来版本中,用户可以定义Topic级别的质量预警规则模型,例如“在5分钟时间内,数据记录的字段1均值超过历史均值记录的比率超过70%”为异常,采取策略为“告警并暂停同步”。通过这种方式,可以在ELT的过程中,尽早发现数据中的异常现象,避免大量异常数据进入数据目的地。
五、总结与展望
最后总结一下。数据集成并不是什么新的概念,在过去二十多年间已经广泛应用于各个行业的信息系统。ELT和ETL相比,最大的区别是“重抽取和加载,轻转换”,从而可以用更简单的技术栈、更轻量的方案搭建起一个满足现代企业应用的数据集成平台。AI应用内在的特点也使得ELT特别适合这个场景。

KafkaConnect本身是一个业界被广泛采用的ELT框架,针对容错、分布式、Schema一致性等方面都提供了良好的支持,同时有大量的社区和商业资源可供参考和选择。DataPipeline基于KafkaConnect做了大量数据集成场景下的优化,与KafkaStream相结合,能够为包括AI在内的各种应用场景构建起一个完整的数据层支撑方案。
(本文来源于网络,由千家智客进行整理编辑,如有侵权,请联系删除。)
猜你喜欢:
参与评论 (0)