Thursday, March 31, 2016

Learning Python 12 - Summing Up

-- Implementing the Django interface

The setup

$ django-admin startproject pwdweb

$ tree -A pwdweb

The model layer

records/models.py

from cryptography.fernet import Fernet
from django.conf import settings
from django.db import models

class Record(models.Model):
    DEFAULT_ENCODING = 'utf-8'

    title = models.CharField(max_length=64, unique=True)
    username = models.CharField(max_length=64)
    email = models.EmailField(null=True, blank=True)
    url = models.URLField(max_length=255, null=True, blank=True)
    password = models.CharField(max_length=2048)
    notes = models.TextField(null=True, blank=True)
    created = models.DateTimeField(auto_now_add=True)
    last_modified = models.DateTimeField(auto_now=True)

    def encrypt_password(self):
        self.password = self.encrypt(self.password)

    def decrypt_password(self):
        self.password = self.decrypt(self.password)

    def encrypt(self, plaintext):
        return self.cypher('encrypt', plaintext)

    def decrypt(self, cyphertext):
        return self.cypher('decrypt', cyphertext)

    def cypher(self, cypher_func, text):
        fernet = Fernet(settings.ENCRYPTION_KEY)
        result = getattr(fernet, cypher_func)(
            self._to_bytes(text))
        return self._to_str(result)

    def _to_str(self, bytes_str):
        return bytes_str.decode(self.DEFAULT_ENCODING)

    def _to_bytes(self, s):
        return s.encode(self.DEFAULT_ENCODING)

def cypher_encrypt(self, text):      
fernet = Fernet(settings.ENCRYPTION_KEY)      
result = fernet.encrypt(          
self._to_bytes(text))      
return self._to_str(result)

$ python manage.py makemigrations
$ python manage.py migrate

>>> from cryptography.fernet import
Fernet>>> Fernet.generate_key()

A simple form

records/forms.py

from django.forms import ModelForm, Textarea
from .models import Record

class RecordForm(ModelForm):  
class Meta:      
model = Record      
fields = ['title', 'username', 'email', 'url',                
'password', 'notes']      
widgets = {'notes': Textarea(          
attrs={'cols': 40, 'rows': 4})}

The view layer

Imports and home view

from django.contrib import messages
from django.contrib.messages.views import SuccessMessageMixin
from django.core.urlresolvers import reverse_lazy
from django.views.generic import TemplateView
from django.views.generic.edit import (  
CreateView, UpdateView, DeleteView)
from .forms import RecordForm
from .models import Record

class HomeView(TemplateView):  
template_name = 'records/home.html'

Listing all records

class RecordListView(TemplateView):  
template_name = 'records/list.html'  

def get(self, request, *args, **kwargs):      
context = self.get_context_data(**kwargs)      
records = Record.objects.all().order_by('title')  #1      
for record in records:          
record.plaintext = record.decrypt(record.password) #2      
context['records'] = records      
return self.render_to_response(context)

Creating records

class EncryptionMixin:  
def form_valid(self, form):      
self.encrypt_password(form)      
return super(EncryptionMixin, self).form_valid(form)  

def encrypt_password(self, form):      
self.object = form.save(commit=False)      
self.object.encrypt_password()      
self.object.save()

class RecordCreateView(      
EncryptionMixin, SuccessMessageMixin, CreateView):  
template_name = 'records/record_add_edit.html'  
form_class = RecordForm  
success_url = reverse_lazy('records:add')  
success_message = 'Record was created successfully'

self.object = form.save()

Updating records

class RecordUpdateView(      
EncryptionMixin, SuccessMessageMixin, UpdateView):  
template_name = 'records/record_add_edit.html'  
form_class = RecordForm  
model = Record  
success_message = 'Record was updated successfully'  

def get_context_data(self, **kwargs):      
kwargs['update'] = True      
return super(          
RecordUpdateView, self).get_context_data(**kwargs)

def form_valid(self, form):      
self.success_url = reverse_lazy(          
'records:edit',          
kwargs={'pk': self.object.pk}        )      
return super(RecordUpdateView, self).form_valid(form)  

def get_form_kwargs(self):      
kwargs = super(RecordUpdateView, self).get_form_kwargs()      
kwargs['instance'].decrypt_password()      
return kwargs

Deleting records

class RecordDeleteView(SuccessMessageMixin, DeleteView):  
model = Record  
success_url = reverse_lazy('records:list')  

def delete(self, request, *args, **kwargs):      
messages.success(          
request, 'Record was deleted successfully')      
return super(RecordDeleteView, self).delete(          
request, *args, **kwargs)

Setting up the URLs

pwdweb/urls.py

from django.conf.urls import include, url
from django.contrib import admin
from records import urls as records_url
from records.views import HomeView

urlpatterns = [  
url(r'^admin/', include(admin.site.urls)),  
url(r'^records/', include(records_url, namespace='records')),  
url(r'^$', HomeView.as_view(), name='home'),]

records/urls.py
from django.conf.urls import include, url
from django.contrib import admin
from .views import (RecordCreateView, RecordUpdateView,
                    RecordDeleteView, RecordListView)

urlpatterns = [
    url(r'^add/$', RecordCreateView.as_view(), name='add'),
    url(r'^edit/(?P<pk>[0-9]+)/$', RecordUpdateView.as_view(),
        name='edit'),
    url(r'^delete/(?P<pk>[0-9]+)/$', RecordDeleteView.as_view(),
        name='delete'),
    url(r'^$', RecordListView.as_view(), name='list'),
]

The template layer

records/templates/records/base.html

{% load static from staticfiles %}
<!DOCTYPE html>
<html lang="en">
<head>  
<meta charset="utf-8">  
<meta name="viewport"        
content="width=device-width, initial-scale=1.0">  
<link href="{% static "records/css/main.css" %}"        
rel="stylesheet">  
<title>{% block title %}Title{% endblock title %}</title>
</head>

<body>
<div id="page-content">    
{% block page-content %}{% endblock page-content %}  
</div>  
<div id="footer">{% block footer %}{% endblock footer %}</div>  
{% block scripts %}    
<script      
src="{% static "records/js/jquery-2.1.4.min.js" %}">    
</script>  
{% endblock scripts %}
</body></html>


Home and footer templates

records/templates/records/home.html

{% extends "records/base.html" %}
{% block title %}Welcome to the Records website.{% endblock %}

{% block page-content %}
<h1>Welcome {{ user.first_name }}!</h1>
<div class="home-option">To create a record click  
<a href="{% url "records:add" %}">here.</a>
</div>
<div class="home-option">To see all records click  
<a href="{% url "records:list" %}">here.</a>
</div>{% endblock page-content %}

records/templates/records/footer.html

<div class="footer">  Go back <a href="{% url "home" %}">home</a>.</div>

Listing all records

records/templates/records/list.html

{% extends "records/base.html" %}
{% load record_extras %}
{% block title %}Records{% endblock title %}
{% block page-content %}
<h1>Records</h1><span name="top"></span>

{% include "records/messages.html" %}
{% for record in records %}
<div class="record {% cycle 'row-light-blue' 'row-white' %}"    
id="record-{{ record.pk }}">  
<div class="record-left">    
<div class="record-list">      
<span class="record-span">Title</span>{{ record.title }}    
</div>    
<div class="record-list">      
<span class="record-span">Username</span>      
{{ record.username }}    
</div>    
<div class="record-list">      
<span class="record-span">Email</span>{{ record.email }}    
</div>    
<div class="record-list">      
<span class="record-span">URL</span>        
<a href="{{ record.url }}" target="_blank">          
{{ record.url }}</a>    
</div>    
<div class="record-list">      
<span class="record-span">Password</span>      
{% hide_password record.plaintext %}    
</div>  
</div>  
<div class="record-right">    
<div class="record-list">      
<span class="record-span">Notes</span>      
<textarea rows="3" cols="40" class="record-notes"                
readonly>{{ record.notes }}</textarea>    
</div>  
<div class="record-list">      
<span class="record-span">Last modified</span>      
{{ record.last_modified }}    
</div>    
<div class="record-list">      
<span class="record-span">Created</span>    
{{ record.created }}    
</div>  
</div>  
<div class="record-list-actions">    
<a href="{% url "records:edit" pk=record.pk %}">ª edit</a>    
<a href="{% url "records:delete" pk=record.pk %}">ª delete    
</a>  
</div>
</div>
{% endfor %}
{% endblock page-content %}

{% block footer %}
<p><a href="#top">Go back to top</a></p>
{% include "records/footer.html" %}
{% endblock footer %}

records/templatetags/record_extras.py

from django import template
from django.utils.html import escape

register = template.Library()

@register.simple_tagdef hide_password(password):  
return '<span title="{0}">{1}</span>'.format(      
escape(password), '*' * len(password))

records/templates/records/messages.html

{% if messages %}
{% for message in messages %}  
<p class="{{ message.tags }}">{{ message }}</p>
{% endfor %}{% endif %}

records/static/records/css/main.css

html, body, * {  font-family: 'Trebuchet MS', Helvetica, sans-serif; }a { color: #333; }
.record {  clear: both; padding: 1em; border-bottom: 1px solid #666;}
.record-left { float: left; width: 300px;}
.record-list { padding: 2px 0; }
.fieldWrapper { padding: 5px; }
.footer { margin-top: 1em; color: #333; }
.home-option { padding: .6em 0; }
.record-span { font-weight: bold; padding-right: 1em; }
.record-notes { vertical-align: top; }
.record-list-actions { padding: 4px 0; clear: both; }
.record-list-actions a { padding: 0 4px; }
#pwd-info { padding: 0 6px; font-size: 1.1em; font-weight: bold;}
#id_notes { vertical-align: top; }
/* Messages */
.success, .errorlist {font-size: 1.2em; font-weight: bold; }
.success {color: #25B725; }
.errorlist {color: #B12B2B; }/* colors */
.row-light-blue { background-color: #E6F0FA; }
.row-white { background-color: #fff; }
.green { color: #060; }
.orange { color: #FF3300; }
.red { color: #900; }

Creating and editing records

records/templates/records/record_add_edit.html

{% extends "records/base.html" %}
{% load static from staticfiles %}
{% block title %}
{% if update %}Update{% else %}Create{% endif %} Record
{% endblock title %}

{% block page-content %}
<h1>{% if update %}Update a{% else %}Create a new{% endif %}  
Record
</h1>
{% include "records/messages.html" %}

<form action="." method="post">{% csrf_token %}  
{{ form.non_field_errors }}  

<div class="fieldWrapper">{{ form.title.errors }}    
{{ form.title.label_tag }} {{ form.title }}</div>  

<div class="fieldWrapper">{{ form.username.errors }}    
{{ form.username.label_tag }} {{ form.username }}</div>  

<div class="fieldWrapper">{{ form.email.errors }}    
{{ form.email.label_tag }} {{ form.email }}</div>  

<div class="fieldWrapper">{{ form.url.errors }}    
{{ form.url.label_tag }} {{ form.url }}</div>  

<div class="fieldWrapper">{{ form.password.errors }}    
{{ form.password.label_tag }} {{ form.password }}    

<span id="pwd-info"></span></div>  
<button type="button" id="validate-btn">    
Validate Password</button>  
<button type="button" id="generate-btn">    
Generate Password</button>  

<div class="fieldWrapper">{{ form.notes.errors }}    
{{ form.notes.label_tag }} {{ form.notes }}</div>  
<input type="submit"    
value="{% if update %}Update{% else %}Insert{% endif %}">
</form>{% endblock page-content %}{% block footer %}
<br>{% include "records/footer.html" %}<br>
Go to <a href="{% url "records:list" %}">the records list</a>.{% endblock footer %}{% block scripts %}

{{ block.super }}
<script src="{% static "records/js/api.js" %}"></script>{% endblock scripts %}


Talking to the API

records/static/records/js/api.js

var baseURL = 'http://127.0.0.1:5555/password';var getRandomPassword = function() {
var apiURL = '{url}/generate'.replace('{url}', baseURL);
$.ajax({  
type: 'GET',  
url: apiURL,  
success: function(data, status, request) {    
$('#id_password').val(data[1]);  
},  
error: function() { alert('Unexpected error'); }
});
}

$(function() {
$('#generate-btn').click(getRandomPassword);
});


var validatePassword = function() {
var apiURL = '{url}/validate'.replace('{url}', baseURL);
$.ajax({  
type: 'POST',  
url: apiURL,  
data: JSON.stringify({'password': $('#id_password').val()}),  
contentType: "text/plain",  // Avoid CORS preflight  
success: function(data, status, request) {    
var valid = data['valid'], infoClass, grade;    
var msg = (valid?'Valid':'Invalid') + ' password.';    
if (valid) {      
var score = data['score']['total'];      
grade = (score<10?'Poor':(score<18?'Medium':'Strong'));      
infoClass = (score<10?'red':(score<18?'orange':'green'));      
msg += ' (Score: {score}, {grade})'        
.replace('{score}', score).replace('{grade}', grade);    
}    
$('#pwd-info').html(msg);    
$('#pwd-info').removeClass().addClass(infoClass);  
},  
error: function(data) { alert('Unexpected error'); }
});
}
$(function() {  $('#validate-btn').click(validatePassword);});


# Python
error = 'critical' if error_level > 50 else 'medium'
// JavaScript equivalent
error = (error_level > 50 ? 'critical' : 'medium');

Deleting records

records/templates/records/record_confirm_delete.html

{% extends "records/base.html" %}
{% block title %}Delete record{% endblock title %}

{% block page-content %}
<h1>Confirm Record Deletion</h1>
<form action="." method="post">{% csrf_token %}  
<p>Are you sure you want to delete "{{ object }}"?</p>  
<input type="submit" value="Confirm" />&nbsp;  
<a href="{% url "records:list" %}#record-{{ object.pk }}">    
ª cancel</a>
</form>
{% endblock page-content %}

records/models.py

class Record(models.Model):  
...  

def __str__(self):      
return '{}'.format(self.title)

-- Implementing the Falcon API

$ tree -A pwdapi/

The main application

main.py

import falcon
from core.handlers import (  
PasswordValidatorHandler,  
PasswordGeneratorHandler,)

validation_handler = PasswordValidatorHandler()
generator_handler = PasswordGeneratorHandler()

app = falcon.API()
app.add_route('/password/validate/', validation_handler)
app.add_route('/password/generate/', generator_handler)


Writing the helpers

from math import ceil
from random import sample
from string import ascii_lowercase, ascii_uppercase, digits

punctuation = '!#$%&()*+-?@_|'
allchars = ''.join(  
(ascii_lowercase, ascii_uppercase, digits, punctuation))

Coding the password validator

class PasswordValidator:  
def __init__(self, password):      
self.password = password.strip()

def is_valid(self):      
return (len(self.password) > 0 and              
all(char in allchars for char in self.password))

def score(self):      
result = {          
'length': self._score_length(),          
'case': self._score_case(),          
'numbers': self._score_numbers(),          
'special': self._score_special(),          
'ratio': self._score_ratio(),      
}      
result['total'] = sum(result.values())      
return result

def _score_length(self):      
scores_list = ([0]*4) + ([1]*4) + ([3]*4) + ([5]*4)      
scores = dict(enumerate(scores_list))      
return scores.get(len(self.password), 7)

def _score_numbers(self):      
return 2 if (set(self.password) & set(digits)) else 0

def _score_special(self):      
return 4 if (          
set(self.password) & set(punctuation)) else 0

def _score_ratio(self):      
alpha_count = sum(          
1 if c.lower() in ascii_lowercase else 0          
for c in self.password)      
digits_count = sum(          
1 if c in digits else 0 for c in self.password)      
if digits_count == 0:          
return 0      
return min(ceil(alpha_count / digits_count), 7)

Coding the password generator

class PasswordGenerator:  
@classmethod  
def generate(cls, length, bestof=10):      
candidates = sorted([          
cls._generate_candidate(length)          
for k in range(max(1, bestof))      
])      
return candidates[-1]  

@classmethod  
def _generate_candidate(cls, length):      
password = cls._generate_password(length)      
score = PasswordValidator(password).score()      
return (score['total'], password)  

@classmethod  
def _generate_password(cls, length):      
chars = allchars * (ceil(length / len(allchars)))      
return ''.join(sample(chars, length))

Writing the handlers

import json
import falcon
from .passwords import PasswordValidator, PasswordGenerator

class HeaderMixin:  
def set_access_control_allow_origin(self, resp):      
resp.set_header('Access-Control-Allow-Origin', '*')

Coding the password validator handler

class PasswordValidatorHandler(HeaderMixin):  
def on_post(self, req, resp):      
self.process_request(req, resp)      
password = req.context.get('_body', {}).get('password')      
if password is None:          
resp.status = falcon.HTTP_BAD_REQUEST          
return None      

result = self.parse_password(password)      
resp.body = json.dumps(result)  

def parse_password(self, password):      
validator = PasswordValidator(password)      
return {          
'password': password,          
'valid': validator.is_valid(),          
'score': validator.score(),      
}  

def process_request(self, req, resp):      
self.set_access_control_allow_origin(resp)      

body = req.stream.read()      
if not body:          
raise falcon.HTTPBadRequest('Empty request body',              
'A valid JSON document is required.')      
try:          
req.context['_body'] = json.loads(              
body.decode('utf-8'))      
except (ValueError, UnicodeDecodeError):          
raise falcon.HTTPError(              
falcon.HTTP_753, 'Malformed JSON',              
'JSON incorrect or not utf-8 encoded.')

Coding the password generator handler

class PasswordGeneratorHandler(HeaderMixin):  
def on_get(self, req, resp):      
self.process_request(req, resp)      
length = req.context.get('_length', 16)      
resp.body = json.dumps(          
PasswordGenerator.generate(length))  

def process_request(self, req, resp):      
self.set_access_control_allow_origin(resp)      
length = req.get_param('length')      
if length is None:          
return      
try:          
length = int(length)          
assert length > 0          
req.context['_length'] = length      
except (ValueError, TypeError, AssertionError):          
raise falcon.HTTPBadRequest('Wrong query parameter',              
'`length` must be a positive integer.')

Running the API

Testing the API

Testing the helpers

tests/test_core/test_passwords.py

class PasswordGeneratorTestCase(TestCase):
 
def test__generate_password_length(self):      
for length in range(300):          
assert_equal(              
length,              
len(PasswordGenerator._generate_password(length))          
)  

def test__generate_password_validity(self):      
for length in range(1, 300):          
password = PasswordGenerator._generate_password(              
length)          
assert_true(PasswordValidator(password).is_valid())  

def test__generate_candidate(self):      
score, password = (          
PasswordGenerator._generate_candidate(42))      
expected_score = PasswordValidator(password).score()      
assert_equal(expected_score['total'], score)  

@patch.object(PasswordGenerator, '_generate_candidate')  
def test__generate(self, _generate_candidate_mock):      
# checks `generate` returns the highest score candidate      
_generate_candidate_mock.side_effect = [          
(16, '&a69Ly+0H4jZ'),          
(17, 'UXaF4stRfdlh'),          
(21, 'aB4Ge_KdTgwR'),  # the winner          
(12, 'IRLT*XEfcglm'),          
(16, '$P92-WZ5+DnG'),          
(18, 'Xi#36jcKA_qQ'),          
(19, '?p9avQzRMIK0'),          
(17, '4@sY&bQ9*H!+'),          
(12, 'Cx-QAYXG_Ejq'),          
(18, 'C)RAV(HP7j9n'),      
]
assert_equal(          
(21, 'aB4Ge_KdTgwR'),
PasswordGenerator.generate(12))

pwdapi/tests/test_core/test_passwords.py
from unittest import TestCase
from unittest.mock import patch
from nose_parameterized import parameterized, param
from nose.tools import (  
assert_equal, assert_dict_equal, assert_true)

from core.passwords import PasswordValidator, PasswordGenerator
class PasswordValidatorTestCase(TestCase):  

@parameterized.expand([      
(False, ''),      
(False, '  '),      
(True, 'abcdefghijklmnopqrstuvwxyz'),      
(True, 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'),      
(True, '0123456789'),      
(True, '!#$%&()*+-?@_|'),  
])  

def test_is_valid(self, valid, password):      
validator = PasswordValidator(password)      
assert_equal(valid, validator.is_valid())

@parameterized.expand(      
param.explicit(char) for char in '>]{<`\\;,[^/"\'~:}=.'  
)  

def test_is_valid_invalid_chars(self, password):      
validator = PasswordValidator(password)      
assert_equal(False, validator.is_valid())

@parameterized.expand([      
(0, ''),  # 0-3: score 0      
(0, 'a'),  # 0-3: score 0      
(0, 'aa'),  # 0-3: score 0      
(0, 'aaa'),  # 0-3: score 0      
(1, 'aaab'),  # 4-7: score 1      
...      
(5, 'aaabbbbccccddd'),  # 12-15: score 5      
(5, 'aaabbbbccccdddd'),  # 12-15: score 5  
])  

def test__score_length(self, score, password):      
validator = PasswordValidator(password)      
assert_equal(score, validator._score_length())

def test__score_length_sixteen_plus(self):      
# all password whose length is 16+ score 7 points      
password = 'x' * 255      
for length in range(16, len(password)):          
validator = PasswordValidator(password[:length])          
assert_equal(7, validator._score_length())

@patch.object(PasswordValidator, '_score_length')  
@patch.object(PasswordValidator, '_score_case')  
@patch.object(PasswordValidator, '_score_numbers')  
@patch.object(PasswordValidator, '_score_special')  
@patch.object(PasswordValidator, '_score_ratio')  
def test_score(          
self,          
_score_ratio_mock,          
_score_special_mock,          
_score_numbers_mock,          
_score_case_mock,          
_score_length_mock):      
_score_ratio_mock.return_value = 2      
_score_special_mock.return_value = 3      
_score_numbers_mock.return_value = 5      
_score_case_mock.return_value = 7      
_score_length_mock.return_value = 11      

expected_result = {          
'length': 11,          
'case': 7,          
'numbers': 5,          
'special': 3,          
'ratio': 2,          
'total': 28,      
}      

validator = PasswordValidator('')      
assert_dict_equal(expected_result, validator.score())

Testing the handlers

pwdapi/tests/test_core/test_handlers.py

import json
from unittest.mock import patchfrom nose.tools
import assert_dict_equal, assert_equal
import falcon
import falcon.testing as testing
from core.handlers import (  
PasswordValidatorHandler,
PasswordGeneratorHandler)

class PGHTest(PasswordGeneratorHandler):  
def process_request(self, req, resp):      
self.req, self.resp = req, resp      
return super(PGHTest, self).process_request(req, resp)

class PVHTest(PasswordValidatorHandler):  
def process_request(self, req, resp):      
self.req, self.resp = req, resp      
return super(PVHTest, self).process_request(req, resp)

class TestPasswordValidatorHandler(testing.TestBase):  
def before(self):      
self.resource = PVHTest()      
self.api.add_route('/password/validate/', self.resource)

def test_post(self):      
self.simulate_request(          
'/password/validate/',          
body=json.dumps({'password': 'abcABC0123#&'}),          
method='POST')      
resp = self.resource.resp      
assert_equal('200 OK', resp.status)      
assert_dict_equal(          
{'password': 'abcABC0123#&',          
'score': {'case': 3, 'length': 5, 'numbers': 2,              
'special': 4, 'ratio': 2, 'total': 16},          
'valid': True},          
json.loads(resp.body))


class TestPasswordGeneratorHandler(testing.TestBase):  
def before(self):      
self.resource = PGHTest()      
self.api.add_route('/password/generate/', self.resource)  

@patch('core.handlers.PasswordGenerator')  
def test_get(self, PasswordGenerator):      
PasswordGenerator.generate.return_value = (7, 'abc123')      
self.simulate_request(          
'/password/generate/',          
query_string='length=7',          
method='GET')      
resp = self.resource.resp      
assert_equal('200 OK', resp.status)      
assert_equal([7, 'abc123'], json.loads(resp.body))

-- Summary

Wednesday, March 30, 2016

Learning Python 11 - Debugging and Troubleshooting

-- Debugging techniques

Debugging with print

Debugging with a custom function

custom.py

def debug(*msg, print_separator=True):    
print(*msg)    
if print_separator:        
print('-' * 40)

debug('Data is ...')
debug('Different', 'Strings', 'Are not a problem')
debug('After while loop', print_separator=False)

$ python custom.py 

custom_timestamp.py

from time import sleep
def debug(*msg, timestamp=[None]):    
print(*msg)    
from time import time  # local import    
if timestamp[0] is None:        
timestamp[0] = time()  #1    
else:        
now = time()        
print(' Time elapsed: {:.3f}s'.format(            
now - timestamp[0]))        
timestamp[0] = now  #2

debug('Entering nasty piece of code...')
sleep(.3)
debug('First step done.')
sleep(.5)
debug('Second step done.')

$ python custom_timestamp.py 

Inspecting the traceback

traceback_simple.py
d = {'some': 'key'}
key = 'some-other'
print(d[key])

$ python traceback_simple.py 

traceback_validator.py

class ValidatorError(Exception):    
"""Raised when accessing a dict results in KeyError. """

d = {'some': 'key'}
mandatory_key = 'some-other'
try:    
print(d[mandatory_key])
except KeyError:    
raise ValidatorError(        
'`{}` not found in d.'.format(mandatory_key))

$ python traceback_validator.py 

Using the Python debugger

ipdebugger.py

# d comes from a JSON payload we don't control
d = {'first': 'v1', 'second': 'v2', 'fourth': 'v4'}
# keys also comes from a JSON pay
load we don't controlkeys = ('first', 'second', 'third', 'fourth')

def do_something_with_value(value):    
print(value)

for key in keys:    
do_something_with_value(d[key])

print('Validation done.')

$ python ipdebugger.py 

ipdebugger_ipdb.py

# d comes from a JSON payload we don't control

d = {'first': 'v1', 'second': 'v2', 'fourth': 'v4'}
# keys also comes from a JSON payload we don't control

keys = ('first', 'second', 'third', 'fourth')

def do_something_with_value(value):    
print(value)

import ipdb
ipdb.set_trace()  # we place a breakpoint here

for key in keys:    
do_something_with_value(d[key])

print('Validation done.')

$ python ipdebugger_ipdb.py

Inspecting log files

log.py

import logging

logging.basicConfig(    
filename='ch11.log',    
level=logging.DEBUG,  # minimum level capture in the file    
format='[%(asctime)s] %(levelname)s:%(message)s',    
datefmt='%m/%d/%Y %I:%M:%S %p')

mylist = [1, 2, 3]
logging.info('Starting to process `mylist`...')

for position in range(4):    
try:        
logging.debug('Value at position {} is {}'.format(            
position, mylist[position]))    
except IndexError:        
logging.exception('Faulty position: {}'.format(position))

logging.info('Done parsing `mylist`.')

Other techniques

Profiling

Assertions

assertions.py

mylist = [1, 2, 3]  # this ideally comes from some place
assert 4 == len(mylist)  # this will break

for position in range(4):    
print(mylist[position])

$ python assertions.py 

Where to find information

-- Troubleshooting guidelines

Using console editors

Where to inspect

Using tests to debug

Monitoring


-- Summary

Learning Python 10 - Web Development Done Right

-- What is the Web?

-- How does the Web work?


On login, a token of user information is saved (most often on the client side, in special files called cookies)
so that each request the user makes carries the means fo the server to recognize the user and provide a custom interface by showing the name, 
keeping eh basket populated,and so on.

-- The Django web framework

A web framework is a set of tools(libraries, functions, classes, and so on) that can use to code a website.


Django design philosophy

DRY: Don't repeat yourself.
Loose coupling
Less code
Consistency

The Model Layer

A model is a class that represents a data structure.
This layer deals with defining the data structures you need to handle in your website and gives you the means to save and load them from and to the database by simply accessing the models, which are Python objects.

The View Layer

the view is the mechanism through which we can fulfill a request. Its result, the response object, can assume several different forms: a JSON payload, text, an HTML page, and so on. When you code a website, your responses usually consist of HTML or JSON.

THE Template Layer

The layout of the page is defined by a template, which is written in a mixture of HTML and Django template language.

The Django URL dispatcher

REGULAR EXPRESSIONS

A regular expression is a sequence of characters that defines a search pattern with which we can carry out operations such as pattern and string matching, find/replace, and so on.

-- A regex website

CSS (Cascading Style Sheets) are files in which we specify how the various elements on an HTML page look. You can set all sorts of properties such as shape, size, color, margins, borders, fonts, and so on. 

Setting up Django

$ pip install django

import django
django.VERSION

Starting the project

$ django-admin startproject regex

$ tree -A regex  # from the ch10 folder

$ python manage.py startapp entries

INSTALLED_APPS = (    
... django apps ...    
'entries',
)

LANGUAGE_CODE = 'en-gb'
TIME_ZONE = 'Europe/London'

$ python manage.py migrate

Creating users

$ python manage.py createsuperuser

$ python manage.py runserver

Adding the Entry model

entries/models.py

from django.db import models
from django.contrib.auth.models import User
from django.utils import timezone

class Entry(models.Model):    
user = models.ForeignKey(User)    
pattern = models.CharField(max_length=255)    
test_string = models.CharField(max_length=255)    
date_added = models.DateTimeField(default=timezone.now)    

class Meta:        
verbose_name_plural = 'entries'

$ python manage.py makemigrations entries
$ python manage.py migrate

Customizing the admin panel

entries/admin.py

from django.contrib import admin
from .models import Entry

@admin.register(Entry)
class EntryAdmin(admin.ModelAdmin):    
fieldsets = [        
('Regular Expression',         
{'fields': ['pattern', 'test_string']}),        
('Other Information',         
{'fields': ['user', 'date_added']}),    ]    

list_display = ('pattern', 'test_string', 'user')    
list_filter = ['user']    
search_fields = ['test_string']


Creating the form

entries/forms.py

from django.forms import ModelForm
from .models import Entry

class EntryForm(ModelForm):    
class Meta:        
model = Entry        
fields = ['pattern', 'test_string']

Writing the views

entries/views.py

import re
from django.contrib.auth.decorators import login_required
from django.contrib.messages.views import SuccessMessageMixin
from django.core.urlresolvers import reverse_lazy
from django.utils.decorators import method_decorator
from django.views.generic import FormView, TemplateView
from .forms import EntryForm
from .models import Entry

class HomeView(TemplateView):    
template_name = 'entries/home.html'    

@method_decorator(        
login_required(login_url=reverse_lazy('login')))    
def get(self, request, *args, **kwargs):        
context = self.get_context_data(**kwargs)       
return self.render_to_response(context)

class EntryListView(TemplateView):    
template_name = 'entries/list.html'    
@method_decorator(        
login_required(login_url=reverse_lazy('login')))    
def get(self, request, *args, **kwargs):        
context = self.get_context_data(**kwargs)        
entries = Entry.objects.filter(            
user=request.user).order_by('-date_added')        
matches = (self._parse_entry(entry) for entry in entries)        
context['entries'] = list(zip(entries, matches))        
return self.render_to_response(context)

def _parse_entry(self, entry):        
match = re.search(entry.pattern, entry.test_string)        
if match is not None:            
return (                
match.group(),                
match.groups() or None,                
match.groupdict() or None            
)        
return None

class EntryFormView(SuccessMessageMixin, FormView):    
template_name = 'entries/insert.html'    
form_class = EntryForm    
success_url = reverse_lazy('insert')    
success_message = "Entry was created successfully"    

@method_decorator(        
login_required(login_url=reverse_lazy('login')))    
def get(self, request, *args, **kwargs):        
return super(EntryFormView, self).get(            
request, *args, **kwargs)    

@method_decorator(        
login_required(login_url=reverse_lazy('login')))    
def post(self, request, *args, **kwargs):        
return super(EntryFormView, self).post(            
request, *args, **kwargs)    

def form_valid(self, form):        
self._save_with_user(form)        
return super(EntryFormView, self).form_valid(form)    

def _save_with_user(self, form):        
self.object = form.save(commit=False)        
self.object.user = self.request.user        
self.object.save()


The home view
THE ENTRY LIST VIEW
The form view

Tying up URLs and views

regex/urls.py

from django.conf.urls import include, url
from django.contrib import admin
from django.contrib.auth import views as auth_views
from django.core.urlresolvers import reverse_lazy
from entries.views import HomeView, EntryListView, EntryFormView

urlpatterns = [    
url(r'^admin/', include(admin.site.urls)),    
url(r'^entries/$', EntryListView.as_view(), name='entries'),    
url(r'^entries/insert$',        EntryFormView.as_view(),        name='insert'),    
url(r'^login/$',        auth_views.login,        kwargs={'template_name': 'admin/login.html'},        name='login'),   
url(r'^logout/$',        auth_views.logout,        kwargs={'next_page': reverse_lazy('home')},        name='logout'),    
url(r'^$', HomeView.as_view(), name='home'),
]

Writing the templates

entries/templates/entries/base.html

{% load static from staticfiles %}
<!DOCTYPE html>
<html lang="en">  
<head>    
{% block meta %}      
<meta charset="utf-8">      
<meta name="viewport"       
content="width=device-width, initial-scale=1.0">    
{% endblock meta %}    

{% block styles %}      
<link href="{% static "entries/css/main.css" %}"       
rel="stylesheet">    
{% endblock styles %}    

<title> {% block title %}Title{% endblock title %} </title>  
</head>  

<body>    
<div id="page-content">      
{% block page-content %}      
{% endblock page-content %}    
</div>    
<div id="footer">      
{% block footer %}      
{% endblock footer %}    
</div>  
</body>
</html>

entries/templates/entries/footer.html

<div class="footer">  
Go back <a href="{% url "home" %}">home</a>.
</div>

entries/templates/entries/home.html
{% extends "entries/base.html" %}
{% block title%}Welcome to the Entry website.{% endblock title %}

{% block page-content %}  
<h1>Welcome {{ user.first_name }}!</h1>  

<div class="home-option">To see the list of your entries    
please click <a href="{% url "entries" %}">here.</a>  

</div>  <div class="home-option">To insert a new entry please click    
<a href="{% url "insert" %}">here.</a>  
</div>  <div class="home-option">To login as another user please click    
<a href="{% url "logout" %}">here.</a>  
</div>    
<div class="home-option">To go to the admin panel    
please click <a href="{% url "admin:index" %}">here.</a>  
</div>{% endblock page-content %}

entries/templates/entries/insert.html

{% extends "entries/base.html" %}
{% block title%}Insert a new Entry{% endblock title %}

{% block page-content %}  
{% if messages %}    
{% for message in messages %}      
<p class="{{ message.tags }}">{{ message }}</p>    
{% endfor %}  
{% endif %}  

<h1>Insert a new Entry</h1>  
<form action="{% url "insert" %}" method="post">    
{% csrf_token %}{{ form.as_p }}    
<input type="submit" value="Insert">  
</form><br>
{% endblock page-content %}

{% block footer %}  
<div><a href="{% url "entries" %}">See your entries.</a></div>  
{% include "entries/footer.html" %}
{% endblock footer %}

entries/templates/entries/list.html

{% extends "entries/base.html" %}
{% block title%} Entries list {% endblock title %}

{% block page-content %} 
{% if entries %}  
<h1>Your entries ({{ entries|length }} found)</h1>  
<div><a href="{% url "insert" %}">Insert new entry.</a></div>  

<table class="entries-table">   
<thead>     
<tr><th>Entry</th><th>Matches</th></tr>   
</thead>   
<tbody>    
{% for entry, match in entries %}     
<tr class="entries-list {% cycle 'light-gray' 'white' %}">      
<td>        
Pattern: <code class="code">         
"{{ entry.pattern }}"</code><br>        
Test String: <code class="code">         
"{{ entry.test_string }}"</code><br>        
Added: {{ entry.date_added }}      
</td>      
<td>        
{% if match %}         
Group: {{ match.0 }}<br>         
Subgroups:          
{{ match.1|default_if_none:"none" }}<br>         
Group Dict: {{ match.2|default_if_none:"none" }}        
{% else %}         
No matches found.        
{% endif %}      
</td>     
</tr>    
{% endfor %}   
</tbody>  </table> 
{% else %}  
<h1>You have no entries</h1>  
<div><a href="{% url "insert" %}">Insert new entry.</a></div> 
{% endif %}{% endblock page-content %}

{% block footer %} 
{% include "entries/footer.html" %}
{% endblock footer %}

-- The future of web development

Writing a Flask view

$ tree -A flask  # from the ch10 folder

flask/templates/main.html

<!doctype html>
<title>Hello from Flask</title>
<h1>  
{% if name %}    
Hello {{ name }}!  
{% else %}    
Hello shy person!  
{% endif %}
</h1>

flask/main.py

from flask import Flask, render_template

app = Flask(__name__)

@app.route('/')
@app.route('/<name>')
def hello(name=None):    
return render_template('main.html', name=name)

if __name__ == '__main__':    
app.run()

$ python main.py

Building a JSON quote server in Falcon

falcon/quotes.py

quotes = [    
"Thousands of candles can be lighted from a single candle, "    
"and the life of the candle will not be shortened. "    
"Happiness never decreases by being shared.",    
...    
"Peace comes from within. Do not seek it without.",
]

falcon/main.py

import json
import random
import falcon
from quotes import quotes

class QuoteResource:    
def on_get(self, req, resp):        
quote = {            
'quote': random.choice(quotes),            
'author': 'The Buddha'        
}        
resp.body = json.dumps(quote)
api = falcon.API()
api.add_route('/quote', QuoteResource())


$ gunicorn main:api

Learning Python 9 - Data Science

-- IPython and Jupyter notebook

Every cell has an In [] label. If there's nothing between the braces, it means that cell has never been executed. If there is a number, it means that the cell has been executed, and the number represents the order in which the cell was executed. Finally, a * means that the cell is currently being executed.

$ pip install jupyter pandas matplotlib fake-factory delorean xlwt

$ jupyter notebook

-- Dealing with data

Setting up the notebook

#1

import json
import calendar
import random
from datetime import date, timedelta

import faker
import numpy as np
from pandas import DataFrame
from delorean 
import parse
import pandas as pd
# make the graphs nicer
pd.set_option('display.mpl_style', 'default')


Preparing the data

#2

fake = faker.Faker()

#3

usernames = set()
usernames_no = 1000
# populate the set with 1000 unique usernames
while len(usernames) < usernames_no:    
usernames.add(fake.user_name())

#4

def get_random_name_and_gender():    
skew = .6  # 60% of users will be female    
male = random.random() > skew    
if male:        return fake.name_male(), 'M'    
else:        return fake.name_female(), 'F'

def get_users(usernames):    
users = []    
for username in usernames:        
name, gender = get_random_name_and_gender()        
user = {            
'username': username,            
'name': name,            
'gender': gender,            
'email': fake.email(),            
'age': fake.random_int(min=18, max=90),            
'address': fake.address(),        
}        
users.append(json.dumps(user))    
return users
users = get_users(usernames)
users[:3]

Out #4

['{"gender": "F", "age": 48, "email": "jovani.dickinson@gmail.com", "address": "2006 Sawayn Trail Apt. 207\\nHyattview, MO 27278", "username": "darcy00", "name": "Virgia Hilpert"}',
 '{"gender": "F", "age": 58, "email": "veum.javen@hotmail.com", "address": "5176 Andres Plains Apt. 040\\nLakinside, GA 92446", "username": "renner.virgie", "name": "Miss Clarabelle Kertzmann MD"}',
 '{"gender": "M", "age": 33, "email": "turner.felton@rippin.com", "address": "1218 Jacobson Fort\\nNorth Doctor, OK 04469", "username": "hettinger.alphonsus", "name": "Ludwig Prosacco"}']

#5 

# campaign name format:
# InternalType_StartDate_EndDate_TargetAge_TargetGender_Currency
def get_type():    
# just some gibberish internal codes    
types = ['AKX', 'BYU', 'GRZ', 'KTR']    
return random.choice(types)

def get_start_end_dates():    
duration = random.randint(1, 2 * 365)    
offset = random.randint(-365, 365)    
start = date.today() - timedelta(days=offset)   
end = start + timedelta(days=duration)        
def _format_date(date_):        
return date_.strftime("%Y%m%d")        
return _format_date(start), _format_date(end)

def get_age():    
age = random.randint(20, 45)    
age -= age % 5    
diff = random.randint(5, 25)    
diff -= diff % 5    
return '{}-{}'.format(age, age + diff)

def get_gender():    
return random.choice(('M', 'F', 'B'))

def get_currency():    
return random.choice(('GBP', 'EUR', 'USD'))

def get_campaign_name():    
separator = '_'    
type_ = get_type()    
start_end = separator.join(get_start_end_dates())    
age = get_age()    
gender = get_gender()    
currency = get_currency()    
return separator.join(        
(type_, start_end, age, gender, currency))

#6

def get_campaign_data():
    name = get_campaign_name()
    budget = random.randint(10**3, 10**6)
    spent = random.randint(10**2, budget)    
    clicks = int(random.triangular(10**2, 10**5, 0.2 * 10**5))    
    impressions = int(random.gauss(0.5 * 10**6, 2))
    return {
        'cmp_name': name,
        'cmp_bgt': budget,
        'cmp_spent': spent,
        'cmp_clicks': clicks,
        'cmp_impr': impressions
    }

#7

def get_data(users):    
data = []    
for user in users:        
campaigns = [get_campaign_data()                     
for _ in range(random.randint(2, 8))]        
data.append({'user': user, 'campaigns': campaigns})    
return data

Cleaning the data

#8

rough_data = get_data(users)
rough_data[:2]  # let's take a peek

[{'campaigns': [{'cmp_bgt': 130532,    
'cmp_clicks': 25576,    
'cmp_impr': 500001,    
'cmp_name': 'AKX_20150826_20170305_35-50_B_EUR',    
'cmp_spent': 57574},   
... omit ...   
{'cmp_bgt': 884396,    
'cmp_clicks': 10955,    
'cmp_impr': 499999,    
'cmp_name': 'KTR_20151227_20151231_45-55_B_GBP',    
'cmp_spent': 318887}],  
'user': '{"age": 44, "username": "jacob43",            
"name": "Holland Strosin",            
"email": "humberto.leuschke@brakus.com",            
"address": "1038 Runolfsdottir Parks\\nElmapo...",            
"gender": "M"}'}]

#9

data = []
for datum in rough_data:    
for campaign in datum['campaigns']:        
campaign.update({'user': datum['user']})        
data.append(campaign)data[:2]  # let's take another peek

[{'cmp_bgt': 130532,
  'cmp_clicks': 25576,
  'cmp_impr': 500001,
  'cmp_name': 'AKX_20150826_20170305_35-50_B_EUR',
  'cmp_spent': 57574,
  'user': '{"age": 44, "username": "jacob43",
            "name": "Holland Strosin",
            "email": "humberto.leuschke@brakus.com",
            "address": "1038 Runolfsdottir Parks\\nElmaport...",
            "gender": "M"}'}]

Creating the DataFrame

#10
df = DataFrame(data)df.head()

#11
df.count()

#12
df.describe()

#13
df.sort_index(by=['cmp_bgt'], ascending=False).head(3)

#14 
df.sort_index(by=['cmp_bgt'], ascending=False).tail(3)

Unpacking the campaign name

#15
def unpack_campaign_name(name):    
# very optimistic method, assumes data in campaign name    
# is always in good state    
type_, start, end, age, gender, currency = name.split('_')    
start = parse(start).date    
end = parse(end).date    
return type_, start, end, age, gender, currency

campaign_data = df['cmp_name'].apply(unpack_campaign_name)
campaign_cols = [    'Type', 'Start', 'End', 'Age', 'Gender', 'Currency']
campaign_df = DataFrame(    campaign_data.tolist(), columns=campaign_cols, index=df.index)
campaign_df.head(3)

$16

df = df.join(campaign_df)

#17

df[['cmp_name'] + campaign_cols].head(3)

Unpacking the user data

#18

def unpack_user_json(user):    
# very optimistic as well, expects user objects   
# to have all attributes    
user = json.loads(user.strip())    
return [        
user['username'],        
user['email'],        
user['name'],        
user['gender'],        
user['age'],        
user['address'],    
]

user_data = df['user'].apply(unpack_user_json)
user_cols = [    'username', 'email', 'name', 'gender', 'age', 'address']
user_df = DataFrame(    
user_data.tolist(), 
columns=user_cols, 
index=df.index)

df = df.join(user_df)
df[['user'] + user_cols].head(2)

#21

better_columns = [    
'Budget', 'Clicks', 'Impressions',    
'cmp_name', 'Spent', 'user',    
'Type', 'Start', 'End',    'Target Age', 
'Target Gender', 'Currency',    
'Username', 'Email', 'Name',    
'Gender', 'Age', 'Address',
]

df.columns = better_columns

#22

def calculate_extra_columns(df):    
# Click Through Rate    
df['CTR'] = df['Clicks'] / df['Impressions']    
# Cost Per Click    
df['CPC'] = df['Spent'] / df['Clicks']    
# Cost Per Impression    
df['CPI'] = df['Spent'] / df['Impressions']
calculate_extra_columns(df)

#23

df[['Spent', 'Clicks', 'Impressions',    'CTR', 'CPC', 'CPI']].head(3)

#24

clicks = df['Clicks'][0]
impressions = df['Impressions'][0]
spent = df['Spent'][0]
CTR = df['CTR'][0]
CPC = df['CPC'][0]
CPI = df['CPI'][0]
print('CTR:', CTR, clicks / impressions)
print('CPC:', CPC, spent / clicks)
print('CPI:', CPI, spent / impressions)

#25

def get_day_of_the_week(day):    
number_to_day = dict(enumerate(calendar.day_name, 1))    
return number_to_day[day.isoweekday()]

def get_duration(row):    
return (row['End'] - row['Start']).days

df['Day of Week'] = df['Start'].apply(get_day_of_the_week)
df['Duration'] = df.apply(get_duration, axis=1)

#26

df[['Start', 'End', 'Duration', 'Day of Week']].head(3)

Cleaning everything up

#27
final_columns = [    'Type', 'Start', 'End', 'Duration', 'Day of Week', 'Budget',    'Currency', 'Clicks', 'Impressions', 'Spent', 'CTR', 'CPC',    'CPI', 'Target Age', 'Target Gender', 'Username', 'Email',    'Name', 'Gender', 'Age']

df = df[final_columns]

Saving the DataFrame to a file

#28

df.to_csv('df.csv')

#29

df.to_json('df.json')

#30

df.to_excel('df.xls')

Visualizing the results

# make the graphs nicer
pd.set_option('display.mpl_style', 'default')

#31

%matplotlib inline

#32

import pylab

pylab.rcParams.update({'font.family' : 'serif'})

#33

df.describe()

#34

df[['Budget', 'Spent', 'Clicks', 'Impressions']].hist(    bins=16, figsize=(16, 6));

#35

df[['CTR', 'CPC', 'CPI']].hist(    bins=20, figsize=(16, 6));

#36

mask = (df.Spent > 0.75 * df.Budget)
df[mask][['Budget', 'Spent', 'Clicks', 'Impressions']].hist(    bins=15, figsize=(16, 6), color='g');

#37

df_weekday = df.groupby(['Day of Week']).sum()
df_weekday[['Impressions', 'Spent', 'Clicks']].plot(    figsize=(16, 6), subplots=True);

#38

agg_config = {    
'Impressions': {        
'Mean Impr': 'mean',        
'Std Impr': 'std',    },    

'Spent': ['mean', 'std'],}
df.groupby(['Target Gender', 'Target Age']).agg(agg_config)

#39

pivot = df.pivot_table(    
values=['Impressions', 'Clicks', 'Spent'],    
index=['Target Age'],    
columns=['Target Gender'],    
aggfunc=np.sum)
pivot

-- Where do we go from here?

-- Summary


Sunday, March 27, 2016

Learning Python 8 - The GUIs and Scripts

chap 

tree -A

simple_server/index.html

<!DOCTYPE html><html lang="en">  <head><title>Cool Owls!</title></head>  <body>    <h1>Welcome to my owl gallery</h1>    <div>      <img src="img/owl-alcohol.png" height="128" />      <img src="img/owl-book.png" height="128" />      <img src="img/owl-books.png" height="128" />      <img src="img/owl-ebook.jpg" height="128" />      <img src="img/owl-rose.jpeg" height="128" />    </div>    <p>Do you like my owls?</p>  </body></html>

$ python -m http.server 8000

$ ./serve.sh

First approach – scripting

The imports

scrape.py (Imports)

import argparse
import base64
import json
import os
from bs4 import BeautifulSoup
import requests

$ pip freeze | egrep -i "soup|requests"

$ pip install beautifulsoup4 requests

Parsing arguments

scrape.py (Argument parsing and scraper triggering)

if __name__ == "__main__":    
parser = argparse.ArgumentParser(        description='Scrape a webpage.')    
parser.add_argument(        '-t',        '--type',        choices=['all', 'png', 'jpg'],        default='all',        help='The image type we want to scrape.')    
parser.add_argument(        '-f',        '--format',        choices=['img', 'json'],        default='img',        help='The format images are saved to.')    
parser.add_argument(        'url',        help='The URL we want to scrape for images.')    
args = parser.parse_args()    
scrape(args.url, args.format, args.type)

$ python scrape.py -h

$ python scrape.py http://localhost:8000
$ python scrape.py -t png http://localhost:8000
$ python scrape.py --type=jpg -f json http://localhost:8000

The business logic

scrape.py (Business logic)


def scrape(url, format_, type_):    
try:        
page = requests.get(url)    
except requests.RequestException as rex:        
print(str(rex))    
else:        
soup = BeautifulSoup(page.content, 'html.parser')        
images = _fetch_images(soup, url)        
images = _filter_images(images, type_)        
_save(images, format_)

def _fetch_images(soup, base_url):    
images = []    
for img in soup.findAll('img'):        
src = img.get('src')        
img_url = (            '{base_url}/{src}'.format(                base_url=base_url, src=src))        
name = img_url.split('/')[-1]        
images.append(dict(name=name, url=img_url))    
return images

def _filter_images(images, type_):    
if type_ == 'all':        
return images   
ext_map = {        'png': ['.png'],        'jpg': ['.jpg', '.jpeg'],    }    
return [        img for img in images        
if _matches_extension(img['name'], ext_map[type_])    
]

def _matches_extension(filename, extension_list):    
name, extension = os.path.splitext(filename.lower())   
return extension in extension_list

def _save(images, format_):    
if images:        
if format_ == 'img':            
_save_images(images)        
else:            
_save_json(images)        
print('Done')    
else:        
print('No images to save.')

def _save_images(images):    
for img in images:        
img_data = requests.get(img['url']).content        
with open(img['name'], 'wb') as f:            
f.write(img_data)

def _save_json(images):    
data = {}    
for img in images:        
img_data = requests.get(img['url']).content        
b64_img_data = base64.b64encode(img_data)        
str_img_data = b64_img_data.decode('utf-8')        
data[img['name']] = str_img_data    
with open('images.json', 'w') as ijson:        
ijson.write(json.dumps(data))


images.json (truncated)

{  "owl-ebook.jpg": "/9j/4AAQSkZJRgABAQEAMQAxAAD/2wBDAAEBAQ...  
"owl-book.png": "iVBORw0KGgoAAAANSUhEUgAAASwAAAEbCAYAAAB...  
"owl-books.png": "iVBORw0KGgoAAAANSUhEUgAAASwAAAElCAYAAA...  
"owl-alcohol.png": "iVBORw0KGgoAAAANSUhEUgAAASwAAAEICAYA...  
"owl-rose.jpeg": "/9j/4AAQSkZJRgABAQEANAA0AAD/2wBDAAEBAQ...
}


-- Second approach – a GUI application

$ python -m tkinter

The imports

from tkinter import *
from tkinter import ttk, filedialog, messagebox
import base64import json
import osfrom bs4 
import BeautifulSoup
import requests

The layout logic

guiscrape.py

if __name__ == "__main__":    
_root = Tk()    
_root.title('Scrape app')

_mainframe = ttk.Frame(_root, padding='5 5 5 5')    
_mainframe.grid(row=0, column=0, sticky=(E, W, N, S))

_url_frame = ttk.LabelFrame(        
_mainframe, text='URL', padding='5 5 5 5')    
_url_frame.grid(row=0, column=0, sticky=(E, W))   
_url_frame.columnconfigure(0, weight=1)    
_url_frame.rowconfigure(0, weight=1)

_url = StringVar()    
_url.set('http://localhost:8000')    
_url_entry = ttk.Entry(        
_url_frame, width=40, textvariable=_url)    
_url_entry.grid(row=0, column=0, sticky=(E, W, S, N), padx=5)    
_fetch_btn = ttk.Button(        
_url_frame, text='Fetch info', command=fetch_url)    
_fetch_btn.grid(row=0, column=1, sticky=W, padx=5)

_img_frame = ttk.LabelFrame(        
_mainframe, text='Content', padding='9 0 0 0')    
_img_frame.grid(row=1, column=0, sticky=(N, S, E, W))

_images = StringVar()    
_img_listbox = Listbox(        
_img_frame, listvariable=_images, height=6, width=25)    
_img_listbox.grid(row=0, column=0, sticky=(E, W), pady=5)    
_scrollbar = ttk.Scrollbar(       
_img_frame, orient=VERTICAL, command=_img_listbox.yview)    
_scrollbar.grid(row=0, column=1, sticky=(S, N), pady=6)    
_img_listbox.configure(yscrollcommand=_scrollbar.set)

_radio_frame = ttk.Frame(_img_frame)    
_radio_frame.grid(row=0, column=2, sticky=(N, S, W, E))

_choice_lbl = ttk.Label(        
_radio_frame, text="Choose how to save images")    
_choice_lbl.grid(row=0, column=0, padx=5, pady=5)    
_save_method = StringVar()    
_save_method.set('img')    
_img_only_radio = ttk.Radiobutton(        
_radio_frame, text='As Images', variable=_save_method,        
value='img')    
_img_only_radio.grid(        
row=1, column=0, padx=5, pady=2, sticky=W)    
_img_only_radio.configure(state='normal')    
_json_radio = ttk.Radiobutton(        
_radio_frame, text='As JSON', variable=_save_method,        
value='json')    
_json_radio.grid(row=2, column=0, padx=5, pady=2, sticky=W)


_scrape_btn = ttk.Button(        
_mainframe, text='Scrape!', command=save)    
_scrape_btn.grid(row=2, column=0, sticky=E, pady=5)


_status_frame = ttk.Frame(        
_root, relief='sunken', padding='2 2 2 2')    
_status_frame.grid(row=1, column=0, sticky=(E, W, S))    
_status_msg = StringVar()    
_status_msg.set('Type a URL to start scraping...')    
_status = ttk.Label(        
_status_frame, textvariable=_status_msg, anchor=W)    
_status.grid(row=0, column=0, sticky=(E, W))

_root.mainloop()

The business logic

Fetching the web page

config = {}def fetch_url():    
url = _url.get()    
config['images'] = []    
_images.set(())   # initialized as an empty tuple    
try:        
page = requests.get(url)    
except requests.RequestException as rex:        
_sb(str(rex))    
else:        
soup = BeautifulSoup(page.content, 'html.parser')        
images = fetch_images(soup, url)        
if images:            
_images.set(tuple(img['name'] for img in images))            
_sb('Images found: {}'.format(len(images)))        
else:            
_sb('No images found')        
config['images'] = imagesdef 
fetch_images(soup, base_url):    
images = []    
for img in soup.findAll('img'):        
src = img.get('src')        
img_url = (            
'{base_url}/{src}'.format(base_url=base_url, src=src))        
name = img_url.split('/')[-1]        
images.append(dict(name=name, url=img_url))    
return images

Saving the images

def save():    
if not config.get('images'):        
_alert('No images to save')        
return    

if _save_method.get() == 'img':        
dirname = filedialog.askdirectory(mustexist=True)        
_save_images(dirname)    
else:        
filename = filedialog.asksaveasfilename(            
initialfile='images.json',            
filetypes=[('JSON', '.json')])        
_save_json(filename)def _save_images(dirname):    
if dirname and config.get('images'):        
for img in config['images']:            
img_data = requests.get(img['url']).content            
filename = os.path.join(dirname, img['name'])            
with open(filename, 'wb') as f:                
f.write(img_data)        
_alert('Done')

def _save_json(filename):    
if filename and config.get('images'):        
data = {}        
for img in config['images']:            
img_data = requests.get(img['url']).content            
b64_img_data = base64.b64encode(img_data)            
str_img_data = b64_img_data.decode('utf-8')            
data[img['name']] = str_img_data        
with open(filename, 'w') as ijson:            
ijson.write(json.dumps(data))        
_alert('Done')

Alerting the user
def _sb(msg):    
_status_msg.set(msg)

def _alert(msg):    
messagebox.showinfo(message=msg)

How to improve the application?

with open('images.json', 'r') as f:    
data = json.loads(f.read())

for (name, b64val) in data.items():    
with open(name, 'wb') as f:        
f.write(base64.b64decode(b64val))

-- Where do we go from here?

The tkinter.tix module

The turtle module

wxPython, PyQt, and PyGTK

The principle of least astonishment

Threading considerations


-- Summary

Blog Archive