采集部署和入库
由于我们使用了多进程的方式做 spiker,所以就不能直接使用如下方式来入库了
python spiker.py | python xxx | python yyy && python redisToMysql.py复制代码
只能在通过整个主进程结束之后,通过获取主进程的返回结果来决定是否做入库操作:
python spiker.py | python format.py | python writeToRedis.pyif [ "$?" == "0" ]; then python redisToMysql.pyfi复制代码
入库后的格式尽量简单:
mysql> select * from respage02;+-----+------------+------------+-------------+--------------+-------+------+| id | bikeid | day | lat | lng | time | type |+-----+------------+------------+-------------+--------------+-------+------+| 1 | 8620750656 | 2018-11-07 | 29.04729065 | 119.64855231 | 14:56 | mb || 2 | 8620762418 | 2018-11-07 | 29.04842459 | 119.64730693 | 14:56 | mb || 3 | 8620663052 | 2018-11-07 | 29.08465714 | 119.66206422 | 16:53 | mb |复制代码
接口化
还是在 respage01 的 django 项目上直接新增接口便可。
新增 URL
urlpatterns = [ url(r'v1/respage01/$', views.Respage01.as_view(), name='Respage01'), url(r'v1/respage02/$', views.Respage02.as_view(), name='Respage02'), ]复制代码
新增 model
class Respage02Info(models.Model): """ respage 02 相关的数据 """ time = models.CharField(max_length=100) day = models.CharField(max_length=100) bikeid = models.CharField(max_length=200) lat = models.FloatField() lng = models.FloatField() type = models.CharField(max_length=100) class Meta: db_table = "respage02"复制代码
新增序列化
class Respage02Serializer(serializers.HyperlinkedModelSerializer): """ 序列化Respage02相关的数据 """ class Meta: model = Respage02Info fields = ('time', 'lat', 'lng', 'bikeid', 'type', 'day')复制代码
新增业务逻辑 (view)
class Respage02(APIView): """ 获取respage02相关的数据 """ authentication_classes = [] permission_classes = [] def get(self, request, format=None): req = request.query_params if 'type' not in req: return Response({}, status=status.HTTP_400_BAD_REQUEST) // 获取某一天内最近时间点的最新数据 if req['type'] == 'now': if 'day' not in req: return Response({}, status=status.HTTP_400_BAD_REQUEST) timelist = Respage02Info.objects.distinct().values("time").filter(day=req['day']).order_by('-time').all() now = timelist[0]['time'] queryset = Respage02Info.objects.filter(day=req['day']).filter(time=now) serializer = Respage02Serializer(queryset, many=True) // 获取某一天内的所有时间点列表 if req['type'] == 'timelist': if 'day' not in req: return Response({}, status=status.HTTP_400_BAD_REQUEST) timelist = Respage02Info.objects.distinct().values("time").filter(day=req['day']).order_by('time').all() return Response(timelist, status=status.HTTP_200_OK) // 获取某一天内某一时间点的单车分布数据 if req['type'] == 'location': if 'day' not in req or 'time' not in req: return Response({}, status=status.HTTP_400_BAD_REQUEST) queryset = Respage02Info.objects.filter(day=req['day']).filter(time=req['time']) serializer = Respage02Serializer(queryset, many=True) return Response(serializer.data, status=status.HTTP_200_OK)复制代码
测试
部署结束
接口相关代码: 采集相关代码: