基本信息
源码名称:Nodejs实现的一个磁力链接爬虫源码下载
源码大小:0.06M
文件格式:.zip
开发语言:Python
更新时间:2016-03-20
友情提示:(无需注册或充值,赞助后即可获取资源下载链接)
嘿,亲!知识可是无价之宝呢,但咱这精心整理的资料也耗费了不少心血呀。小小地破费一下,绝对物超所值哦!如有下载和支付问题,请联系我们QQ(微信同号):813200300
本次赞助数额为: 2 元×
微信扫码支付:2 元
×
请留下您的邮箱,我们将在2小时内将文件发到您的邮箱
源码介绍
'use strict'; var dgram = require('dgram'), crypto = require('crypto'), util = require('util'), bencode = require('bencode'), remoteNodes = require('./redis/remoteNodes'), bucket = require('./redis/bucket'), sysInfo = require('./redis/sysInfo'), infohash = require('./redis/infohash'), config = require('../config'), logger = require('./common/logger'), utils = require('./common/utils'), Resource = require('./proxy/resource'); function Worker(port) { var self = this; self.id = new Buffer(crypto.createHash('sha1').update((config.address || '') port.toString()).digest('hex'), 'hex'); self.port = port; self.socket = dgram.createSocket('udp4'); // 取得ips,不响应本机的请求,不给本机发请求 self.ips = utils.getLocalIps(); // 最后一次请求bootstrap节点的时间 self.lastBootstrapTime = 0; // 捕获错误 self.socket.on('error', function (err) { logger.error("socket error:\n" err); }); // 有消息发来时触发 self.socket.on('message', self.onmessage.bind(self)); // listen后触发 if (config.worker[self.port].sended) { self.socket.once('listening', self.run.bind(self)); } if (config.address) { self.socket.bind(port, config.address); } else { self.socket.bind(port); } } // 启动 Worker.prototype.run = function run() { var self = this, target; // 从remoteNodes里取一个,取不到就用默认的(bootstrap的节点,最多5s一次) remoteNodes.pop(function (error, reply) { if (error) { logger.error('get remoteNodes error'); logger.error(error); return; } target = reply ? { address: reply.split(':')[0], port: reply.split(':')[1] } : { address: config.bootstrapAddress, port: config.bootstrapPort }; /** * 1、reply存在,立刻发送 * 2、reply不存在,lastSendTime也不存在,说明第一次请求bootstrap的节点,立刻发送 * 3、reply不存在,lashSendTime存在,且间隔5s以上,立刻发送 */ if (reply || !self.lastBootstrapTime || ( new Date() - self.lastBootstrapTime >= 5000)) { if (!reply) { self.lastBootstrapTime = new Date(); } self.sendFindNode(target); } // 不停的发送 setTimeout(run.bind(self), config.worker[self.port].cycle); }); }; /** * socket的消息处理 */ Worker.prototype.onmessage = function onmessage(packet, rinfo) { var self = this; // 不处理本机的请求&响应 if (self.ips.indexOf(rinfo.address) !== -1) { return; } var msg, id; // 尝试解码 try { msg = bencode.decode(packet); } catch (e) { logger.error(util.format('[bencode decode error] %s:%d', rinfo.address, rinfo.port)); return false; } // 取得id if (msg.a && msg.a.id) { id = msg.a.id; } else if (msg.r && msg.r.id) { id = msg.r.id; } // 忽略异常数据 if (!id || !Buffer.isBuffer(id) || id.length !== 20) { return this.error(rinfo, msg, 203, 'Id is required'); } if (!msg.y || msg.y.length !== 1) { return this.error(rinfo, msg, 203, 'Y is required'); } if (!msg.t) { return this.error(rinfo, msg, 203, 'T is required'); } // 接收到的是异常的话 if (msg.y[0] === 0x65 /* e */) { logger.error(msg.e); return; } // 发送来的是查询,包括ping,find_node,get_peers,announce_peer if (msg.y[0] === 0x71 /* q */) { sysInfo.incrReceiveRequest(); if (!msg.a) { return this.error(rinfo, msg, 203, 'A is required'); } if (!msg.q) { return this.error(rinfo, msg, 203, 'Q is required'); } // 执行响应 this.processRequest(msg.q.toString(), msg, rinfo); } // 发送来的是响应,只可能是响应find_node,因为我们只发find_node的请求 if (msg.y[0] === 0x72 /* r */) { sysInfo.incrReceiveReponse(); if (msg && Buffer.isBuffer(msg.r.nodes)) { var nodes; try { nodes = utils.decodeNodes(msg.r.nodes); } catch (error) { logger.error(util.format('%s:%d respond find_node error:'), rinfo.address, rinfo.port); logger.error(error); return; } if (nodes.length > 0) { // 添加到节点列表 remoteNodes.push(nodes); } } } // 为了保证桶的“活性”,只把给我发请求和响应我的添加到桶里 bucket.push(self.id, [ { id: id, address: rinfo.address, port: rinfo.port } ]); }; /** * 响应 */ // 统一的响应请求方法 Worker.prototype.processRequest = function processRequest(type, msg, rinfo) { // 端口不合法 if (rinfo.port <= 0 || rinfo.port >= 65536) { return; } if (type === 'ping') { this.processPing(msg, rinfo); } else if (type === 'find_node') { this.processFindNode(msg, rinfo); } else if (type === 'get_peers') { this.processGetPeers(msg, rinfo); } else if (type === 'announce_peer') { this.processAnnouncePeer(msg, rinfo); } }; // 响应ping请求 Worker.prototype.processPing = function processPing(msg, rinfo) { this._respond(rinfo, msg, { id: this.id }); }; // 响应find_node请求 Worker.prototype.processFindNode = function processFindNode(msg, rinfo) { var self = this, token = crypto.randomBytes(4); bucket.getKClosest(self.id, msg.a.id, function (nodes) { self._respond(rinfo, msg, { id: self.id, token: token, nodes: utils.encodeNodes(nodes) }); }); }; // 响应get_peers请求 Worker.prototype.processGetPeers = function processGetPeers(msg, rinfo) { var self = this; if (!msg.a.info_hash || !Buffer.isBuffer(msg.a.info_hash) || msg.a.info_hash.length !== 20) { return self.error(rinfo, msg, 203, 'get_peers without info_hash'); } // 保存infohash self._saveInfohash(msg.a.info_hash.toString('hex')); var token = crypto.randomBytes(4); bucket.getKClosest(self.id, msg.a.id, function (nodes) { self._respond(rinfo, msg, { id: self.id, token: token, nodes: utils.encodeNodes(nodes) }); }); }; // 响应announce_peer请求 Worker.prototype.processAnnouncePeer = function processAnnouncePeer(msg, rinfo) { var self = this; if (!msg.a.token || !Buffer.isBuffer(msg.a.token)) { return self.error(rinfo, msg, 203, 'token is invalid'); } if (!msg.a.info_hash || !Buffer.isBuffer(msg.a.info_hash) || msg.a.info_hash.length !== 20) { return self.error(rinfo, msg, 203, 'announce_peer without info_hash'); } // 保存infohash self._saveInfohash(msg.a.info_hash.toString('hex')); self._respond(rinfo, msg, { id: self.id }); }; // 执行响应 Worker.prototype._respond = function _respond(target, msg, args) { args.id = this.id; var response = { t: msg.t, // 带上别人发来的事务ID y: 'r', r: args }, packet = bencode.encode(response); this.socket.send(packet, 0, packet.length, target.port, target.address); sysInfo.incrSendReponse(); }; // 保存infohash Worker.prototype._saveInfohash = function _saveInfohash(infohashStr) { Resource.getResourceByInfohash(infohashStr, function (error, value) { if (error) { logger.error(error); } if (!value) { infohash.sadd(infohashStr); } else { // 更新hotpeers Resource.incrResource(infohashStr); } }); }; /** * 请求 */ // 发送find_node请求(随机查找) Worker.prototype.sendFindNode = function sendFindNode(target) { this._request(target, 'find_node', { id: this.id, target: new Buffer(crypto.createHash('sha1').update(crypto.randomBytes(20)).digest('hex'), 'hex') }); }; // 发送查询 Worker.prototype._request = function _request(target, type, args) { // 不给本机发送请求 if (this.ips.indexOf(target.address) !== -1) { return; } // 端口号不合法 if (target.port <= 0 || target.port >= 65536) { return; } // 随机事务ID var transactionId = new Buffer([~~(Math.random() * 256), ~~(Math.random() * 256)]), msg = { t: transactionId, y: 'q', q: type, a: args }, packet = bencode.encode(msg); this.socket.send(packet, 0, packet.length, target.port, target.address); sysInfo.incrSendRequest(); }; /** * 响应错误 */ Worker.prototype.error = function error(target, msg, code, text) { // 不给本机发送请求 if (this.ips.indexOf(target.address) !== -1) { return; } // 端口号不合法 if (target.port <= 0 || target.port >= 65536) { return; } var response = { t: msg.t, y: 'e', e: [code, util.format('[http://findit.so] %s', text || 'error')] }, packet = bencode.encode(response); this.socket.send(packet, 0, packet.length, target.port, target.address); sysInfo.incrReceiveError(); // 记录日志,以供分析,稳定后可删除 // logger.error(util.format('[%s:%d] %s', target.address, target.port, text || 'error')); // logger.error(msg); }; // 创建一个worker exports.create = function create(port) { return new Worker(port); };