npx创建react应用出错

最近准备学一学前端的东西,也不是为了什么,主要是之前一直想做一做前端,但是不知道从何开始下手,刚好这里有一份react的完整教程,那就下定决心学一遍吧。

npx create-react-app demo-react 初始化项目时候就报错了。。

1
2
3
4
5
6
npm ERR! code CERT_HAS_EXPIRED
npm ERR! errno CERT_HAS_EXPIRED
npm ERR! request to https://registry.npm.taobao.org/create-react-app failed, reason: certificate has expired

npm ERR! A complete log of this run can be found in:
npm ERR! /Users/skyrover/.npm/_logs/2024-05-30T13_27_38_262Z-debug-0.log

一开始没有好好看错误信息,光去设置软件源了。。

npm config set registry https://registry.npm.taobao.org

后来才发现是证书过期了,解决办法:

1
2
npm cache clean --force
npm config set strict-ssl false

然后再安装就OK了。

手把手教你如何在Ubuntu上安装TA-Lib

前言

做量化的服务器快到期了,准备迁移到一个新服务器上,然后就是拉代码,建环境一系列操作,不过在装这个TA-Lib的时候卡住了,百度之,一堆辣鸡信息,没有一个有效的。

然后谷歌了一下,第一篇文章就是How to install TA-Lib on Ubuntu 22.04 - Step by Step,这也太简单了,很快就装好了。

无语了,对于国内这种互联网环境。。

原文是英文的,这里大概翻译一下,权当记录。

什么是TA-Lib

TA-Lib是一个被广泛应用的金融市场数据分析软件。包含了150+的指标比如ADX,EMA,RSI等等。也适合加密货币比如比特币等市场数据分析。

TA-Lib发布于2001年,被广泛应用了20多年。代码经过了时间的检验,非常稳定。

手把手教你如何在Ubuntu上安装TA-Lib

  • 安装构建工具
1
2
sudo apt update
sudo apt install build-essential wget -y
  • 下载和解压源码
1
2
3
wget http://prdownloads.sourceforge.net/ta-lib/ta-lib-0.4.0-src.tar.gz
tar -xzf ta-lib-0.4.0-src.tar.gz
cd ta-lib/
  • 配置以及从源码构建
1
2
./configure -prefix=/usr
make
  • 安装到系统
1
sudo make install
  • 安装Python包
1
pip install ta-lib
  • 测试

在Python中import talib

如果没有错误,表示安装成功。

我这里遇到这个错误:ValueError: numpy.ndarray size changed, may indicate binary incompatibility. Expected 96 from C header, got 80 from PyObject

只需要升级下numpy即可:pip install --upgrade numpy

PhotoPrism——搭建家庭相册

之前用退休的笔记本电脑搭建了一个NAS服务,主要就是用来备份手机照片的,在使用过程中发现了一个问题:照片越存越多但是丝毫没有再打开浏览的时候,因为照片实在太多了,浩如烟海,让人没有勇气打开。。。另外一个就是手机上的软件都不太好用,特别是安卓,es文件浏览器广告实在是太多了。

偶然的机会在知乎上看到有人推荐在NAS上安装照片管理软件——PhotoPrism,于是搞来试一试,这个也是开源软件,在GitHub上可以查看源代码。

我按照文档推荐的Docker容器安装,不过装Docker的时候费了点功夫。。

因为必须用root用户启动PhotoPrism,所以直接切到root用户

大概要做以下事情:

Quick Start

更新源:apt-get update

安装python3:apt-get install python3-pip

安装docker:apt-get install docker.io

安装docker-compose:pip3 install docker-compose

然后在docker-compose配置文件所在目录运行:docker-compose up -d就可以运行了。

配置

更改Volumes配置:

1
2
3
4
volumes:
# "/host/folder:/photoprism/folder" # Example
# - "~/Pictures:/photoprism/originals" # Original media files (DO NOT REMOVE)
- "/home/skyroverb/nas-data/back-up-data/照片:/photoprism/originals"

冒号前面的就是你照片所在的文件夹,包含所有照片,下面可以包含各种层级的目录。冒号后面的/photoprism/originals相当于是固定的,当然后面可以增加子目录,相当于一个虚拟映射吧。

其他也没有啥特别需要关注的配置了。

启动

启动之后页面是空的,那是因为第一次使用需要索引。

image.png

点击开始索引之后,等待一段时间之后页面上就有照片显示了。

使用

索引好之后的页面长这样:

image.png

这里面我最喜欢的就是按地点分类,一个个地方打卡,很有意思。

然后日历是按月份分类的,这样看起来就很清晰。

人物这里是使用了人工智能识别了不同人脸,每个人脸下面就是所有关于这个人的照片。

最后

现在手机里都会有这种照片管理,苹果手机还会隔段时间会帮你做一个照片视频。这个软件的优点就是帮你把所有照片都做了个管理,毕竟一个家庭会有很多台手机,把这些照片统一管理起来,家庭生活就有很多不同的角度,甚至是每个人不同的时间线融合在一起,这就是整个家庭的生活记录,很棒!

破解蹭网限制

哈哈,越来越不地道了!

前段时间我发现家里的监控自动离线了,因为我在月子中心,所以一直没时间回家看看是咋回事了。

这几天回来后发现是邻居把我的设备限速了,下载速度只有10kb/s。。。而我的手机连接网络是正常的,网速杠杠的,所以肯定是把我的电脑给限速了。

那如何破解呢?只要学过一点计算机网络,就知道:

首先邻居肯定是黑名单限制的,因为我的手机可以正常联网,要是用白名单,那手机也无法正常访问网络的。

其次在路由器上设置黑名单来限制网速的原理就是根据要限制设备的MAC地址。

MAC地址(英语:Media Access Control Address),直译为媒体存取控制位址,也称为局域网地址(LAN Address),MAC位址,以太网地址(Ethernet Address)或物理地址(Physical Address),它是一个用来确认网络设备位置的位址。在OSI模型中,第三层网络层负责IP地址,第二层数据链路层则负责MAC位址 [1] 。MAC地址用于在网络中唯一标示一个网卡,一台设备若有一或多个网卡,则每个网卡都需要并会有一个唯一的MAC地址 [2] 。

那可不可以我替换一下设备的MAC地址,通过欺骗的方式绕过路由器的黑名单限制呢。

那肯定是可以的,MAC地址是在数据链路层写入到数据包上,形成一个个帧的,替换一下就行了呗。

于是在网上找了些教程:苹果电脑MAC地址修改步骤

其实挺麻烦的,有没有一键修改的呢,于是搜了下Github,果然有:

就是这个项目:SpoofMAC

还是个Python库,使用方法非常简单:pip install SpoofMAC

然后就可以change了:sudo spoof-mac.py randomize wi-fi

查看修改后的状态:spoof-mac.py list --wifi

显示:

1
- "Wi-Fi" on device "en0" with MAC address 90:9C:4A:B6:1F:75 currently set to 00:05:69:33:51:F4

注意这里只是currently set,重启电脑后就会失效,MAC地址仍然是本身的地址,需要再次运行命令来修改MAC地址。

而我的电脑是作为网关使用,所以一直启动不会重启,除非邻居再把我这个MAC地址给禁掉。。。那重新randomize下就行了

嘿嘿,又可以愉快的蹭网了。

Aurora 代理设置

跟网站服务器一样,代理这个现在也懒得折腾,直接用现成免费的,虽然慢点,但是只搜索看文本信息是足够的了。

Aurora 之前是通过代理服务器访问,有个 list 记录了网址规则,然后搭配谷歌插件 SwitchyOmega 来判断不同网址是要用代理还是直接连接,还是蛮香的。

Aurora 最近一次更新改成了 PAC 情景模式,PAC 情景模式就是主要根据 PAC 文件里面的规则来访问网络,所以在 SwitchyOmega 也需要改一下配置

首先到电脑设置->网络->高级->代理->自动代理配置,查看系统自动识别出来的 PAC 代理配置,我的这里是:http://localhost:51076/FfDwKP81YnT9sXpoe3vXON0C3WJcQvGC/fei.pac

image.png

然后到 SwitchyOmega 新增一种情景模式,选择 PAC,填写 PAC 网址,点击立即更新情景模式

image.png

应用选项之后,前往 auto switch,把之前需要代理访问的情景模式都改成新的 PAC 模式即可。

蹭网成功🤪

现在住的这个房子是去年9月租的,家里的宽带应该是上一个租客的网,一直能用,所以我就没再拉网,毕竟一年后就搬到新家了,再折腾网络有点麻烦,所以就一直用下去了。

没想到从上个月最后一天开始断网了,一直到现在😅,昨天没事干,下载了个wifi万能钥匙,一打开,我去,有一个信号还不错的有钥匙的网,连上去,能上网!哈哈,这下子就能蹭网了。

打开设置选择分享wifi,出现一个二维码,然后用微信扫一下,wifi密码就出来了,找到wifi密码就好说了。

然后我拿着手机在屋子里转了一圈,发现在客厅信号最好,然后我把手机放到地板上,发现没有变化,然后举高往天花板放,果然信号满格了,那基本上确定了这是楼上邻居的网络。

确定了哪里信号最强后,开始着手构建一个无线mesh!

手机无线mesh

家里有一个没有用的新手机,一开始想的是拿这个手机当作一个跳板,先连上邻居的网,然后开一个网络共享,其他设备连这个手机,理论上应该没啥问题。

但是这样搞了之后,我发现:

  1. 这个网络的5G wifi信号在手机上比正常2.4G的信号要弱,连5G的根本没法上网,连2.4G的也不太稳定,动不动就上不了网了。
  2. 其他设备连接共享网络基本上只能浏览网页,看不了视频,虽然作为跳板的手机可以正常看视频。

所以我猜手机的天线功率还是太小了,所以打算用笔记本电脑试一下。

笔记本电脑无线mesh

用笔记本电脑打开wifi,果然笔记本的天线要好多了,2.4G的网络是满格,5G的网络也只少了一格。

连上5G,在电脑上看视频打游戏那就跟自家网络一样,没问题。

那怎么共享网络呢,查了下,苹果电脑在设置-共享-互联网共享里进行设置

image.png

但是我发现mac电脑里没有从wifi入,然后从wifi共享的选项,而手机可以这样操作。

这下有点麻烦了,但是我看有一个蓝牙选项,我就试着打开蓝牙。但是这个功能只有苹果手机能连上,其他手机不行,而且苹果手机连上之后,网速也不太行,看视频看不了,看网页也要卡好久。

那这个方案也作罢。。

笔记本替换光猫做网关

在上面共享设置里有一个选项是usb以太网,突然想起来我之前买过一个usb网卡,那怎么用呢?

家里的路由器连上光猫就可以上网,也就是说光猫及以上的部分的作用就是提供了互联网接入。

那和我现在的场景岂不是一样:光猫以及光猫以上的部分相当于笔记本连上wifi了么!

于是打开usb以太网共享,将网线一头插到usb网卡上,一头插到路由器的wan口上,啥都不用改。

果然通过原来的wifi信号可以上网了!

这样那太爽了,家里的所有设备都不用动,因为之前的wifi就能上网。

哈哈,我简直太高兴了,虽然说蹭的邻居的网有点不地道,但是我管不了那么多了,先让我玩几局游戏再说,憋了一个月了🤣

crontab的一个小问题

最近家里网断了,交易系统想要正常运行,就得用手机开热点,但是偶尔要出门,就没办法了,而且有时候也会忘记开热点,导致不能及时更新数据,为了摆脱这个烦恼,趁着昨晚失眠的时间,我把交易系统给上云了。

之前的log日志是直接append到一个文件里的,在本地好说,用电脑查看,也没什么不妥。上云后,为了能方便的通过移动设备来访问,于是将日志按照时间来做了切分:

/path/to/bin/update_stock_data.sh >> /path/to/logs/update-$(date +%Y%m%d).log 2>&1

然后将这个命令加入了云服务器的crontab,但是怎么都不运行。。

刚开始以为是cron服务没有启动,检查之:service cron status,没问题,active

查看日志:tail -200 /var/log/syslog,发现cron确实执行了,但是没执行成功,显示这么一句:

CRON[5629]: (root) CMD ("/root/quantization/bin/update_stock_data.sh >> /root/matrix/logs/update-$(date +)

乍一看,我以为是系统没显示完全,但是命令执行没问题,那是不是这个$()需要双引号引起来呀,改了之后,仍然没有作用,

后来网上搜了一番,才发现,我去,%在cron里被解释成了换行。。。

需要加上\来进行转义,这样:

/path/to/bin/update_stock_data.sh >> /path/to/logs/update-$(date +\%Y\%m\%d).log 2>&1

这样就能执行成功了。

Python 里的 random 模块

最近工作上遇到了一个比较有意思的问题:在生成邮箱验证码的时候,居然会出现重复,而且是可复现的重复,后来去另一个环境试了下,发现居然不同环境都会重复!

这种情况,粗略判断,应该是随机数生成的问题,但是奇怪的是,代码也重置了 seed,这个 seed 也是个随机的:

1
2
3
4
5
6
7
8
import random

random.seed(time.time() * 1000)


def get_capture(length=6):
all_letters = string.ascii_uppercase + string.digits
return ''.join(random.sample(all_letters, length))

经过调试,发现不论 seed 设置什么值,生成的随机字符串都是一样的,但是如果把 seed 设置放在函数内,就又都是随机的了。

最开始以为是 seed 设置会有作用域区分,但是经过查看代码,发现 random 居然是在内部生成了一个 Random 类的实例,从外部导入的 random.seedrandom.sample 都是在这个实例上调用的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Create one instance, seeded from current time, and export its methods
# as module-level functions. The functions share state across all uses
#(both in the user's code and in the Python libraries), but that's fine
# for most programs and is easier for the casual user than making them
# instantiate their own Random() instance.

_inst = Random()
seed = _inst.seed
random = _inst.random
uniform = _inst.uniform
triangular = _inst.triangular
randint = _inst.randint
choice = _inst.choice
randrange = _inst.randrange
sample = _inst.sample
shuffle = _inst.shuffle
choices = _inst.choices
normalvariate = _inst.normalvariate
lognormvariate = _inst.lognormvariate
expovariate = _inst.expovariate
vonmisesvariate = _inst.vonmisesvariate
gammavariate = _inst.gammavariate
gauss = _inst.gauss
betavariate = _inst.betavariate
paretovariate = _inst.paretovariate
weibullvariate = _inst.weibullvariate
getstate = _inst.getstate
setstate = _inst.setstate
getrandbits = _inst.getrandbits

那既然不论在全局 seed,还是在函数内 seed,都基于同一个实例,那应该都起作用才对。现在的状态是全局调用 seed 函数,不论传入什么值产生的结果都是一样的。而在函数内调用 seed,如果输入是随机值,那么输出也是随机的,如果输入是个固定的,那么输出也会是固定的随机字符串。

那么这样看来,似乎只有一种可能————有其他地方在这个全局实例上也调用了 seed,于是我把断点打到这个 seed 函数里,然后重启 web,果然发现有很多次调用。

顺藤摸瓜,找到了其他在全局调用 seed 的地方,案子终于破了:其他地方调用 seed 的时机比这个地方迟,而且又都是在一个实例上调用的,那肯定都覆盖了,而且其他地方都是以固定数字调用 seed 的,所以最后产生的随机字符串都是以固定的顺序生成的。

那么如何修改代码呢?

在调试的时候,除了上面的那些全局调用 seed 的地方,还有一些其他写法,那就是新实例一个 Random 实例,让它和 random 模块里内置的那个默认实例区分开,然后后面调用 random 方法的地方,都基于这个新实例。妙啊!

那么 random 库使用的最佳实践应该是 在用到 random 的时候,搞一个自己的 Random 实例,这样就分开了。

代码修改如下:

1
2
3
4
5
6
7
8
import random

_random = random.Random(time.time() * 1000)


def get_capture(length=6):
all_letters = string.ascii_uppercase + string.digits
return ''.join(_random.sample(all_letters, length))

学习 React

以前也写过一些前端,比如我自己的博客,就是用 JQuery+HTML+CSS 实现的,但是大部分都是抄的,特别是 CSS 样式,真让人头大。至于 JS 部分,也是现学现撸,很不成体系,更不用说更深入的语言细节了,那就是一塌糊涂。

最近浏览博客,发现之前关注的博主 miguelgrinberg.com 开始在写 React 的教程,所以打算跟进学习一下,正好了解一下现代的 JS 框架是怎么样的,学习下前端工程是怎么构建的。

听说成为全栈工程师能更容易拿到远程工作的 offer,嗯不错不错,开始搞吧,学习使我快乐~

教程地址:Introducing the React Mega-Tutorial

我会在博客里记录一下有意思和对我来说重要的点,主要就是个笔记吧。

ES5 vs. ES6

ECMA, ECMAScript

transpiling:which converts modern JavaScript source code into functionally equivalent ES5 code that runs everywhere.

Summary of Recent JavaScript Features

Trailing Commas

很有帮助的观点:

1
2
3
4
5
const myArray = [
1,
3,
5,
];

在最后面一个元素加上逗号有两点好处,一个是上下挪动比较方便,另一个是新加元素比较方便。

Imports and Exports

这里其实没有完全理解,default export到底和其他export有什么意思,文中说了一句话:When using default exports, the name of the exported symbol does not really matter.

后面举的例子,说 import myReallyCoolFunction from './cool'; 也是有效的,那和 import myCoolFunction from './cool'; 是一个意思?

也就是说 default export 的东西,因为一个模块只能有一个,所以在其他模块引用的时候可以随便用任何名字?

先保留这个疑惑,继续往下看。

另外导入 non-default export 也有一点不一样,需要加大括号 import { SQRT2 } from './cool';

Variables and Constants

使用 let 来声明变量,const 来声明常量。

1
2
let a = 1;
const c = 3;

常量就是赋值之后不能有新的赋值了,而且也必须是在声明的时候赋值。

Equality and Inequality Comparisons

===!==

String Interpolation
1
2
const name = 'susan';
let greeting = `Hello, ${name}!`; // "Hello, susan!"
For-Of Loops
1
2
3
4
const allTheNames = ['susan', 'john', 'alice'];
for (name of allTheNames) {
console.log(name);
}
Arrow Functions

之前 code review 的时候看过这种写法,觉得很神秘,现在详细了解之后,其实也没啥,正如 Python 里的匿名函数,lambda 表达式一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
function mult(x, y) {
const result = x * y;
return result;
}

// 函数用箭头函数可以这样写
const mult = (x, y) => {
const result = x * y;
return result;
};

// 可以这样
const mult = (x, y) => x * y;

// 如果没有 y 参数 可以这样
const square = x => x * x;

// 回调函数
longTask(result => console.log(result));

在 Python 里就会这么写

1
lambda x, y: x * y
Promises
1
2
3
4
fetch('http://example.com/data.json')
.then(r => r.json())
.then(data => console.log(data))
.catch(error => console.log(`Error: ${error}`));
Async and Await

上面的写法可以改成这样,更易读

1
2
3
4
5
async function f() {
const r = await fetch('https://example.com/data.json');
const data = await r.json();
console.log(data);
}

加上 async 的函数会自动返回一个 promise

箭头函数也可以使用 async

1
2
3
4
const g = async () => {
await f();
console.log('done!');
};
Spread Operator

Using column alias in a WHERE clause doesn't work

工作中需要写一个 SQL 进行查询,本来我是这样写的:

1
2
3
4
5
SELECT document.id, (COALESCE(CAST(document.internal_info->>'proofread_time' AS int), 0) - p500_end.end_utc) / 60 AS proofread_time
FROM document
JOIN p500_end ON p500_end.doc_id = document.id
WHERE document.created_utc > 1658592000 AND document.deleted = 0 AND proofread_time > 60
ORDER BY id DESC

但是发现报这样的错误:

ERROR: column "proofread_time" does not exist

然后我把 WHERE 条件里的 proofread_time 替换成 (COALESCE(CAST(document.internal_info->>'proofread_time' AS int), 0) - p500_end.end_utc) / 60,就正常了。

咦,好奇怪,难道 alias 不能在 WHERE 条件里用吗,有点反直觉,于是我去查了下文档:

An output column’s name can be used to refer to the column’s value in ORDER BY and GROUP BY clauses, but not in the WHERE or HAVING clauses; there you must write out the expression instead.

原因就是 WHERE 语句和 HAVING 语句是在 column aliases 之前做的,所以没法引用,而 ORDER BY 和 GROUP BY 是在其之后,所以可以使用 aliased column

很古怪吧,反直觉!

但是把这么一长串的表达式写两遍真的很难受,所以我使用了 WITH 表达式来解决这个问题:

1
2
3
4
5
6
7
8
WITH results AS (
SELECT document.id, (COALESCE(CAST(document.internal_info->>'proofread_time' as int), 0) - p500_end.end_utc) / 60 AS proofread_time
FROM document
JOIN p500_end ON p500_end.doc_id = document.id
WHERE document.created_utc > 1658592000 AND document.deleted = 0
ORDER BY id DESC
)
SELECT id, proofread_time FROM results WHERE proofread_time > 60;

这样看起来就清晰多了。

Casdoor的权限检查

Casdoor 的权限检查机制是基于 Casbin 的,参考文档:

简要介绍

所有在同一个组织的用户可以访问该组织下的所有应用,但是有时候需要做一些限制,这里就要用到 权限(permission) 了。

那么 权限(permission) 的作用就是用于控制 用户 能否对 应用 进行某些 操作

最简单的方式就是在 权限(permission) 配置页面新建一个权限,如下图所示:

image.png

作为三个要素,用户,应用和操作,在这里是需要配置的:

  • 包含用户
  • 资源
  • 动作

意义也比较简单,就是哪些 用户 可以对哪些 应用 进行哪些 操作

包含角色 也容易理解,一个角色对应多个人,所以这里就是用来控制一组人的。

那上面的模型是什么意思呢,Casdoor 是如何根据这个配置进行权限校验的呢?

ACL 权限控制模型

ACL 权限控制模型应该很熟悉了吧,Linux 上就在使用,这里就不再赘述,参考:

Casbin 的权限控制

Casbin 是一个开源的权限控制库,它支持了多种权限控制模型

Casbin 实施权限规则比较简单,管理员需要列出 主体(subject)对象(object) 和期望允许的 操作(action) 即可:

  • subject,主体,比如用户
  • object,对象,需要被权限控制的对象
  • action,动作,read,write,delete或者其他定义的动作(操作)

管理员需要定义 模型(model) 文件来确定校验条件,Casbin 提供了一个 执行器(Enforcer) 来根据规则定义和模型文件来校验请求。

也就是说 Casbin 权限校验需要三个部分:模型(Model), 校验规则(Policy)和执行器(Enforcer)。

执行器是 Casbin 自带的,可以不必特别关注,只需要知道这件事情就行。下来着重研究模型和校验规则。

哦,对了进行权限校验还需要一个 请求(request)

模型定义

Casbin 里的权限控制模型被抽象成了一个 CONF 文件,基于 PERM metamodel (Policy, Effect, Request, Matchers),这个 PERM metamodel 基于 4 个部分 Policy, Effect, Request, Matchers,其描述了 资源 和 用户 之间的关系。

Casbin 里最简单的一个模型,也是默认模型:

1
2
3
4
5
6
7
8
9
10
11
[request_definition]
r = sub, obj, act

[policy_definition]
p = permission, sub, obj, act

[policy_effect]
e = some(where (p.eft == allow))

[matchers]
m = r.sub == p.sub && r.obj == p.obj && r.act == p.act
Request

r = sub, obj, act

定义了请求参数,也就是说请求需要三个部分,subject,object 和 action,通俗点讲就是 访问的用户,访问的对象,访问的方法。

这里其实定义了我们应该提供给权限控制匹配函数的参数名和顺序。

Policy

p = permission, sub, obj, act

定义了模型的访问策略,其实是定义了在策略规则文档中字段名和顺序

这里还有一种定义:p={sub, obj, act, eft}

eft是可以被省略的,eft代表的是策略的结果,比如就是 allow 或者是 deny,如果没有定义的话,读取策略文件的时候就会被忽略,默认会返回 allow

Matcher

m = r.sub == p.sub && r.obj == p.obj && r.act == p.act

请求和策略的匹配规则,这里 r 就是请求(request),p 就是策略(policy)

Effect

e = some(where (p.eft == allow))

就是对策略的结果再次进行条件判断,这里的意思就是只要有一条规则返回的是 allow,那么整个就是 allow

还有这样的定义:e = some(where (p.eft == allow)) && !some(where (p.eft == deny))

就是说有一条是 allow 而且 没有 deny 的情况,整个结果是 allow

这里主要是针对多条匹配规则都匹配上的情况做处理的

在 Casdoor 定义权限控制

定义模型

我们已经了解了模型的定义规则,那么就开始在 Casdoor 上面实际操作一番吧。

模型定义用来针对要控制的 object,访问的 subject 能允许怎样的 action

首先在页面顶部导航栏点击 模型(Model),点击新增:

image.png

模型文本这里就填写上面介绍过的模型定义字符串,点击保存&退出

定义权限

在页面顶部导航栏点击 权限(permission),点击新增:

image.png

permission 其实就是具体的控制权限设置,可以针对多个资源进行控制

其实这里的配置界面只是简化了规则的配置,这个页面的权限其实会转换成类似如下的数据:

1
2
p	alice	data1	read
p bob data2 write

代码上

image.png

permission 里定义的规则会先转换成 CasbinRule 对象,每一个对象就包含了 Policy 定义的每个字段,之后就会转成如下的字符串:

p, built-in/permission-built-in, built-in/app-built-in/admin, app-built-in, read

执行校验

模型和权限定义好之后,对应被控制的用户访问被控制的应用时,Casdoor 就会进行加载对应模型和权限进行校验,执行这一工作的叫做 Enforcer

关于 Enforcer 的文档:New a Casbin enforcer

代码分析

流程已经分析差不多了,让我们看看代码上是怎么处理的

Casdoor 是在这个函数做权限校验的。用户登陆时候就会调用,当然在任何你想进行权限校验的地方,都可以进行调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func CheckAccessPermission(userId string, application *Application) (bool, error) {
permissions := GetPermissions(application.Organization)
allowed := true
var err error
for _, permission := range permissions {
if !permission.IsEnabled || len(permission.Users) == 0 {
continue
}

isHit := false
for _, resource := range permission.Resources {
if application.Name == resource {
isHit = true
break
}
}

if isHit {
enforcer := getEnforcer(permission) // v2.Enforcer
allowed, err = enforcer.Enforce(userId, application.Name, "read")
break
}
}
return allowed, err
}

逻辑就是通过 organization 来获取所有的 permission 配置,接下来遍历 permission 中定义的 resource,如果跟当前的应用匹配,那么用这条权限去检查用户是否有对应的权限。

匹配成功之后,调用 getEnforcer 来获取一个执行权限校验的 enforcer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func getEnforcer(permission *Permission) *casbin.Enforcer {
tableNamePrefix := conf.GetConfigString("tableNamePrefix")
adapter, err := xormadapter.NewAdapterWithTableName(conf.GetConfigString("driverName"), conf.GetBeegoConfDataSourceName()+conf.GetConfigString("dbName"), "permission_rule", tableNamePrefix, true)
if err != nil {
panic(err)
}

modelText := `
[request_definition]
r = sub, obj, act

[policy_definition]
p = permission, sub, obj, act

[policy_effect]
e = some(where (p.eft == allow))

[matchers]
m = r.sub == p.sub && r.obj == p.obj && r.act == p.act`

permissionModel := getModel(permission.Owner, permission.Model)
if permissionModel != nil {
modelText = permissionModel.ModelText
}
m, err := model.NewModelFromString(modelText)
if err != nil {
panic(err)
}

enforcer, err := casbin.NewEnforcer(m, adapter)
if err != nil {
panic(err)
}

err = enforcer.LoadFilteredPolicy(xormadapter.Filter{V0: []string{permission.GetId()}})
if err != nil {
panic(err)
}

return enforcer
}

getEnforcer 做的工作就是根据 权限(permission) 数据去看有没有定义好的model,有的话使用定义好的模型文本来加载 enforcer,否则使用默认的模型文本:

1
2
3
4
5
6
7
8
9
10
11
[request_definition]
r = sub, obj, act

[policy_definition]
p = permission, sub, obj, act

[policy_effect]
e = some(where (p.eft == allow))

[matchers]
m = r.sub == p.sub && r.obj == p.obj && r.act == p.act

加载模型文本之后,enforcer 就会加载对应 权限(permission) 所对应定义的规则,对应 LoadFilteredPolicy 函数。

这些数据加载完成之后,enforcer 会根据传入的 subject,entity,action,即 用户,应用,操作,来检查权限是否允许

allowed, err = enforcer.Enforce(userId, application.Name, "read")

总结

总结一下,主要有以下核心:

  1. 理解 Casdoor 的权限是做什么的
  2. 理解 模型(Model) 以及 模型文本 是如何定义的
  3. 理解 模型,权限是如何配置的,以及是如何对用户,应用进行控制的
  4. 理解代码上是如何调用的,可以扩展到其他需要进行权限校验的地方

注意

在代码上新建或者编辑权限(permission) 之后,记得要重新生成一下规则(policy),因为不论在界面上是怎样的表现形式,最后都是enforcer通过model textpolicy来检查request

代码如下:

1
2
3
4
5
6
7
8
9
10
// 更新permission之后需要调用
if affected != 0 {
removePolicies(oldPermission)
addPolicies(permission)
}

// 新建permission之后需要调用
if affected != 0 {
addPolicies(permission)
}

Casdoor 部署问题

后端运行时 timeout

运行go run main.go,时候报 get 包 timeout,应该是代理问题,解决办法就是:go env -w GOPROXY=https://goproxy.cn,direct

如下:

1
2
3
4
5
6
7
8
duguangting@c123:~/casdoor$ go get github.com/RobotsAndPencils/go-saml@v0.0.0-20170520135329-fb13cb52a46b
go: github.com/RobotsAndPencils/go-saml@v0.0.0-20170520135329-fb13cb52a46b: Get "https://proxy.golang.org/github.com/%21robot
s%21and%21pencils/go-saml/@v/v0.0.0-20170520135329-fb13cb52a46b.mod": dial tcp 172.217.163.49:443: i/o timeout
duguangting@c123:~/casdoor$ go env -w GOPROXY=https://goproxy.cn,direct
duguangting@c123:~/casdoor$ go get github.com/RobotsAndPencils/go-saml@v0.0.0-20170520135329-fb13cb52a46b
go: downloading github.com/RobotsAndPencils/go-saml v0.0.0-20170520135329-fb13cb52a46b
go: downloading github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d
go: downloading github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0

前端运行报错

后端跑通之后,前端运行yarn install的时候报:

The engine "node" is incompatible with this module

解决办法:

yarn install --ignore-engines

Goland 方法没法跳转

在Goland里所有引入的外部包无法跳转,搜了一下,大概这样:

  • step 1

在 Goland 的 go modules 设置勾选,然后设置 GOPROXY

image.png

  • step2

然后因为项目已经有 go.mod 文件了,那么直接执行 go mod tidy,这个命令的作用就是:add missing and remove unused modules

然后就 OK 了

如何去除数学表达式中的无用括号

最近工作上遇到了这个问题,挺有意思,终于遇到了一些有点意思的东西,于是深入研究了下。

问题背景

问题大概背景是 系统中会展示数学公式,公式是用树来表达的,以运算符号为节点,比如 3=1+2,首先根节点是=,然后左子节点为3,右子节点是+,接下来+的左子节点是1,右子节点是2,就是以这种方式来表示一个数学公式。

然后展示的时候并不知道哪些地方要加括号,因为括号这个信息隐藏在树里了,展示的时候只能是每个操作符的两边都加上括号,这样展示出来就非常繁琐,那么就得想一个算法来将这些无用括号去除掉。

算法描述

算法大概是这样的,只要括号内的非括号部分的数学符号的最小优先级小于括号任意一边数学符号的优先级,那么括号就是必须的,否则就是无用的括号。

比如q * (a * b + c * d) + c,括号内部的+*,最小优先级是+,括号两边是*+,小于其中一个*的优先级,那么这个括号就是必须的。

同理,q * (a * b * c * d) + c,这个括号就是无用的。

原理就是括号里面的优先级如果都是高于括号两边,那么不加括号,也是会先计算括号里面的运算。

另外为啥是非括号部分,因为内部如果已经有括号了,那么子括号肯定是先运算了,可以把这个子括号当一个整体对待。

当然目前还有一个问题,就是遇到/-的时候会有一些问题,比如a/(b/c)或者a-(b-c),主要原因是/-有取反的作用,所以需要特殊处理:也就是如果/-右侧的括号需要保留。

代码实现

回到问题背景,按照这样的数据结构,其实算是简化了。结点都是运算符,那么如果把结点左侧作为括号包裹的内容,只需要比较结点左侧的非括号部分最小优先级和结点符号优先级就可以了。同理,右侧也是一样的。

而且针对/-的特殊处理也更简单了,只需要判断结点运算符是不是/-,如果是,那么右侧部分需要加括号,当然如果需要的话。

另外代码还实现了一个功能:小括号,中括号,大括号的嵌套。主要原理就是如果某个结点需要加括号,那么递归的去找一下左节点和右节点目前使用的最大括号,在此基础上再用更高一级的括号。

实现代码大概如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
class BracketProcessor:
small_bracket = 'small'
middle_bracket = 'middle'
big_bracket = 'big'
priority_default = 3
operator_priority = {
'+': 1,
'-': 1,
'*': 2,
'/': 2,
}

def __init__(self, formula_items, expression):
self.formula_items = formula_items
self.expression = expression
self.formula_dict = self.get_formula_dict(formula_items)
self.start_node = self.get_start_node(self.formula_dict)
self.brackets = {
self.small_bracket: 0,
self.middle_bracket: 1,
self.big_bracket: 2
}

@classmethod
def get_formula_dict(cls, formula_items):
formula_dict = {}
for item in formula_items:
origin = item.data['origin']
if 'left' not in origin and 'node' in origin:
origin.update(origin['node'])
if 'name' not in origin:
origin['name'] = origin['operator']
formula_dict[origin['index']] = item
return formula_dict

@classmethod
def get_start_node(cls, formula_dict):
for key, value in formula_dict.items():
if value.data['origin']['name'] == '=':
left_node = formula_dict.get(value.data['origin']['left'])
right_node = formula_dict.get(value.data['origin']['right'])
return left_node if left_node.data['origin'].get('operator') else right_node
return None

def process(self):
if self.expression and '(' not in self.expression:
return
# 搜索 left 和 right 的最低优先级 X,Y,
# 如果 X 优先级小于 start_node 的优先级,那么 X 需要括号
# 否则,不需要括号
# 对于 Y 同理
# 只在一个分支上检查level
# 特例 / - 右边分支如果有同级的 需要带括号
# 括号内部 非括号部分的优先级
self.fill_node_bracket(self.start_node)

def ensure_bracket(self, child_bracket):
if not child_bracket:
return self.small_bracket
child_bracket_order = self.brackets[child_bracket]
for bracket, order in self.brackets.items():
if order > child_bracket_order:
return bracket
return None

def find_child_bracket(self, node):
if not node:
return
bracket = node.data['origin'].get('bracket')
if node.data['origin']['left'] != -1:
left_bracket = self.find_child_bracket(self.formula_dict.get(node.data['origin']['left']))
if left_bracket:
if not bracket:
bracket = left_bracket
else:
bracket = left_bracket if self.brackets[left_bracket] > self.brackets[bracket] else bracket
if node.data['origin']['right'] != -1:
right_bracket = self.find_child_bracket(self.formula_dict.get(node.data['origin']['right']))
if right_bracket:
if not bracket:
bracket = right_bracket
else:
bracket = right_bracket if self.brackets[right_bracket] > self.brackets[bracket] else bracket
return bracket

def fill_node_bracket(self, start_node, level=1):
start_node_priority = self.operator_priority.get(start_node.data['origin']['name'], self.priority_default)

left = self.formula_dict.get(start_node.data['origin']['left'])
if left:
self.fill_node_bracket(left, level=level + 1)
left_lowest_priority = self.get_lowest_priority(left)
if left_lowest_priority < start_node_priority:
child_bracket = self.find_child_bracket(left)
left.data['origin']['bracket'] = self.ensure_bracket(child_bracket)

right = self.formula_dict.get(start_node.data['origin']['right'])
if right:
self.fill_node_bracket(right, level=level + 1)
right_lowest_priority = self.get_lowest_priority(right)
if right_lowest_priority < start_node_priority or (start_node.data['origin']['name'] in ['/', '-'] and right_lowest_priority == start_node_priority):
child_bracket = self.find_child_bracket(right)
right.data['origin']['bracket'] = self.ensure_bracket(child_bracket)

def get_lowest_priority(self, start_node):
if not start_node or start_node.data['origin'].get('bracket'):
return self.priority_default
node_priority = self.operator_priority.get(start_node.data['origin']['name'], self.priority_default)
if start_node.data['origin']['left'] != -1:
left_priority = self.get_lowest_priority(self.formula_dict.get(start_node.data['origin']['left']))
node_priority = min(node_priority, left_priority)
if start_node.data['origin']['right'] != -1:
right_priority = self.get_lowest_priority(self.formula_dict.get(start_node.data['origin']['right']))
node_priority = min(node_priority, right_priority)
return node_priority

一个子查询优化

问题背景

同事写了一个接口,这个接口涉及到下面三个表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
User:
- id
- username
- created_utc
- ...

UserLoginInfo
- id
- user_id
- created_utc
- ...

Project
- id
- user_id
- name
- ...

需求是查询出:用户,最近用户登录时间,以及创建的项目数。这里的一条UserLoginInfo记录,代表一次登录,Project就是用户创建的项目记录,可能有多条。

同事实现是用子查询,Python里实现大概如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
user_projects_query = (
db_session.query(func.count(Project.id).label("cnt"))
.filter(
Project.user_id == User.id,
Project.deleted == 0,
)
.label("user_projects_cnt")
)
login_utc_query = (
db_session.query(UserLoginInfo.created_utc)
.filter(UserLoginInfo.user_id == User.id)
.order_by(UserLoginInfo.id.desc())
.limit(1)
.label("latest_login_time")
)
default_login_query = func.COALESCE(login_utc_query, User.created_utc).label("default_login_time")

order_mapping = {
"login_time": lambda desc: (default_login_query.desc(), User.id.desc()) if desc else (default_login_query, User.id),
"user_projects": lambda desc: (user_projects_query.desc(), User.id.desc()) if desc else (user_projects_query, User.id),
"created_utc": lambda desc: (User.created_utc.desc(), User.id.desc()) if desc else (User.created_utc, User.id)
}
query = db_session.query(User, user_projects_query, default_login_query).filter_by(deleted=0).order_by(*condition)

这里的condition是在order_mapping中根据desc的值进行选择的

上面用Python写的代码翻译成raw sql就是

1
2
3
4
5
6
7
SELECT 
"user".id as user_id, "user".username as user_name,
(SELECT count(project.id) AS cnt FROM project WHERE project.user_id = "user".id AND project.deleted = 0) AS user_projects_cnt,
coalesce((SELECT user_login_info.created_utc FROM user_login_info WHERE user_login_info.user_id = "user".id ORDER BY user_login_info.id DESC LIMIT 1), "user".id) AS default_login_time
FROM "user"
WHERE "user".deleted = 0 ORDER BY default_login_time DESC, "user".ext_id DESC
LIMIT 10 OFFSET 0

分析

这一实现一打眼就知道肯定会有问题,因为有很多关联子查询,在数据集小的情况下体现不出来,一旦数据集稍有增大,速度就会大幅下降。因为一般来说,关联子查询在查询优化器里是不好进行优化的,最后出来的算法大概率是Nested loop

下面是在测试环境用explain执行后的结果,果然是Nested Loop占了相当长的时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Sort  (cost=750000347885.10..750000347885.29 rows=75 width=566)
Sort Key: ((SubPlan 2)) DESC, "user".id DESC
-> Bitmap Heap Scan on "user" (cost=8.75..750000347882.77 rows=75 width=566)
Recheck Cond: (ext_sys = 1)
Filter: (deleted = 0)
-> Bitmap Index Scan on idx_user_ext_sys (cost=0.00..8.73 rows=78 width=0)
Index Cond: (ext_sys = 1)
SubPlan 1
-> Aggregate (cost=19.27..19.28 rows=1 width=8)
-> Bitmap Heap Scan on document (cost=4.21..19.25 rows=5 width=4)
Recheck Cond: (user_id = "user".ext_id)
Filter: ((user_sys = 1) AND (deleted = 0))
-> Bitmap Index Scan on idx_user_id (cost=0.00..4.21 rows=8 width=0)
Index Cond: (user_id = "user".ext_id)
SubPlan 2
-> Limit (cost=10000004618.93..10000004618.94 rows=1 width=4)
-> Sort (cost=10000004618.93..10000005141.43 rows=209000 width=4)
Sort Key: user_login_info.created_utc DESC
-> Nested Loop (cost=10000000000.15..10000003573.93 rows=209000 width=4)
-> Seq Scan on user_login_info (cost=10000000000.00..10000000942.95 rows=1045 width=4)
Filter: (user_id = "user".id)
-> Materialize (cost=0.15..18.98 rows=200 width=0)
-> Index Only Scan using idx_document_user_sys on document document_1 (cost=0.15..17.98 rows=200 width=$
)
Index Cond: (user_sys = 1)

Planning time: 0.212 ms
Execution time: 229.674 ms

解决

那么现在首要问题就是如何避免子查询,可以看到需求里是需要最近用户登录时间用户的项目数,那么一个很自然的思路就是先把这两个数据查出来,然后再和 User Join到一起进行分页即可,这样就可以避免子查询嵌套到父查询里了,这里涉及到一个子查询的优化方法,尽量将关联子查询上推,上推到和父查询一个层级以避免 Nested Loop。

要实现先查出来某些数据,然后在后面的查询中使用,那么就是 CTE(公用表表达式) 了!公用表表达式,本质是允许用户通过一个子查询语句定义出一个临时表,然后在各个地方都可以使用这个临时表。

实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
user_projects_query = db_session.query(
Project.user_id, func.count(Project.id).label("cnt")
).filter(
Project.user_id == User.id,
Project.deleted == 0,
).group_by(Project.user_id).cte('user_projects_query')
project_count = func.COALESCE(user_projects_query.c.cnt, 0)

login_utc_query = db_session.query(
UserLoginInfo.user_id, func.max(UserLoginInfo.created_utc).label('login_utc')
).group_by(UserLoginInfo.user_id).cte('login_utc_query')
login_utc = func.COALESCE(login_utc_query.c.login_utc, User.created_utc).label('login_utc')

order_mapping = {
"login_time": lambda desc: (login_utc.desc(), User.id.desc()) if desc else (login_utc, User.id),
"user_docs": lambda desc: (document_count.desc(), User.id.desc()) if desc else (document_count, User.id),
"created_utc": lambda desc: (User.created_utc.desc(), User.id.desc()) if desc else (User.created_utc, User.id)
}

query = db_session.query(
User, document_count, login_utc
).join(
user_projects_query, user_projects_query.c.user_id == User.id, isouter=True
).join(
login_utc_query, login_utc_query.c.user_id == User.id, isouter=True
).filter(
User.deleted == 0,
).order_by(*condition)

对应的raw sql如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
WITH user_projects_query AS 
(SELECT project.user_id AS user_id, count(project.id) AS cnt
FROM project
WHERE project.deleted = 0 GROUP BY document.user_id
),
login_utc_query AS
(SELECT user_login_info.user_id AS user_id, max(user_login_info.created_utc) AS login_utc
FROM user_login_info GROUP BY user_login_info.user_id
)
SELECT "user".id AS user_id, "user".username AS username, user_projects_query.cnt AS user_projects_query_cnt, coalesce(login_utc_query.login_utc, "user".created_utc) AS login_utc
FROM "user" LEFT OUTER JOIN user_projects_query ON user_projects_query.user_id = "user".id LEFT OUTER JOIN login_utc_query ON login_utc_query.user_id = "user".id
WHERE "user".deleted = 0
ORDER BY login_utc DESC, "user".ext_id DESC
LIMIT 20 OFFSET 0

在测试环境跑一下 EXPLAIN ANALYSE:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
 Limit  (cost=2832.33..2832.38 rows=20 width=28) (actual time=11.156..11.162 rows=20 loops=1)
CTE user_projects_query
-> HashAggregate (cost=31.53..31.88 rows=35 width=12) (actual time=0.103..0.108 rows=26 loops=1)
Group Key: document.user_id
-> Bitmap Heap Scan on document (cost=9.69..30.69 rows=168 width=8) (actual time=0.018..0.074 rows=181 loops=1)
Recheck Cond: (user_sys = 1)
Filter: (deleted = 0)
Rows Removed by Filter: 29
Heap Blocks: exact=18
-> Bitmap Index Scan on idx_document_user_sys (cost=0.00..9.65 rows=200 width=0) (actual time=0.013..0.013 rows=211 loops=
1)
Index Cond: (user_sys = 1)
CTE login_utc_query
-> GroupAggregate (cost=0.29..2760.73 rows=46 width=8) (actual time=1.135..10.853 rows=70 loops=1)
Group Key: user_login_info.user_id
-> Index Scan using idx_user_login_info_user_id on user_login_info (cost=0.29..2515.61 rows=48932 width=8) (actual time=0.005..5
.915 rows=48804 loops=1)
-> Sort (cost=39.73..39.99 rows=107 width=28) (actual time=11.155..11.158 rows=20 loops=1)
Sort Key: (COALESCE(login_utc_query.login_utc, "user".created_utc)) DESC, "user".ext_id DESC
Sort Method: top-N heapsort Memory: 27kB
-> Hash Left Join (cost=2.78..36.88 rows=107 width=28) (actual time=11.028..11.130 rows=107 loops=1)
Hash Cond: ("user".id = login_utc_query.user_id)
-> Hash Left Join (cost=1.28..34.54 rows=107 width=28) (actual time=0.137..0.210 rows=107 loops=1)
Hash Cond: ("user".id = user_projects_query.user_id)
-> Index Scan using user_pkey on "user" (cost=0.14..32.67 rows=107 width=20) (actual time=0.014..0.066 rows=107 loops=
1)
Filter: (deleted = 0)
Rows Removed by Filter: 5
-> Hash (cost=0.70..0.70 rows=35 width=12) (actual time=0.119..0.119 rows=26 loops=1)
Buckets: 1024 Batches: 1 Memory Usage: 10kB
-> CTE Scan on user_projects_query (cost=0.00..0.70 rows=35 width=12) (actual time=0.104..0.115 rows=26 loops=1)
-> Hash (cost=0.92..0.92 rows=46 width=8) (actual time=10.887..10.887 rows=70 loops=1)
Buckets: 1024 Batches: 1 Memory Usage: 11kB
-> CTE Scan on login_utc_query (cost=0.00..0.92 rows=46 width=8) (actual time=1.136..10.875 rows=70 loops=1)
Planning time: 0.218 ms
Execution time: 11.211 ms

可以看到相比之前的子查询的229ms,使用cte的sql已经降到了11ms了

总结

不要在SELECT语句中滥用子查询

子查询只有必要的时候再用,使用时候应该注意考虑如何将子查询与SQL语句的主干进行融合,子查询不是独立的黑盒数据块,应该与主语句通盘考虑后再结合使用。

在使用子查询的地方,往往可以通过CTE表达式进行优化,核心思路就是将子查询提升到和父查询一样的层级,避免Nested Loop,而且CTE表达式可读性更好。

总之在一个SQL有比较多的子查询时候一定要小心,记得 EXPLAIN ANALYSE

在 Hexo 博客里插入图表

用到了 Chartjs 的插件:hexo-tag-chart

用法总结如下:

首先在 hexo 博客的目录里运行:npm install hexo-tag-chart --save

然后在文章中就可以使用 chart 的 tag 了

1
2
3
{% chart 90% 300 %}
\\TODO option goes here
{% endchart %}

其中 chart 是标签名,endchart 是结束标签,不需要更改,90% 是图表容器的相对宽度,默认是 100%,300 是图表容器的高度,默认是按正常比例缩放的,你可以通过设置 options 里面的 aspectRatio 属性来调整宽高比例,另外还有许多属性可以自定义,你可以查看 官方文档。在标签之间的部分,需要自己填充的图表数据和属性。

我自己使用图表的页面:FIRE 基金

参考:

构建基于 Python 的单元测试

这篇文章记录了如何基于 Python 构建一个 Web 系统的单元测试,涉及一些基本和高级用法。

测试分类

  • 单元测试:单个模块的测试
  • 集成测试:多个模块的测试
  • 功能测试:项目的功能测试

其实就是范围不同,单元测试仅是系统特定一部分的测试,功能测试是将系统作为整体进行测试,集成测试介于两者之间。

单元测试库

最常用的是 unittest 和 pytest

  • 继承 unittest 的 TestCase 类来组织单元测试
  • assert 语句用来检测是否符合预期,而 pytest 提供了一些更强大的 assert 方法
  • pytest 用来运行测试,它可以使用加强版的 assert,并且它完全支持 unittest

一个简单的单元测试

1
2
3
4
5
6
7
8
9
import unittest
from fizzbuzz import fizzbuzz


class TestFizzBuzz(unittest.TestCase):
def test_fizz(self):
for i in [3, 6, 9, 18]:
print('testing', i)
assert fizzbuzz(i) == 'Fizz'

运行:

1
2
3
4
5
6
7
8
9
(venv) $ pytest
========================== test session starts ===========================
platform darwin -- Python 3.8.6, pytest-6.1.2, py-1.9.0, pluggy-0.13.1
rootdir: /Users/miguel/testing
collected 1 items

test_fizzbuzz.py . [100%]

=========================== 1 passed in 0.03s ============================

pytest命令比较智能,它会自动识别单元测试,它假定以这样的名字:test_[something].py 或者 [something]_test.py 命名的模块都包含单元测试。同时它也会搜索子目录。

一般来说,单元测试统一放到 tests 目录下,和应用目录隔离开。

测试覆盖率

安装:pip install pytest-cov

运行 pytest --cov=fizzbuzz,可以针对 fizzbuzz 模块运行单元测试以及覆盖率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
(venv) $ pytest --cov=fizzbuzz
========================== test session starts ===========================
platform darwin -- Python 3.8.6, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
rootdir: /Users/miguel/testing
plugins: cov-2.11.1
collected 3 items

test_fizzbuzz.py ... [100%]

---------- coverage: platform darwin, python 3.8.6-final-0 -----------
Name Stmts Miss Cover
---------------------------------
fizzbuzz.py 13 4 69%
---------------------------------
TOTAL 13 4 69%


=========================== 3 passed in 0.07s ============================

还有以下参数:

  • --cov-branch 针对分支处理,有多少个分支就统计多少次
  • --cov-report=term-missing 表示以何种方式展示报告,term-missing表示在terminal上展示,并且会额外加上缺少测试覆盖的代码行数,另外一个常用选项是html 在html上展示报告,很清晰,常用。

可以添加注释 pragma: no cover 来跳过该块代码的覆盖率检测

测试参数化

使用库 parameterized: pip install parameterized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from parameterized import parameterized

# ...

class TestLife(unittest.TestCase):
# ...

@parameterized.expand([('pattern1.txt',), ('pattern2.txt',)])
def test_load(self, pattern):
life = Life()
life.load(pattern)
assert life.survival == [2, 3]
assert life.birth == [3]
assert set(life.living_cells()) == {
(10, 10), (11, 11), (15, 10), (17, 10)}
assert life.bounding_box() == (10, 10, 17, 11)

也可以使用列表推导式:

1
2
3
4
5
class TestLife(unittest.TestCase):
# ...

@parameterized.expand([(n,) for n in range(9)])
def test_advance_cell(self, num_neighbors):

支持多参数:

1
2
3
4
5
6
7
import itertools

class TestLife(unittest.TestCase):
# ...

@parameterized.expand(itertools.product([True, False], range(9)))
def test_advance_cell(self, alive, num_neighbors):

测试异常

1
2
3
4
5
6
7
8
9
10
11
import pytest

# ...

class TestLife(unittest.TestCase):
# ...

def test_load_invalid(self):
life = Life()
with pytest.raises(RuntimeError):
life.load('pattern4.txt')

Mocking

mocking 就是劫持函数或者功能,可以控制返回值或者其他东西的一种功能。在测试中如果对某个函数已经有了详尽的测试,那么在这个函数被调用的地方,就可以用mocking功能,节约资源。

unittest 里的 mock 模块,可以使用 mock.patch_object() 来替换函数或者方法

1
2
3
4
5
6
7
8
9
from unittest import mock

class TestLife(unittest.TestCase):
# ...

@mock.patch.object(Life, '_advance_cell')
def test_advance_false(self, mock_advance_cell):
mock_advance_cell.return_value = False
# ...

测试 Web 应用

最好将测试归集到一个继承 unittest.TestCase 的类里,这样可以公用 setUp 和 tearDown 方法,会有更好的性能,以及更方便。

WSGI 和 ASGI 都有特定的规则用于服务器如何传递到应用的请求。所以我们可以注入假的请求到应用上来模拟,而不用启动真正的服务器。这些 Web 框架都有所谓的测试客户端(test clients)来帮助实现单元测试,不需要任何网络,会向应用传递假的请求。如果 Web 框架没有提供的话,WSGI 应用可以使用 Werkzeug 库,ASGI 应用可以使用 async-asgi-testclient

比如,Flask 框架可以直接使用自带的 test client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class TestWebApp(unittest.TestCase):
def setUp(self):
self.app = create_app()
self.appctx = self.app.app_context()
self.appctx.push()
db.create_all()
self.client = self.app.test_client()

def tearDown(self):
db.drop_all()
self.appctx.pop()
self.app = None
self.appctx = None
self.client = None

Tornado 框架可以继承 HTTPTestCase or AsyncHTTPTestCase 类来实现,其中它自带了 HTTPClient 和 AsyncHTTPClient,可以直接使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

class BaseTestCase(AsyncHTTPTestCase):
def setUp(self):
super(BaseTestCase, self).setUp()
self.db_session = test_session
self.db_session.commit()
self.cookie = SimpleCookie()

def get_app(self):
test_app = Application()
return test_app

def get_new_ioloop(self):
return IOLoop.current()

def get_url(self, path):
full_path = super(BaseTestCase, self).get_url('/api/v1{}'.format(path))
return full_path

def _update_cookies(self, headers):
try:
cookies = escape.native_str(headers['Set-Cookie'])
self.cookie.update(SimpleCookie(cookies))
except KeyError:
return

def make_response(self, req, resp):
response = Response()
response.status_code = getattr(resp, 'code', None)
response.headers = {k: v for k, v in list(resp.headers.items())}
response.encoding = get_encoding_from_headers(response.headers)
response.raw = resp
response.reason = response.raw.reason
response._content = resp.body

if isinstance(req.url, bytes):
response.url = req.url.decode('utf-8')
else:
response.url = req.url
extract_cookies_to_jar(response.cookies, req, resp)
response.request = req
return response

def send(self, url, method='GET', data=None, json_data=None, files=None, headers=None, **kwargs):
if 'follow_redirects' not in kwargs:
kwargs['follow_redirects'] = False
request = Request(url=self.get_url(url), files=files, data=data, json=json_data)
request_data = request.prepare()
if headers is None:
headers = {}
headers.update(request_data.headers)
cookie_sting = '; '.join([f'{key}={morsel.value}' for key, morsel in self.cookie.items()])
if cookie_sting != '':
headers.update({'Cookie': cookie_sting})
resp = self.fetch(url, method=method, headers=headers, body=request_data.body, allow_nonstandard_methods=True, **kwargs)
self._update_cookies(resp.headers)
response = self.make_response(request, resp)
self.db_session.rollback()
return response

def get(self, url, **kwargs):
response = self.send(url, method='GET', **kwargs)
return response

def patch(self, url, files=None, data=None, json_data=None):
response = self.send(url, method='PATCH', files=files, data=data, json_data=json_data)
return response

def post(self, url, files=None, data=None, json_data=None, **kwargs):
response = self.send(url, method='POST', files=files, data=data, json_data=json_data, **kwargs)
return response

def put(self, url, files=None, data=None, json_data=None):
response = self.send(url, method='PUT', files=files, data=data, json_data=json_data)
return response

测试 html 内容

没必要全部 match 去做测试,而是可以检查一部分内容是否存在,比如提交按钮是否存在于 html 中,而忽略其顺序等无关信息。

1
2
3
4
5
6
7
8
9
10
11
def test_registration_form(self):
response = self.client.get('/auth/register')
assert response.status_code == 200
html = response.get_data(as_text=True)

# make sure all the fields are included
assert 'name="username"' in html
assert 'name="email"' in html
assert 'name="password"' in html
assert 'name="password2"' in html
assert 'name="submit"' in html

这样的方式也适合于其他数据量比较大的测试,只需要测试关键部分即可。

提交表单

主要问题在于 CSRF token 怎么处理,可以先发一个 GET 请求,然后拿到 token,再去提交表单,这是一种方法。另一种方法就是在测试中禁掉 CSRF 的保护。

1
2
3
4
5
6
7
def setUp(self):
self.app = create_app()
self.app.config['WTF_CSRF_ENABLED'] = False # no CSRF during tests
self.appctx = self.app.app_context()
self.appctx.push()
db.create_all()
self.client = self.app.test_client()

测试表单验证

根据表单验证失败返回的语句进行判断

1
2
3
4
5
6
7
8
9
10
def test_register_user_mismatched_passwords(self):
response = self.client.post('/auth/register', data={
'username': 'alice',
'email': 'alice@example.com',
'password': 'foo',
'password2': 'bar',
})
assert response.status_code == 200
html = response.get_data(as_text=True)
assert 'Field must be equal to password.' in html

测试需要登陆验证的页面

有以下几点:

  1. setUp 方法初始化用户
  2. login 方法
  3. 完成对应测试

Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# ...
import re

class TestWebApp(unittest.TestCase):
# ...

def setUp(self):
self.app = create_app()
self.app.config['WTF_CSRF_ENABLED'] = False # no CSRF during tests
self.appctx = self.app.app_context()
self.appctx.push()
db.create_all()
self.populate_db()
self.client = self.app.test_client()

def populate_db(self):
user = User(username='susan', email='susan@example.com')
user.set_password('foo')
db.session.add(user)
db.session.commit()

def login(self):
self.client.post('/auth/login', data={
'username': 'susan',
'password': 'foo',
})

def test_write_post(self):
self.login()
response = self.client.post('/', data={'post': 'Hello, world!'},
follow_redirects=True)
assert response.status_code == 200
html = response.get_data(as_text=True)
assert 'Your post is now live!' in html
assert 'Hello, world!' in html
assert re.search(r'<span class="user_popup">\s*'
r'<a href="/user/susan">\s*'
r'susan\s*</a>\s*</span>\s*said', html) is not None

测试 API 服务器

比较简单,因为 API 接口第一涉及范围小,第二返回基本上都是 JSON,容易解析。

1
2
3
4
5
6
7
8
9
10
11
12
def test_api_register_user(self):
response = self.client.post('/api/users', json={
'username': 'bob',
'email': 'bob@example.com',
'password': 'bar'
})
assert response.status_code == 201

# make sure the user is in the database
user = User.query.filter_by(username='bob').first()
assert user is not None
assert user.email == 'bob@example.com'

参考

pip 最佳使用方法

image.png

在激活的虚拟环境中使用 pip install 或者 python -m pip install 效果是完全相同的,但是有些场景下就有问题了,而 python -m pip 确保了想要安装的包会和当前解释器是一个环境。

可以设置个别名接着用:

1
❯ echo 'alias pip="python -m pip"' >> ~/.aliasrc

Tornado 实现的 WebSocket 简单例子

Server 部分,主要就是继承 WebSocketHandler 实现了个 WebSocket Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import logging
import tornado.web
import tornado.websocket
import tornado.ioloop
import tornado.options

from tornado.options import define, options

define("port", default=3000, help="run on the given port", type=int)


class Application(tornado.web.Application):
def __init__(self):
handlers = [(r"/", MainHandler)]
settings = dict(debug=True)
tornado.web.Application.__init__(self, handlers, **settings)


class MainHandler(tornado.websocket.WebSocketHandler):
def check_origin(self, origin):
return True

def open(self):
logging.info("A client connected.")

def on_close(self):
logging.info("A client disconnected")

def on_message(self, message):
logging.info("message: {}".format(message))


def main():
tornado.options.parse_command_line()
app = Application()
app.listen(options.port)
tornado.ioloop.IOLoop.instance().start()


if __name__ == "__main__":
main()

Client 部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from tornado.ioloop import IOLoop, PeriodicCallback
from tornado import gen
from tornado.websocket import websocket_connect


class Client(object):
def __init__(self, url, timeout):
self.url = url
self.timeout = timeout
self.ioloop = IOLoop.instance()
self.ws = None
self.connect()
PeriodicCallback(self.keep_alive, 20000).start()
self.ioloop.start()

@gen.coroutine
def connect(self):
print("trying to connect")
try:
self.ws = yield websocket_connect(self.url)
except Exception as e:
print("connection error")
else:
print("connected")
self.run()

@gen.coroutine
def run(self):
while True:
msg = yield self.ws.read_message()
if msg is None:
print("connection closed")
self.ws = None
break

def keep_alive(self):
if self.ws is None:
self.connect()
else:
self.ws.write_message("keep alive")


if __name__ == "__main__":
client = Client("ws://localhost:3000", 5)

运行 Server 和 Clinet 之后输出如下:

  • Server 部分
1
2
3
4
5
[I 220805 10:49:12 server:27] A client connected.
[I 220805 10:49:32 server:33] message: keep alive
[I 220805 10:49:52 server:33] message: keep alive
[I 220805 10:50:12 server:33] message: keep alive
[I 220805 10:50:13 server:30] A client disconnected
  • Client 部分
1
2
trying to connect
connected

初识 Casdoor

最近工作需要研究了下Casdoor:

Casdoor is a UI-first Identity Access Management (IAM) / Single-Sign-On (SSO) platform based on OAuth 2.0, OIDC, SAML and CAS.

简介

Casdoor 是一个开源的单点登录系统,单点登录系统的好处就是集中管理用户,使的我们开发的应用只需要关心业务逻辑而不用每个应用都是去实现一套用户系统。

Casdoor 使用了 OAuth2 的方式来完成单点认证,大概逻辑如下图:

image.png

总结一下就是:

  • 第一步 获取code
  • 第二步 用code获取access_token
  • 第三步 用access_token获取所需要的资源

安装

服务安装文档

文档说的很明白,我这里是下载源码使用的,因为我用的PostgreSQL,所以需要改一下app.confadapter.go

然后后端启动:go run main.go

后端启动:

1
2
3
cd web
yarn install
yarn start

非常简单

界面配置

这里我是用最小的改动来完成对接demo的,首先需要添加一个组织,组织就是一堆资源,应用的的集合:

image.png

后面的用户,角色,权限,模型,提供商都可以先不管,需要了解的可以看文档

然后需要添加一个新的应用,主要就是下图这些

image.png

客户端ID和客户端密钥,都是后面对接时候需要的

还有一个就是证书,这个不需要改,只需要点编辑进入将公钥拿到

image.png

上面就是在界面上需要完成的事情和完成一个对接demo所需要的数据了

web 对接

下面是和 Casdoor 对接的 web 接口代码,主要有两部分,第一部分是获取 code,第二部分是通过 code 获取 access_token,而这个 access_token 已经包含了用户信息,用对应的公钥进行解密,即可得到一个用户的 json 数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import logging
from urllib.parse import urljoin

from casdoor import CasdoorSDK


@route(r'/casdoor/sso-login')
class CASDoorSSOLoginHandler(BaseHandler):

def save_user(self, user_data):
user = User.make_user(uid=user_data['id'], ext_uname=user_data['name'], username=user_data['name'], _from='casdoor')
return user

def get(self, *args, **kwargs):
code = self.get_argument('code', None)
target_uri = self.get_argument('target_uri')

subpath = config.get_config("webif.redirect_subpath", '')
trident_base = urljoin(self.origin_host, subpath.lstrip('/'))

endpoint = config.get_config('casdoor_auth.endpoint')
client_id = config.get_config('casdoor_auth.client_id')
client_secret = config.get_config('casdoor_auth.secret')
org_name = config.get_config('casdoor_auth.org_name')
certificate = config.get_config('casdoor_auth.cert')

sdk = CasdoorSDK(
endpoint,
client_id,
client_secret,
certificate,
org_name,
)

if code:
access_token = sdk.get_oauth_token(code)
user_data = sdk.parse_jwt_token(access_token)
user = self.save_user(user_data)
self.session['proxy_user_id'] = str(user.id)
redirect_url = urljoin(trident_base, target_uri)
else:
origin_url = '{}/api/v1/casdoor/sso-login?target_uri={}'.format(trident_base, target_uri)
redirect_url = sdk.get_auth_link(origin_url, state='casdoor')
logging.info('redirect to %s', redirect_url)
return self.redirect(redirect_url)

配置也比较简单:

image.png

这里需要特别注意的是,endpoint 配的是 Casdoor 访问的首页地址,也就是这里是前端地址,而不是后端地址。前端地址是:http://localhost:7001/,后端地址是:http://localhost:8000/,这里很有迷惑性,让我折腾了半天。

写hexo博客的绝佳工具——hexo-client

最近逛Github,发现了一款写hexo博客的绝佳工具——hexo-client,非常好用,在这安利一下。

以前写hexo博客都是用IDE加载整个项目文件夹,在其中编辑,然后利用hexo命令在终端进行操作。有几个地方不是很方便:

  1. 上传图片的时候的操作很繁琐,首先需要把图片挪到对应文件夹下,然后在文章里面用markdown语法插入对应的图片地址。
  2. 没有markdown的编辑器,有时候一些语法还得现查
  3. 没法按照分类和标签来展示文章
  4. IDE编辑完成后,还得在终端去执行发布命令

现在这款hexo-client工具完美解决了上面的问题,它提供了一个很好用的markdown编辑器,如下图:

WX202207301024142x.png

然后上传图片的时候也很方便,点击编辑器里面的图片按钮,上传即可,工具会自动帮你上传到指定的图片文件夹下,并且会重命名,比如上面这张图片的地址是这样的:

![WX202207301024142x.png](/images/2022/07/30/66ef87ce-a338-4597-9de5-2f3b73084553.png)

很方便有木有,另外首页界面的文章也可以按照分类,标签展示,一目了然:

WX202207301026562x.png

最后最牛的一点,工具可以一键发布,太爽了有木有:

WX202207301028362x.png

太好用了,又激起我写博客的热情了,感谢 gaoyoubo

用cookiecutter来创建一个自己的代码模板

最近工作上要新建一个项目,又要搬运一些重复代码,这件事情想想都让人有点sick,于是想到搞一个代码模板多好,以后新建项目直接把相关信息一填,直接生成出新的项目,所以想到了cookiecutter。

cookiecutter以前很早就用过了,比如cookiecutter-tornadocookiecutter-flask

用这些模板生成出来的项目你就会感觉到什么叫做最佳范式,想必大家都读过最佳实践之类的书或者文章吧,这种最佳实践的确会让人感觉很舒服。

Github上其实有很多模板项目,比如上面的tornado,flask,最新的fastapi也有,但是这些都不太满足我的需求,因为我们公司有很多自己定制化的代码,而且我们是前后端分离的,项目也不需要那些前端模板项目。

因此是时候搞一个自己的模板项目了,其实搞一个模板项目很简单,只是个体力劳动。

cookiecutter其实就是利用模板渲染,把需要替换的字符串写成jinja2的形式:{{ project }},然后cookiecutter执行的时候就是把你在cookiecutter.json预定义的变量在代码里替换一下。非常简单是不是?

首先随便在Github上找一个现成的模板项目:full-stack-fastapi-postgresql

WX202207291432422x.png

把这个里面除了{{cookiecutter.project_slug}}目录,都拷贝到你自己的项目里

然后开始修改里面的cookiecutter.json,这里面的变量需要的可以保留,不需要的删掉即可,也可以新加上自己需要的。

然后就是从一个已有的项目把代码复制到{{cookiecutter.project_slug}}目录里,复制完成之后就开始最累人的部分,将在cookiecutter.json定义的变量替换到代码对应地方,比如这样:

1
2
3
4
5
6
7
8
9
import hashlib
import uuid

import peewee
from fastapi_permissions import Allow
from playhouse.postgres_ext import BinaryJSONField

from {{ cookiecutter.project_slug }}.common.enums import Role
from {{ cookiecutter.project_slug }}.models import BaseModel

所有需要变量替换的地方都写成{{ variable }}的形式,替换完成之后,利用模板生成一个项目,看是否有问题,直到修改的没有问题,大功告成。