本文由 Qwen3 Max 翻译
TL;DR
使用 Guice 构建依赖图:
1 2 3 4 5 6 7 8 9 10 11
| public class DatabaseModule extends AbstractModule { @Provides @Singleton public DataSource provideDataSource() { return new HikariDataSource(); }
@Provides @Singleton public UserRepository provideUserRepository(DataSource dataSource) { return new UserRepositoryImpl(dataSource); } }
|
创建单例注入器:
1 2 3 4 5 6 7 8 9 10 11
| public class AppInjector { private static class Holder { static final Injector INJECTOR = Guice.createInjector(new DatabaseModule()); }
private AppInjector() {}
public static void injectMembers(Object instance) { Holder.INJECTOR.injectMembers(instance); } }
|
在 Flink 函数中使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class UserMapper extends RichMapFunction<Long, User> { @Inject transient UserRepository userRepository;
@Override public void open(Configuration parameters) throws Exception { AppInjector.injectMembers(this); }
@Override public User map(Long userId) throws Exception { Objects.requireNonNull(userId, "用户 ID 为空"); return userRepository.getById(userId).orElseThrow(() -> new RuntimeException("未找到用户")); } }
|
动机
依赖注入(Dependency Injection,简称 DI)是 Java 编程中的常见实践,尤其当你有 Spring 背景时更是如此。最直接的好处是可测试性,即你可以用测试桩(stub)替换类的实现。其他好处还包括关注点分离、更好的类层次结构、控制反转等。组件通过类构造函数或带注解的成员声明其依赖,而 DI 框架则创建一个容器(或上下文)来正确地连接这些组件。该上下文通常在应用启动时创建,并贯穿整个应用生命周期。一些例子包括 Spring 的 ApplicationContext
和 Guice 的 Injector
。
Flink 是一个分布式计算框架,通过依赖注入将业务逻辑与框架解耦是有利的。然而,Flink 应用由函数式类组成,这些类在驱动类(即 main
方法)中实例化、序列化后发送到分布式任务管理器。除非我们所有的组件都是可序列化的,否则无法将依赖注入到这些类中。幸运的是,Flink 提供了一个生命周期钩子 open
,它在作业启动时被调用。结合另一种常见模式——单例模式,我们可以让 DI 框架与 Flink 良好协作。
Guice 快速入门
我选择的依赖注入框架是 Guice,因为它简单、轻量且高效。通常我们通过构造函数声明类依赖,将所有组件添加到模块中,然后让 Guice 完成其余工作。
声明依赖
有三种方式为类声明依赖。推荐使用构造函数方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import com.google.inject.Inject;
public class UserRepositoryImpl implements UserRepository { private DataSource dataSource;
@Inject public UserRepositoryImpl(DataSource dataSource) { this.dataSource = dataSource; } }
class UserRepositoryImpl implements UserRepository { @Inject private DataSource dataSource; }
public class UserRepositoryImpl implements UserRepository { private DataSource dataSource;
@Inject public void setDataSource(DataSource dataSource) { this.dataSource = dataSource; } }
|
将组件添加到模块
模块是 Guice 用于配置组件的机制。例如如何初始化组件、哪个具体类实现接口、当存在多个实现时如何处理等。组件被分组到模块中,模块本身也可以组合在一起。这里涉及很多主题,可参考其官方文档,我将介绍一些基本用法。
首先,只要 Guice 能仅凭类类型和注解推断出依赖图,就可以隐式地创建组件。例如:
1 2 3 4 5 6 7 8 9 10
| @ImplementedBy(UserRepositoryImpl.class) public interface UserRepository {}
public class UserRepositoryImpl implements UserRespository { @Inject private HikariDataSource dataSource; }
var injector = Guice.createInjector(); injector.getInstance(UserRepository.class);
|
dataSource
的类型是 HikariDataSource
,这是一个具体类,因此 Guice 知道如何创建它。如果类型是 DataSource
,Guice 将抛出缺少实现的错误。但对于 UserRepository
,由于我们使用了 ImplementedBy
注解声明了实现类,Guice 知道其具体实现。否则,我们需要在模块中声明这种关系:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import com.google.inject.AbstractModule; import com.google.inject.Provides;
public class DatabaseModule extends AbstractModule { @Override protected void configure() { bind(UserRepository.class).to(UserRepositoryImpl.class); } }
public class DatabaseModule extends AbstractModule { @Provides public UserRepository provideUserRepository(UserRepositoryImpl impl) { return impl; } }
var injector = Guice.createInjector(new DatabaseModule()); injector.getInstance(UserRepository.class);
|
这两种方法是等价的。第二种方法可以这样理解:
- 用户请求一个
UserRepository
实例。
- Guice 发现带有
@Provides
注解且返回类型匹配的 provideUserRepository
方法。
- 该方法需要一个
UserRepositoryImpl
参数。
- Guice 隐式创建该实现类的实例,因为它是具体类。
- 方法获取该实例,可能对其进行修改,然后返回给用户。
第二种方法与我们之前使用的略有不同:之前方法的参数是 DataSource
,我们手动创建 UserRepositoryImpl
:
1 2 3 4
| @Provides public UserRepository provideUserRepository(DataSource dataSource) { return new UserRepositoryImpl(dataSource); }
|
在这种情况下,UserRepositoryImpl
中的 @Inject
注解可以省略,因为 Guice 不负责创建该实例,除非你显式地从 Guice 请求一个 UserRepositoryImpl
实例。
在提供方法中,我们可以配置返回的实例:
1 2 3 4 5 6 7 8
| @Provides @Singleton public DataSource provideDataSource() { var config = new HikariConfig(); config.setJdbcUrl("jdbc:mysql://localhost:3306/flink_di"); config.setUsername("root"); config.setPassword(""); return new HikariDataSource(config); }
|
最后,模块可以组合在一起:
1 2 3 4 5 6 7 8 9 10
| public class EtlModule extends AbstractModule { @Override protected void configure() { install(new ConfigModule()); install(new DatabaseModule()); install(new RedisModule()); } }
var injector = Guice.createInjector(new EtlModule());
|
命名与作用域组件
当同一类型存在多个不同配置的实例时,可使用 @Named
注解加以区分。也可以创建自定义注解,或在 AbstractModule#configure
中使用绑定,而非提供方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class DatabaseModule extends AbstractModule { @Provides @Named("customer") @Singleton public DataSource provideCustomerDataSource() { return new HikariDataSource(); }
@Provides @Named("product") @Singleton public DataSource provideProductDataSource() { return new HikariDataSource(); } }
@Singleton public class UserRepositoryImpl extends UserRepository { @Inject @Named("customer") private DataSource dataSource; }
|
数据源和实现类实例都标注了 @Singleton
,表示 Guice 在每次请求时都会返回同一个实例。否则,其行为类似于 Spring 中的原型作用域(prototype scope)。
Flink 流水线序列化
考虑以下简单流水线:将 ID 流转换为用户模型并打印到控制台。
1 2 3 4 5 6 7
| var env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.fromElements(1L); DataStream<User> users = source.map(new UserMapper()); users.print();
env.execute();
|
在底层,Flink 会将该流水线构建成作业图,进行序列化,并发送到远程任务管理器。map
算子接收一个 MapFunction
实现,在本例中是一个 UserMapper
实例。该实例被包装在 SimpleUdfStreamOperatorFactory
中,并通过 Java 对象序列化机制进行序列化。
1 2 3 4 5 6 7 8 9
| public static byte[] serializeObject(Object o) throws IOException { try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos)) { oos.writeObject(o); oos.flush(); return baos.toByteArray(); } }
|
流水线算子最终变成一系列配置的哈希映射,并通过远程调用发送给作业管理器。
1 2 3 4 5
| org.apache.flink.configuration.Configuration { operatorName=Map, serializedUdfClassName=org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory, serializedUDF=[B@6c67e137, }
|
为了让 ObjectOutputStream
正常工作,流水线中的每个类及其成员字段都必须实现 Serializable
接口。对于 UserMapper
,它继承了实现 Serializable
接口的 RichMapFunction
。然而,如果我们添加一个不可序列化的依赖对象,就会发生错误:
1 2 3 4 5 6 7 8 9 10
| public class UserMapper extends RichMapFunction<Long, User> { @Inject UserRepository userRepository; }
var injector = Guice.createInjector(new DatabaseModule()); var userMapper = injector.getInstance(UserMapper.class); DataStream<User> users = source.map(userMapper);
|
这是因为 HikariDataSource
不可序列化。因此,无法通过序列化携带 userRepository
,而应在 UserMapper
被恢复并调用 open
方法后设置它,如本文开头所示。我们添加 transient
关键字,告知 Java 在序列化时不包含该字段。
1 2 3 4 5 6 7 8 9
| public class UserMapper extends RichMapFunction<Long, User> { @Inject transient UserRepository userRepository;
@Override public void open(Configuration parameters) throws Exception { AppInjector.injectMembers(this); } }
|
在 AppInjector
中,我们使用单例模式确保只有一个 Guice 注入器,而 Guice 本身是线程安全的,因此连接池等重量级资源可以在不同的用户自定义函数之间共享。
单元测试
如前所述,依赖注入提高了可测试性。要测试 UserMapper
,我们可以模拟其依赖项,并像普通函数一样进行测试。其他测试技术可参见官方文档。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*;
public class UserMapperTest { @Test public void testMap() throws Exception { var userRepository = mock(UserRepository.class); when(userRepository.getById(1L)) .thenReturn(Optional.of(new User(1L, "jizhang", new Date())));
var userMapper = new UserMapper(); userMapper.userRepository = userRepository; assertEquals("jizhang", userMapper.map(1L).getUsername()); } }
|
参考资料