写在前面
学习数据采集,先转载下来,之后在学习
这次的爬虫是关于房价信息的抓取,目的在于练习10万以上的数据处理及整站式抓取。
数据量的提升最直观的感觉便是对函数逻辑要求的提高,针对Python的特性,谨慎的选择数据结构。以往小数据量的抓取,即使函数逻辑部分重复,I/O请求频率密集,循环套嵌过深,也不过是1~2s的差别,而随着数据规模的提高,这1~2s的差别就有可能扩展成为1~2h。
因此对于要抓取数据量较多的网站,可以从两方面着手降低抓取信息的时间成本。
1)优化函数逻辑,选择适当的数据结构,符合Pythonic的编程习惯。例如,字符串的合并,使用join()要比“+”节省内存空间。
2)依据I/O密集与CPU密集,选择多线程、多进程并行的执行方式,提高执行效率。
一、获取索引
包装请求request,设置超时timeout
1 # 获取列表页面
2 def get_page(url):
3 headers = {
4 'User-Agent': r'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
5 r'Chrome/45.0.2454.85 Safari/537.36 115Browser/6.0.3',
6 'Referer': r'http://bj.fangjia.com/ershoufang/',
7 'Host': r'bj.fangjia.com',
8 'Connection': 'keep-alive'
9 }
10 timeout = 60
11 socket.setdefaulttimeout(timeout) # 设置超时
12 req = request.Request(url, headers=headers)
13 response = request.urlopen(req).read()
14 page = response.decode('utf-8')
15 return page
一级位置:区域信息
二级位置:板块信息(根据区域位置得到板块信息,以key_value对的形式存储在dict中)
以dict方式存储,可以快速的查询到所要查找的目标。-> {'朝阳':{'工体','安贞','健翔桥'......}}
三级位置:地铁信息(搜索地铁周边房源信息)
将所属位置地铁信息,添加至dict中。 -> {'朝阳':{'工体':{'5号线','10号线' , '13号线'},'安贞','健翔桥'......}}
对应的url:http://bj.fangjia.com/ershoufang/--r-%E6%9C%9D%E9%98%B3%7Cw-5%E5%8F%B7%E7%BA%BF%7Cb-%E6%83%A0%E6%96%B0%E8%A5%BF%E8%A1%97
解码后的url:http://bj.fangjia.com/ershoufang/--r-朝阳|w-5号线|b-惠新西街
根据url的参数模式,可以有两种方式获取目的url:
1)根据索引路径获得目的url
1 # 获取房源信息列表(嵌套字典遍历)
2 def get_info_list(search_dict, layer, tmp_list, search_list):
3 layer += 1 # 设置字典层级
4 for i in range(len(search_dict)):
5 tmp_key = list(search_dict.keys())[i] # 提取当前字典层级key
6 tmp_list.append(tmp_key) # 将当前key值作为索引添加至tmp_list
7 tmp_value = search_dict[tmp_key]
8 if isinstance(tmp_value, str): # 当键值为url时
9 tmp_list.append(tmp_value) # 将url添加至tmp_list
10 search_list.append(copy.deepcopy(tmp_list)) # 将tmp_list索引url添加至search_list
11 tmp_list = tmp_list[:layer] # 根据层级保留索引
12 elif tmp_value == '': # 键值为空时跳过
13 layer -= 2 # 跳出键值层级
14 tmp_list = tmp_list[:layer] # 根据层级保留索引
15 else:
16 get_info_list(tmp_value, layer, tmp_list, search_list) # 当键值为列表时,迭代遍历
17 tmp_list = tmp_list[:layer]
18 return search_list
2)根据dict信息包装url
{'朝阳':{'工体':{'5号线'}}}
参数:
—— r-朝阳
—— b-工体
—— w-5号线
组装参数:http://bj.fangjia.com/ershoufang/--r-朝阳|w-5号线|b-工体
1 # 根据参数创建组合url
2 def get_compose_url(compose_tmp_url, tag_args, key_args):
3 compose_tmp_url_list = [compose_tmp_url, '|' if tag_args != 'r-' else '', tag_args, parse.quote(key_args), ]
4 compose_url = ''.join(compose_tmp_url_list)
5 return compose_url
二、获取索引页最大页数
1 # 获取当前索引页面页数的url列表
2 def get_info_pn_list(search_list):
3 fin_search_list = []
4 for i in range(len(search_list)):
5 print('>>>正在抓取%s' % search_list[i][:3])
6 search_url = search_list[i][3]
7 try:
8 page = get_page(search_url)
9 except:
10 print('获取页面超时')
11 continue
12 soup = BS(page, 'lxml')
13 # 获取最大页数
14 pn_num = soup.select('span[class="mr5"]')[0].get_text()
15 rule = re.compile(r'\d+')
16 max_pn = int(rule.findall(pn_num)[1])
17 # 组装url
18 for pn in range(1, max_pn+1):
19 print('************************正在抓取%s页************************' % pn)
20 pn_rule = re.compile('[|]')
21 fin_url = pn_rule.sub(r'|e-%s|' % pn, search_url, 1)
22 tmp_url_list = copy.deepcopy(search_list[i][:3])
23 tmp_url_list.append(fin_url)
24 fin_search_list.append(tmp_url_list)
25 return fin_search_list
三、抓取房源信息Tag
这是我们要抓取的Tag:
['区域', '板块', '地铁', '标题', '位置', '平米', '户型', '楼层', '总价', '单位平米价格']
1 # 获取tag信息
2 def get_info(fin_search_list, process_i):
3 print('进程%s开始' % process_i)
4 fin_info_list = []
5 for i in range(len(fin_search_list)):
6 url = fin_search_list[i][3]
7 try:
8 page = get_page(url)
9 except:
10 print('获取tag超时')
11 continue
12 soup = BS(page, 'lxml')
13 title_list = soup.select('a[class="h_name"]')
14 address_list = soup.select('span[class="address]')
15 attr_list = soup.select('span[class="attribute"]')
16 price_list = soup.find_all(attrs={"class": "xq_aprice xq_esf_width"}) # select对于某些属性值(属性值中间包含空格)无法识别,可以用find_all(attrs={})代替
17 for num in range(20):
18 tag_tmp_list = []
19 try:
20 title = title_list[num].attrs["title"]
21 print(r'************************正在获取%s************************' % title)
22 address = re.sub('\n', '', address_list[num].get_text())
23 area = re.search('\d+[\u4E00-\u9FA5]{2}', attr_list[num].get_text()).group(0)
24 layout = re.search('\d[^0-9]\d.', attr_list[num].get_text()).group(0)
25 floor = re.search('\d/\d', attr_list[num].get_text()).group(0)
26 price = re.search('\d+[\u4E00-\u9FA5]', price_list[num].get_text()).group(0)
27 unit_price = re.search('\d+[\u4E00-\u9FA5]/.', price_list[num].get_text()).group(0)
28 tag_tmp_list = copy.deepcopy(fin_search_list[i][:3])
29 for tag in [title, address, area, layout, floor, price, unit_price]:
30 tag_tmp_list.append(tag)
31 fin_info_list.append(tag_tmp_list)
32 except:
33 print('【抓取失败】')
34 continue
35 print('进程%s结束' % process_i)
36 return fin_info_list
四、分配任务,并行抓取
对任务列表进行分片,设置进程池,并行抓取。
1 # 分配任务
2 def assignment_search_list(fin_search_list, project_num): # project_num每个进程包含的任务数,数值越小,进程数越多
3 assignment_list = []
4 fin_search_list_len = len(fin_search_list)
5 for i in range(0, fin_search_list_len, project_num):
6 start = i
7 end = i+project_num
8 assignment_list.append(fin_search_list[start: end]) # 获取列表碎片
9 return assignment_list
1 p = Pool(4) # 设置进程池
2 assignment_list = assignment_search_list(fin_info_pn_list, 3) # 分配任务,用于多进程
3 result = [] # 多进程结果列表
4 for i in range(len(assignment_list)):
5 result.append(p.apply_async(get_info, args=(assignment_list[i], i)))
6 p.close()
7 p.join()
8 for result_i in range(len(result)):
9 fin_info_result_list = result[result_i].get()
10 fin_save_list.extend(fin_info_result_list) # 将各个进程获得的列表合并
通过设置进程池并行抓取,时间缩短为单进程抓取时间的3/1,总计时间3h。
电脑为4核,经过测试,任务数为3时,在当前电脑运行效率最高。
五、将抓取结果存储到excel中,等待可视化数据化处理
1 # 存储抓取结果
2 def save_excel(fin_info_list, file_name):
3 tag_name = ['区域', '板块', '地铁', '标题', '位置', '平米', '户型', '楼层', '总价', '单位平米价格']
4 book = xlsxwriter.Workbook(r'C:\Users\Administrator\Desktop\%s.xls' % file_name) # 默认存储在桌面上
5 tmp = book.add_worksheet()
6 row_num = len(fin_info_list)
7 for i in range(1, row_num):
8 if i == 1:
9 tag_pos = 'A%s' % i
10 tmp.write_row(tag_pos, tag_name)
11 else:
12 con_pos = 'A%s' % i
13 content = fin_info_list[i-1] # -1是因为被表格的表头所占
14 tmp.write_row(con_pos, content)
15 book.close()
附上源码
1 #! -*-coding:utf-8-*-
2 # Function: 房价调查
3 # Author:蘭兹
4
5 from urllib import parse, request
6 from bs4 import BeautifulSoup as BS
7 from multiprocessing import Pool
8 import re
9 import lxml
10 import datetime
11 import cProfile
12 import socket
13 import copy
14 import xlsxwriter
15
16
17 starttime = datetime.datetime.now()
18
19 base_url = r'http://bj.fangjia.com/ershoufang/'
20
21
22 test_search_dict = {'昌平': {'霍营': {'13号线': 'http://bj.fangjia.com/ershoufang/--r-%E6%98%8C%E5%B9%B3|w-13%E5%8F%B7%E7%BA%BF|b-%E9%9C%8D%E8%90%A5'}}}
23
24 search_list = [] # 房源信息url列表
25 tmp_list = [] # 房源信息url缓存列表
26 layer = -1
27
28
29 # 获取列表页面
30 def get_page(url):
31 headers = {
32 'User-Agent': r'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
33 r'Chrome/45.0.2454.85 Safari/537.36 115Browser/6.0.3',
34 'Referer': r'http://bj.fangjia.com/ershoufang/',
35 'Host': r'bj.fangjia.com',
36 'Connection': 'keep-alive'
37 }
38 timeout = 60
39 socket.setdefaulttimeout(timeout) # 设置超时
40 req = request.Request(url, headers=headers)
41 response = request.urlopen(req).read()
42 page = response.decode('utf-8')
43 return page
44
45
46 # 获取查询关键词dict
47 def get_search(page, key):
48 soup = BS(page, 'lxml')
49 search_list = soup.find_all(href=re.compile(key), target='')
50 search_dict = {}
51 for i in range(len(search_list)):
52 soup = BS(str(search_list[i]), 'lxml')
53 key = soup.select('a')[0].get_text()
54 value = soup.a.attrs['href']
55 search_dict[key] = value
56 return search_dict
57
58
59 # 获取房源信息列表(嵌套字典遍历)
60 def get_info_list(search_dict, layer, tmp_list, search_list):
61 layer += 1 # 设置字典层级
62 for i in range(len(search_dict)):
63 tmp_key = list(search_dict.keys())[i] # 提取当前字典层级key
64 tmp_list.append(tmp_key) # 将当前key值作为索引添加至tmp_list
65 tmp_value = search_dict[tmp_key]
66 if isinstance(tmp_value, str): # 当键值为url时
67 tmp_list.append(tmp_value) # 将url添加至tmp_list
68 search_list.append(copy.deepcopy(tmp_list)) # 将tmp_list索引url添加至search_list
69 tmp_list = tmp_list[:layer] # 根据层级保留索引
70 elif tmp_value == '': # 键值为空时跳过
71 layer -= 2 # 跳出键值层级
72 tmp_list = tmp_list[:layer] # 根据层级保留索引
73 else:
74 get_info_list(tmp_value, layer, tmp_list, search_list) # 当键值为列表时,迭代遍历
75 tmp_list = tmp_list[:layer]
76 return search_list
77
78
79 # 获取房源信息详情
80 def get_info_pn_list(search_list):
81 fin_search_list = []
82 for i in range(len(search_list)):
83 print('>>>正在抓取%s' % search_list[i][:3])
84 search_url = search_list[i][3]
85 try:
86 page = get_page(search_url)
87 except:
88 print('获取页面超时')
89 continue
90 soup = BS(page, 'lxml')
91 # 获取最大页数
92 pn_num = soup.select('span[class="mr5"]')[0].get_text()
93 rule = re.compile(r'\d+')
94 max_pn = int(rule.findall(pn_num)[1])
95 # 组装url
96 for pn in range(1, max_pn+1):
97 print('************************正在抓取%s页************************' % pn)
98 pn_rule = re.compile('[|]')
99 fin_url = pn_rule.sub(r'|e-%s|' % pn, search_url, 1)
100 tmp_url_list = copy.deepcopy(search_list[i][:3])
101 tmp_url_list.append(fin_url)
102 fin_search_list.append(tmp_url_list)
103 return fin_search_list
104
105
106 # 获取tag信息
107 def get_info(fin_search_list, process_i):
108 print('进程%s开始' % process_i)
109 fin_info_list = []
110 for i in range(len(fin_search_list)):
111 url = fin_search_list[i][3]
112 try:
113 page = get_page(url)
114 except:
115 print('获取tag超时')
116 continue
117 soup = BS(page, 'lxml')
118 title_list = soup.select('a[class="h_name"]')
119 address_list = soup.select('span[class="address]')
120 attr_list = soup.select('span[class="attribute"]')
121 price_list = soup.find_all(attrs={"class": "xq_aprice xq_esf_width"}) # select对于某些属性值(属性值中间包含空格)无法识别,可以用find_all(attrs={})代替
122 for num in range(20):
123 tag_tmp_list = []
124 try:
125 title = title_list[num].attrs["title"]
126 print(r'************************正在获取%s************************' % title)
127 address = re.sub('\n', '', address_list[num].get_text())
128 area = re.search('\d+[\u4E00-\u9FA5]{2}', attr_list[num].get_text()).group(0)
129 layout = re.search('\d[^0-9]\d.', attr_list[num].get_text()).group(0)
130 floor = re.search('\d/\d', attr_list[num].get_text()).group(0)
131 price = re.search('\d+[\u4E00-\u9FA5]', price_list[num].get_text()).group(0)
132 unit_price = re.search('\d+[\u4E00-\u9FA5]/.', price_list[num].get_text()).group(0)
133 tag_tmp_list = copy.deepcopy(fin_search_list[i][:3])
134 for tag in [title, address, area, layout, floor, price, unit_price]:
135 tag_tmp_list.append(tag)
136 fin_info_list.append(tag_tmp_list)
137 except:
138 print('【抓取失败】')
139 continue
140 print('进程%s结束' % process_i)
141 return fin_info_list
142
143
144 # 分配任务
145 def assignment_search_list(fin_search_list, project_num): # project_num每个进程包含的任务数,数值越小,进程数越多
146 assignment_list = []
147 fin_search_list_len = len(fin_search_list)
148 for i in range(0, fin_search_list_len, project_num):
149 start = i
150 end = i+project_num
151 assignment_list.append(fin_search_list[start: end]) # 获取列表碎片
152 return assignment_list
153
154
155 # 存储抓取结果
156 def save_excel(fin_info_list, file_name):
157 tag_name = ['区域', '板块', '地铁', '标题', '位置', '平米', '户型', '楼层', '总价', '单位平米价格']
158 book = xlsxwriter.Workbook(r'C:\Users\Administrator\Desktop\%s.xls' % file_name) # 默认存储在桌面上
159 tmp = book.add_worksheet()
160 row_num = len(fin_info_list)
161 for i in range(1, row_num):
162 if i == 1:
163 tag_pos = 'A%s' % i
164 tmp.write_row(tag_pos, tag_name)
165 else:
166 con_pos = 'A%s' % i
167 content = fin_info_list[i-1] # -1是因为被表格的表头所占
168 tmp.write_row(con_pos, content)
169 book.close()
170
171
172 if __name__ == '__main__':
173 file_name = input(r'抓取完成,输入文件名保存:')
174 fin_save_list = [] # 抓取信息存储列表
175 # 一级筛选
176 page = get_page(base_url)
177 search_dict = get_search(page, 'r-')
178 # 二级筛选
179 for k in search_dict:
180 print(r'************************一级抓取:正在抓取【%s】************************' % k)
181 url = search_dict[k]
182 second_page = get_page(url)
183 second_search_dict = get_search(second_page, 'b-')
184 search_dict[k] = second_search_dict
185 # 三级筛选
186 for k in search_dict:
187 second_dict = search_dict[k]
188 for s_k in second_dict:
189 print(r'************************二级抓取:正在抓取【%s】************************' % s_k)
190 url = second_dict[s_k]
191 third_page = get_page(url)
192 third_search_dict = get_search(third_page, 'w-')
193 print('%s>%s' % (k, s_k))
194 second_dict[s_k] = third_search_dict
195 fin_info_list = get_info_list(search_dict, layer, tmp_list, search_list)
196 fin_info_pn_list = get_info_pn_list(fin_info_list)
197 p = Pool(4) # 设置进程池
198 assignment_list = assignment_search_list(fin_info_pn_list, 2) # 分配任务,用于多进程
199 result = [] # 多进程结果列表
200 for i in range(len(assignment_list)):
201 result.append(p.apply_async(get_info, args=(assignment_list[i], i)))
202 p.close()
203 p.join()
204 for result_i in range(len(result)):
205 fin_info_result_list = result[result_i].get()
206 fin_save_list.extend(fin_info_result_list) # 将各个进程获得的列表合并
207 save_excel(fin_save_list, file_name)
208 endtime = datetime.datetime.now()
209 time = (endtime - starttime).seconds
210 print('总共用时:%s s' % time)
总结:
当抓取数据规模越大,对程序逻辑要求就愈严谨,对python语法要求就越熟练。如何写出更加pythonic的语法,也需要不断学习掌握的。
推荐阅读《编写高质量代码 改善Python程序的91个建议》
大家可以尝试抓取一下,分析一下房价的走势也是蛮有意思的ლ(^o^ლ)
欢迎交流,转载请注明出处~ (^ _ ^)/~~
更多python爬虫实例,请访问:http://www.landsblog.com/blog/category/pachong
转载链接:http://www.cnblogs.com/Lands-ljk/p/5467236.html