一、背景
目前实时计算的流程是通过把业务表的 binlog 同步 kafka,flink job 根据具体逻辑对数据流进行处理,并 sink 到统计结果表。 那么,在产品调整统计逻辑或者代码有 bug 需要修改 flink job 逻辑时,如果修改逻辑需要影响到历史数据时,需要对 sink 表进行清空,重新把 flink job 消费位点重置进行重跑获得准确的统计结果。 业务是直接通过 jdbc 读取 sink 表的数据,如果清空 sink 表会影响业务的查询结果。
二、方案设计
为了减小对业务的影响,可以通过降低数据的一致性来提高可用性,具体的做法有几个方案
方案一
找一个夜深人静的时间段,把旧 flink job 下线,再把 sink 表 truncate,启动新 flink job,等数据追上来,在 truncate 表到数据追上来这段时间业务不可用。
方案二
先走流水线,让 DBA 把需要调整的统计表dump下来,生成对应的 backup table,实现一个通过配置文件动态切换 sql 表名的拦截器,在需要更新 flink job 逻辑的时候,启动拦截器的开关(配置文件热更新),
在 mybatis 执行 sql 前的 prepare 阶段把代理的 sql 改成基于对应的备份表,从而让业务无感的升级,下一步把旧 flink job 停止,把旧 sink 表进行 truncate,启动新 flink job,等待新 flink job 数据追上来,关掉拦截器的开关,业务无感的切换的新逻辑的数据。
方案三
大体同方案二,具体区别是把切换表名的拦截器换成配置文件,每次查询统计表的 sql from 的表名通过配置文件获取,从而达到和方案二一样的效果。
缺点:需要在每个查询 sql 提供参数的方式传入表名(重复开发)。
方案四
跟业务方无关的一个方案,首先这是一个低频是事情,业务方不做设计,只在变动 flink job 的时候做一些流程上的设计,原来的 sink A 表先不动,启动一个新逻辑的 flink job,输出到新的 sink B 表(等数据追上来),停调原来的 job 和新的 job,把 sink B 表 rename 为 sinkA 表,修改新逻辑的 sink 数据源表为(sinkB->sinkA),启动新逻辑的 job
三、落地方案
综合上面的方案对比和分析,选择方案二进行落地,
具体拦截器的实现为
const val FLINK_SINK_BACKUP_ENABLED = "flink.sink.backup.enabled"
const val FLINK_SINK_BACKUP_TABLE_REL = "flink.sink.backup.tableRel"
/**
* 使用备份表拦截器
*
* 在开关开启时,通过配置文件获取需要替换的表名,替换sql
*
* @author wangchang on 2022/03/24
*/
@Intercepts(Signature(type = StatementHandler::class, method = "prepare", args = [Connection::class, Integer::class]))
class UsingBackupTableInterceptor : Interceptor {
private val logger = LoggerFactory.getLogger(UsingBackupTableInterceptor::class.java)
override fun intercept(invocation: Invocation): Any {
// 读备份表开关
val backupEnabled: Boolean = ConfigService.getAppConfig().getBooleanProperty(FLINK_SINK_BACKUP_ENABLED, false)
if (!backupEnabled) {
return invocation.proceed()
}
// 获取配置的备份表映射
val backupTableRel: String = ConfigService.getAppConfig().getProperty(FLINK_SINK_BACKUP_TABLE_REL, "")
val backupTableRelMap = getBackupTableRelMap(backupTableRel)
if (backupTableRelMap.isEmpty()) {
return invocation.proceed()
}
val statementHandler = invocation.target as StatementHandler
// 保存会话信息
var metaStatementHandler = SystemMetaObject.forObject(statementHandler)
// 分离代理对象链(由于目标类可能被多个插件拦截,从而形成多次代理,通过下面的两次循环,可以分离出最原始的的目标类)
while (metaStatementHandler.hasGetter("h")) {
val h = metaStatementHandler.getValue("h")
metaStatementHandler = SystemMetaObject.forObject(h)
}
// 分离最后一个代理对象的目标类
while (metaStatementHandler.hasGetter("target")) {
val target = metaStatementHandler.getValue("target")
metaStatementHandler = SystemMetaObject.forObject(target)
}
// 获取原sql
val boundSql = metaStatementHandler.getValue("delegate.boundSql") as BoundSql
// 获取原表&备份表的map 遍历replace
val newSql = getSql(boundSql, backupTableRelMap)
metaStatementHandler.setValue("delegate.boundSql.sql", newSql)
return invocation.proceed()
}
/**
* 原表名#备份表名 逗号隔开
*/
private fun getBackupTableRelMap(backupTableRel: String): Map<String, String> {
val backupTableNameRelMap = mutableMapOf<String, String>()
if (backupTableRel.isNullOrBlank()) {
return emptyMap()
}
backupTableRel.split(",").forEach {
val rel = it.split("#")
if (rel.isNotEmpty() && rel.size == 2) {
backupTableNameRelMap[rel.first()] = rel.last()
}
}
return backupTableNameRelMap
}
/**
* 获取替换后的sql
*
* @param boundSql
* @return
*/
private fun getSql(boundSql: BoundSql, backupTableNameRelMap: Map<String, String>): String {
var sql = boundSql.sql.trim().toLowerCase()
backupTableNameRelMap.forEach {
if (sql.contains(it.key)) {
sql = sql.replace(it.key, it.value)
}
}
return sql
}
override fun plugin(target: Any): Any {
return if (target is StatementHandler) {
Plugin.wrap(target, this)
} else {
target
}
}
}
Q.E.D.