如何打造一个Dubbo网关--用户注入

之前也说过微服务架构下,我们通常会收获数个甚至更多的子项目,每个项目独立运行。但是用户还需要登录啊,每个子系统也要获取登录之后用户信息

将登录凭证在子系统中传递,交由子系统单独认证并不是什么好主意:不仅会增加开发复杂度,同时会增加认证服务的压力,还会造成大量浪费。子系统需要的仅仅是用户当前信息的快照

如果某个接口需要用户登录,但是登录凭证失效了、不存在或错误应该在网关直接返回。本篇内容主要讲述注入注解用户注入。以下代码片段全部来自于plume

接口白名单

我们将不需要登录的接口称为白名单接口,使用注解@Whitelist来对类或者方法进行标记

1
2
3
4
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.SOURCE)
public @interface Whitelist {
}
  • Target里设置了TYPE和METHOD,表示注解只可以标记在方法或者类上
  • Retention设置为SOURCE,表示注解只在源码上有效,编译后不会带上注解信息
  • 因为网关没有jar包,所以RUNTIME级别没有任何意义,具体如何读取这个标记在文档里详细介绍

业务方有以下这种需求:对于白名单接口如果用户登录希望拿到用户信息,未登录网关也要放行。这也是合理的

用户注入等级

具体的业务接口里,有的只需要一个userId、有的需要更多数据、有的需要全部的用户信息

我们使用注解@Injection来标记要注入的字段,定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.SOURCE)
@Documented
public @interface Injection {
/**
* 注入类型,具体格式参照 BaseInjectEnum 的实现类
*/
InjectEnum value() default InjectEnum.MEMBER_ID;

/**
* 请求IP,当 InjectEnum 不是默认值时无效
* 要求接收对象是 map 或者 bean 包含 requestIp(Integer) 参数
*/
boolean ip() default false;

/**
* 请求头,当 InjectEnum 不是默认值时无效
* 要求接收对象是 map 或者 bean 包含 requestHeaders(Map<String, String>) 参数
*/
String headers() default "";

/**
* 请求头,当 InjectEnum 不是默认值时无效
* 要求接收对象是 map 或者 bean 包含 requestCookies(Map<String, byte[]>) 参数
*/
String cookies() default "";
}

这个注解只能作用于参数,并且value是一个枚举InjectEnum,我们在这里将注入分为三个等级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public enum InjectEnum implements BaseInjectEnum {
/**
* 会员id 注解参数类型必须是 Long、Map或包含userId属性的Bean
*/
MEMBER_ID(1, "member.memberService.getUserIdByToken"),

/**
* 会员信息 注解参数类型必须是 MemberInfo
*/
MEMBER_SIMPLE(2, "member.memberService.getUserByToken"),

/**
* 会员信息 注解参数类型必须是 MemberFullInfo
*/
MEMBER_EXTEND(3, "member.memberService.getUserFullByToken");

private int level;

private String method;

......
}
  • level表示注入等级,原则上越大获取的信息越多
  • method就是之前提到过的invokerName,泛化调用用户接口时使用
  • 可以看出具体用户认证并不是网关完成的,而是交由member系统

泛化调用用户接口

泛化调用的入口在dubboInvoker里的doMember,交由gatewayCache.tokenCache去进行实际的泛化调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
private Map<String, Object> doMember(String inputTenant, String inoutGroup, InvokeCache paramCache, String token, String tag) {
// 无论是不是白名单接口,都去获取一次用户信息
Map<String, Object> memberInfo = gatewayCache.tokenCache(new TokenKey(token, inputTenant, inoutGroup, paramCache.getInvokeName()));
if (!paramCache.isWhitelist()) {
if (StringUtil.isBlank(token)) {
// RefundException是一个特定异常,如果抛出前端会退出登录,常见情况时用户token失效
throw new RefundException(PlatformExceptionEnum.LOGIN_NONE);
}
// 目前的member注入相关接口设计,正确不会包含code
if (memberInfo.isEmpty() || memberInfo.containsKey("code")) {
throw new RefundException(PlatformExceptionEnum.LOGIN_EXPIRE);
}
}
return memberInfo;
}

......

@SuppressWarnings("unchecked")
public Map<String, Object> tokenCache(TokenKey key) {
// 如果token存在则进行之后的操作,在member的token设计里长度一定时64或128位
final String token = key.getToken();
if (StringUtil.isBlank(token) || (token.length() != 64 && token.length() != 128)) {
return Collections.emptyMap();
}
// 设置一个默认的注入等级
if (StringUtil.isBlank(key.getInject())) {
key.setInject(InjectEnum.MEMBER_ID.method());
}

// 缓存里如果存在则直接返回,不存在继续;这里用了两个缓存是为了解决一些问题,实际通用场景并不需要
Map<String, Object> present = tokenCache.getIfPresent(key);
if (null == present) {
present = tokenInsCache.getIfPresent(key);
}
if (null == present) {
// 先给定一个默认值
present = Collections.emptyMap();
// 日志使用
String proof = RpcContext.getContext().getAttachment(PlatformConstants.PROOF_KEY);
// 获取泛化调用的方法
String inject = key.getInject();
if (StringUtil.isBlank(inject)) {
inject = InjectEnum.MEMBER_ID.method();
}
log.info("[GATEWAY] 当前校验等级: {}", inject);

try {
// 方法信息查询
final InvokeDetailCache cache = invokeCache(new InvokeKey(key.getGroup(), inject, 1), true);
// 执行泛化调用
present = (Map<String, Object>) referenceConfig(cache.getClassName()).get()
.$invoke(StringUtil.splitLastByDot(inject), cache.getTypes(), new Object[]{token});
log.info("[GATEWAY] 用户获取完毕, 信息: {}", present);

tokenCache.put(key, present);
} catch (PassedException e) {
log.warn("[GATEWAY] {}", e.getMessage());

// PassedException是一个特定异常,表示业务执行不通过,这里通常是指用户不存在;不存在也要入缓存
tokenCache.put(key, present);
} catch (Exception e) {
log.warn("[GATEWAY] 获取用户信息出错: ", e);
} finally {
// 执行rpc后会清空上下文,重新设置
setRpcContext(key.getTenant(), key.getGroup(), proof);
}
}
return present;
}

这两段代码都挺简单易懂的,这里只说下两个特定异常

  • RefundException通常是token失效,如果抛出网关会设置特定code,前端收到后会退出登录
  • PassedException是业务异常,需要开发者手动抛出;rpc超时之类的网络错误并不是一类。所以这里只可能是因为某些条件不满足,在代码里手动抛出的。不存在也要入缓存是为了防止这种情况:拿同一个不存在的token大量调用网关,导致网关进行大量无效的rpc

附加信息注入

有时候业务方可能需要用户ip、cookie、header里的信息,这种情况同样可以通过注入完成。在上面的Injection注解里可以看到已经定义相关的字段,下面看下如何填充这些数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@SuppressWarnings({"rawtypes", "unchecked"})
private Object doInject(String getToken, Object getParam, InjectionInfo info, Map<String, Object> member,
Map<String, Object> attach, RequestInfo request) {
if (null == info) {
return null == getParam ? Collections.emptyMap() : getParam;
}

// 当注入等级不是userId时,因为要求的返回实体固定,直接返回即可
if (InjectEnum.getLevel(info.getInjectType()) > 1) {
return member;
} else {
final Object userId = member.get("userId");
// getParam是调用方法的入参,只有是map时才可以注入,否则直接返回userId
if (getParam instanceof Map) {
final HttpHeaders headers = request.getHttpHeaders();
final Map<String, Object> mapParam = (Map) getParam;
// 填充userId
if (null != userId) {
mapParam.put("userId", userId);
}
// 填充ip
if (info.getHaveAddress()) {
mapParam.put("requestIp", IPConvert.ip2Num(RequestUtil.userIp(request.getHttpHeaders())));
}
// 填充所需要的cookie
final Set<String> cookieNames = info.getCookieNames();
if (null != cookieNames && cookieNames.size() > 0) {
final MultiValueMap<String, HttpCookie> getCookies = request.getHttpCookies();
if (CollectionUtil.notEmpty(getCookies)) {
final Map<String, String> retCookies = Maps.newHashMapWithExpectedSize(cookieNames.size());
for (String cookieName : cookieNames) {
final List<HttpCookie> httpCookies = getCookies.get(cookieName);
if (null != httpCookies && !httpCookies.isEmpty()) {
retCookies.put(cookieName, httpCookies.get(0).getValue());
}
}
mapParam.put("requestCookies", retCookies);
}
}
// 填充所需要的header
final Set<String> headerNames = info.getHeaderNames();
if (null != headerNames && headerNames.size() > 0) {
final Map<String, byte[]> retHeaders = Maps.newHashMapWithExpectedSize(headerNames.size());
for (String headerName : headerNames) {
final List<String> httpHeader = headers.get(headerName);
if (CollectionUtil.notEmpty(httpHeader)) {
retHeaders.put(headerName, httpHeader.get(0).getBytes());
}
}

mapParam.put("requestHeaders", retHeaders);
}
// 填充其它附加参数
if (CollectionUtil.notEmpty(attach)) {
mapParam.put("attachParam", attach);
}
return mapParam;
} else {
return userId;
}
}
}

泛化调用的入参和返回结果都是map,这反而方便了随意定义数据进去

信息缓存和清理

泛化调用时使用的gatewayCache.tokenCache里对用户信息进行了缓存,但是我们要求用户在同一种场景只能有一个客户端在线,也就是说同一种场景下用户每次登录都会重置token,重置后要求旧token在所有地方都失效

为了满足这个场景,需要member在新token写入完成后,向所有网关发送一条消息。在当前环境中我们使用的Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
public BaseResult onMessage(TokenKey tokenKey) {
log.info("[GATEWAY] 要删除网关中用户缓存是: {}", tokenKey);
final Cache<TokenKey, Map<String, Object>> tokenCache = getTokenCache();
// 清理所有注入等级的数据
for (InjectEnum inject : InjectEnum.values()) {
tokenKey.setInject(inject.method());
final Map<String, Object> token = tokenCache.getIfPresent(tokenKey);
if (null != token) {
tokenCache.invalidate(tokenKey);
}
}
return new BaseResult();
}

如何打造一个Dubbo网关--用户注入
https://back.pub/post/hh-code-dubbo-gateway-3/
作者
Dash
发布于
2018年11月10日
许可协议