# 如何实现注解 RPC Consumer属性动态注入

作者:Tom哥
公众号:微观技术
博客:https://offercome.cn (opens new window)
人生理念:知道的越多,不知道的越多,努力去学


分布式系统架构时代,RPC框架你一定不会陌生。目前主流的RPC框架有 dubbo、thrift、motan、grpc等。

消费端(RPC Consumer)通常只有服务接口定义,接口的业务逻辑实现部署在生产端(RPC Provider),服务调用一般是采用动态代理方式,通过Proxy创建一个代理类,借助增强方式完成网络的远程调用,获取执行结果。

# 两个关键点

1、如何实现一个通用的代理类?

2、如何在消费端动态注入接口的代理对象?

# 如何实现一个通用的代理类?

目前动态代理的实现方案有很多种,如JDK 动态代理、Cglib、Javassist、ASM、Byte Buddy等

JDK 动态代理的代理类是运行时通过字节码生成的,我们通过Proxy.newProxyInstance方法获取的接口实现类就是这个字节码生成的代理类

定义代理类RpcInvocationHandler,继承InvocationHandler接口,并重写invoke()方法。

public class RpcInvocationHandler implements InvocationHandler {

    private final String serviceVersion;
    private final long timeout;

    public RpcInvocationHandler(String serviceVersion, long timeout) {
        this.serviceVersion = serviceVersion;
        this.timeout = timeout;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) {
        // todo
        // 1、封装RpcProtocol对象
        // 2、对象编码
        // 3、发送请求到服务端
        // 4、获取返回结果

        // 模拟生成一个订单号
        Long orderId = Long.valueOf(new Random().nextInt(100));

        String s = String.format("【RpcInvocationHandler】 调用方法:%s , 参数:%s ,订单id:%d", method.getName(), JSON.toJSONString(args), orderId);
        System.out.println(s);
        return orderId;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 如何在消费端动态注入接口的代理对象?

构造一个自定义Bean,并对该Bean下执行的所有方法拦截,增加额外处理逻辑。

OrderService是一个订单接口类,client端没有该接口的实现类。

定义注解@RpcReference,用于描述代理类的参数信息。

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Autowired
public @interface RpcReference {
    String serviceVersion() default "1.0";
    long timeout() default 5000;
}
1
2
3
4
5
6
7

Spring 的 FactoryBean 接口可以帮助我们实现自定义的 Bean,FactoryBean 是一种特殊的工厂 Bean,通过 getObject() 方法返回对象,而并不是 FactoryBean 本身。

public class RpcReferenceBean implements FactoryBean<Object> {

    private Class<?> interfaceClass;

    private String serviceVersion;

    private long timeout;

    private Object object;

    @Override
    public Object getObject() throws Exception {
        return object;
    }

    @Override
    public Class<?> getObjectType() {
        return interfaceClass;
    }

    public void init() throws Exception {
        object = Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new RpcInvocationHandler(serviceVersion, timeout));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

但是 RpcReferenceBean如何被spring容器识别并加载呢?需要借助Spring的其他扩展点:

1、BeanFactoryPostProcessor,在Spring 容器加载 Bean 的定义之后以及 Bean 实例化之前执行,方便用户对 Bean 的配置元数据进行二次修改。

2、ApplicationContextAware,通过它Spring容器会自动把上下文环境对象调用ApplicationContextAware接口中的setApplicationContext方法。通过ApplicationContext可以查找Spring容器中的Bean对象。

3、BeanClassLoaderAware, 获取 Bean 的类加载器

public class RpcConsumerPostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {

    private ApplicationContext applicationContext;
    private ClassLoader classLoader;
    private final Map<String, BeanDefinition> rpcBeanDefinitions = new LinkedHashMap<>();

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override
    public void setBeanClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        for (String beanDefinitionName : beanFactory.getBeanDefinitionNames()) {
            BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);
            String beanClassName = beanDefinition.getBeanClassName();
            if (beanClassName != null) {
                Class<?> clazz = ClassUtils.resolveClassName(beanClassName, this.classLoader);
                ReflectionUtils.doWithFields(clazz, this::parseRpcReference);
            }
        }

        // 自定义bean注册到容器中
        BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
        this.rpcBeanDefinitions.forEach((beanName, beanDefinition) -> {
            if (applicationContext.containsBean(beanName)) {
                throw new IllegalArgumentException("spring context already has a bean named " + beanName);
            }
            registry.registerBeanDefinition(beanName, beanDefinition);
            System.out.println(String.format("registered RpcReferenceBean %s success!", beanName));
        });
    }

    private void parseRpcReference(Field field) {
        RpcReference annotation = AnnotationUtils.getAnnotation(field, RpcReference.class);
        if (annotation != null) {
            // 创建RpcReferenceBean类的Bean定义
            BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(RpcReferenceBean.class);
            beanDefinitionBuilder.setInitMethodName("init");
            beanDefinitionBuilder.addPropertyValue("interfaceClass", field.getType());
            beanDefinitionBuilder.addPropertyValue("serviceVersion", annotation.serviceVersion());
            beanDefinitionBuilder.addPropertyValue("timeout", annotation.timeout());

            BeanDefinition beanDefinition = beanDefinitionBuilder.getBeanDefinition();
            rpcBeanDefinitions.put(field.getName(), beanDefinition);
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

RpcConsumerPostProcessor 从 beanFactory 中获取所有 Bean 的定义信息,然后对每个Bean下的field检测,如果field被声明了@RpcReference注解,通过BeanDefinitionBuilder重新构造 RpcReferenceBean的定义,并为成员变量赋值。

最后借助BeanDefinitionRegistry将新定义的Bean重新注册到Spring容器中。由容器来实例化Bean对象,并完成IOC依赖注入

# 项目源码

https://github.com/aalansehaiyang/spring-boot-example/tree/master/spring-rpc-reference

上次更新: 2023/3/4