网络代购货品分析与决策系统的设计与实现
- 网络代购货品分析与决策系统的设计与实现
- 项目地址
- 参考
- 开发日志
- day1 12.25
- day2 12.26
- day3 12.28
- day4 12.29
- day5 12.30
- day6 12.31
- day7 1.2
- day8 1.3
- day9 1.5
- day10 1.6
- day11 1.7
- day12 1.9
- day13 1.11
- day14 2.21
- day14 2.28
- day15 3.20
- day16 3.21
- day17 3.22
- day18 3.23
- day19 3.28
- day20 4.12
- day21 4.24
- day21 5.3
- day22 5.4
- day23 5.5
- day24 5.9
- day24 5.11
- day26 5.14
- day27 5.16
- day28 5.20
考研结束,结果不论,可以开始做毕设了。老规矩边做边写点啥记录。这次想用日记一样的形式记录每日的进度。选题简单分为两部分,爬虫获得数据和时间预测模型处理数据。这个决策系统可能还要引入新的内容,先不管了,一个一个来吧。
项目地址
参考
- 书籍
- 文档
- 博客
开发日志
day1 12.25
先稍微看了下崔庆才的Python3网络爬虫开发实战 第2版,之前学爬虫就看的这本。只不过当时只学了点基础,pyquery获取和解析网页。这次起码要把模拟登录和反爬虫之类的解决。检查发现我的电子书有缺页,难怪到后面书签就乱了。我还以为是书签的问题,zlibrary上的都有缺页,找了本第一版的对照看。
day2 12.26
爬虫有个scrapy挺有名,但我感觉没必要整这么复杂,增加额外的学习负担。先用书上的内容做做看吧,应该够用。
以前直接VSCode做的,因为项目都很简单。这次想用pycharm试试,一是熟悉专业的python开发工具,二是pycharm好像有虚拟环境,我本地的环境已经被我pip得乱七八糟了。就是好久没用pycharm好卡啊(汗)。
day3 12.28
看了看venv。虽然pycharm会帮你自动创建好,但之后可能会需要迁移什么的,有点担心。
关于反爬虫,之前一直没注意。这次看了看,书上主要提到三方面:验证码识别,代理和模拟登录。目前先做模拟登录,其他用到的时候再展开。
看了看好像京东关于模拟登录的教程都过期了,以前好歹有个表单项现在只有一串长字符没法解析,不会要寄了吧。
day4 12.29
试试能不能用自动化测试工具Selenium模拟登录来获取cookie。但这样可能不方便用代理池反爬。不知道爬多了会不会直接封账号。。。而且这样需要用户自己登录,会多出一些步骤。也许可以找些空账号来。
day5 12.30
感觉还是在反爬虫这方面力有未逮。之前用到爬虫时完全没考虑过反爬虫,因为爬的是个很简单的网页,访问频率也不高完全没必要反爬。中心都放在网页数据的获取和解析上,感觉还是得好好学习了解下反爬才行。选定了这本韦世东的Python3反爬虫原理与绕过实战当教材。
day6 12.31
pycharm太卡了,还是放弃了,换回VSCode。
配置venv环境
记录一下venv的基础用法:教程
- 创建虚拟环境:
python -m venv /path/to/new/virtual/environment
- 激活虚拟环境:
venv\Scripts\activate
- 退出虚拟环境:
deactivate
主要用途是在虚拟环境内pip安装软件包,和全局环境分开。
装了Selenium和ChromeDriver。selenium直接pip就行。ChromeDriver则是需要自己下载。[这里]提供旧版下载,而Chrome for Testing availability才能下到最新版。下载后将chromedriver.exe放到venv的Scripts目录下就行。chrome浏览器必须版本对应比较麻烦,如果是Firefox的话可以向下兼容直接下最新版的geckodriver就行。可以通过chromedriver --version
测试是否可用。
使用selenium输入账号密码
京东登录页面的链接为https://passport.jd.com/new/login.aspx
。其中,账号输入框,密码输入框,登录按钮的id分别为loginname
,nloginpwd
,loginsubmit
。
使用selenium输入账号密码:
from selenium import webdriver
from selenium.webdriver.common.by import By
import time
browser = webdriver.Chrome()
browser.get('https://passport.jd.com/new/login.aspx')
username='username'
password='password'
input_username=browser.find_element(By.ID,'loginname')
input_username.send_keys(username)
input_password=browser.find_element(By.ID,'nloginpwd')
input_password.send_keys(password)
button_login=browser.find_element(By.ID,'loginsubmit')
button_login.click()
time.sleep(10)
browser.close()
day7 1.2
在京东输入账号密码并点击登录后,会弹出滑动验证码验证。书上 (Python3网络爬虫开发实战第2版) 第八章介绍了验证码识别的相关内容。其中对于滑块验证码有OpenCV(P298)和深度学习(P304)两种方法。其中OpenCV识别正确率不高,而深度学习训练和维护模型相对复杂。此外还有一个选择就是打码平台,相当于付费把这个问题转接给更专业的商业平台,但我们这个毕设就暂时不考虑了。第八章最后还提到手机验证码的自动化处理,感觉暂时没必要了解。这个相当于在手机上做一个收发转发,但让用户收一下验证码填一下就行没必要再增加开发内容。
OpenCV和深度学习我暂时不知道两者的优劣,就先都做下试试。
day8 1.3
看了看书上 (Python3反爬虫原理与绕过实战) 第二章。主要是2.2浏览器和2.3网络协议,复习了一下HTML DOM,http协议,cookie,WebSocket等内容。我也不知道看这些有什么用,但看看也没事。
记一下cookie相关
Cookie可以理解为在HTTP协议下,服务器或其他脚本语言维护客户端信息的一种方式,是保存在客户端(比如浏览器)的文本文件,Cookie中往往包含客户端或者用户的相关信息。
简单理解,cookie就是一种保存在本地的身份信息。在用户登录后,服务器生成对应cookie保存在响应头的set-cookie
字段中返回给客户端。之后客户端和服务器的通信都会在请求头的cookie
字段中带上这个信息让服务器验证身份,就不用每次登录了。

day9 1.5
韦的书上 (Python3反爬虫原理与绕过实战) 介绍的滑块验证码,一种是缺口位置写在css里,直接从代码中拿到移动距离。一种是先不显示缺口,滑动时再用canvas绘制缺口。这种的解决方法是前后截图再比较像素找出缺口。京东的方法和上面都不一样,是直接放一个带缺口的图片作背景,只能想办法从图片中识别出缺口。崔的书上 (Python3网络爬虫开发实战第2版) 讲的就是这种情况。
做一下Opencv识别滑块验证码。
在用selenium拿图时发现id拿不了,又不想学XPath,就用CSS选择器好了。顺便记下CSS选择器的语法,表来自韦的书P92。
CSS选择器语法规则
获取滑块验证码图片
滑块和背景图为JDJRV-smallimg
和JDJRV-bigimg
类下的<img>
元素,使用css选择器获取对象,urllib.request.urlretrieve
下载图片:
# 获取滑块验证码的滑块和背景图
bigimg_src = browser.find_element(
By.CSS_SELECTOR, ".JDJRV-bigimg img").get_attribute("src") # 背景图
smallimg_src = browser.find_element(
By.CSS_SELECTOR, ".JDJRV-smallimg img").get_attribute("src") # 滑块图
# 命名图片
bigimg='bigimg.png'
smallimg='smallimg.png'
# 下载图片
request.urlretrieve(url=bigimg_src,filename=bigimg)
request.urlretrieve(url=smallimg_src,filename=smallimg)
缺口检测的大致流程如下:
- 高斯滤波 高斯滤波处理减少噪声干扰。
- 边缘检测 使用John F. Canny开发的的多级边缘检测算法canny。OpenCV实现了这个算法。
- 轮廓提取 使用
findContours
方法提取出边缘的轮廓。 - 外接矩形 使用
boundingRect
方法计算轮廓外接矩形,根据面积和周长等判断是不是目标缺口。- 轮廓面积 可以使用
contourAera
方法计算面积。 - 轮廓周长 可以使用
arcLength
方法计算周长。
- 轮廓面积 可以使用
day10 1.6
opencv缺口识别算法
定义实现高斯滤波、边缘检测和轮廓提取的三个方法:
import cv2
GAUSSIAN_BLUR_KERNEL_SIZE = (5, 5) # 高斯内核大小
GAUSSIAN_BLUR_SIGMA_X = 0 # 高斯内核X方向上标准偏差
CANNY_THRESHOLD1 = 200 # 最小判定临界点
CANNY_THRESHOLD2 = 450 # 最大判定临界点
# 高斯滤波
def get_gaussian_blur_image(image):
return cv2.GaussianBlur(src=image,
ksize=GAUSSIAN_BLUR_KERNEL_SIZE,
sigmaX=GAUSSIAN_BLUR_SIGMA_X)
# 边缘检测
def get_canny_image(image):
return cv2.Canny(image=image,
threshold1=CANNY_THRESHOLD1,
threshold2=CANNY_THRESHOLD2)
# 轮廓提取
def get_contours(image):
contours, _ = cv2.findContours(
image=image,
mode=cv2.RETR_CCOMP, # 轮廓检索方式-检测所有的轮廓,只建立两个等级关系,顶层为连通域的外围边界,次层为洞的内层边界
method=cv2.CHAIN_APPROX_SIMPLE # 轮廓近似方式-仅保存轮廓的拐点信息
)
return contours
image_raw = cv2.imread('bigimg.png') # 读取原始图片
image_gaussian_blur = get_gaussian_blur_image(image_raw) # 高斯滤波处理
image_canny = get_canny_image(image_gaussian_blur) # 边缘检测处理
contours = get_contours(image_canny) # 轮廓提取处理
计算轮廓和滑块的面积和周长并比较,若误差不超过允许的误差ALLOWABLE_ERROR
则认定该轮廓为目标缺口。因为求了外接矩形,面积周长我就直接用长宽求了。
ALLOWABLE_ERROR = 0.1 # 允许误差范围为10%
# 计算矩形大小
def get_area(height, width):
area = height*width
return area
# 计算矩形周长
def get_length(height, width):
length = (height+width)*2
return length
image_slide = cv2.imread('smallimg.png') # 读取滑块图片
slide_height, slide_width, _ = image_slide.shape # 获取宽高信息
target_area = get_area(slide_height, slide_width) # 目标面积
target_length = get_length(slide_height, slide_width) # 目标周长
offset = None
for contour in contours: # 遍历轮廓
x, y, w, h = cv2.boundingRect(contour)
contour_area = get_area(h, w) # 轮廓面积
contour_length = get_length(h, w) # 轮廓周长
area_error = abs((contour_area-target_area)/target_area)
length_error = abs((contour_length-target_length)/target_length)
if area_error <= ALLOWABLE_ERROR and length_error <= ALLOWABLE_ERROR:
cv2.rectangle(image_raw, (x, y), (x + w, y + h), (0, 0, 255), 2)
offset = x
return offset
兴致勃勃做完才发现成功率堪忧。在轮廓识别这一步要么就是缺口和背景色太接近识别不出来,要么就是缺口轮廓和背景图轮廓混在一起导致下一步求外接矩形时出错。前者没啥办法,后者可能可以通过调整findContours
的mode
和method
参数解决,我也不确定。可能要换一种检测方法,或者干脆放弃OpenCV转深度学习,但感觉后者可能更容易出错。。。
放一个检测成功的示例:
day11 1.7
准度不够次数来凑,加上了失败重试,不停尝试直到成功。至于判断是否成功的方法,不会写js触发器先用别的顶着,通过“拼接成功”的提示信息判断是否通过验证。
captcha_success = False
while (captcha_success == False):
offset = None
while (offset == None):
refresh_captcha(browser) # 刷新验证码
time.sleep(2)
offset = get_offset(browser) # 获取滑动距离
time.sleep(2)
slide_captcha(browser, offset)
captcha_text = browser.find_element(
By.CLASS_NAME, 'JDJRV-slide-bar-center').text
if (captcha_text == '拼接成功'):
captcha_success = True
发现一个问题,原图大小和渲染大小不一致,根据原图计算的移动距离需要等比换算。此外,滑动时总差一丝,怀疑是小数自动向下取整,所以加一个向上取整。
# 原图大小和渲染大小不一致,需要等比换算。
_, image_width, _ = image_raw.shape # 原图宽度
render_width = 242 # 渲染宽度
offset = math.ceil(offset*(render_width/image_width))
鼠标轨迹模拟
如上图,明明缺口对齐了,却还是没有通过验证。而在尝试手动完成滑块验证码时成功通过了验证。也就是说并不是selenium的原因。这时,我想到了韦的书 (Python3反爬虫原理与绕过) 上提到的一点:鼠标轨迹检测(9.6 P330),于是尝试手动添加随机抖动来模拟人手移动轨迹。
import random
# 滑动验证码
def slide_captcha(browser, offset):
button_slide = browser.find_element(
By.CLASS_NAME, 'JDJRV-slide-btn')
action = webdriver.ActionChains(browser)
action.click_and_hold(button_slide).perform() # 点击并按住
# action.move_by_offset(offset, 0) # 滑动距离
# 添加随机抖动
sum_x = 0
sum_y = 0
while (sum_x < offset):
rand_y = random.randint(-5, 5) # y轴随机抖动
while (sum_y+rand_y >= 30): # 控制y轴移动不超过滑块范围
rand_y = random.randint(-5, 5)
rand_x = random.randint(1, 5) # x轴随机移动
if (offset-sum_x < 5):
rand_x = offset-sum_x
action.move_by_offset(rand_x, rand_y)
sum_x = sum_x+rand_x
sum_y = sum_y+rand_y
# print(rand_x, rand_y, sum_x, sum_y)
action.release().perform() # 松开
虽然成功率不是百分百,但也能骗过轨迹检测成功登录了。
返回Session和cookie
cookies = browser.get_cookies()
print(cookies)
browser.close()
session = requests.session()
for cookie in cookies:
session.cookies.set(cookie['name'], cookie['value'])
return session, cookies
纠错:通过“拼接成功”的提示信息判断是否通过验证不可行,因为通过验证后很快就会自动跳转,读不到这个信息。暂时用try应付一下,因为跳转后读不到图片会报错,在except里break。感觉好草台班子啊这种做法。。。
小改动:加了一个判断缺口正方形的条件。
day12 1.9
因为淘宝京东这些电商平台要看商品详情都需要先登录,这段时间就一直在做模拟登录。做完了才发现详情页里没有历史价格,感觉这段时间白做了。其实也不算完全白做,京东的话不登录连搜索都用不了,淘宝只是看不了详情页,其他电商平台还没试过。
查了下怎么看历史价格,要么是第三方比价网站,要么是商用的查价api,没找到在京东官方页面看历史价格的方法。去爬第三方,先不说数据能不能用,反爬有是另一套手段了。登录是微信扫码,验证方法也不一样。而且爬多了感觉很容易封号,电商平台你多看看很正常,查价网站一次查一大堆就很奇怪了,账号绑定的话代理池也没用。而商用api,虽然查一次就几分几毛钱,但要做机器学习数据量几百条总得有吧,相当于一次就用掉七八块也耗不起。
迷茫了,不知道往哪个方向做,又怕做了后又发现没用全盘推倒。
day13 1.11
才发现selenium自带延时等待功能,这下不需要time.sleep()这种东西了。
对代码进行了修改优化。
day14 2.21
本来只想休息一下,结果一歇就是一个月,拖延症真可怕。
继续吧,直接爬第三方网站好了,从头做就从头做,破罐子破摔。
因为京东的搜索要登录,所以之前做的也还有点用。京东的搜索页是:https://search.jd.com/
,格式为:https://search.jd.com/Search?keyword=【关键词1%20关键词2】
,后面可能还有enc,wq和pvid等参数,实测删去不影响。
淘宝的搜索页为:https://s.taobao.com/
。之前是的时候记得不用登录的,这次有需要了,还要再做淘宝的模拟登录。搜索格式为:https://s.taobao.com/search?q=【关键词1%20关键词2】
,还有别的一堆奇奇怪怪的参数,实测删去不影响。
day14 2.28
救命,完全没状态。
这几天看了看《时间序列分析实战 基于机器学习和统计学》,还是云里雾里。找了一大堆书,但是没法有效消化内容,讲解方向也与做的方向不完全契合,难以直接套用。大概是我有些眼高手低了。
先做点别的找找手感吧,照着之前的做个淘宝的模拟登录。
淘宝登录页:https://login.taobao.com/
淘宝的登录验证需要手机app确认或者验证码,这个就不是selenium能完成的工作了(,到现在也没想好到时候成品登录要不要用用户的账号,要的话就不用弄这么麻烦,不要的话各种验证也难搞,像这个二次验证就不是这边搞得好的。又试了试,只有无痕要二次登录,浏览器是滑块,但是自然语言理解,不是简单的对齐缺口。然后试试selenium,又是个没信息的滑块,但是手动滑都过不了验证。淘宝在这块做的太复杂了,有点想撂担子。
day15 3.20
摸鱼一时爽,一直摸鱼一直爽。要不是马上中期答辩我说不定还会继续摆下去。。。
回到正题,模拟登录因为层出不穷的反爬验证处理起来实在麻烦,为了先搭个架子出来倒是有个取巧的办法:直接让用户自己登录,我们等着拿cookie就行了。
import requests
from selenium import webdriver
import time
def selenium(target_url):
browser = webdriver.Chrome()
browser.get(url=target_url)
time.sleep(30)
cookies = browser.get_cookies()
# print(cookies, type(cookies))
# print(cookies[0], type(cookies[0]))
browser.close()
session = requests.session()
for cookie in cookies:
session.cookies.set(cookie['name'], cookie['value'])
return session, cookies
顺路记一下,selenium得到的cookies的结构就是一个字典的列表,list[dict]
,每一个列表项就是一个cookie。然而requests库的cookie格式只有name和value两项,所以需要转换一下:
# 将 cookie 转换为 requests 库可用的格式
cookie_dict = {cookie['name']: cookie['value'] for cookie in cookies}
还顺便写了个cookie的文件格式存取以备不时之需,虽然程序内直接传参就行。
错误 invalid cookie domain
在测试selenium测试cookie是否可用时出现此错误,因为是先add_cookie
再访问,就报了这个错。查了一下正确的做法是先访问需要登录的地址再设置cookie。
browser2 = webdriver.Chrome()
browser2.get('https://www.jd.com')
for cookie in cookies:
browser2.add_cookie(cookie)
browser2.get('https://search.jd.com/Search?keyword=北通%20手柄')
time.sleep(10)
browser2.close()
day16 3.21
本来想selenium拿到cookie后用requests库进行请求的,但不知道为什么加了cookie还是未登录状态。正好之前selenium使用cookie正常,这些测试转正了。用selenium完成搜索,再把内容交由pyquery解析...好像也没必要,selenium自己又不是不能解析。算了先这样吧。
selenium登录&搜索功能整合为类
整合了登录和搜索功能,这下直接把selenium整成一个类得了。上文提到了搜索的格式,直接把关键字放到url里用空格分割就行,带上cookies信息就能正常搜索了。
from selenium import webdriver
import time
class Selenium:
login_url = ''
index_url = ''
search_url = ''
cookies = list[dict]
def __init__(self, login_url, index_url, search_url) -> None:
self.login_url = login_url
self.index_url = index_url
self.search_url = search_url
self.cookies = list[dict]
def selenium_login(self):
login_url = self.login_url
browser = webdriver.Chrome()
browser.get(url=login_url)
time.sleep(30)
cookies = browser.get_cookies()
browser.close()
# return cookies
self.cookies = cookies
def selenium_search(self, keywords):
index_url = self.index_url
search_url = self.search_url
for keyword in keywords:
search_url = search_url+keyword+'%20' # 空格分割关键字
browser = webdriver.Chrome()
browser.get(index_url)
for cookie in self.cookies:
browser.add_cookie(cookie)
browser.get(search_url)
time.sleep(5)
with open("search_result.html", mode="w", encoding='utf-8', newline='') as f:
f.write(browser.page_source)
browser.close()
if __name__ == '__main__':
selenium_jd = Selenium(login_url='https://passport.jd.com/new/login.aspx',
index_url='https://www.jd.com',
search_url='https://search.jd.com/Search?keyword=')
selenium_jd.selenium_login()
selenium_jd.selenium_search(['北通', '手柄'])
解析还是先别放类里了,不同电商平台页面结构不同解析方法也各异。
再记一些TODO List:
- 登录定时更改,手动登录完成后主动结束等待,可能需要多进程/线程
- 异常处理
- 搜索时启用无界面模式
搜索结果页获取商品id(京东)
从结果页中解析出商品id待用。标识度很高的是J_goodsList
这个类,检索这个类下的li
元素就行,商品id是data-sku
这个参数。
import requests
from pyquery import PyQuery as pq
from selenium_class import Selenium
# index_url_jd = 'https://www.jd.com'
# session, _ = selenium_login.selenium()
# response = session.get(index_url_jd)
# print(response.status_code)
# print(response.url)
# with open("index_jd.html", mode="w", encoding='utf-8', newline='') as f:
# f.write(response.text)
def get_goods_id_jd(keywords):
login_url_jd = 'https://passport.jd.com/new/login.aspx'
index_url_jd = 'https://www.jd.com'
search_url_jd = 'https://search.jd.com/Search?keyword='
selenium_jd = Selenium(login_url_jd, index_url_jd, search_url_jd)
selenium_jd.selenium_login()
selenium_jd.selenium_search(keywords)
# 解析网页数据
with open('search_result.html', 'r', encoding='utf-8') as f:
html = f.read()
doc = pq(html)
goodslist = doc('#J_goodsList li')
id_list = list()
for goods in goodslist.items():
id_list.append(goods.attr('data-sku'))
for id in id_list:
print(id)
day17 3.22
昨天上午和导师聊了聊,找到了不少改进方向,记一下。首先是登录部分,我现在是取巧,弹个框让用户自己登录。就算之前的自动化登录,也是需要一个账号密码来登录的。这就牵扯到一个数据安全的问题,用户怎么会愿意把自己的账号密码输入到一个非官方的软件里呢?我也想过内置一个账号,但在陌生环境登录一般都会要求手机号收验证码,而做这个功能既麻烦又需要服务器,会有额外开销。我初始的设想是做纯客户端不做服务器的,但聊了后服务器这个还是要有,验证码这个功能也得做。现在图方便最后答辩也过不了的,所以还是得做。第二就是后面的时间序列预测,我之前想当然的觉得,把不同时间的价格数据丢到模型里跑,得到一个未来的价格就行。但导师要求更严格,还要加入销量等影响因素,用于支持题目中的“决策”二字,这就有点触及我的知识盲区了,看来得先多看看书。目前看的那本,讲了半天还是没到重点,在犹豫要不要换一本。此外,时间序列销量这个数据也不好拿,电商平台上都只有当前的销量价格,价格上第三方网站上还能找到,销量这数据就是真没人关心了。
电商网站上都只有当前价格,历史价格这个数据藏起来的(不要影响鼠鼠赚钱)。找了下可以在第三方的网站上找到。也有一些商用api。api好用是好用,就是要钱,测试用用没关系,放毕设里正式用肯定被毙。查价网站只需要商品页链接就行,但有点担心大量爬触发反爬,又要增加工作量,先走一步看一步吧。
先列出一些可用的第三方查价网站:
https://tool.manmanbuy.com/HistoryLowest.aspx
这个是一个叫慢慢买的app的查价页,该app也是致力于比价省钱的,还要专门的比价api合作业务,比较专业。https://www.lsjgcx.com/
这个就是一个常见的查价网站,但传播较广,也做了自己的app,看了看挺草率的,没上一个专业。http://www.hisprice.cn/
从个犄角格拉翻出来的,过客网的过客比价。也做了个公众号只不过是跳转网页罢了。
day18 3.23
接上。
先用http://www.hisprice.cn/
这个链接做下试试。虽然可以输链接再submit,但看了看直接加在url后面当参就行了,即:http://www.hisprice.cn/his.php?hisurl=【商品链接】
。
寄。
本来以为数据现成的很好爬,f12看看发现是canvas现画的图,价格放在id=containertooltip
的div里,随着鼠标位置实时更新,落在拐点上才显示价格。现在有两条路,一条图像识别折线图+selenium模拟鼠标落点再爬html里的数据,一条直接想办法从js代码里把数据拿出来。后者折腾半天还是毫无头绪,实在不懂前端,大概只能选前者了。
day19 3.28
又找了找,换了一家叫购物党(gwdang.com)的查价网站,这家就很实诚了。虽然还是画图+鼠标位置检测+动态变更内容,但那也只是方便用户,具体数据在侧边栏写的明明白白,就爬这家好了。
从查价网站获取历史价格(gwdang.com)
这家的搜索页是https://www.gwdang.com/v2/trend?from=search
,可以往input里填数据再post,也可以直接通过链接访问,格式为https://www.gwdang.com/v2/trend/【商品id】-【平台代码】.html
,例如京东的平台代码为3,上面有一件商品id为4979408,对应链接就是https://www.gwdang.com/v2/trend/4979408-3.html。
访问了一下试试,失败了,返回状态码403,尝试添加用户代理(user-agent)。关于这部分内容,详见韦的《Python3反爬虫原理与绕过实战》第4章 信息校验型反爬虫 4.1 User-Agent反爬虫。
User-Agent反爬虫
def get_history_price(goods_id, platform_code):
target_url = 'https://www.gwdang.com/v2/trend/'+goods_id+'-'+platform_code+'.html'
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
headers = {'User-Agent': user_agent}
response = requests.get(target_url, headers=headers)
print(response.status_code)
成功访问,但会直接通过链接访问会弹出验证消息,懒得做验证的话通过搜索页输入商品链接就不会弹验证了。还是回到了selenium上(。
搜索框是<input id="url">
,按钮是<button id="search-button">
用selenium填写后点击按钮。
browser = webdriver.Chrome()
browser.get('https://www.gwdang.com/v2/trend?from=search')
input = browser.find_element(By.CSS_SELECTOR, '#url')
input.send_keys('https://item.jd.com/4979408.html')
button = browser.find_element(By.CSS_SELECTOR, '#search-button')
button.click()
# 等待页面加载完成
WebDriverWait(browser, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, 'promotion-list')))
print(browser.page_source)
还是弹出了验证,猜测是检测到了使用自动化工具,尝试绕过webdriver检测。关于这部分内容,详见韦的《Python3反爬虫原理与绕过实战》第7章 特征识别反爬虫 7.1 webdriver识别。
webdriver识别
Object.defineProperty(navigator, "webdriver", { get: () => false, });
browser = webdriver.Chrome()
browser.get('https://www.gwdang.com/v2/trend?from=search')
# 绕过webdriver验证
script = 'Object.defineProperty(navigator, "webdriver", { get: () => false, });'
browser.execute_script(script=script)
input = browser.find_element(By.CSS_SELECTOR, '#url')
input.send_keys('https://item.jd.com/4979408.html')
button = browser.find_element(By.CSS_SELECTOR, '#search-button')
button.click()
# 等待页面加载完成
WebDriverWait(browser, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, 'promotion-list')))
print(browser.page_source)
PyAutoGui
还是不行。尝试引入pyautogui,做了一部分突然想起来最后还是要做无界面的(headless)到时候pyautogui就没用了。所以可能还是要做验证。留一个残骸,这个也还有个问题,就是它的相对坐标是从页面起点算的,上边的各种栏位都没算进去,但也没必要改了。
import pyautogui
browser = webdriver.Chrome()
browser.get('https://www.gwdang.com/v2/trend?from=search')
input = browser.find_element(By.CSS_SELECTOR, '#url')
# input.send_keys('https://item.jd.com/4979408.html')
button = browser.find_element(By.CSS_SELECTOR, '#search-button')
# button.click()
# 获取相对坐标
location_input = input.location
location_button = button.location
print(location_button)
# 转换为绝对坐标
window_position = browser.get_window_position()
print(window_position)
abs_location_input = {'x': window_position['x']+location_input['x'],
'y': window_position['y']+location_input['y']}
abs_location_button = {'x': window_position['x']+location_button['x'],
'y': window_position['y']+location_button['y']}
print(abs_location_button)
# pyautogui操作
pyautogui.moveTo(abs_location_input['x'],
abs_location_input['y'], duration=0.5)
pyautogui.click()
pyautogui.typewrite('https://item.jd.com/4979408.html')
pyautogui.moveTo(abs_location_button['x'],
abs_location_button['y'], duration=0.5)
pyautogui.click()
如果手动点的话是不会弹验证的,selenium点就会,键盘上enter一下也不行。大概是鼠标轨迹检测吧,之前的技术积累用上了。
# 设定动作链
action = webdriver.ActionChains(browser)
x_offset = 40
action.move_to_element_with_offset(input, -x_offset, 0)
sumx = 0
while (sumx < x_offset):
if (sumx < x_offset):
action.move_by_offset(2, 0)
sumx += 2
action.perform()
action.click()
action.perform()
没点到。。。看不到鼠标也不知道哪里出问题,换个方法。
手动测试发现虽然直接tab enter不行,但tab绕一圈回来再enter就可以。这样就绕过了麻烦的爬虫检测了。
input.send_keys(goods_url)
# 设定动作链
action = webdriver.ActionChains(browser)
# 跳过第一次
action.send_keys(Keys.TAB).perform()
time.sleep(0.2)
action.send_keys(Keys.TAB).perform()
time.sleep(0.2)
# 循环一次
while (button != browser.switch_to.active_element):
action.send_keys(Keys.TAB).perform()
time.sleep(0.2)
action.send_keys(Keys.ENTER).perform()
开始爬价格数据。直接定位<ul class="promotion-list">
,查找子元素<li class="promotion-item">
,每一项记为一天的数据,日期为<div class="head"><span class="date">
,价格为<p class="ymj"><span>
。
import requests
from pyquery import PyQuery as pq
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import selenium.common.exceptions as SE
target_url = 'https://www.gwdang.com/v2/trend/4979408-3.html'
search_url = 'https://www.gwdang.com/v2/trend?from=search'
index_url = 'https://www.gwdang.com/'
# user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
# headers = {'User-Agent': user_agent}
# response = requests.get(search_url, headers=headers)
# print(response.status_code)
browser = webdriver.Chrome()
browser.get('https://www.gwdang.com/v2/trend?from=search')
# 绕过webdriver验证
script = 'Object.defineProperty(navigator, "webdriver", { get: () => false, });'
browser.execute_script(script=script)
# 等待页面加载完成
WebDriverWait(browser, 10).until(
EC.presence_of_element_located((By.ID, 'url')))
input = browser.find_element(By.CSS_SELECTOR, '#url')
input.send_keys('https://item.jd.com/4979408.html')
button = browser.find_element(By.CSS_SELECTOR, '#search-button')
# button.click()
# 等待页面加载完成
WebDriverWait(browser, 15).until(
EC.presence_of_element_located((By.CLASS_NAME, 'promotion-item')))
# with open("price_result.html", mode="w", encoding='utf-8', newline='') as f:
# f.write(browser.page_source)
# with open('price_result.html', 'r', encoding='utf-8') as f:
# html = f.read()
# doc = pq(html)
doc = pq(browser.page_source)
datalist = doc('.promotion-list .promotion-item')
datalist = datalist.items()
id_list = list(dict())
d = dict()
for data in datalist:
d['date'] = data.find('.date').text() # 日期
d['price'] = data.find('.ymj span').text()[1:] # 价格
id_list.append(d)
print(d['date']+'\t'+d['price'])
day20 4.12
好想把日期删掉(捂脸),摸鱼也得有个限度吧。
做下数据库,本来想做无服务端,就选了SQLite这个内置的轻量数据库。但好像又得做服务端了,懒得改先就这样吧。
稍微康了康,发现自己sql忘的差不多了,边做边复习吧。
老样子先建个类,各种方法都封装进去,主程序只需要传参就行。
day21 4.24
deadline来临,大踏步进入时序预测模型。从头学应该来不及,先读点代码吧。
装装环境库,pandas,sklearn,tensorflow都整上。需要注意的是,sklearn的包名现在是scikit-learn
,pip sklearn会报错。因为这几个包都挺大的,建议使用镜像站:-i https://pypi.tuna.tsinghua.edu.cn/simple
。
了解到informer做长序列比较好,可以考虑加入进来。还有就是之前一直考虑的价格单标签,应该至少加上销量做成多标签。
day21 5.3
感觉数据太将就了,用pandas搞下数据清洗吧。
pandas数据清洗
首先原始数据的日期没有年份,手动给它加上去:
from datetime import datetime
year = datetime.now().year # 获取当前年份
pre = 13
for i in range(df.shape[0]):
mouth, _ = df.loc[i, 'Date'].split('-')
if (int(mouth) > pre):
year -= 1
pre = int(mouth)
df.loc[i, 'Date'] = pd.to_datetime(
df.loc[i, 'Date']+'-'+str(year), format='%m-%d-%Y')
df['Date'] = pd.to_datetime(df['Date'])
df['Date'] = df['Date'].dt.date
这样就从03-27变成了2024-03-27,避免排序时搞乱了。
再是去重,很简单一句话:
df.drop_duplicates(inplace=True)
然后就是填补缺失值,这一步有点难理解,最好看看文档里的例子来理解各函数的作用。首先创建一个连续的日期索引date_range
,然后set_index('Date')
把日期当索引,reindex(date_range)
重设索引,这样没有的日期也有一行了,数据是NA,.fillna(method='ffill')
前向填充数据,空的日期就用前一天的价格填上了,最后.reset_index()
把索引改回去。这里Date列名变成index了,用rename改回去。
# 用前一天的数据填补空缺日期
df.sort_values('Date', inplace=True) # 排序
df.reset_index(drop=True, inplace=True) # 重置索引
# 创建一个连续日期的索引
date_range = pd.date_range(start=df['Date'].min(), end=df['Date'].max())
# 用date列当索引 | 用date_range重设索引 | 前向填充空数据 | 重设索引
df = df.set_index('Date').reindex(
date_range).fillna(method='ffill').reset_index()
df = df.rename(columns={'index': 'Date'}) # 将新增的列名'index'改回'Date'
print(df.head)
最后存下来,训练模型时再读取。
path = './data/processed_data/'
df.to_csv(path+filename, index=False)
day22 5.4
验证码转发什么的,还是搞下试试吧。
SmsForwarder转发验证码
在手机上用SmsForwarder的webhook转发验证码信息,就不造轮子了。
给出一个简单的配置示例:
【此处应有图】
服务器用flask简单搭一下。因为是测试,直接用的内网ip,双方连同一个局域网。也可以用 ngork
这样的工具内网穿透,这里就不用了。
from flask import Flask, request, jsonify
app = Flask(__name__)
# 测试用,实际使用需要更改主机与端口号
host = '172.17.148.62'
port = '8000'
@app.route('/')
def index():
return 'Hello, World!'
if __name__ == '__main__':
app.run(port=port, host=host, debug=True)
加一个接收验证码的路由,从里面解析出接收验证码的手机号和验证码,存到账号池数据库中。爬虫从数据库里拿收到的验证码,感觉有点多此一举,如果用RabbitMQ、Kafka等消息队列直接通信或许更方便,可惜我不会,而且账号池数据库也算有需要吧。
# 获取SmsForwarder转发的验证码
@app.route('/receive_sms', methods=['POST'])
def receive_sms():
# 获取并解析请求中的数据
data = request.get_json()
print(data)
message = data.get('msg')
if message:
# 处理短信内容,通过正则匹配6位数字验证码
pattern = r"【京东】(\d{6})"
match = re.search(pattern, message)
verification_code = match.group(1)
# 正则匹配接收信息的号码
pattern = r"\+86(\d{11})"
match = re.search(pattern, message)
mobile_number = match.group(1)
print(mobile_number, verification_code)
db = sql_class.SQLiteTool('accounts.db')
update_sql = "UPDATE accounts SET verification_code = ? WHERE mobile_number = ?"
db.update_data(
update_sql, (verification_code, mobile_number))
return jsonify({"status": "success", "message": "SMS received"}), 200
else:
return jsonify({"status": "error", "message": "No message found in the request"}), 400
SQLite数据库方法封装
数据库使用SQLite,写个轮子先,封装好各个交互方法:
import sqlite3
class SQLiteTool:
def __init__(self, db_name):
"""
初始化数据库连接
:param db_name: 数据库文件名
"""
self.conn = sqlite3.connect(db_name)
self.cursor = self.conn.cursor()
def create_table(self, table_sql):
"""
创建表
:param table_sql: 创建表的SQL语句
:return: None
"""
try:
self.cursor.execute(table_sql)
self.conn.commit()
print("Table created successfully.")
except Exception as e:
print(f"Error creating table: {e}")
def insert_data(self, insert_sql, data_tuple):
"""
插入数据
:param insert_sql: 插入数据的SQL语句模板
:param data_tuple: 插入数据的元组
:return: None
"""
try:
self.cursor.execute(insert_sql, data_tuple)
self.conn.commit()
print("Data inserted successfully.")
except Exception as e:
print(f"Error inserting data: {e}")
def query_data(self, query_sql):
"""
查询数据
:param query_sql: 查询数据的SQL语句
:return: 查询结果
"""
try:
self.cursor.execute(query_sql)
rows = self.cursor.fetchall()
return rows
except Exception as e:
print(f"Error querying data: {e}")
return None
def update_data(self, update_sql, data_tuple):
"""
更新数据
:param update_sql: 更新数据的SQL语句模板
:param data_tuple: 更新数据的元组
:return: None
"""
try:
self.cursor.execute(update_sql, data_tuple)
self.conn.commit()
print("Data updated successfully.")
except Exception as e:
print(f"Error updating data: {e}")
def delete_data(self, delete_sql, data_tuple=None):
"""
删除数据
:param delete_sql: 删除数据的SQL语句模板
:param data_tuple: 可选的元组,用于填充SQL中的占位符
:return: None
"""
try:
if data_tuple:
self.cursor.execute(delete_sql, data_tuple)
else:
self.cursor.execute(delete_sql)
self.conn.commit()
print("Data deleted successfully.")
except Exception as e:
print(f"Error deleting data: {e}")
def close_connection(self):
"""
关闭数据库连接
:return: None
"""
if self.conn:
self.conn.close()
print("Connection closed.")
账号池数据库
建一个账号池数据库,包含手机号(mobile_number,主键),验证码(verification_code)和占用状态(occupied)三列。爬虫和接收转发的验证码的服务器就通过数据库交互。爬虫首先从账号池中筛出空闲的账号,用这个账号请求验证码并标记占用,服务器接收到转发的验证码后存入验证码。爬虫检测到验证码变化后取出验证码并解除占用。
# 账号池数据库初始化
def accounts_db_init():
# 账号池
account_list = [
'',
''
]
# 创建数据库
db = SQLiteTool("accounts.db")
create_table_sql = '''
CREATE TABLE IF NOT EXISTS accounts (
mobile_number TEXT PRIMARY KEY,
verification_code TEXT NOT NULL,
occupied BOOLEAN NOT NULL
);
'''
db.cursor.execute("DROP TABLE IF EXISTS accounts")
db.create_table(create_table_sql)
# 存入账号
insert_sql = "INSERT INTO accounts VALUES (?, ?, ?)"
for account in account_list:
db.insert_data(insert_sql, (account, '000000', False))
# 关闭连接
db.close_connection()
然后就是爬虫中,和短信验证码相关的部分。用这部分替换掉原来的账号密码登录。
# SQL:从账号池中找出一个可用账号
db = SQLiteTool('accounts.db')
query_sql = "SELECT * FROM accounts WHERE occupied = False"
accounts = db.query_data(query_sql)
if (len(accounts)):
print('忙碌中,请稍候再试')
return False
update_sql = "UPDATE accounts SET occupied = ? WHERE mobile_number = ?"
db.update_data(update_sql, (True, accounts[0][0])) # 标记该账号使用中
div_sms_login = browser.find_element(By.ID, 'sms-login')
div_sms_login.click() # 切换到短信验证码登录
input_mobile_number = browser.find_element(By.ID, 'mobile-number')
input_mobile_number.send_keys(accounts[0][0])
button_send_code = browser.find_element(By.ID, 'send-sms-code-btn')
button_send_code.click() # 发送验证码
# 每0.5秒检查一次是否收到验证码,等待5秒
cnt = 10
verification_code = ''
while (cnt):
time.sleep(0.5)
cnt -= 1
query_sql = "SELECT * FROM accounts WHERE mobile_number = accounts[0][0]"
verification_code = db.query_data(query_sql)[1]
if (verification_code != accounts[0][1]):
break
update_sql = "UPDATE accounts SET occupied = ? WHERE mobile_number = ?"
db.update_data(update_sql, (False, accounts[0][0])) # 标记该账号空闲
db.close_connection()
if (verification_code == accounts[0][1]):
print('请求验证码超时')
return False
input_sms_code = browser.find_element(By.ID, 'sms-code')
input_sms_code.send_keys(verification_code) # 输入验证码
button_login = browser.find_element(By.ID, "sms-login-submit")
button_login.click() # 等待登录
day23 5.5
建议:selenium版本回退
发现一个bug,selenium启动chromedriver时,总是会先访问一个网址,等到超时报错再正常启动浏览器。而这个网址是github.io域名没挂梯一般都访问不了,所以每次启动都要等半天等它超时了才能正常启动浏览器,而且这个访问对正常使用完全没影响,大概是检查更新之类的吧。想禁用这个访问有不知道怎么做,查了查回退版本到4.5.0就不会有这个问题了。
完整报错信息如下:
Exception managing chrome: error sending request for url (https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json)
day24 5.9
没空写日志,忙的头掉。写文档好麻烦,都没空敲代码。
day24 5.11
简单记一下这段时间完成的工作吧,没空详细写了。其实也没做啥,写文档去了。现在的主要任务也是改文档。其实我代码都没写完。。。
果然还是写代码好玩,抽个空写写,明天又要交文档。
前端改了改,用jinja2的模板继承功能简化了一下。
为了实现登录引入flask-login,方便操作数据库又引入Flask-SQLAlchemy,层层加码,变得越来越臃肿了。
Flask-SQLAlchemy不知道为什么就是不能正确连接到数据库,弃用了。正常写sql命令就行。
flask-login实现账号管理
写一下用户数据库初始化,在初始化时就插入一个管理员账号:
def users_db_init():
# 创建数据库
db = SQLiteTool("users.db")
create_table_sql = '''
CREATE TABLE IF NOT EXISTS users (
uid INTEGER PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
password TEXT NOT NULL,
email NOT NULL UNIQUE
);
'''
db.create_table(create_table_sql)
insert_sql = "INSERT INTO users VALUES (?, ?, ?,?)"
db.insert_data(insert_sql, (0, 'admin', 'admin', 'admin'))
# 关闭连接
db.close_connection()
配置一下flask-login,用这个库去管理登录状态。
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
login_manager.login_message = '请先登录以访问此页面。'
login_manager.login_message_category = 'Access denied.'
app.config['SECRET_KEY'] = os.urandom(16).hex()
定义用户类:
# 用户模型
class User(UserMixin):
db_path='users.db'
def __init__(self, uid, username, password, email):
self.uid = uid
self.username = username
self.password = password
self.email = email
@classmethod
def get_by_id(cls, uid):
db = sql_class.SQLiteTool(cls.db_path)
query_sql = "SELECT * FROM users WHERE uid = " + str(uid)
user_data = db.query_data(query_sql)
db.close_connection()
if user_data:
return cls(*user_data[0])
return None
@classmethod
def get_by_username(cls, username):
db = sql_class.SQLiteTool(cls.db_path)
query_sql = "SELECT * FROM users WHERE username = " + \
f'\'{str(username)}\''
user_data = db.query_data(query_sql)
db.close_connection()
if user_data:
return cls(*user_data[0])
return None
@classmethod
def get_by_email(cls, email):
db = sql_class.SQLiteTool(cls.db_path)
query_sql = "SELECT * FROM users WHERE email = " + \
f'\'{str(email)}\''
user_data = db.query_data(query_sql)
db.close_connection()
if user_data:
return cls(*user_data[0])
return None
@classmethod
def create_user(cls, username, password, email):
db = sql_class.SQLiteTool(cls.db_path)
query_sql = "SELECT MAX(uid) FROM users"
max_uid = db.query_data(query_sql)[0][0]
uid = max_uid+1
insert_sql = "INSERT INTO users VALUES (?, ?, ?, ?)"
db.insert_data(insert_sql, (uid, username, password, email))
db.close_connection()
return cls(uid, username, password, email)
def get_id(self):
return str(self.uid)
# 用户加载回调
@login_manager.user_loader
def load_user(uid):
return User.get_by_id(uid)
登录功能实现:
# 登录
@app.route('/login', methods=['POST'])
def login():
username = request.form['username']
password = request.form['password']
user = User.get_by_username(username=username)
if user and user.password == password:
login_user(user)
return redirect(url_for('index')) # 登录成功后重定向
else:
session['error_msg'] = '用户名或密码错误'
return redirect(url_for('error'))
注册功能实现:
# 注册
@app.route('/register', methods=['POST'])
def register():
username = request.form['username']
email = request.form['email']
password = request.form['password']
confirm_password = request.form['confirm_password']
if password != confirm_password:
session['error_msg'] = '两次密码不匹配'
return redirect(url_for('error'))
user1 = User.get_by_username(username=username)
user2 = User.get_by_email(email=email)
if user1 or user2:
session['error_msg'] = '用户名或邮箱已被使用'
return redirect(url_for('error'))
# 创建新用户
new_user = User.create_user(username, password, email)
# 注册完成直接登录
login_user(new_user)
return redirect(url_for('index'))
day26 5.14
今天代码截止,但还没写完,寄。主要把之前的后端和最近做的前端匹配起来。
selenium页面滚动
京东搜索页有些商品信息需要滚动到下面才会正常加载,所以加个页面滚动。
# 模仿用户缓慢滚动到页面底部
last_height = 0
while True:
# 滚动200px
browser.execute_script("window.scrollBy(0,300);")
# 等待页面加载新内容
time.sleep(0.5)
# 计算新的文档高度,如果不再变化,则跳出循环
height = browser.execute_script(
"return document.documentElement.scrollTop || window.pageYOffset || document.body.scrollTop;")
if height == last_height:
break
last_height = height
print(height)
time.sleep(1)
selenium设置无头模式
from selenium.webdriver.chrome.options import Options
opt.add_argument("--headless") # 无头
self.driver = webdriver.Chrome(options=opt) #使用无头模式打开
self.driver.get(url)
今天测试时突然发现我爬数据的购物党要登录了,离交代码只剩几小时,紧急改,最后做了一个很糙的补丁交上去了,也不知道后面还能不能改。。。还有好多功能没实现。。。
day27 5.16
在保存预测结果到SQLite数据库是发现predict_price被存为了BLOB二进制格式而不是REAL浮点数格式,而actual_price一切正常。在一番排查后终于确定是因为predict_price的格式为numpy.float32而actual_price的格式为numpy.float64,SQLite只能正确处理64位的浮点数,所以要把32位的predict_price转为64位。
predict_price = predict_price.astype(np.float64)
day28 5.20
应该算是结束了。随后改了点啥,没空记也记不清了,总之就是些边边角角的修改。总结一下,只有前期慢悠悠的做爬虫的部分算是真正学了东西,后面的LSTM,Flask以及前端等都是赶鸭子上架,能跑就行。做出来的东西,因为缺少规划,也是东一榔头西一棒槌,没有一个清晰明了的结构。最开始本来是想做好封装和模块化的,写着写着就驾驭不住了。还是缺少项目开发和管理的经验,下次加油吧。
(完)
- 网络代购货品分析与决策系统的设计与实现
- 项目地址
- 参考
- 开发日志
- day1 12.25
- day2 12.26
- day3 12.28
- day4 12.29
- day5 12.30
- day6 12.31
- day7 1.2
- day8 1.3
- day9 1.5
- day10 1.6
- day11 1.7
- day12 1.9
- day13 1.11
- day14 2.21
- day14 2.28
- day15 3.20
- day16 3.21
- day17 3.22
- day18 3.23
- day19 3.28
- day20 4.12
- day21 4.24
- day21 5.3
- day22 5.4
- day23 5.5
- day24 5.9
- day24 5.11
- day26 5.14
- day27 5.16
- day28 5.20