在地震工程研究中,需要经常对地震波进行处理。因此开发了 pyearthquake 模块来实现一些基础的功能。该模块定义了三个类: Spectrum Motion Suite ,分别处理反应谱、地震波、地震波组。通常可将一个地震波组 suite 保存为一个文件,以方便管理和共享。

保存文件的过程,即将内存数据转化成可保存于硬盘的数据格式的过程,称为串行化(Serialization)。所形成的文件大小以及读写的时间是串行化效率的主要表征指标。本文建立一个含30条地震波的示例 suite ,讨论以下四种串行化方法:

  • 借助 json 模块生成 JSON 字符串;
  • 借助 struct 模块进行手动二进制编码;
  • 借助 pickle 模块自动生成二进制文件;
  • 借助 sci.io 模块保存为 .mat 格式的二进制文件

对比分析四种串行化方式的文件大小和读写速度,从而评估四种方式的效率,并选出一种作为 pyearthquake 模块默认的串行化方法。

各模块的主要功能介绍

本文为了简便,仅对各模块的主要功能进行介绍,略去了部分属性和方法。

Spectrum 模块

Spectrum 模块主要储存反应谱,只有两个数组,分别是 periodsvalues ,对应横轴周期和纵轴谱加速度。两个属性都是使用 numpyfloat32 格式储存的,精度足够。不记录阻尼比等其他信息。其方法主要有 get() from_accel() 等。这里仅展示构造函数及后面会用到的 get() 方法。 get() 方法的参数可以是一个标量,也可以是一个向量,分别对应输出一个标量或一个向量。识别标量和向量通过 hasattr 函数识别 __iter__ 属性和实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import numpy as np

class Spectrum():
def __init__(self, periods, values):
assert periods[0] == 0.0, "Periods must start from zero."
self.periods = np.array(periods, dtype=np.float32)
self.values = np.array(values, dtype=np.float32)

def get(self, period):
if not hasattr(period, "__iter__"):
i = self.periods.searchsorted(period)
if self.periods[i] == period:
return self.values[i]
elif i >= len(self.periods):
print("Warning: given period exceeds the sampling range.")
return self.values[-1]
else:
return self.values[i] - (self.values[i] - self.values[i - 1]) / (
self.periods[i] - self.periods[i - 1]
) * (self.periods[i] - period)
else:
return np.array([self.get(p) for p in period])

def __call__(self, period):
return self.get(period)

Motion 模块

Motion 模块主要处理地震波,主要有四个基础属性, name 是地震波的名字,dt 是地震波的采样时间间隔, accel 是地震波的加速度值, info 是地震波的其他信息。另外还有一些计算出的属性,比如 pga duration arias_intensity 等。方法有很多,主要进行地震波的处理,如 correct_baseline() trim() 等。这里只展示构造函数

1
2
3
4
5
6
class Motion(object):
def __init__(self, name, dt, accel, info=""):
self.name = name
self.info = info
self.dt = dt
self.accel = np.array(accel, dtype=np.float32)

Suite 模块

Suite 模块存储一组地震波,通常这一组地震波有一个目标反应谱,将这些地震波通过调幅来与目标反应谱拟合,并使一组的地震波的平均反应谱与目标反应谱接近。其构造函数如下。主要储存了一组 Motion 的实例,并对应一组调幅系数 factors 以及一组反应谱 spectrums 。另外还存储一个目标反应谱 target_spectrum 和拟合范围 period_bound 。剩下的变量用于构造一个线性方程组,来优化调幅系数,本文不再赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Suite(object):
_period_sample_number = 200

def __init__(self, target_spectrum, period_bound=None):
self.motions = []
self.factors = np.array([], dtype=np.float32)
self.spectrums = []
self.target_spectrum = target_spectrum
# period bound defaults to the domain
if period_bound is None:
period_bound = [0.0, target_spectrum.periods[-1]]
self.period_bound = period_bound
# create sample points in the period matching boundary
self._period_samples = np.linspace(period_bound[0], period_bound[1], self._period_sample_number)
# create an SOE to optimize the factors
self._spectrum_matrix = None
self._target_vector = target_spectrum.get(self._period_samples)

def set_motions(self, motions, factors=None, spectrums=None, damping=0.05):
self.motions = motions
if factors is None:
self.factors = np.ones([len(motions)]) * 0.01 # trick: make a obviously small initial factor
else:
assert len(factors) == len(motions), "Length of factors and motions are not identical."
self.factors = np.array(factors, dtype=np.float32)
if spectrums is None:
self.spectrums = [Spectrum.from_accel(mo.accel, mo.dt, damping) for mo in motions]
else:
assert len(spectrums) == len(motions), "Length of spectrums and motions are not identical."
self.spectrums = spectrums
self._reset_spectrum_matrix()

def _reset_spectrum_matrix(self):
mat = np.zeros((len(self._period_samples), len(self.motions)))
for i in range(len(self.motions)):
spectrum = self.spectrums[i].get(self._period_samples)
mat[:, i] = spectrum / len(self.motions)
self._spectrum_matrix = mat
self._target_vector = self.target_spectrum(self._period_samples)

读写装饰器

在串行化的过程中,一般会涉及到读写的问题,即将串行化得到的字符串或二进制流写到文件中,或是从文件中读取。但有的时候不一定要写到文件中,而是直接在内存中传递,比如说通过串行化实现深度复制。因此,串行化操作需要一方面生成数据,另一方面读写文件。这里使用两个装饰器 saveable loadable 来分别实现向文件中读写的功能。后面在写串行化函数时,只需要处理数据即可,而不需要再关注对文件的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
## I/O decorator

def saveable(mode="wb"):
def decorator(fn):
def decorated(self, filename=None):
res = fn(self)
if filename is not None:
with open(filename, mode) as f:
f.write(res)
return res
return decorated
return decorator

def loadable(mode="rb"):
def decorator(fn):
def decorated(data=None, filename=None):
if filename is not None:
with open(filename, mode) as f:
data = f.read()
if data is None:
raise ValueError("data is None or cannot be loaded.")
return fn(data)
return decorated
return decorator

四种串行化方式

借助 json 模块保存为 JSON 格式

json 模块为 python 自带的模块,可以将 python 对象转化为 JSON 字符串格式。然后可以使用文本文件来储存。这里对 Motion Spectrum Suite 分别引入 to_json()from_json() 两个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import json

@saveable("w")
def spectrum_to_json(self, filename=None):
return json.dumps({
"periods": self.periods.tolist(),
"values": self.values.tolist(),
})

@loadable("r")
def spectrum_from_json(data=None, filename=None):
it = json.loads(data)
return Spectrum(periods=it["periods"], values=it["values"])

@saveable("w")
def motion_to_json(self, filename=None):
return json.dumps({
"name": self.name,
"info": self.info,
"dt": self.dt,
"accel": self.accel.tolist(),
})

@loadable("r")
def motion_from_json(data=None, filename=None):
it = json.loads(data)
return Motion(name=it["name"], dt=it["dt"], accel=it["accel"], info=it["info"])

@saveable("w")
def suite_to_json(self, filename=None):
return json.dumps({
"motion_strs": [mo.to_json() for mo in self.motions],
"factors": self.factors.tolist(),
"spectrum_strs": [spe.to_json() for spe in self.spectrums],
"period_bound": self.period_bound,
"target_spectrum": self.target_spectrum.to_json(),
})

@loadable("r")
def suite_from_json(data=None, filename=None):
it = json.loads(data)
target_spectrum = Spectrum.from_json(it["target_spectrum"])
su = Suite(target_spectrum = target_spectrum, period_bound=it["period_bound"])
motions = [Motion.from_json(mo_json) for mo_json in it['motion_strs']]
factors = it["factors"]
spectrums = [Spectrum.from_json(spe_json) for spe_json in it["spectrum_strs"]]
su.set_motions(motions, factors, spectrums)
return su

# Bound these methods outside class definition, for this notebook only.
Spectrum.to_json = spectrum_to_json
Spectrum.from_json = staticmethod(spectrum_from_json)
Motion.to_json = motion_to_json
Motion.from_json = staticmethod(motion_from_json)
Suite.to_json = suite_to_json
Suite.from_json = staticmethod(suite_from_json)

注:以上定义为了方便在 jupyter notebook 中运行,使用了在类的定义之外进行绑定的方式。实际中应该在类定义的内部完成。

借助 struct 模块手动生成二进制字符串

struct 模块提供了用于在字节字符串和 python 原生数据类型之间转换函数。这需要对二进制的编码有所了解。对于数字,根据数据类型确定二进制的位数。对于字符串,可以使用 utf-8 来编码。对于数组,先储存数组的形状,再逐个储存数据。

由于 numpyndarray 提供了 tobytes() 方法,调用起来比较方便,所以出现 ndarray 的时候也可使用这一方法来转化成二进制。其结果与使用 structpack 是一样的,效率不得而知,但应该比使用 for 循环的效率高。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import struct

@saveable()
def spectrum_to_bytes(self, filename=None):
return np.hstack([self.periods, self.values]).tobytes()

@loadable()
def spectrum_from_bytes(data=None, filename=None):
dt = np.frombuffer(data, dtype=np.float32)
dlen = len(dt)
return Spectrum(periods=dt[:dlen//2], values=dt[dlen//2:])

@saveable()
def motion_to_bytes(self, filename=None):
data = b""
nlen = len(self.name)
ilen = len(self.info)
data += struct.pack("i", nlen)
data += struct.pack("i", ilen)
data += struct.pack(f"{nlen}s", self.name.encode("utf-8"))
data += struct.pack(f"{ilen}s", self.info.encode("utf-8"))
data += struct.pack("d", self.dt)
data += self.accel.tobytes()
return data

@loadable()
def motion_from_bytes(data=None, filename=None):
nlen, ilen = struct.unpack("ii", data[:8])
data = data[8:]
(namebt,) = struct.unpack(f"{nlen}s", data[:nlen])
data = data[nlen:]
(infobt,) = struct.unpack(f"{ilen}s", data[:ilen])
data = data[ilen:]
(dt,) = struct.unpack("d", data[:8])
data = data[8:]
accel = np.frombuffer(data, dtype=np.float32)
name = namebt.decode("utf-8")
info = infobt.decode("utf-8")
return Motion(name=name, dt=dt, accel=accel, info=info)

@saveable()
def suite_to_bytes(self, filename=None):
mlen = len(self.motions)
motions = [mo.to_bytes() for mo in self.motions]
spectrums = [spe.to_bytes() for spe in self.spectrums]
motion_bt_len = [len(bt) for bt in motions]
spectrum_bt_len = [len(bt) for bt in spectrums]
target_spectrum_bt = self.target_spectrum.to_bytes()
array_len = np.array(motion_bt_len + spectrum_bt_len + [len(target_spectrum_bt)])

data = b""
data += struct.pack("dd", self.period_bound[0], self.period_bound[1])
data += struct.pack("i", mlen)
data += array_len.tobytes()
for mo_bt in motions:
data += mo_bt
for spe_bt in spectrums:
data += spe_bt
data += target_spectrum_bt
data += self.factors.tobytes()
return data

@loadable()
def suite_from_bytes(data=None, filename=None):
period_bound = struct.unpack("dd", data[:16])
data = data[16:]
mlen, = struct.unpack("i", data[:4])
data = data[4:]
lenslen = mlen * 2 + 1
lens = struct.unpack(f"{lenslen}i", data[:lenslen*4])
data = data[lenslen*4:]
motions = []
spectrums = []
for i in range(mlen):
motions.append(Motion.from_bytes(data[:lens[i]]))
data = data[lens[i]:]
lens = lens[mlen:]
for i in range(mlen):
spectrums.append(Spectrum.from_bytes(data[:lens[i]]))
data = data[lens[i]:]
lens = lens[mlen:]
target_spectrum = Spectrum.from_bytes(data[:lens[0]])
data = data[lens[0]:]
factors = np.frombuffer(data, dtype=np.float32)
assert len(factors) == mlen, "Factors length does not match."
suite = Suite(target_spectrum=target_spectrum, period_bound=list(period_bound))
suite.motions = motions
suite.factors = np.array(factors, dtype=np.float32)
suite.spectrums = spectrums
return suite

Spectrum.to_bytes = spectrum_to_bytes
Spectrum.from_bytes = staticmethod(spectrum_from_bytes)
Motion.to_bytes = motion_to_bytes
Motion.from_bytes = staticmethod(motion_from_bytes)
Suite.to_bytes = suite_to_bytes
Suite.from_bytes = staticmethod(suite_from_bytes)

借助 pickle 模块自动生成二进制字符串

pickle 模块是 python 自带的用于串行化的模块,可以将 python 对象直接保存在文件中。

1
2
3
4
5
6
7
8
9
10
11
12
import pickle

@saveable()
def suite_to_pickle(self, filename=None):
return pickle.dumps(self)

@loadable()
def suite_from_pickle(data=None, filename=None):
return pickle.loads(data)

Suite.to_pickle = suite_to_pickle
Suite.from_pickle = staticmethod(suite_from_pickle)

借助 scipy.io 保存为 mat 格式

.mat 格式是 Matlab 的数据格式,保存为这一格式有助于与 Matlab 的快速兼容。 Python 的 scipy.io 中的 loadmat()savemat() 提供了这一格式的读写方法。

值得注意的是, .mat 格式只支持字符串到数组的键值对,而我们的 python 字典相对比较复杂,savemat() 会自动帮我们使用特殊的方法处理这个键值对,但是在读取的时候需要较多地进行处理,非常复杂,这里就不尝试实现了。

1
2
3
4
5
6
from scipy.io import savemat, loadmat

def suite_to_mat(self, filename):
savemat(filename, self.__dict__)

Suite.to_mat = suite_to_mat

示例地震波组

下面以一个示例地震波组为例,来对比不同方法的性能。这个示例地震波组是从 PEER NGA West2 数据库中下载的,并在一定的周期范围内与中国抗震规范中的某反应谱进行匹配调幅所得到。共 30 条地震波。使用 Suite 模块中的 plot_interactive() 方法可以查看这些波的时程和反应谱。如下图所示。这一方法的定义这里不赘述了。

目前这一地震波组是使用手动的二进制编码保存的,可以从文件中读取。这里使用现有版的 pyearthquakeSuite.load() 方法读取。

1
2
3
4
5
6
7
8
9
10
11
12
import pyearthquake as pe

ss = pe.Suite.load("sample.suite")
ss.plot_interactive()

# copy ss to a new Suite instance
target_spectrum = Spectrum(periods=ss.target_spectrum.periods, values=ss.target_spectrum.values)
s = Suite(target_spectrum=target_spectrum, period_bound=ss.period_bound)
motions = [Motion(name=m.name, dt=m.dt, accel=m.accel, info=m.info) for m in ss.motions]
factors = ss.factors
spectrums = [Spectrum(periods=s.periods, values=s.values) for s in ss.spectrums]
s.set_motions(motions=motions, factors=factors, spectrums=spectrums)

一簇地震波的结果

四种方法对比

将内存中的这个 Suite 以上文定义的四种方法保存和读取,并进行对比。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Save time
print("--- Save time for json")
%timeit s.to_json(filename="s1.json")
print("--- Save time for bytes")
%timeit s.to_bytes(filename="s2.bytes")
print("--- Save time for pickle")
%timeit s.to_pickle(filename="s3.pkl")
print("--- Save time for mat")
%timeit s.to_mat(filename="s4.mat")

# Load time
print("--- Load time for json")
%timeit s1 = Suite.from_json(filename="s1.json")
print("--- Load time for bytes")
%timeit s2 = Suite.from_bytes(filename="s2.bytes")
print("--- Load time for pickle")
%timeit s3 = Suite.from_pickle(filename="s3.pkl")

from os.path import getsize
# File size
print("--- File size for json")
print(getsize("s1.json"), "bytes")
print("--- File size for bytes")
print(getsize("s2.bytes"), "bytes")
print("--- File size for pickle")
print(getsize("s3.pkl"), "bytes")
print("--- File size for mat")
print(getsize("s4.mat"), "bytes")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
--- Save time for json
79.1 ms ± 2.7 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
--- Save time for bytes
1.45 ms ± 26 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
--- Save time for pickle
999 µs ± 20.7 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
--- Save time for mat
16 ms ± 281 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
--- Load time for json
110 ms ± 2.67 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
--- Load time for bytes
2.91 ms ± 196 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
--- Load time for pickle
281 µs ± 20.1 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
--- File size for json
2978618 bytes
--- File size for bytes
533810 bytes
--- File size for pickle
589685 bytes
--- File size for mat
549144 bytes

可以看出,使用手动二进制和自动二进制的速度都很快,而 .mat 文件的速度次之, json 文件的速度最慢。保存的文件大小对比中, json 文件最大,是二进制文件的 5 倍。其他二进制文件的大小相差不大。对实验结果进行总结,见下表

文件类型 写入时间 读取时间 文件大小
json 71 ms 110 ms 2979 kB
bytes 1 ms 3 ms 534 kB
pickle 1 ms 0 ms 590 kB
mat 16 ms - 549 kB

对比不同格式的优点和缺点,见下表

文件类型 优点 缺点
json 以字符串保存可以直接阅读;方便在浏览器中查看 文件大,速度慢
bytes 文件小,速度快;二进制规则清晰,可以用其他语言解析 实现较麻烦
pickle 文件小,速度快;实现方便 串行规则不明确,仅 python 友好
mat 可与 Matlab 兼容 数据结构有较大变化,读取困难

最终选择 bytes 格式进行保存。