URLLIB使用
urlopen
urlopen构造方法
urlopen(
url,
data=None,
[timeout,]*,
cafile=None,
capath=None,
cadefault=False,
context=None
)
urlopen模拟POST提交表单
import urllib.parse
import urllib.request
url="http://httpbin.org/post"
data=bytes(urllib.parse.urlencode({'word':'hello'}),encoding='utf-8')
response=urllib.request.urlopen(url,data=data,timeout=1)
print(response.read().decode('utf-8'))
Request
urllib.request.Request构造方法
urllib.request.Request(
url,
data=None,
headers={},
origin_req_host=None,
unverifiable=False,
method=None
)
简单操作(修改UA)
import urllib.request
url="http://httpbin.org/post"
headers={
'User-Agent':'***'
}
request=urllib.request.Request(url=url,headers=headers,method='POST')
response=urllib.request.urlopen(request)
print(response.read().decode('utf-8'))
Request使用代理
from urllib import request,error
if __name__ == '__main__':
url = "http://httpbin.org"
proxy = {'http': '218.75.102.198:8000'}
proxy_handler = request.ProxyHandler(proxy)
opener = request.build_opener(proxy_handler)
request.install_opener(opener)
try:
rsp = request.urlopen(url)
print(rsp.status)
except error.URLError as e:
print(e)
except Exception as e:
print(e)
报错可能是代理的问题
认证登陆
- 1、HTTPPasswordMgrWithDefaultRealm()实例化一个账号密码管理对象
- 2、add_password()添加账号密码
- 3、HTTPBasicAuthHandler()得到Hander
- Opener.open发起请求...
import urllib.request
url="http://tieba.baidu.com"
user='test_user'
password='test_password'
pwdmgr=urllib.request.HTTPPasswordMgrWithDefaultRealm()
pwdmgr.add_password(None,url,user,password)
auth_handler=urllib.request.HTTPBasicAuthHandler(pwdmgr)
opener=urllib.request.build_opener(auth_handler)
response=opener.open(url)
print(response.read().decode('utf-8'))
好像不大行...
Cookie设置
- 用CookieJar实例化一个Cookie对象
- HTTPCookieProcessor构造Handler对象
- 再opens
import http.cookiejar
import urllib.request
url="http://tieba.baidu.com"
fileName='cookie.txt'
cookie=http.cookiejar.CookieJar()
handler=urllib.request.HTTPCookieProcessor(cookie)
opener=urllib.request.build_opener(handler)
response=opener.open(url)
f=open(fileName,'a')
for item in cookie:
f.write(item.name+"="+item.value+'\n')
f.close()
请求Cookie并保存文件
HTTPResponse
包含属性:msg,version,status,reason,debuglevel,closed
包含函数:read(),readinto(),getheader(name),getheaders(),fileno()
错误解析
URLError
import urllib.request
import urllib.error
url="http://www.google.com"
try:
response=urllib.request.urlopen(url)
except urllib.error.URLError as e:
print('reason: ' + e.reason + '\n')
print(response.read().decode('utf-8'))
不知道为什么返回不了reason。。。
import urllib.request
import urllib.error
url="http://www.google.com"
try:
response=urllib.request.urlopen(url)
except urllib.error.URLError as e:
# print('code: ' + e.code + '\n')
print(e.reason)
# print('headers: ' + e.headers + '\n')
这个在断网时成功了...
HTTPError
import urllib.request
import urllib.error
url="http://www.google.com"
try:
response=urllib.request.urlopen(url)
except urllib.error.HTTPError as e:
print('code: ' + e.code + '\n')
print('reason: ' + e.reason + '\n')
print('headers: ' + e.headers + '\n')
print(response.read().decode('utf-8'))
也是返回不了...
Request库
request.get()
res=requests.get(url,params,**kwargs)
import requests
r=requests.get("http://www.baidu.com")
print(r.status_code)
print(r.encoding)
print(r.apparent_encoding)
print(r.text)
异常处理
import requests
try:
r=requests.get("http://www.baidu.com",timeout=30)
r.raise_for_status()
r.encoding=r.apparent_encoding
print(r.text)
except:
print("Wrong!")
request.head()
import requests
r=requests.head("http://www.baidu.com",timeout=30)
print(r.headers)
request.post()
import requests
payload={"key1":"value1","key2":"value2"}
header={"User-Agent":"***"}
r=requests.post("http://httpbin.org/post",data=payload,headers=header)
print(r.text)
r=requests.post("http://httpbin.org/post",data='helloworld')
print(r.text)
其他操作
request 设置代理
import requests
proxies = {
"http": "http://218.75.102.198:8000",
"https": "https://218.75.102.198:8000",
}
response = requests.get("http://www.baidu.com", proxies=proxies,verify=False)
print(response.status_code)
RE正则
re.match:从头开始匹配
re.search:返回第一个成功的匹配
参数:(pattern,string,flags=0),pattern中的r表示特殊字符不转义
group返回每个括号包裹的匹配值
re.sub:字符串替换
re.complie:编译正则表达式,其方法findall可以找出所有的匹配
XPATH
Try1.爬取阳光电影html
import requests
headers={
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:98.0) Gecko/20100101 Firefox/98.0'
}
url='http://www.ygdy8.com/index.html'
print(headers)
req=requests.get(url,headers=headers)
req.encoding='gbk'
print(req.text)
Try2.百度搜索关键字提交
import requests
try:
get={'wd':'python'}
r=requests.get('http://www.baidu.com/s',params=get)
r.raise_for_status()
r.encoding=r.apparent_encoding
print(r.text)
except:
print('Failed')
AJAX
简介
在不刷新全部页面的情况下通过Ajax实现异步加载数据,实现数据的局部更新
基本原理
1.发送请求
创建XMLHttpRequest
var xmlhttp;
if (window.XMLHttpRequest)
{
// IE7+, Firefox, Chrome, Opera, Safari 浏览器执行代码
xmlhttp=new XMLHttpRequest();
}
else
{
// IE6, IE5 浏览器执行代码
xmlhttp=new ActiveXObject("Microsoft.XMLHTTP");
}
一般GET有传输长度限制
而POST更慢
2.解析请求
根据xmlhttp的response来渲染
3.渲染网页
如document.getElementById().innerHTML
Ajax方法分析
Ajax请求类型为xhr
Python模拟请求数据
以https://data.variflight.com/analytics/CodeQuery为例
Network选项卡下找到airportCode,查看Headers下的信息可以发现请求链接为RequestURL,方法为POST


Preview里为返回结果

import requests
import json
def get_data(key,page):
url='https://data.variflight.com/analytics/Codeapi/airportCode'
data={
"key":key,
"page":page
}
res=requests.request("post",url,data=data)
return res.text
def get_parse(data):#解析并返回JSON字符串
return json.loads(data)
data=get_data('杭州',0)
apt_info=get_parse(data)
print(apt_info["data"])
试图去爬了微博
import requests
import json
headers = {
'Host': 'm.weibo.cn',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36',
'X-Requested-With': 'XMLHttpRequest',
}
def get_data():
data={
'containerid': '102803',
'openApp': '0',
'sinceid': '1'
}
url='https://m.weibo.cn/api/container/getIndex'
res=requests.request(method='GET',url=url,params=data,headers=headers)
return res.text
def get_parse(data):#解析并返回JSON字符串
return json.loads(data)
data=get_data()
apt_info=get_parse(data)
for i in range(1,10):
print(apt_info['data']['cards'][i]['mblog']['text'])
动态渲染页面爬取
selenium
可以获取浏览器当前呈现的网页源代码,做到可见即可爬
selenium定位方法
可以通过id,name,class name,tag name,link text,partial link text,xpath,css selector来定位
selenium控制浏览器操作
设置浏览器大小
from selenium import webdriver
browser = webdriver.Chrome()
browser.get('http://www.baidu.com')
print("480*800")
browser.set_window_size(480,800)
控制浏览器前进后退刷新
from selenium import webdriver
import time
browser = webdriver.Chrome()
browser.set_page_load_timeout(2)# 设置页面加载超时
browser.set_script_timeout(2)# 设置页面异步js执行超时
first_url="http://www.baidu.com"
print("now access %s"%(first_url))
browser.get(first_url)
time.sleep(2)
try:
second_url="http://news.baidu.com"
print("now access %s"%(second_url))
browser.get(second_url)
except:
print("Too Slow")
time.sleep(2)
print("back to %s"%(first_url))
browser.back()
browser.refresh()# 页面刷新
time.sleep(2)
try:
print("forward to %s"%(second_url))
browser.forward()
except:
print("Too Slow")
一些常用方法
clear清楚文本
send_keys(value)模拟按键输入
click()模拟单击
submit()提交(应用范围没有click那个广泛)
size返回尺寸
text获取元素文本
get_attribute(name):获得属性值
is_displayed():设置该元素是否用户可见
from selenium import webdriver
import time
browser = webdriver.Chrome()
browser.set_page_load_timeout(2)
browser.set_script_timeout(2)
first_url="http://www.baidu.com"
print("now access %s"%(first_url))
browser.get(first_url)
time.sleep(2)
search=browser.find_element_by_id("kw")
search.send_keys("Try")
time.sleep(2)
search.clear()
search.send_keys("Go")
time.sleep(2)
#search.submit()
Click=browser.find_element_by_id("su")
print(Click.get_attribute("value"))
Click.click()
鼠标键盘事件
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
import time
driver = webdriver.Chrome()
driver.get("http://www.baidu.com")
time.sleep(2)
above=driver.find_element(by="id", value="s-usersetting-top")
ActionChains(driver).move_to_element(above).perform()
#设置鼠标悬浮
好像因为设置在span里。。所以不能直接find_element_by_link_text
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
import time
driver = webdriver.Chrome()
driver.get("http://www.baidu.com")
time.sleep(2)
news=driver.find_element(by="link text",value="新闻")
action = ActionChains(driver)
action.move_to_element(news)
action.context_click()
action.perform()
send_keys(Keys.BACK_SPACE) 删除键(BackSpace)
send_keys(Keys.SPACE) 空格键(Space)
send_keys(Keys.TAB) 制表键(Tab)
send_keys(Keys.ESCAPE) 回退键(Esc)
send_keys(Keys.ENTER) 回车键(Enter)
send_keys(Keys.CONTROL,'a') 全选(Ctrl+A)
send_keys(Keys.CONTROL,'c') 复制(Ctrl+C)
send_keys(Keys.CONTROL,'x') 剪切(Ctrl+X)
send_keys(Keys.CONTROL,'v') 粘贴(Ctrl+V)
send_keys(Keys.F1) 键盘 F1
send_keys(Keys.F12) 键盘 F12
send_keys(Keys.LEFT)光标左移
send_keys(Keys.SHIFT, Keys.RIGHT)光标选中右移
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
import time
driver = webdriver.Chrome()
driver.get("http://www.baidu.com")
time.sleep(2)
kw=driver.find_element(by="id",value="kw")
kw.send_keys("seleniumm")
kw.send_keys(Keys.BACK_SPACE)
kw.send_keys(Keys.LEFT*3)
time.sleep(2)
kw.send_keys(Keys.SHIFT,Keys.RIGHT*3)
获取断言信息
# coding=utf-8
from selenium import webdriver
import time
driver = webdriver.Chrome()
driver.get("http://bing.com")
print("--------搜索以前--------")
title=driver.title
print(title)
now_url=driver.current_url
print(now_url)
kw=driver.find_element(by="class name",value="sb_form_q")
kw.send_keys("nba")
kw.submit()
time.sleep(1)
print("--------搜索以后--------")
title=driver.title
print(title)
now_url=driver.current_url
print(now_url)
num=driver.find_element(by="id",value="b_tween")
print(num.text)
还不是很会find_element_by_class_name...
设置元素等待
显式等待
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
driver=webdriver.Chrome()
driver.get("https://www.baidu.com")
element=WebDriverWait(driver,5,0.5).until(
EC.presence_of_element_located((By.ID,"kw"))
)
element.send_keys('selenium')
element.submit()
WebDriverWait(driver,timeout,poll_frequency=0.5,ignored_exceptions=None)
一般与until()和until_not()配合使用(直到返回值为True或False)
隐式等待
不针对页面上的某个元素进行等待
脚本执行到某个元素定位时,可以定位则执行,否则不断判定位
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.common.by import By
from time import ctime
driver=webdriver.Chrome()
driver.implicitly_wait(10)
driver.get("https://www.baidu.com")
try:
print(ctime())
driver.find_element(by=By.ID,value="kw")
except NoSuchElementException as e:
print(e)
finally:
print(ctime())
driver.quit()
多表单切换
留坑。。。还没有弄出来。。。
下拉框选择
from selenium import webdriver
from selenium.webdriver.support.select import Select
from selenium.webdriver.common.by import By
from time import sleep
driver=webdriver.Chrome()
driver.get("https://cn.bing.com/account/general?ru=https%3a%2f%2fcn.bing.com%2f&FORM=O2HV46&sh=0")
sleep(1)
sel=driver.find_element(by=By.XPATH,value="//select[@id='rpp']")
Select(sel).select_by_value("15")
还不是很会弄XPath。。。
调用JS代码
from selenium import webdriver
from time import sleep
driver=webdriver.Chrome()
driver.get("http://www.baidu.com")
driver.set_window_size(500,500)
sleep(3)
js="window.scrollTo(100,450);"
driver.execute_script(js)
sleep(3)
driver.execute_script()
窗口截图
driver.get_screenshot_as_file
无头模式
加入代码:
from selenium.webdriver.chrome.options import Options
chrome_options=Options()
chrome_options.add_argument('--headless')
driver = webdriver.Chrome(options=chrome_options)
不会XPath感觉很糟糕。。。明天学一下
Splash
不知道有什么用。。。
一些脚本
function main(splash, args)
splash:go("https://www.taobao.com")
splash:wait(0.5)
local title=splash:evaljs("document.title")
return {title=title}
end
试图模拟登陆CSDN
Failed。。。
代理的设置与使用
URLLIB设置代理
import urllib.request
url="http://httpbin.org/get"
proxies={
'http':'http://106.54.128.253:999'
}
proxy_support=urllib.request.ProxyHandler(proxies)
opener=urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
try:
response=urllib.request.urlopen(url,timeout=1)
print(response.read().decode('utf-8'))
except urllib.error.URLError as e:
print(e)
Requests设置代理
import requests
url="http://httpbin.org/get"
proxies={
'http':'http://106.54.128.253:999'
}
s = requests.session()
s.keep_alive = False
response=requests.get(url=url,proxies=proxies,timeout=1)
print(response.text)
之前实验的时候出现了Max retries exceeded with url错误
解决方法:
1. 增加重试连接次数
requests.adapters.DEFAULT_RETRIES = 5
2. 关闭多余的连接
requests使用了urllib3库,默认的http connection是keep-alive的,requests设置False关闭。
操作方法:
s = requests.session()
s.keep_alive = False
Selenium代理设置
from selenium import webdriver
chromeOptions=webdriver.ChromeOptions()
chromeOptions.add_argument("--proxy-server=http://106.54.128.253:999")
chromeOptions.add_argument("--headless")
browser=webdriver.Chrome(chrome_options=chromeOptions)
browser.get("http://httpbin.org/get")
print(browser.page_source)
browser.quit()
多个IP?
多个IP随机切换的操作:
import random
iplist=['xxx','xxxx']
proxies={'http':random.choice(iplist)}
代理池
构建代理池
import requests
from bs4 import BeautifulSoup
def get_ips():
url="https://www.kuaidaili.com/free/"
header={
'User-Agent':"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.60 Safari/537.36",
}
res=requests.get(url,headers=header)
bs=BeautifulSoup(res.text,'html.parser')
res_list=bs.find_all('tr');
ip_list=[]
for link in res_list:
tds=link.find_all('td')
if tds:
ip_list.append({"ip:":tds[0].text,"port:":tds[1].text})
return ip_list
ip_list=get_ips()
for item in ip_list:
print(item)
验证代理池
import socket
import requests
from bs4 import BeautifulSoup
def get_ips():
url="https://www.kuaidaili.com/free/"
header={
'User-Agent':"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.60 Safari/537.36",
}
res=requests.get(url,headers=header)
bs=BeautifulSoup(res.text,'html.parser')
res_list=bs.find_all('tr');
ip_list=[]
for link in res_list:
tds=link.find_all('td')
if tds:
ip_list.append({"ip":tds[0].text,"port":tds[1].text})
return ip_list
def ip_pool():
socket.setdefaulttimeout(2)
ip_list=get_ips()
for x in ip_list:
print(x)
print("="*50)
ip_pool_list=[]
id=0;
print(len(ip_list))
for x in ip_list:
id=id+1;
print(id)
proxy=x["ip"]+":"+x["port"]
proxies={'http':proxy}
try:
res=requests.get("http://www.baidu.com",proxies=proxies,timeout=2)
ip_pool_list.append(proxy)
except Exception as ex:
continue
return ip_pool_list
ip=ip_pool()
for x in ip:
print(x)
使用代理池
import socket
import time
import random
import requests
from bs4 import BeautifulSoup
def get_ips():
url="https://www.kuaidaili.com/free/"
header={
'User-Agent':"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.60 Safari/537.36",
}
res=requests.get(url,headers=header)
bs=BeautifulSoup(res.text,'html.parser')
res_list=bs.find_all('tr');
ip_list=[]
for link in res_list:
tds=link.find_all('td')
if tds:
ip_list.append({"ip":tds[0].text,"port":tds[1].text})
return ip_list
def ip_pool():
socket.setdefaulttimeout(2)
ip_list=get_ips()
for x in ip_list:
print(x)
print("="*50)
ip_pool_list=[]
id=0;
print(len(ip_list))
for x in ip_list:
id=id+1;
print(id)
proxy=x["ip"]+":"+x["port"]
proxies={'http':proxy}
try:
res=requests.get("http://www.baidu.com",proxies=proxies,timeout=2)
ip_pool_list.append(proxy)
except Exception as ex:
continue
return ip_pool_list
iplist=ip_pool()
for T in range(1,11):
proxies={'http':random.choice(iplist)}
response=requests.get("http://httpbin.org/get",proxies=proxies)
print(response.text)
time.sleep(1)
动态拨号
留坑吧。。。
验证码识别
OCR简单识别
import pytesseract
from PIL import Image
print(pytesseract.image_to_string(Image.open("code.jpg"),lang="eng",config="--psm 7"))
一些预处理
二值化变色
def test(path):
img=Image.open(path)
w,h=img.size
for x in range(w):
for y in range(h):
r,g,b=img.getpixel((x,y))
if 0<=r<=100 and 0<=g<=100 and 0<=b<=100:
img.putpixel((x,y),(255,255,255))
if 150<=r<=255 and 0<=g<=50 and 0<=b<=50:
img.putpixel((x,y),(0,0,0))
img=img.convert('L').point([0]*150+[1]*(256-150),'1')
return img
降噪
t2val={}
# G:图像二值化阈值
def twoValue(image,G):
for y in range(0,image.size[1]):
for x in range(0,image.size[0]):
g=image.getpixel((x,y))
if g>G :
t2val[(x,y)]=1
else:
t2val[(x,y)]=0
# N:降噪率 0<N<8
# Z:降噪次数
def clearNoise(image,N,Z):
for i in range(0,Z):
t2val[(0,0)]=1
t2val[(image.size[0]-1,image.size[1]-1)]=1
for x in range(1,image.size[0]-1):
for y in range(1,image.size[1]-1):
nearDots=0
L=t2val[(x,y)]
if L==t2val[(x-1,y-1)]:
nearDots+=1
if L==t2val[(x-1,y)]:
nearDots+=1
if L==t2val[(x-1,y+1)]:
nearDots+=1
if L==t2val[(x,y-1)]:
nearDots+=1
if L==t2val[(x,y+1)]:
nearDots+=1
if L==t2val[(x+1,y-1)]:
nearDots+=1
if L==t2val[(x+1,y)]:
nearDots+=1
if L==t2val[(x+1,y+1)]:
nearDots+=1
if nearDots<N:
t2val[(x,y)]=1
存图并识别
def saveImage(filename,size):
image=Image.new("1",size)
draw=ImageDraw.Draw(image)
for x in range(0,size[0]):
for y in range(0,size[1]):
draw.point((x,y),t2val[(x,y)])
image.save(filename)
def recognize_captcha(img_path):
im=Image.open(img_path)
num=pytesseract.image_to_string(im)
return num
完整代码
import pytesseract
from PIL import Image,ImageDraw
def test(path):
img=Image.open(path)
w,h=img.size
for x in range(w):
for y in range(h):
r,g,b=img.getpixel((x,y))
if 0<=r<=100 and 0<=g<=100 and 0<=b<=100:
img.putpixel((x,y),(255,255,255))
if 150<=r<=255 and 0<=g<=50 and 0<=b<=50:
img.putpixel((x,y),(0,0,0))
img=img.convert('L').point([0]*150+[1]*(256-150),'1')
return img
t2val={}
# G:图像二值化阈值
def twoValue(image,G):
for y in range(0,image.size[1]):
for x in range(0,image.size[0]):
g=image.getpixel((x,y))
if g>G :
t2val[(x,y)]=1
else:
t2val[(x,y)]=0
# N:降噪率 0<N<8
# Z:降噪次数
def clearNoise(image,N,Z):
for i in range(0,Z):
t2val[(0,0)]=1
t2val[(image.size[0]-1,image.size[1]-1)]=1
for x in range(1,image.size[0]-1):
for y in range(1,image.size[1]-1):
nearDots=0
L=t2val[(x,y)]
if L==t2val[(x-1,y-1)]:
nearDots+=1
if L==t2val[(x-1,y)]:
nearDots+=1
if L==t2val[(x-1,y+1)]:
nearDots+=1
if L==t2val[(x,y-1)]:
nearDots+=1
if L==t2val[(x,y+1)]:
nearDots+=1
if L==t2val[(x+1,y-1)]:
nearDots+=1
if L==t2val[(x+1,y)]:
nearDots+=1
if L==t2val[(x+1,y+1)]:
nearDots+=1
if nearDots<N:
t2val[(x,y)]=1
def saveImage(filename,size):
image=Image.new("1",size)
draw=ImageDraw.Draw(image)
for x in range(0,size[0]):
for y in range(0,size[1]):
draw.point((x,y),t2val[(x,y)])
image.save(filename)
def recognize_captcha(img_path):
im=Image.open(img_path)
num=pytesseract.image_to_string(im)
return num
if __name__=='__main__':
path="code.jpg"
im=test(path)
path=path.replace('jpg','png')
im.save(path)
image=Image.open(path).convert("L")
twoValue(image,100)
clearNoise(image,2,5)
img_path="code.jpeg"
saveImage(img_path,image.size)
print (recognize_captcha(img_path))
print(pytesseract.image_to_string(Image.open("code.jpeg"),lang="eng",config="--psm 7"))


我觉得处理得还行。。。但就是识别不出
GG
CNN验证码识别
整了好久好久的tensorflow环境。。。
至少了解了一下Conda
滑动验证码
import random
from selenium import webdriver
import time
from selenium.webdriver.common.action_chains import ActionChains
driver=webdriver.Chrome()
url="http://127.0.0.1:8890/test.html"
driver.get(url)
action=ActionChains(driver)
time.sleep(1)
element=driver.find_element(by="id",value="nc_1_n1z")
action.click_and_hold(element).perform()
distance=260
while distance > 0:
if distance>10:
span=random.randint(5,8)
else:
span=random.randint(2,3)
action.move_by_offset(span,0)
distance-=span
time.sleep(random.randint(10,50)/1000)
action.move_by_offset(distance,1).perform()
action.release().perform()
但还是被阿里云的反爬机制给检测出来了。。
拼图验证码
思路
拼图验证码的本质是三张图片叠在一起
1、保存三张图片
2、将两张残缺的图片拼接计算距离
3、实现模拟拖动到指定位置
不大会整。。
APP数据抓取
Fiddler Everywhere
手机抓包的暂时弄不了。。。校园网只能登一个账号。。。
数据储存
TEXT文件存储
file.open('filename','a',encoding='utf-8') # a表示追加
file.write('')
file.close()
with open('filename','a',encoding='utf-8')as file:
file.write('')
JSON文件储存
JSON是一种轻量级的数据交换格式,完全独立于语言
JSON在Python中分别由LIST和DICT组成。
JSON模块提供了四个功能:
1.dumps:数据转为字符串
2.dump:数据转为字符串并存在文件中
3.loads:把字符串转换为数据类型
4.load:打开文件,并把字符串转换为数据类型
字典->字符串->字典
import json
test_dict={'bigberg':[7600,{1:[['iPhone',6300],['Bike',800],['shirt',300]]}]}
print(test_dict)
print(type(test_dict))
print("="*50)
json_str=json.dumps(test_dict)
print(json_str)
print(type(json_str))
print("="*50)
new_dict=json.loads(json_str)
print(new_dict)
print(type(new_dict))
数据写入JSON文件
import json
new_dict={"name":"zk","age":20,"gender":"m"}
with open("record.json","w",encoding='utf-8')as f:
json.dump(new_dict,f)
print("Finished!")
CSV文件储存
CSV用于储存表格数据,每条记录中值与值之间逗号分隔
CSV写数据
import csv
c=open("test.csv","w")
writer=csv.writer(c)
writer.writerow(['name','address','city','state'])
CSV读数据
import csv
c =open("test.csv","r")
reader =csv.reader(c)
for line in reader:
print(line[0],line[1],line[2],line[3])
c.close()
Excel文件储存
写文件
import xlwt
def set_style(name,height,bold=False):
style =xlwt.XFStyle()
font =xlwt.Font()
font.name=name
font.bold=bold
font.colour_index=4
font.height=height
style.font=font
return style
def write_excel(path):
workbook=xlwt.Workbook(encoding="utf-8") #实例化Workbook
data_sheet=workbook.add_sheet("demo") #创建sheet
row0=[u'字段名称',u'大致时段','CRNTI','CELL-ID']
row1=[u'测试','15:50:33-15:52:14',22706,4190202]
for i in range(len(row0)):
data_sheet.write(0, i, row0[i], set_style('Times New Roman', 220, True))
data_sheet.write(1, i, row1[i], set_style('Times New Roman', 220, True))
# data_sheet.write(行,列,value)
workbook.save(path) #表示保存文件
if __name__=='__main__':
path='demo.xls'
write_excel(path)
print("Success!")
读文件
import xlrd
Workbook =xlrd.open_workbook('demo.xls')
sheet_names =Workbook.sheet_names()
sheet1 =Workbook.sheet_by_name('demo')
#sheet1 =Workbook.sheet_by_index(0)
rows=sheet1.row_values(1)
cols10=sheet1.col_values(1)
print('rows',rows)
print('cols10',cols10)
Mysql存储
创建数据库连接
import mysql.connector
mydb=mysql.connector.connect(
host="localhost",
user="root",
passwd="root"
)
print(mydb)
插入数据
import mysql.connector
mydb=mysql.connector.connect(
host="localhost",
user="root",
passwd="root"
)
mycursor=mydb.cursor()
sql="USE pylogin"
mycursor.execute(sql)
sql="create "
sql="INSERT INTO test_01 (id,name,age) VALUES (%s,%s,%s)"
val=[
(2,"张三",12),
(3,"李四",13),
(4,"王五",23),
(5,"麻子",35)
]
for x in val:
mycursor.execute(sql,x)
mydb.commit()
查询数据
import mysql.connector
mydb=mysql.connector.connect(
host="localhost",
user="root",
passwd="root"
)
mycursor=mydb.cursor()
sql="USE pylogin"
mycursor.execute(sql)
sql="SELECT * FROM test_01;"
mycursor.execute(sql)
myresult=mycursor.fetchall()
for x in myresult:
print(x)
MongoDB储存数据
据说可以保证服务器性能
插入一条数据
import pymongo
myclient =pymongo.MongoClient("mongodb://root:root@localhost:27017")
mydb =myclient["test_db"]
mycol =mydb["sites"]
mydict=[
{"name":"张三","age":"23","gender":"男"},
{"name":"李四","age":"24","gender":"男"},
{"name":"王五","age":"25","gender":"男"},
{"name":"麻子","age":"26","gender":"男"}
]
x=mycol.insert_many(mydict)
print(x.inserted_ids)
插入多条数据
import pymongo
myclient =pymongo.MongoClient("mongodb://root:root@localhost:27017")
mydb =myclient["test_db"]
mycol =mydb["sites"]
mydict=[
{"name":"张三","age":"23","gender":"男"},
{"name":"李四","age":"24","gender":"男"},
{"name":"王五","age":"25","gender":"男"},
{"name":"麻子","age":"26","gender":"男"}
]
x=mycol.insert_many(mydict)
print(x.inserted_ids)
查询一条数据
import pymongo
myclient =pymongo.MongoClient("mongodb://root:root@localhost:27017")
mydb =myclient["test_db"]
mycol =mydb["sites"]
x=mycol.find_one()
print(x)
查询多条数据
import pymongo
myclient =pymongo.MongoClient("mongodb://root:root@localhost:27017")
mydb =myclient["test_db"]
mycol =mydb["sites"]
for x in mycol.find():
print(x)
修改数据
import pymongo
myclient =pymongo.MongoClient("mongodb://root:root@localhost:27017")
mydb =myclient["test_db"]
mycol =mydb["sites"]
myquery={"name":"张三"}
newvalues={"$set":{"age":"20"}}
mycol.update_many(myquery,newvalues)
for x in mycol.find():
print(x)
Redis储存
Redis是一种数据结构服务器,效率极高
Redis列表
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
r.lpush("test_list",1)
r.lpush("test_list",2)
r.lpush("test_list",3)
print(r.lpop("test_list"))
print(r.lpop("test_list"))
Redis集合
Hash实现,数据不能重复出现,复杂度O(1)
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
r.sadd("test_list",1)
r.sadd("test_list",2)
r.sadd("test_list",3)
print(r.scard("test_list"))
PostgreSQL
可实现PythonDBAPI2.0规范和线程安全
连接数据库
import psycopg2
conn=psycopg2.connect(
database="postgres",
user="postgres",
password="root",
host="localhost",
port=5432
)
cur=conn.cursor()
插入数据
import psycopg2
conn=psycopg2.connect(
database="postgres",
user="postgres",
password="root",
host="localhost",
port=5432
)
cur=conn.cursor()
cur.execute("CREATE TABLE student(id integer,name varchar,sex varchar);")
cur.execute("INSERT INTO student(id,name,sex)VALUES(%s,%s,%s)",(1,'Aspirin','M'))
cur.execute("INSERT INTO student(id,name,sex)VALUES(%s,%s,%s)",(2,'Taxol','F'))
cur.execute("INSERT INTO student(id,name,sex)VALUES(%s,%s,%s)",(3,'Dixheral','M'))
conn.commit()
cur.close()
conn.close()
查询数据
import psycopg2
conn=psycopg2.connect(
database="postgres",
user="postgres",
password="root",
host="localhost",
port=5432
)
cur=conn.cursor()
cur.execute("SELECT * FROM student")
result=cur.fetchall()
print(result)
cur.close()
conn.close()
修改删除
和插入比就是语句不同
Python 实现爬取简书上的文章标题
from selenium import webdriver
import time
import pymysql
from lxml import etree
driver=webdriver.Chrome()
driver.get('https://www.jianshu.com')
def load_more(num):
for x in range(5):
js="var q=document.documentElement.scrollTop=100000"
driver.execute_script(js)
time.sleep(2)
if num==0:
time.sleep(4)
load_more=driver.find_element_by_class_name("load-more")
load_more.click()
def get_html():
note_list=driver.find_element_by_class_name("note-list")
html=note_list.get_attribute("innerHTML")
return html
def extract_data(content_html):
html=etree.HTML(content_html)
title_list=html.xpath('//li//a[@class="title"]/text()')
abstract_list=html.xpath('//li//a[@class="abstract"]/text()')
nickname_list=html.xpath('//li//a[@class="nickname"]/text()')
data_list=[]
for index,x in enumerate(title_list):
item={}
item["title"]=title_list[index]
# item["abstract"]=abstract_list[index]
# item["nickname"]=nickname_list[index]
print(item)
data_list.append(item)
return data_list
def insert_data(sql):
db=pymysql.connect(
host="127.0.0.1",
user="root",password="root",
database="pylogin",
charset="utf8")
try:
cursor=db.cursor()
return cursor.execute(sql)
except Exception as ex:
print(ex)
finally:
db.commit()
db.close()
for x in range(2):
print(x)
load_more(x)
time.sleep(1)
results=extract_data(get_html())
for item in results:
print(item)
sql="insert into jianshu(title) values('%s')"\
""%(item["title"])
insert_data(sql)
很多地方还不是很懂...主要是XPath这块
Python实现微服务打卡
import re
import requests
from bs4 import BeautifulSoup
from requests_toolbelt.utils import dump
import api
import Student
import json
sess = requests.session()
def download_img(imgurl):
rsp = sess.get(imgurl)
if rsp.status_code == 200:
content = rsp.content
with open("./captcha.png", "wb+") as f:
f.write(content)
return str(content)
def GetExecution(be):
return be.find(name="input", attrs={"name": "execution"})["value"]
def GETCaptcha(be):
pattern = re.compile(r"id: '\d+'$",re.MULTILINE)
ID0 = be.find_all("script",text=pattern)
ID1 = pattern.findall(str(ID0[0]))[0]
ID2 = re.findall(r"\d+",ID1)[0]
CaptchaURL = "http://ua.scu.edu.cn/captcha?captchaId="+ID2
download_img(CaptchaURL)
Captcha=api.Get("./captcha.png")
return Captcha
sess_header = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
}
def Visit():
LoginUrl = "http://ua.scu.edu.cn/login?service=https%3A%2F%2Fwfw.scu.edu.cn%2Fa_scu%2Fapi%2Fsso%2Fcas-index%3Fredirect%3Dhttps%253A%252F%252Fwfw.scu.edu.cn%252Fncov%252Fwap%252Fdefault%252Fsave"
res = sess.get(LoginUrl, headers=sess_header)
return BeautifulSoup(res.text, 'html.parser')
def SetPost(Execution,Captcha):
POSTDATA = {
'username': Student.username,
'password': Student.password,
'captcha': Captcha,
'submit': '%E7%99%BB%E5%BD%95',
'type': 'username_password',
'execution': Execution,
'_eventId': 'submit'
}
return POSTDATA
def Rush(data):
URL = "https://ua.scu.edu.cn/login"
res = sess.post(URL,headers=sess_header,data=data)
return res.status_code
# ddd = dump.dump_all(res)
# print(ddd.decode("utf-8"))
def Daka(data):
# print(data)
url = "https://wfw.scu.edu.cn/ncov/wap/default/save"
res=sess.post(url,data=data)
mess=json.loads(res.text)
print(mess["m"])
# print(sess.cookies)
# ddd = dump.dump_all(res)
# print(ddd.decode("utf-8"))
def HealthData():
data = {
"zgfxdq": "0",
"mjry": "0",
"csmjry": "0",
"szxqmc": "江安校区",
"sfjzxgym": "0",
"jzxgymrq": "",
"sfjzdezxgym": "0",
"jzdezxgymrq": "",
"sfjzdszxgym": "0",
"jzdszxgymrq": "",
"uid": "",
"date": "",
"tw": "2",
"sfcxtz": "0",
"sfyyjc": "0",
"jcjgqr": "0",
"jcjg": "",
"sfjcbh": "0",
"sfcxzysx": "0",
"qksm": "",
"remark": "",
"address": "四川省成都市双流区西航港街道四川大学江安校区",
"area": "四川省+成都市+双流区",
"province": "四川省",
"city": "成都市",
"geo_api_info": {
"type": "complete",
"position": {"Q": 30.55300374349, "R": 103.99391167534799, "lng": 103.993912, "lat": 30.553004},
"location_type": "html5",
"message": "Get+ipLocation+failed.Get+geolocation+success.Convert+Success.Get+address+success.",
"accuracy": 35,
"isConverted": "true",
"status": 1,
"addressComponent": {
"citycode": "028",
"adcode": "510116",
"businessAreas": [{
"name": "白家",
"id": "510116",
"location": {
"Q": 30.562482,
"R": 104.006821,
"lng": 104.006821,
"lat": 30.562482
}
}],
"neighborhoodType": "",
"neighborhood": "",
"building": "",
"buildingType": "",
"street": "川大路二段",
"streetNumber": "1号",
"country": "中国",
"province": "四川省",
"city": "成都市",
"district": "双流区",
"towncode": "510116002000",
"township": "西航港街道"
},
"formattedAddress": "四川省成都市双流区西航港街道四川大学江安校区学生西园8舍围合",
"roads": [],
"crosses": [],
"pois": [],
"info": "SUCCESS"
},
"created": "1649433631",
"sfzx": "1",
"sfjcwhry": "0",
"sfcyglq": "0",
"gllx": "",
"glksrq": "",
"jcbhlx": "",
"jcbhrq": "",
"sftjwh": "0",
"sftjhb": "0",
"fxyy": "",
"bztcyy": "1",
"fjsj": "0",
"sfjchbry": "0",
"sfjcqz": "",
"jcqzrq": "",
"jcwhryfs": "",
"jchbryfs": "",
"xjzd": "",
"szgj": "",
"sfsfbh": "0",
"szsqsfybl": "0",
"sfsqhzjkk": "0",
"sqhzjkkys": "",
"sfygtjzzfj": "0",
"gtjzzfjsj": "",
"szcs": "",
"sfjxhsjc": "0",
"hsjcrq": "",
"hsjcdd": "",
"hsjcjg": "0",
"bzxyy": "",
"id": "",
"gwszdd": "",
"sfyqjzgc": "",
"jrsfqzys": "",
"jrsfqzfy": "",
"szgjcs": "",
"ismoved": "0"
}
return data
def Login():
count=0
while count<=5:
be=Visit()
Execution=GetExecution(be)
Captcha=GETCaptcha(be)
data=SetPost(Execution,Captcha)
status=Rush(data)
if status==200:
return "Success!";
count+=1
return "Failed!"
def run():
print("登录微服务中............")
if Login() == "Failed!":
print("Username Or Password Wrong!")
exit()
print("登陆成功!正在配置打卡数据................")
data = HealthData()
print("配置成功!正在打卡................")
Daka(data)
if __name__=="__main__":
run()
常用爬虫框架
PySpider
架构
分为:Scheduler(调度器),Fetcher(抓取器),Processor(处理器)
整个爬虫受到Monitor的监控,抓取的结果由Result Worker处理
M1Mac上好像不大能整。。。
Scrapy
创建项目
scrapy startproject FirstLab

scrapy.cfg:项目的配置文件
FirstLab/:该项目的Python模块
FirstLab/items.py:项目中的item文件
FirstLab/pipelines.py:项目中的pipelines文件
FirstLab/settings.py:项目的设置文件
FirstLab/spiders:放置spider代码的目录
定义Item
Item保存爬取到的数据的容器
# Define here the models for your scraped items
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/items.html
import scrapy
class FirstlabItem(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
title = scrapy.Field()
link = scrapy.Field()
desc = scrapy.Field()
pass
编写爬虫
创建spider类必须继承scrapy.Spider类,并定义以下三个属性:
1、name:用于区分Spider,名称必须唯一
2、start_urls:包含了Spider
3、parse():Spider的一个方法,负责解析返回的数据,以及生成需要进一步处理的URL的request对象
import scrapy
class DmozSpider(scrapy.Spider):
name="dmoz"
allowed_domains=["runoob.com"]
start_urls=[
"http://www.runoob.com/xpath/xpath-examples.html",
"http://www.runoob.com/bootstrap/bootstrap-tutorial.html"
]
def parse(self,response):
filename=response.url.split("/")[-2]
with open(filename,'wb') as f:
f.write(response.body)
提取Item
用ScrapySelectors
在Shell中尝试Selector选择器
scrapy shell "http://www.runoob.com/xpath/xpath-examples.html"
shell载入后可以得到一个包含response数据的本地response变量
提取数据
import scrapy
class DmozSpider(scrapy.Spider):
name="dmoz"
allowed_domains=["runoob.com"]
start_urls=[
"http://www.runoob.com/xpath/xpath-examples.html",
"http://www.runoob.com/bootstrap/bootstrap-tutorial.html"
]
def parse(self,response):
for x in response.selector.xpath('//ul/li'):
print(x.xpath('a/text()').extract())
print(x.xpath('a/@href').extract())
Comments | 3 条评论
好长
@匿名 学了有一点时间了QwQ
(`・ω・´)