从 PEER 的数据库中可以下载地震波。PEER 的每一条地震波都对应一个 RSA 。因此想到,是否可以在 Python 中通过 RSA 来直接从 PEER 的数据库中下载地震波呢?最容易想到的方法就是模拟手动在浏览器中下载的全过程来实现。这时 Selenium 就可以派上用场了。

Selenium 是一种开源工具,用于在 Web 浏览器上执行自动化测试(使用任何 Web 浏览器进行 Web 应用程序测试)。简单地说,就是可以在浏览器中自动模拟用户的行为。

新版的 Selenium 提供了很好用的 IDE ,再也不用不停地去查文档了。让我们来一起体验一下吧。

使用 Selenium IDE 录制测试用例

Selenium 的 IDE 非常轻量级,是以浏览器插件的形式提供的。打开 Selenium 的官网,可以看到它提供了三个产品。第一是 Driver ,用于通过编程来实现自动测试。第二是 IDE ,用于录制和回放自动测试。第三是 Grid ,用来进行分布式测试。这里我们只用到前面两个。

先点击 Selenium IDE 下面的 READ MORE 进入介绍页面,然后就可以找到下载界面。一般我们使用 Edge 或 Chrome 浏览器,可以点击第一个按钮下载安装,其本质是一个浏览器插件。安装完成后运行,界面如图所示。

在欢迎页面中,选择第一个 Record a new test in a new project 。在弹出的对话框中要先为 project 起一个名字,这里我们叫 peer_tester 。然后需要有一个入口 url ,这里我们就输入在 PEER 的 NGA West 2 数据库中下载地震波的页面 url:

1
https://ngawest2.berkeley.edu/spectras/new?sourceDb_flag=1

然后点击 Start recording 按钮,就在新的浏览器中打开了这一网址,即可开始录制。

下面按下载地震波的过程操作。首先输入用户名和密码登录,然后不提供反应谱,直接点 Submit 按钮,进入新的搜索页面。这里我们在 RSA 一栏里面随便输入一个编号,如 123 。然后点击 Search records 。得到地震波后,再点击 Download Time Series Records 按钮,再点击两次确定,即可完成下载。

在页面的右下角,可以看到一个一直在闪烁的提示: Selenium IDE is recording 。说明现在正在录制中。我们把弹出的浏览器关掉,回到 Selenium IDE 界面,可以看到刚才的操作都被录制下来了,如下图所示。

录制的内容主要有 3 列,第一列是命令,包括点击 click ,输入 type ,执行脚本 run script 等。第二列是目标,即执行命令对应的浏览器 DOM 元素。它支持使用不同的 selector 来选择。比如使用 id 来选择,使用 CSS 来选择等。第三列是值,对于一些命令,比如 type ,需要使用到一些值。

在右上角可以看到一个红色的按钮正在闪烁,说明尽管浏览器已经被关掉,录制仍然在进行。这里我们点击这个 stop recording 按钮,录制结束。这时弹出对话框,要求输入测试用例名,这里输入 download_timehistory 。这个测试用例即被保存。

我们把一些录制下来但无用的命令删除,比如窗口滚动命令等。这样这个测试就录制好了。点击上方工具栏的 Run test 按钮,可以复现这次测试。但是这里由于我们已经登录过,登录的逻辑发生了变化,所以测试无法正常复现。这时需要我们进行调试。

使用 Selenium IDE 调试测试用例

首先看第一条命令 open ,打开了一个新的 url ,根据其语意可知是进入登录页面。然而我们知道,这个页面并不是我们主动进去的,而是打开最初的 url 之后,浏览器判断我们没有登录之后自动跳转的。因此这一条操作的目标需要更改为初始的页面 url ,即上文以 flag=1 结尾的 url

再次运行,发现如果我们已经登录,就不会跳转到登录页面,那样的话测试就卡住了。但是由于后面在使用 driver 的时候,每次都需要启动一个新的浏览器,不会保存登录的信息,所以不用过多处理,只要退出登录就好了。

测试中也可以使用工具栏中的 step over 按钮进行单步测试。测试中发现,点击了下载按钮后没有弹出 alert ,而是直接下载。测试用例在 assert alert 部分卡住,我们不需要这个 assert 环节,直接删掉即可。

以上就完成了我们的测试用例。

将 IDE 的用例导出为 Python 代码

右击左边栏中显示的本条测试 download_timehistory ,点击 export ,可以看到支持多种语言的测试到处。这里我们选择 Python pytest 。Pytest 是一个非常常用的 Python 单元测试框架。点击 EXPORT ,则下载了一个测试文件。

打开文件,可以看到里面的代码为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# Generated by Selenium IDE
import pytest
import time
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities

class TestDownloadtimehistory():
def setup_method(self, method):
self.driver = webdriver.Chrome()
self.vars = {}

def teardown_method(self, method):
self.driver.quit()

def test_downloadtimehistory(self):
self.driver.get("https://ngawest2.berkeley.edu/spectras/new?sourceDb_flag=1")
self.driver.set_window_size(1550, 929)
self.driver.find_element(By.ID, "user_email").send_keys("xxxx@xxx.com")
self.driver.find_element(By.ID, "user_password").send_keys("xxxxx")
self.driver.find_element(By.ID, "user_submit").click()
self.driver.find_element(By.CSS_SELECTOR, "#buttons > button").click()
self.driver.find_element(By.ID, "search_search_nga_number").click()
self.driver.find_element(By.ID, "search_search_nga_number").send_keys("123")
self.driver.find_element(By.ID, "input_box").click()
self.driver.find_element(By.CSS_SELECTOR, ".peer_nga_spectrum > button").click()
self.driver.find_element(By.CSS_SELECTOR, ".peer_nga_spectrum:nth-child(6) > button:nth-child(4)").click()
self.driver.close()

这里定义的类以 Test 开头,内部有一个方法以 test 开头,就可以被 pytest 识别为测试用例。setup_methodteardown_method 是两个 fixture ,分别在开始测试和结束测试时使用。我们可以在命令行里输入

1
pytest .

来运行全部测试用例。当然也可以不使用 pytest ,直接在这个文件中最下面加入以下代码

1
2
3
4
5
if __name__ == "__main__":
t = TestDownloadtimehistory()
t.setup_method(None)
t.test_downloadtimehistory()
t.teardown_method(None)

再运行即可。

运行可以看到,有一个测试用的 Chrome 浏览器自动启动,并开始执行测试用代码。但是没能执行完毕,从报错看,大体意思是没有找到想要的元素。

这里就涉及到了 web 测试中非常常见的问题,就是网页还没有加载完毕,就试图去获取页面中的元素,导致元素无法正常读取,从而报错。这里我们使用 Selenium 提供的一个方法,自动隐式地等待网页加载了所需要的元素,再运行。在 setup_method 中加入一行代码

1
self.driver.implicitly_wait(20)

表示最多等待 20 秒。再次运行,又发现在弹出 alert 的时候报错。这是因为我们点击 alert 的行为无法被录制,所以没有直接点击 alert 中的确定按钮。这里我们加上两次确定,再指定等待 20 秒种,让浏览器下载完毕。

1
2
3
self.driver.switch_to.alert.accept()
self.driver.switch_to.alert.accept()
time.sleep(20)

再次运行,可以看到成功地点击了下载按钮,并将所需文件下载到了默认下载路径。

获取下载的文件

现在的测试代码中,文件是自动下载到默认文件夹的。而且使用了确定的时长等待,非常 tricky 。如果下载文件的时间过长,就会在还没有下载完毕时就关闭了浏览器。因此,我们需要用一个更好的方法来获取下载的文件。

在 Selenium 的文档中搜索 Download ,发现一篇文件,大意是讲 Selenium 不推荐进行下载文件的测试,而且也不提供 API 对已下载的文件进行测试。这里我们使用一个 trick 来处理。首先建立一个临时文件夹用于保存下载的文件,然后再轮询这个文件夹,当出现下载好的文件之后,再读取这一文件,并在读好后将临时文件夹删除。

首先要通过 drivercapability 来定义默认的下载文件夹。要想知道这个 capability 如何定义,就需要找到其文档。它的文档可以在这里找到。根据教程,在 setupMethod 中加入以下代码

1
2
3
4
5
6
7
8
self.folder = tempfile.mkdtemp()
print("Temp dir:", self.folder)
prefs = {
"download.default_directory": self.folder
}
options = webdriver.ChromeOptions()
options.add_experimental_option("prefs", prefs)
self.driver = webdriver.Chrome(options=options)

这里使用 tempfile 模块创建一个临时文件夹,然后将下载的默认文件夹设为这一文件夹。

然后在点击了下载按钮之后,使用以下代码来判断是否加载完成:

1
2
3
4
5
self.zip_path = self.folder + "\\PEERNGARecords_Unscaled.zip"
for i in range(120):
time.sleep(1)
if os.path.exists(self.zip_path):
break

再定义一个 read_zip 方法,来读取下载的 zip 文件中的地震波:

1
2
3
4
5
6
7
8
9
def read_zip(self, target_folder=None):
with zipfile.ZipFile(self.zip_path, "f") as f:
nl = f.namelist()
accel_files = []
for n in nl:
if n.startswith("RSN234") and n.endswith(".AT2"):
accel_files.append(n)
for af in accel_files:
f.extract(af, target_folder if target_folder is not None else self.folder)

读取好的地震波可以使用前期编写的 pyearthquake 模块进行处理,这里就不再赘述了。

整理代码

前面的代码是由 Selenium IDE 自动生成的,并且没有加入变量。下面对代码进行整理,不烦使用类来实现了,就直接封装为一个函数。这里不赘述了。整理后的代码见下,需要的用户可以直接下载使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import time
import os
import tempfile
import zipfile

from selenium import webdriver
from selenium.webdriver.common.by import By


def get_peer_accel(user_email, user_pwd, rsas, target_folder):
"""Get accel file from PEER NGA West2 database

Args:
user_email (str): user email to login to PEER
user_pwd (str): user password to login to PEER
rsas (List<int>): list of e/q RSAs
target_folder (path): The folder path to save .AT2 files
"""
# setup temp folder
folder = tempfile.mkdtemp()
print("Temp dir to download peer file:", folder)
# setup options for default download dir
prefs = {
"download.default_directory": folder
}
options = webdriver.ChromeOptions()
options.add_experimental_option("prefs", prefs)
# create driver
driver = webdriver.Chrome(options=options)
driver.implicitly_wait(20)
# Do the job
driver.get("https://ngawest2.berkeley.edu/spectras/new?sourceDb_flag=1")
driver.set_window_size(1550, 929)
driver.find_element(By.ID, "user_email").send_keys(user_email)
driver.find_element(By.ID, "user_password").send_keys(user_pwd)
driver.find_element(By.ID, "user_submit").click()
driver.find_element(By.CSS_SELECTOR, "#buttons > button").click()
driver.find_element(By.ID, "search_search_nga_number").click()
driver.find_element(By.ID, "search_search_nga_number").send_keys(",".join([str(r) for r in rsas]))
driver.find_element(By.ID, "input_box").click()
driver.find_element(By.CSS_SELECTOR, ".peer_nga_spectrum > button").click()
driver.find_element(By.CSS_SELECTOR, ".peer_nga_spectrum:nth-child(6) > button:nth-child(4)").click()
driver.switch_to.alert.accept()
driver.switch_to.alert.accept()
# set the path for the zip file
zip_path = folder + "\\PEERNGARecords_Unscaled.zip"
for _ in range(120):
time.sleep(1)
if os.path.exists(zip_path):
break
# read and extract zip file
with zipfile.ZipFile(zip_path, "r") as f:
nl = f.namelist()
accel_files = []
for rsa in rsas:
for n in nl:
if n.startswith("RSN") and n.endswith(".AT2"):
accel_files.append(n)
for af in accel_files:
f.extract(af, target_folder)
print("Got file " + af)
driver.close()
driver.quit()


if __name__ == "__main__":
USER_EMAIL = "xxxx@xxx.com"
USER_PWD = "xxxxx"
rsas = [11, 12, 13]
target_folder = r"C:\peer_temp"
get_peer_accel(USER_EMAIL, USER_PWD, rsas, target_folder)