二八定律

0x00.前言

最近工作上在做Elasticsearch入库速度的调优,总的来说入库有两个阶段

  • 第一阶段:封装成bulk格式的请求体
  • 第二阶段:调用bulk操作

0x01.测试

1.pandas.dataframe.loc

示例代码如下(精简代码非全部

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def generate_all_content(df_content, timestamp, index_name_type):
all_content = []
index = df_content.index

for i in index:
df_json = df_content.loc[i].to_json()
df_dict = json.loads(df_json)
device_ip = df_dict['router_ip']
interface = df_dict['interface']

id_str = f'{host_name}_{timestamp}_{device_ip}_{interface}_{i}'

content = {
'_op_type': 'create',
'_index': index_str,
'_id': id_str,

'timestamp': timestamp,
'router_ip': df_dict['router_ip'],
'interface': df_dict['interface'],
'direction': df_dict['direction'],
'protocol': df_dict['protocol'],
'collector': host_name,
'dst_ip': df_dict['dst_ip'],
'src_ip': df_dict['src_ip'],
'dst_port': df_dict['dst_port'],
'src_port': df_dict['src_port'],
'dscp': df_dict['dscp'],
'bits': df_dict['bits']
}
all_content.append(content)
return all_content

这块代码实际在做的就是遍历pandasdataframe,按行取出并转化成python的对象,然后拼请求体
经测试,与速度关系最大的一行代码是df_content.loc[i].to_json(),当df_content大小是12w条的时候,耗时结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
ssh://<rm>:22/home/sdn/collector-venv/bin/python3 -u /home/sdn/cxnetflow-collector/test/ES_bulk.py
<bound method DataFrame.count of router_ip interface direction ... protocol dscp bits
0 175952351 29 0 ... 50 0 8000000000
1 175952351 29 0 ... 50 0 8000000000
2 175952351 29 0 ... 50 0 8000000000
3 175952351 29 0 ... 50 0 8000000000
4 175952351 29 0 ... 50 0 8000000000
... ... ... ... ... ... ... ...
119995 175952351 32 0 ... 50 0 8000000000
119996 175952351 32 0 ... 50 0 8000000000
119997 175952351 32 0 ... 50 0 8000000000
119998 175952351 32 0 ... 50 0 8000000000
119999 175952351 32 0 ... 50 0 8000000000

[120000 rows x 10 columns]>
===== Test Start =====
generate_all_content completed in 69.69848251342773s
split_all_content completed in 69.70153331756592s
bulk_load_to_es_by_thread_pool_executor count 12 completed in 95.28210115432739s
二测
===== Test Start =====
generate_all_content completed in 62.697731256484985s
split_all_content completed in 62.69972372055054s
bulk_load_to_es_by_thread_pool_executor count 12 completed in 84.10340571403503s
点击此处 ← 查看折叠

当然因为df_dict里面的内容实际上全都需要,因此可能会想到如果用下面的写法是否会加速
二者的区别在于前者是取值,后者是添加值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for i in index:
df_json = df_content.loc[i].to_json()
df_dict = json.loads(df_json)

df_dict['timestamp'] = timestamp
df_dict['collector'] = host_name

device_ip = df_dict['router_ip']
interface = df_dict['interface']

df_dict['_id'] = f'{host_name}_{timestamp}_{device_ip}_{interface}_{i}'

df_dict['_op_type'] = 'create'
df_dict['_index'] = index_str

all_content.append(df_dict)
return all_content

但是从实际结果来看基本上没有区别

1
2
3
4
5
6
7
8
9
10
[120000 rows x 10 columns]>
===== Test Start =====
generate_all_content completed in 69.1796202659607s
split_all_content completed in 69.1828773021698s
bulk_load_to_es_by_thread_pool_executor count 12 completed in 93.98494386672974s
二测
===== Test Start =====
generate_all_content completed in 60.939237117767334s
split_all_content completed in 60.94171929359436s
bulk_load_to_es_by_thread_pool_executor count 12 completed in 82.97677779197693s

可以用PyCharm运行菜单中的Profile查看详细的调用时间,虽然里面的名称不一定是认识的

配置文件
配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ssh://<rm>:22/home/sdn/collector-venv/bin/python3 -u /root/.pycharm_helpers/profiler/run_profiler.py 0.0.0.0 54982 /home/sdn/cxnetflow-collector/test/ES_bulk.py
Starting cProfile profiler
……
[120000 rows x 10 columns]>
===== Test Start =====
generate_all_content completed in 93.82941246032715s
split_all_content completed in 93.8325982093811s
bulk_load_to_es_by_thread_pool_executor count 12 completed in 119.37008118629456s
Snapshot saved to /tmp/cxnetflow-collector3.pstat
二测
[120000 rows x 10 columns]>
===== Test Start =====
generate_all_content completed in 83.8989508152008s
split_all_content completed in 83.90175294876099s
bulk_load_to_es_by_thread_pool_executor count 12 completed in 108.72677087783813s
Snapshot saved to /tmp/cxnetflow-collector.pstat
统计信息
统计信息

可以看出耗时最长的是这个__get_item__的方法,来自pandas下的/core/index.py文件,循环有2w+

2.pandas.dataframe.iterrows

参考pandas按行按列遍历Dataframe的几种方式,换成迭代的方法取值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for index, row in df_content.iterrows():
df_json = row.to_json()
df_dict = json.loads(df_json)

df_dict['timestamp'] = timestamp
df_dict['collector'] = host_name

device_ip = df_dict['router_ip']
interface = df_dict['interface']

df_dict['_id'] = f'{host_name}_{timestamp}_{device_ip}_{interface}_{index}'

df_dict['_op_type'] = 'create'
df_dict['_index'] = index_str

all_content.append(df_dict)
return all_content

这时结果有了明显的变化,直接减小了20秒左右(虽然昨天测试的时候是减小了30秒左右

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[120000 rows x 10 columns]>
===== Test Start =====
generate_all_content completed in 48.11659097671509s
split_all_content completed in 48.11987543106079s
bulk_load_to_es_by_thread_pool_executor count 12 completed in 76.74760866165161s
二测
===== Test Start =====
generate_all_content completed in 38.85774374008179s
split_all_content completed in 38.86009979248047s
bulk_load_to_es_by_thread_pool_executor count 12 completed in 60.861244201660156s
三测
===== Test Start =====
generate_all_content completed in 37.29048800468445s
split_all_content completed in 37.29290556907654s
bulk_load_to_es_by_thread_pool_executor count 12 completed in 61.13431143760681s

同样,拿Profile再跑一遍

1
2
3
4
5
6
7
8
一测无
二测
[120000 rows x 10 columns]>
===== Test Start =====
generate_all_content completed in 56.07292437553406s
split_all_content completed in 56.07504391670227s
bulk_load_to_es_by_thread_pool_executor count 12 completed in 77.29125237464905s
Snapshot saved to /tmp/cxnetflow-collector1.pstat
统计信息
统计信息

可以看出此时耗时最长的是iterrows,来自pandas下的core/frame.py

3.pandas.DataFrame.apply()

看到网上有说使用apply()方法,如果是CPython的话会有解释器加速的加成?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def generate_each_line(row, timestamp, host_name, index, index_str):
df_json = row.to_json()
df_dict = json.loads(df_json)
# print(df_dict)
df_dict['timestamp'] = timestamp
df_dict['collector'] = host_name

device_ip = df_dict['router_ip']
interface = df_dict['interface']

df_dict['_id'] = f'{host_name}_{timestamp}_{device_ip}_{interface}_{index}'

df_dict['_op_type'] = 'create'
df_dict['_index'] = index_str

for index, row in df_content.iterrows():
df_dict = generate_each_line(row, timestamp, host_name, index, index_str)
all_content.append(df_dict)
return all_content

然后调用

1
all_content = df_content.apply(lambda index, row: generate_each_line(row, timestamp, host_name, index, index_str))

emmm,报错了不知道该怎么同时传递indexrow

4.pandas.Dataframe.values

仍然是网上的说法,取.values[]应该是所有之中最快的,但是此时已经变成了Numpy的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
for i in index:
df_json = df_content.values[i]

device_ip = df_json[0]
interface = df_json[1]

id_str = f'{host_name}_{timestamp}_{device_ip}_{interface}_{i}'

content = {
'_op_type': 'create',
'_index': index_str,
'_id': id_str,

'timestamp': timestamp,
'router_ip': device_ip,
'interface': interface,
'direction': df_json[2],
'protocol': df_json[7],
'collector': host_name,
'dst_ip': df_json[5],
'src_ip': df_json[6],
'dst_port': df_json[3],
'src_port': df_json[4],
'dscp': df_json[8],
'bits': df_json[9]
}
all_content.append(content)
return all_content

但是结果却不尽人意,实测变得巨慢无比,也没有测全部跑完到底需要多少时间

5.pandas.DataFrame.itertuples()

除了iterrows还有一种方法是itertuples,据说这个的速度还会快一点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
i = 0
for row in df_content.itertuples():
device_ip = getattr(row, 'router_ip')
interface = getattr(row, 'interface')

id_str = f'{host_name}_{timestamp}_{device_ip}_{interface}_{i}'

content = {
'_op_type': 'create',
'_index': index_str,
'_id': id_str,

'timestamp': timestamp,
'router_ip': device_ip,
'interface': interface,
'direction': getattr(row, 'direction'),
'protocol': getattr(row, 'protocol'),
'collector': host_name,
'dst_ip': getattr(row, 'dst_ip'),
'src_ip': getattr(row, 'src_ip'),
'dst_port': getattr(row, 'dst_port'),
'src_port': getattr(row, 'src_port'),
'dscp': getattr(row, 'dscp'),
'bits': getattr(row, 'bits')
}
i = i + 1
all_content.append(content)
return all_content

万万没想到的是,结果原地起飞草(快亿

1
2
3
4
5
[120000 rows x 10 columns]>
===== Test Start =====
generate_all_content completed in 1.2253265380859375s
split_all_content completed in 1.2275893688201904s
bulk_load_to_es_by_thread_pool_executor count 12 completed in 23.280734062194824s

担心没有生成正确的内容还特意去看了下返回值,确认是正确的

content
content

惯例跑一遍Profile看看

1
2
3
4
5
6
[120000 rows x 10 columns]>
===== Test Start =====
generate_all_content completed in 1.442192554473877s
split_all_content completed in 1.4444100856781006s
bulk_load_to_es_by_thread_pool_executor count 12 completed in 24.176409482955933s
Snapshot saved to /tmp/cxnetflow-collector2.pstat
配置文件
配置文件

因为耗时过短,没看出来时间花费在了哪里,于是只执行第一阶段再看下

配置文件
配置文件

测试至此就接近尾声了,当然结论自然是用itertuples

0x02.深入

iterrows
iterrows
itertuples
itertuples

可以看出iterrows只调用了一个核心,而itertuples会调用全部核心

未完待续……