Casdoor的权限检查

Casdoor 的权限检查机制是基于 Casbin 的,参考文档:

简要介绍

所有在同一个组织的用户可以访问该组织下的所有应用,但是有时候需要做一些限制,这里就要用到 权限(permission) 了。

那么 权限(permission) 的作用就是用于控制 用户 能否对 应用 进行某些 操作

最简单的方式就是在 权限(permission) 配置页面新建一个权限,如下图所示:

image.png

作为三个要素,用户,应用和操作,在这里是需要配置的:

  • 包含用户
  • 资源
  • 动作

意义也比较简单,就是哪些 用户 可以对哪些 应用 进行哪些 操作

包含角色 也容易理解,一个角色对应多个人,所以这里就是用来控制一组人的。

那上面的模型是什么意思呢,Casdoor 是如何根据这个配置进行权限校验的呢?

ACL 权限控制模型

ACL 权限控制模型应该很熟悉了吧,Linux 上就在使用,这里就不再赘述,参考:

Casbin 的权限控制

Casbin 是一个开源的权限控制库,它支持了多种权限控制模型

Casbin 实施权限规则比较简单,管理员需要列出 主体(subject)对象(object) 和期望允许的 操作(action) 即可:

  • subject,主体,比如用户
  • object,对象,需要被权限控制的对象
  • action,动作,read,write,delete或者其他定义的动作(操作)

管理员需要定义 模型(model) 文件来确定校验条件,Casbin 提供了一个 执行器(Enforcer) 来根据规则定义和模型文件来校验请求。

也就是说 Casbin 权限校验需要三个部分:模型(Model), 校验规则(Policy)和执行器(Enforcer)。

执行器是 Casbin 自带的,可以不必特别关注,只需要知道这件事情就行。下来着重研究模型和校验规则。

哦,对了进行权限校验还需要一个 请求(request)

模型定义

Casbin 里的权限控制模型被抽象成了一个 CONF 文件,基于 PERM metamodel (Policy, Effect, Request, Matchers),这个 PERM metamodel 基于 4 个部分 Policy, Effect, Request, Matchers,其描述了 资源 和 用户 之间的关系。

Casbin 里最简单的一个模型,也是默认模型:

1
2
3
4
5
6
7
8
9
10
11
[request_definition]
r = sub, obj, act

[policy_definition]
p = permission, sub, obj, act

[policy_effect]
e = some(where (p.eft == allow))

[matchers]
m = r.sub == p.sub && r.obj == p.obj && r.act == p.act
Request

r = sub, obj, act

定义了请求参数,也就是说请求需要三个部分,subject,object 和 action,通俗点讲就是 访问的用户,访问的对象,访问的方法。

这里其实定义了我们应该提供给权限控制匹配函数的参数名和顺序。

Policy

p = permission, sub, obj, act

定义了模型的访问策略,其实是定义了在策略规则文档中字段名和顺序

这里还有一种定义:p={sub, obj, act, eft}

eft是可以被省略的,eft代表的是策略的结果,比如就是 allow 或者是 deny,如果没有定义的话,读取策略文件的时候就会被忽略,默认会返回 allow

Matcher

m = r.sub == p.sub && r.obj == p.obj && r.act == p.act

请求和策略的匹配规则,这里 r 就是请求(request),p 就是策略(policy)

Effect

e = some(where (p.eft == allow))

就是对策略的结果再次进行条件判断,这里的意思就是只要有一条规则返回的是 allow,那么整个就是 allow

还有这样的定义:e = some(where (p.eft == allow)) && !some(where (p.eft == deny))

就是说有一条是 allow 而且 没有 deny 的情况,整个结果是 allow

这里主要是针对多条匹配规则都匹配上的情况做处理的

在 Casdoor 定义权限控制

定义模型

我们已经了解了模型的定义规则,那么就开始在 Casdoor 上面实际操作一番吧。

模型定义用来针对要控制的 object,访问的 subject 能允许怎样的 action

首先在页面顶部导航栏点击 模型(Model),点击新增:

image.png

模型文本这里就填写上面介绍过的模型定义字符串,点击保存&退出

定义权限

在页面顶部导航栏点击 权限(permission),点击新增:

image.png

permission 其实就是具体的控制权限设置,可以针对多个资源进行控制

其实这里的配置界面只是简化了规则的配置,这个页面的权限其实会转换成类似如下的数据:

1
2
p	alice	data1	read
p bob data2 write

代码上

image.png

permission 里定义的规则会先转换成 CasbinRule 对象,每一个对象就包含了 Policy 定义的每个字段,之后就会转成如下的字符串:

p, built-in/permission-built-in, built-in/app-built-in/admin, app-built-in, read

执行校验

模型和权限定义好之后,对应被控制的用户访问被控制的应用时,Casdoor 就会进行加载对应模型和权限进行校验,执行这一工作的叫做 Enforcer

关于 Enforcer 的文档:New a Casbin enforcer

代码分析

流程已经分析差不多了,让我们看看代码上是怎么处理的

Casdoor 是在这个函数做权限校验的。用户登陆时候就会调用,当然在任何你想进行权限校验的地方,都可以进行调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func CheckAccessPermission(userId string, application *Application) (bool, error) {
permissions := GetPermissions(application.Organization)
allowed := true
var err error
for _, permission := range permissions {
if !permission.IsEnabled || len(permission.Users) == 0 {
continue
}

isHit := false
for _, resource := range permission.Resources {
if application.Name == resource {
isHit = true
break
}
}

if isHit {
enforcer := getEnforcer(permission) // v2.Enforcer
allowed, err = enforcer.Enforce(userId, application.Name, "read")
break
}
}
return allowed, err
}

逻辑就是通过 organization 来获取所有的 permission 配置,接下来遍历 permission 中定义的 resource,如果跟当前的应用匹配,那么用这条权限去检查用户是否有对应的权限。

匹配成功之后,调用 getEnforcer 来获取一个执行权限校验的 enforcer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func getEnforcer(permission *Permission) *casbin.Enforcer {
tableNamePrefix := conf.GetConfigString("tableNamePrefix")
adapter, err := xormadapter.NewAdapterWithTableName(conf.GetConfigString("driverName"), conf.GetBeegoConfDataSourceName()+conf.GetConfigString("dbName"), "permission_rule", tableNamePrefix, true)
if err != nil {
panic(err)
}

modelText := `
[request_definition]
r = sub, obj, act

[policy_definition]
p = permission, sub, obj, act

[policy_effect]
e = some(where (p.eft == allow))

[matchers]
m = r.sub == p.sub && r.obj == p.obj && r.act == p.act`

permissionModel := getModel(permission.Owner, permission.Model)
if permissionModel != nil {
modelText = permissionModel.ModelText
}
m, err := model.NewModelFromString(modelText)
if err != nil {
panic(err)
}

enforcer, err := casbin.NewEnforcer(m, adapter)
if err != nil {
panic(err)
}

err = enforcer.LoadFilteredPolicy(xormadapter.Filter{V0: []string{permission.GetId()}})
if err != nil {
panic(err)
}

return enforcer
}

getEnforcer 做的工作就是根据 权限(permission) 数据去看有没有定义好的model,有的话使用定义好的模型文本来加载 enforcer,否则使用默认的模型文本:

1
2
3
4
5
6
7
8
9
10
11
[request_definition]
r = sub, obj, act

[policy_definition]
p = permission, sub, obj, act

[policy_effect]
e = some(where (p.eft == allow))

[matchers]
m = r.sub == p.sub && r.obj == p.obj && r.act == p.act

加载模型文本之后,enforcer 就会加载对应 权限(permission) 所对应定义的规则,对应 LoadFilteredPolicy 函数。

这些数据加载完成之后,enforcer 会根据传入的 subject,entity,action,即 用户,应用,操作,来检查权限是否允许

allowed, err = enforcer.Enforce(userId, application.Name, "read")

总结

总结一下,主要有以下核心:

  1. 理解 Casdoor 的权限是做什么的
  2. 理解 模型(Model) 以及 模型文本 是如何定义的
  3. 理解 模型,权限是如何配置的,以及是如何对用户,应用进行控制的
  4. 理解代码上是如何调用的,可以扩展到其他需要进行权限校验的地方

注意

在代码上新建或者编辑权限(permission) 之后,记得要重新生成一下规则(policy),因为不论在界面上是怎样的表现形式,最后都是enforcer通过model textpolicy来检查request

代码如下:

1
2
3
4
5
6
7
8
9
10
// 更新permission之后需要调用
if affected != 0 {
removePolicies(oldPermission)
addPolicies(permission)
}

// 新建permission之后需要调用
if affected != 0 {
addPolicies(permission)
}

尘埃落定

生活逐渐回归正轨,一切都开始尘埃落定。这两个月事情实在是太多,而且很多事情不受自己控制,我又是那种想的很多,容易焦虑的人,所以搞的很疲惫。现在终于清净下来了。

  • 换了个新房子住,和之前的房东和房子彻底Say Goodbye,新房子设施出的各种问题都已妥善解决。
  • 装修也基本上快要到尾声了,也顺利通过物业验收,现在还差开关,灯具,厨电,门锁,开荒保洁,窗帘,家具。只要柜子进场就行,可以开始散味,其他的也不是很着急。

除了这两件长时间困扰人的事情,中间还穿插了车在车库被撞,修了2个星期。。

现在慢慢觉得越长大,要处理的事情,要担的事情就越多,为什么不想长大,不就是因为小孩子无忧无虑,不需要扛这些事情,有父母大人护着你。

说起父母,家庭,我还是不知道怎么解决,从今年3月到现在,我没回过老家,也没有跟父母联系过,感觉像是3月份之后,大家不约而同的有一点不好意思联系对方一样,另外我也不知道怎么面对,我想起来就头脑如麻,真的是没有办法。

算了,还是先不想这些事情了,因为想也没有办法解决,还是先把眼前的事情解决好吧。

Casdoor 部署问题

后端运行时 timeout

运行go run main.go,时候报 get 包 timeout,应该是代理问题,解决办法就是:go env -w GOPROXY=https://goproxy.cn,direct

如下:

1
2
3
4
5
6
7
8
duguangting@c123:~/casdoor$ go get github.com/RobotsAndPencils/go-saml@v0.0.0-20170520135329-fb13cb52a46b
go: github.com/RobotsAndPencils/go-saml@v0.0.0-20170520135329-fb13cb52a46b: Get "https://proxy.golang.org/github.com/%21robot
s%21and%21pencils/go-saml/@v/v0.0.0-20170520135329-fb13cb52a46b.mod": dial tcp 172.217.163.49:443: i/o timeout
duguangting@c123:~/casdoor$ go env -w GOPROXY=https://goproxy.cn,direct
duguangting@c123:~/casdoor$ go get github.com/RobotsAndPencils/go-saml@v0.0.0-20170520135329-fb13cb52a46b
go: downloading github.com/RobotsAndPencils/go-saml v0.0.0-20170520135329-fb13cb52a46b
go: downloading github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d
go: downloading github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0

前端运行报错

后端跑通之后,前端运行yarn install的时候报:

The engine "node" is incompatible with this module

解决办法:

yarn install --ignore-engines

Goland 方法没法跳转

在Goland里所有引入的外部包无法跳转,搜了一下,大概这样:

  • step 1

在 Goland 的 go modules 设置勾选,然后设置 GOPROXY

image.png

  • step2

然后因为项目已经有 go.mod 文件了,那么直接执行 go mod tidy,这个命令的作用就是:add missing and remove unused modules

然后就 OK 了

FIRE基金策略调整

最近FIRE基金刷新了从2022年2月以来的最大回撤,目前为止为6.18%。

究其原因,并不是操作的问题,而是系统性原因,8月和9月市场行情不太行,导致每次上车都是白给,而中秋节后更是如此,在节后第一天上证50,深红利,银行等诸多标的提示买入信号,最后竟然买入到了满仓,其实那时候觉得应该也会继续上涨,没想到后面连续4天回撤了快3%!

image.png

从9月份系统回测图可以看出来,确实有巨大的回撤。

但是我的目标是最小回撤,稳定复利,高达6%的回撤可是不能接受的,而且这个回撤还不一定会结束 orz…

但是我发现回撤的品种全是之前定位打野的品种,然后我依稀记得之前得出过一个结论,打野基本上不赚钱,赚了亏,亏了赚,对净值只会起波动作用,而不是起增长作用。于是我便去掉打野标的进行了回撤,从2021年3月1日开始:

image.png

我惊呆了,标准的上楼梯曲线,一直保持东北角的上升曲线,几乎没有回撤,这不就是我追求的完美资金曲线嘛!17个月的时间,快60%的收益率,一年大概40%的收益率。

再对比下有打野标的的曲线(从2021年12月1日开始)

image.png

而对应的2021年12月1日开始的去除打野标的的曲线

image.png

两相对比,含打野的在总收益率上略胜一筹,其实也差不多,但是不含打野的避免了大的回撤,减少了波动。最重要的是,省心省力,诸多打野来回操作也挺消耗精力,一直持仓总会不可避免的关注股市。

所以我决定,++以后FIRE基金的策略调整为不做打野标的,只做满仓标的。++

其实可以想到,如果满仓标的有回撤,那么不论采用哪种都会有回撤,而根据打野的历史经验,打野基本不赚钱,那何必劳心费神呢。

现在基金只有20%的仓位,持仓有两个打野标的,这两个又是买入后就跌,目前亏损幅度还不小,= =!。下来就是等创业板转强,梭哈就完事了。

如何去除数学表达式中的无用括号

最近工作上遇到了这个问题,挺有意思,终于遇到了一些有点意思的东西,于是深入研究了下。

问题背景

问题大概背景是 系统中会展示数学公式,公式是用树来表达的,以运算符号为节点,比如 3=1+2,首先根节点是=,然后左子节点为3,右子节点是+,接下来+的左子节点是1,右子节点是2,就是以这种方式来表示一个数学公式。

然后展示的时候并不知道哪些地方要加括号,因为括号这个信息隐藏在树里了,展示的时候只能是每个操作符的两边都加上括号,这样展示出来就非常繁琐,那么就得想一个算法来将这些无用括号去除掉。

算法描述

算法大概是这样的,只要括号内的非括号部分的数学符号的最小优先级小于括号任意一边数学符号的优先级,那么括号就是必须的,否则就是无用的括号。

比如q * (a * b + c * d) + c,括号内部的+*,最小优先级是+,括号两边是*+,小于其中一个*的优先级,那么这个括号就是必须的。

同理,q * (a * b * c * d) + c,这个括号就是无用的。

原理就是括号里面的优先级如果都是高于括号两边,那么不加括号,也是会先计算括号里面的运算。

另外为啥是非括号部分,因为内部如果已经有括号了,那么子括号肯定是先运算了,可以把这个子括号当一个整体对待。

当然目前还有一个问题,就是遇到/-的时候会有一些问题,比如a/(b/c)或者a-(b-c),主要原因是/-有取反的作用,所以需要特殊处理:也就是如果/-右侧的括号需要保留。

代码实现

回到问题背景,按照这样的数据结构,其实算是简化了。结点都是运算符,那么如果把结点左侧作为括号包裹的内容,只需要比较结点左侧的非括号部分最小优先级和结点符号优先级就可以了。同理,右侧也是一样的。

而且针对/-的特殊处理也更简单了,只需要判断结点运算符是不是/-,如果是,那么右侧部分需要加括号,当然如果需要的话。

另外代码还实现了一个功能:小括号,中括号,大括号的嵌套。主要原理就是如果某个结点需要加括号,那么递归的去找一下左节点和右节点目前使用的最大括号,在此基础上再用更高一级的括号。

实现代码大概如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
class BracketProcessor:
small_bracket = 'small'
middle_bracket = 'middle'
big_bracket = 'big'
priority_default = 3
operator_priority = {
'+': 1,
'-': 1,
'*': 2,
'/': 2,
}

def __init__(self, formula_items, expression):
self.formula_items = formula_items
self.expression = expression
self.formula_dict = self.get_formula_dict(formula_items)
self.start_node = self.get_start_node(self.formula_dict)
self.brackets = {
self.small_bracket: 0,
self.middle_bracket: 1,
self.big_bracket: 2
}

@classmethod
def get_formula_dict(cls, formula_items):
formula_dict = {}
for item in formula_items:
origin = item.data['origin']
if 'left' not in origin and 'node' in origin:
origin.update(origin['node'])
if 'name' not in origin:
origin['name'] = origin['operator']
formula_dict[origin['index']] = item
return formula_dict

@classmethod
def get_start_node(cls, formula_dict):
for key, value in formula_dict.items():
if value.data['origin']['name'] == '=':
left_node = formula_dict.get(value.data['origin']['left'])
right_node = formula_dict.get(value.data['origin']['right'])
return left_node if left_node.data['origin'].get('operator') else right_node
return None

def process(self):
if self.expression and '(' not in self.expression:
return
# 搜索 left 和 right 的最低优先级 X,Y,
# 如果 X 优先级小于 start_node 的优先级,那么 X 需要括号
# 否则,不需要括号
# 对于 Y 同理
# 只在一个分支上检查level
# 特例 / - 右边分支如果有同级的 需要带括号
# 括号内部 非括号部分的优先级
self.fill_node_bracket(self.start_node)

def ensure_bracket(self, child_bracket):
if not child_bracket:
return self.small_bracket
child_bracket_order = self.brackets[child_bracket]
for bracket, order in self.brackets.items():
if order > child_bracket_order:
return bracket
return None

def find_child_bracket(self, node):
if not node:
return
bracket = node.data['origin'].get('bracket')
if node.data['origin']['left'] != -1:
left_bracket = self.find_child_bracket(self.formula_dict.get(node.data['origin']['left']))
if left_bracket:
if not bracket:
bracket = left_bracket
else:
bracket = left_bracket if self.brackets[left_bracket] > self.brackets[bracket] else bracket
if node.data['origin']['right'] != -1:
right_bracket = self.find_child_bracket(self.formula_dict.get(node.data['origin']['right']))
if right_bracket:
if not bracket:
bracket = right_bracket
else:
bracket = right_bracket if self.brackets[right_bracket] > self.brackets[bracket] else bracket
return bracket

def fill_node_bracket(self, start_node, level=1):
start_node_priority = self.operator_priority.get(start_node.data['origin']['name'], self.priority_default)

left = self.formula_dict.get(start_node.data['origin']['left'])
if left:
self.fill_node_bracket(left, level=level + 1)
left_lowest_priority = self.get_lowest_priority(left)
if left_lowest_priority < start_node_priority:
child_bracket = self.find_child_bracket(left)
left.data['origin']['bracket'] = self.ensure_bracket(child_bracket)

right = self.formula_dict.get(start_node.data['origin']['right'])
if right:
self.fill_node_bracket(right, level=level + 1)
right_lowest_priority = self.get_lowest_priority(right)
if right_lowest_priority < start_node_priority or (start_node.data['origin']['name'] in ['/', '-'] and right_lowest_priority == start_node_priority):
child_bracket = self.find_child_bracket(right)
right.data['origin']['bracket'] = self.ensure_bracket(child_bracket)

def get_lowest_priority(self, start_node):
if not start_node or start_node.data['origin'].get('bracket'):
return self.priority_default
node_priority = self.operator_priority.get(start_node.data['origin']['name'], self.priority_default)
if start_node.data['origin']['left'] != -1:
left_priority = self.get_lowest_priority(self.formula_dict.get(start_node.data['origin']['left']))
node_priority = min(node_priority, left_priority)
if start_node.data['origin']['right'] != -1:
right_priority = self.get_lowest_priority(self.formula_dict.get(start_node.data['origin']['right']))
node_priority = min(node_priority, right_priority)
return node_priority

Get新技能——换锁芯

租了个新房子,这个房子之前住的人比较杂,所以需要换一个锁芯。

看了个视频觉得挺简单的,但是实际拆了之后出现了一些问题,因为不是很了解,所以小心翼翼的,现在想来也挺简单的。

首先是用螺丝刀将门锁内侧的两个螺丝拧开,把内侧把手拆掉,然后把门外侧把手拆掉。我这里内侧很好拆,但是外侧不好弄,刚开始以为有什么玄机,后来发现还是要大力出奇迹,来回扭一扭就拆掉了。其实外侧把手相当于一个螺母,内侧把手穿了一个螺丝,通过这种方式连接到一起了。

接下来是把门锁中间固定锁芯的螺丝拆掉,拆掉之后有的锁芯可以直接拔下来,我这边拔不掉,当时我就卡在这里了,后来在网上搜了一下说是插入钥匙拧一下,拧到合适的角度就能拔下来了。拔下来之后才发现,锁芯中间有个小塑料环,拧到一定角度才能和锁芯主体平齐,这样才能拔出来。

锁芯拔出来之后,量一下中间螺丝孔到锁芯两边的距离,然后网上下单,完了把旧锁芯再装回去。

新锁芯回来,按照上面的步骤把旧锁芯替换为新锁芯,装回去,万事大吉。

搬家了

中秋节那天开始打包搬家,我真是低估了家里的东西多少,高估了自己的搬运能力。

幸亏把好朋友官哥叫来帮忙了,否则我中秋节三天累死都不可能搬完。

当天打包完成,吃个火锅喝个小酒,吃饱喝足之后晚上干活!

image.png

从晚上8点开始,搬到晚上12点多。

image.png

第二天从早上9点搬到中午12点。

真的太累了。

搬完家,开始新生活,加油~

人生的松弛感

最近看到这样一条微博,很有意思。

image.png

我想大多数人因为证件过期登不了机,都会生气懊恼大崩溃吧,毕竟从小到大,我们都习惯了掌控生活。

任何计划被打乱,没有按照原本的想象进行,我们都会表现出深深的失望和恐慌,整个人活得特别像刺猬,每根刺都竖立着蓄势待发。

与之相反,面对小孩证件过期的这个突发事故,这个家庭没有揪着不放,没有放大这件事情对行程的影响。

他们很从容地应对当下,没有急切,没有压力,非常轻松地接受着这突如其来的变故,放任其发展,好像一切的发生本应如此,没有什么好气愤,好纠缠。

那一家人的那种人生的松弛感,让博主感动到想哭,我想,这是因为它是这个时代所稀缺的状态,怎能不让人羡慕呢?!

那到底什么是人生的松弛感呢?

人生的松弛感,就是面对世界的任何变化和不如意,你都完全接受的状态,爱咋样咋样!

这个世界的真相,就是所有事情都不是你能控制的。

每一次发生在你身上的事情,有时候在你的预期内,有时候出乎你的意料,但本质上,它的发生都有很多的偶然性和随机性,有很多的影响因素是大脑意识感知不到的。

如果你时刻紧张兮兮地面对生活,期待生活按照你所期望的那样在你眼前展开,那你注定要失望,而当你持续地因此而不开心,患得患失智只会让你的人生走下坡路。

相反,那种人生的松弛感,不是摆烂,不是无所谓,而是与世界的变化和解,跟自己的执念和解,不浪费心力与它纠缠,潇洒转身继续轻松地迎接人生的下一站。

所以,你要转变思路,我们来到这个世界,是来玩的,不设预期,不过多地计划,而是随着变化调整自己,用心体验生活,不急不躁地顺着生命之流而下。

拿工作来说,工作重要吗?当然重要,但是更重要的是我自己开心,所以你不用过度在意别人是怎么看你,不用忧心自己能不能升职加薪,不用过度思考自己怎样才能脱颖而出。

你真正要做的,是追求那种人生的松弛感,让自己用一个放松的心态去工作,不用跟别人比较,不用纠缠于当下的不顺,不用计较付出和回报,而是投入到当下真正该做的事情中去。

很神奇的是,当你的心态彻底转变之后,一切开始变得顺利起来,你在工作中更游刃有余,成就感暴增,瞎逼逼的同事也不常出现在你面前,连老板对你也开始赞赏有加,认可度提升。

这些都是我的亲身体验——

当你松弛了,你就没有太多得失心,也没有太多的焦虑内耗,你所有的能量都集中在你最需要实现的价值上。

人生的松弛感太难得了,它来自于你对自我的认可,对世界的不期待,以及对人生的负责。

底层的逻辑可能就是“爱谁谁,爱咋地咋地,我只要能自洽通透地过好自己的人生就好!”

为了获得人生的松弛感,我有如下建议:

  1. 临在当下,专注事情本身

你需要建立“成长型思维”和“体验者思维”。

“成长型思维”让你在面对过去的事情时认识到自己的不足,看到自身改进的契机,进而不断迭代精进自己。

“体验者思维”则让我们面对未来的生活时,不抱期待,不设预期,而是抱有更多的好奇和探索欲,让自己沉浸在那些该做的事情中。

抱持人生松弛感的人,总是能在过去的坏事中看到好的一面,也不会因为未来的预期而作茧自缚,相反,他们总是可以但行好事,莫问前程。

  1. 臣服生活,接纳一切

不拧巴,不较劲,生活不过就是见招拆招。

在《清醒地活》这本书中,作者指出,那些引发我们的执念的人事物,都是束缚在我们心灵的能量。这股被束缚的能量,因为我们的不接纳不臣服,而淤积于我们内心,成为阻挡我们成长的力量。

我们之所以有纠结有内耗,都是因为我们不去放手那些执着的人事物。

如果你放手了,臣服生活,接纳一切,这股被束缚的能量才能被释放。反过来,这股被释放的能量会重新流经你,成为支撑你往上走的意志力。

这时候,你就找到了人生的松弛感。

换句话说,当你不再执着,而是臣服生活,接纳一切,你会有更多的意志力去做好当下该做的事情。

很多人抱怨自己没有意志力去早起,去健身,不妨去审视你是否有太多对生活的执念,以至于你时刻恐慌焦虑,意志力缺失。

  1. 了解自己,接纳自己

回归到自身,为了获得人生的松弛感,你需要去了解自己,探索自己。

只有了解自己的人,才会真正地接纳自己,获得人生的松弛感。

你可以看看那些具有松弛感的明星,比如王菲,她应该很清楚自己要什么,很清楚什么对自己最重要,所以当她接纳自己真实的样子的时候,她就可以不惧外界的评判而真实地表达自己,所以我们总是可以看到她呈现给我们的那种淡定从容和那种“爱谁谁”的坦然。

我们对自己人生的定义,不来自于外界的评价体系,而是深深地来自于对自我的了解和接纳,那我们就不会过度地焦虑迷茫,就不会急于自证,不会操之过急,因为只有耐得住漫长的时间线,水到渠成才会是一件非常自然的事情。

人生的松弛感,是生活方式和人生态度由内而外地发散。

它应该成为我们人生的底色,让我们从容淡定地历经岁月的洗礼,并对于所有的体验都保持开放而随性的态度。

人生漫长,一路上会有高潮,也会有低谷。

慢慢地,你会发现,松弛感才是人生的王炸。

共勉~

雨天漫游

周日无事,跟媳妇开着车随便逛

image.png

image.png

我们老家的洋芋片夹馍,香太太。

馍中有乾坤,一馍一世界~

image.png

一个子查询优化

问题背景

同事写了一个接口,这个接口涉及到下面三个表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
User:
- id
- username
- created_utc
- ...

UserLoginInfo
- id
- user_id
- created_utc
- ...

Project
- id
- user_id
- name
- ...

需求是查询出:用户,最近用户登录时间,以及创建的项目数。这里的一条UserLoginInfo记录,代表一次登录,Project就是用户创建的项目记录,可能有多条。

同事实现是用子查询,Python里实现大概如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
user_projects_query = (
db_session.query(func.count(Project.id).label("cnt"))
.filter(
Project.user_id == User.id,
Project.deleted == 0,
)
.label("user_projects_cnt")
)
login_utc_query = (
db_session.query(UserLoginInfo.created_utc)
.filter(UserLoginInfo.user_id == User.id)
.order_by(UserLoginInfo.id.desc())
.limit(1)
.label("latest_login_time")
)
default_login_query = func.COALESCE(login_utc_query, User.created_utc).label("default_login_time")

order_mapping = {
"login_time": lambda desc: (default_login_query.desc(), User.id.desc()) if desc else (default_login_query, User.id),
"user_projects": lambda desc: (user_projects_query.desc(), User.id.desc()) if desc else (user_projects_query, User.id),
"created_utc": lambda desc: (User.created_utc.desc(), User.id.desc()) if desc else (User.created_utc, User.id)
}
query = db_session.query(User, user_projects_query, default_login_query).filter_by(deleted=0).order_by(*condition)

这里的condition是在order_mapping中根据desc的值进行选择的

上面用Python写的代码翻译成raw sql就是

1
2
3
4
5
6
7
SELECT 
"user".id as user_id, "user".username as user_name,
(SELECT count(project.id) AS cnt FROM project WHERE project.user_id = "user".id AND project.deleted = 0) AS user_projects_cnt,
coalesce((SELECT user_login_info.created_utc FROM user_login_info WHERE user_login_info.user_id = "user".id ORDER BY user_login_info.id DESC LIMIT 1), "user".id) AS default_login_time
FROM "user"
WHERE "user".deleted = 0 ORDER BY default_login_time DESC, "user".ext_id DESC
LIMIT 10 OFFSET 0

分析

这一实现一打眼就知道肯定会有问题,因为有很多关联子查询,在数据集小的情况下体现不出来,一旦数据集稍有增大,速度就会大幅下降。因为一般来说,关联子查询在查询优化器里是不好进行优化的,最后出来的算法大概率是Nested loop

下面是在测试环境用explain执行后的结果,果然是Nested Loop占了相当长的时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Sort  (cost=750000347885.10..750000347885.29 rows=75 width=566)
Sort Key: ((SubPlan 2)) DESC, "user".id DESC
-> Bitmap Heap Scan on "user" (cost=8.75..750000347882.77 rows=75 width=566)
Recheck Cond: (ext_sys = 1)
Filter: (deleted = 0)
-> Bitmap Index Scan on idx_user_ext_sys (cost=0.00..8.73 rows=78 width=0)
Index Cond: (ext_sys = 1)
SubPlan 1
-> Aggregate (cost=19.27..19.28 rows=1 width=8)
-> Bitmap Heap Scan on document (cost=4.21..19.25 rows=5 width=4)
Recheck Cond: (user_id = "user".ext_id)
Filter: ((user_sys = 1) AND (deleted = 0))
-> Bitmap Index Scan on idx_user_id (cost=0.00..4.21 rows=8 width=0)
Index Cond: (user_id = "user".ext_id)
SubPlan 2
-> Limit (cost=10000004618.93..10000004618.94 rows=1 width=4)
-> Sort (cost=10000004618.93..10000005141.43 rows=209000 width=4)
Sort Key: user_login_info.created_utc DESC
-> Nested Loop (cost=10000000000.15..10000003573.93 rows=209000 width=4)
-> Seq Scan on user_login_info (cost=10000000000.00..10000000942.95 rows=1045 width=4)
Filter: (user_id = "user".id)
-> Materialize (cost=0.15..18.98 rows=200 width=0)
-> Index Only Scan using idx_document_user_sys on document document_1 (cost=0.15..17.98 rows=200 width=$
)
Index Cond: (user_sys = 1)

Planning time: 0.212 ms
Execution time: 229.674 ms

解决

那么现在首要问题就是如何避免子查询,可以看到需求里是需要最近用户登录时间用户的项目数,那么一个很自然的思路就是先把这两个数据查出来,然后再和 User Join到一起进行分页即可,这样就可以避免子查询嵌套到父查询里了,这里涉及到一个子查询的优化方法,尽量将关联子查询上推,上推到和父查询一个层级以避免 Nested Loop。

要实现先查出来某些数据,然后在后面的查询中使用,那么就是 CTE(公用表表达式) 了!公用表表达式,本质是允许用户通过一个子查询语句定义出一个临时表,然后在各个地方都可以使用这个临时表。

实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
user_projects_query = db_session.query(
Project.user_id, func.count(Project.id).label("cnt")
).filter(
Project.user_id == User.id,
Project.deleted == 0,
).group_by(Project.user_id).cte('user_projects_query')
project_count = func.COALESCE(user_projects_query.c.cnt, 0)

login_utc_query = db_session.query(
UserLoginInfo.user_id, func.max(UserLoginInfo.created_utc).label('login_utc')
).group_by(UserLoginInfo.user_id).cte('login_utc_query')
login_utc = func.COALESCE(login_utc_query.c.login_utc, User.created_utc).label('login_utc')

order_mapping = {
"login_time": lambda desc: (login_utc.desc(), User.id.desc()) if desc else (login_utc, User.id),
"user_docs": lambda desc: (document_count.desc(), User.id.desc()) if desc else (document_count, User.id),
"created_utc": lambda desc: (User.created_utc.desc(), User.id.desc()) if desc else (User.created_utc, User.id)
}

query = db_session.query(
User, document_count, login_utc
).join(
user_projects_query, user_projects_query.c.user_id == User.id, isouter=True
).join(
login_utc_query, login_utc_query.c.user_id == User.id, isouter=True
).filter(
User.deleted == 0,
).order_by(*condition)

对应的raw sql如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
WITH user_projects_query AS 
(SELECT project.user_id AS user_id, count(project.id) AS cnt
FROM project
WHERE project.deleted = 0 GROUP BY document.user_id
),
login_utc_query AS
(SELECT user_login_info.user_id AS user_id, max(user_login_info.created_utc) AS login_utc
FROM user_login_info GROUP BY user_login_info.user_id
)
SELECT "user".id AS user_id, "user".username AS username, user_projects_query.cnt AS user_projects_query_cnt, coalesce(login_utc_query.login_utc, "user".created_utc) AS login_utc
FROM "user" LEFT OUTER JOIN user_projects_query ON user_projects_query.user_id = "user".id LEFT OUTER JOIN login_utc_query ON login_utc_query.user_id = "user".id
WHERE "user".deleted = 0
ORDER BY login_utc DESC, "user".ext_id DESC
LIMIT 20 OFFSET 0

在测试环境跑一下 EXPLAIN ANALYSE:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
 Limit  (cost=2832.33..2832.38 rows=20 width=28) (actual time=11.156..11.162 rows=20 loops=1)
CTE user_projects_query
-> HashAggregate (cost=31.53..31.88 rows=35 width=12) (actual time=0.103..0.108 rows=26 loops=1)
Group Key: document.user_id
-> Bitmap Heap Scan on document (cost=9.69..30.69 rows=168 width=8) (actual time=0.018..0.074 rows=181 loops=1)
Recheck Cond: (user_sys = 1)
Filter: (deleted = 0)
Rows Removed by Filter: 29
Heap Blocks: exact=18
-> Bitmap Index Scan on idx_document_user_sys (cost=0.00..9.65 rows=200 width=0) (actual time=0.013..0.013 rows=211 loops=
1)
Index Cond: (user_sys = 1)
CTE login_utc_query
-> GroupAggregate (cost=0.29..2760.73 rows=46 width=8) (actual time=1.135..10.853 rows=70 loops=1)
Group Key: user_login_info.user_id
-> Index Scan using idx_user_login_info_user_id on user_login_info (cost=0.29..2515.61 rows=48932 width=8) (actual time=0.005..5
.915 rows=48804 loops=1)
-> Sort (cost=39.73..39.99 rows=107 width=28) (actual time=11.155..11.158 rows=20 loops=1)
Sort Key: (COALESCE(login_utc_query.login_utc, "user".created_utc)) DESC, "user".ext_id DESC
Sort Method: top-N heapsort Memory: 27kB
-> Hash Left Join (cost=2.78..36.88 rows=107 width=28) (actual time=11.028..11.130 rows=107 loops=1)
Hash Cond: ("user".id = login_utc_query.user_id)
-> Hash Left Join (cost=1.28..34.54 rows=107 width=28) (actual time=0.137..0.210 rows=107 loops=1)
Hash Cond: ("user".id = user_projects_query.user_id)
-> Index Scan using user_pkey on "user" (cost=0.14..32.67 rows=107 width=20) (actual time=0.014..0.066 rows=107 loops=
1)
Filter: (deleted = 0)
Rows Removed by Filter: 5
-> Hash (cost=0.70..0.70 rows=35 width=12) (actual time=0.119..0.119 rows=26 loops=1)
Buckets: 1024 Batches: 1 Memory Usage: 10kB
-> CTE Scan on user_projects_query (cost=0.00..0.70 rows=35 width=12) (actual time=0.104..0.115 rows=26 loops=1)
-> Hash (cost=0.92..0.92 rows=46 width=8) (actual time=10.887..10.887 rows=70 loops=1)
Buckets: 1024 Batches: 1 Memory Usage: 11kB
-> CTE Scan on login_utc_query (cost=0.00..0.92 rows=46 width=8) (actual time=1.136..10.875 rows=70 loops=1)
Planning time: 0.218 ms
Execution time: 11.211 ms

可以看到相比之前的子查询的229ms,使用cte的sql已经降到了11ms了

总结

不要在SELECT语句中滥用子查询

子查询只有必要的时候再用,使用时候应该注意考虑如何将子查询与SQL语句的主干进行融合,子查询不是独立的黑盒数据块,应该与主语句通盘考虑后再结合使用。

在使用子查询的地方,往往可以通过CTE表达式进行优化,核心思路就是将子查询提升到和父查询一样的层级,避免Nested Loop,而且CTE表达式可读性更好。

总之在一个SQL有比较多的子查询时候一定要小心,记得 EXPLAIN ANALYSE