Rewrite to use v4 CloudFlare API and request batching.

This commit is contained in:
Quantum 2017-08-28 19:28:26 -04:00
parent 168b199db7
commit 581658b99a

159
cfwatch.py Normal file → Executable file
View file

@ -1,48 +1,140 @@
import os
import json
import logging import logging
import traceback import os
from urlparse import urljoin from threading import Lock, Event, Thread
from urllib import urlencode
from urllib2 import urlopen, URLError
from contextlib import closing
try:
from itertools import zip_longest
except ImportError:
from itertools import izip_longest as zip_longest
try:
from urllib.parse import urljoin
except ImportError:
from urlparse import urljoin
import requests
from watchdog.events import FileSystemEventHandler, FileSystemMovedEvent
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
logger = logging.getLogger('cfwatch') log = logging.getLogger('cfwatch')
class CloudFlareMonitorHandler(FileSystemEventHandler): class CloudFlareMonitorHandler(FileSystemEventHandler):
def __init__(self, email, token, zone, prefix, dir='.'): def __init__(self, email, token, zone, prefix, dir='.'):
self.session = requests.Session()
self.to_purge = set()
self.queue_lock = Lock()
self._trigger = Event()
self._stop = False
self.email = email self.email = email
self.token = token self.token = token
self.zone = zone self.zone = self._get_zone(zone)
self.prefix = prefix self.prefix = prefix
self.base = dir self.base = dir
def purge_event(self, event): def on_any_event(self, event):
if not event.is_directory: if event.is_directory:
path = os.path.relpath(event.src_path, self.base) return
self.queue_purge(event.src_path)
if isinstance(event, FileSystemMovedEvent):
self.queue_purge(event.dest_path)
def queue_purge(self, path):
path = os.path.relpath(path, self.base)
if os.sep == '\\': if os.sep == '\\':
path = path.replace('\\', '/') path = path.replace('\\', '/')
url = urljoin(self.prefix, path) url = urljoin(self.prefix, path)
logger.debug('Going to purge: %s', url) log.debug('Going to purge: %s', url)
with self.queue_lock:
self.to_purge.add(url)
self._trigger.set()
def _get_zone(self, name):
resp = self.cf_request('GET', 'https://api.cloudflare.com/client/v4/zones?name=%s' % (name,)).json()
try: try:
with closing(urlopen('https://www.cloudflare.com/api_json.html', urlencode({ if not resp['success']:
'a': 'zone_file_purge', 'tkn': self.token, raise ValueError('Failed to obtain zone info: %r' % (resp['errors'],))
'email': self.email, 'z': self.zone, 'url': url
}))) as f: if not resp['result']:
result = json.load(f) raise ValueError('No such zone: %s' % (name,))
logger.info('Successfully purged: %s' if result.get('result') == 'success'
else 'Failed to purge: %s', url) return resp['result'][0]['id']
except (ValueError, URLError) as e: except KeyError:
logger.exception('Failed to purge: %s', url) raise ValueError('Malformed response: %r' % (resp,))
on_modified = purge_event
on_moved = purge_event def cf_request(self, *args, **kwargs):
headers = {
'X-Auth-Email': self.email,
'X-Auth-Key': self.token,
}
headers.update(kwargs.pop('headers', {}))
kwargs['headers'] = headers
return self.session.request(*args, **kwargs)
def purge(self, urls):
for chunk in zip_longest(*[iter(urls)] * 30):
chunk = [url for url in chunk if url]
try:
resp = self.cf_request(
'DELETE',
'https://api.cloudflare.com/client/v4/zones/%s/purge_cache' % (self.zone,),
json={'files': chunk},
).json()
if resp['success']:
for url in chunk:
log.info('Successfully purged: %s', url)
else:
for url in chunk:
log.info('Failed to purged: %s', url)
log.error('Cloudflare responded with error: %r', resp['errors'])
except (IOError, KeyError):
for url in chunk:
log.exception('Failed to purge: %s', url)
def run(self):
observer = Observer()
observer.schedule(self, self.base, recursive=True)
observer.start()
log.info('Started watching.')
log.info('Local prefix: %s', self.base)
log.info('Remote prefix: %s', self.prefix)
try:
while True:
self._trigger.wait()
self._trigger.clear()
if self._stop:
break
self._trigger.wait(1)
if self._trigger.is_set():
continue
with self.queue_lock:
current, self.to_purge = self.to_purge, set()
if current:
self.purge(current)
except KeyboardInterrupt:
pass
finally:
observer.stop()
observer.join()
def start(self):
Thread(target=self.run).start()
def stop(self):
self._stop = True
self._trigger.set()
def main(): def main():
import argparse import argparse
import time
parser = argparse.ArgumentParser(description='Purges CloudFlare on file change.') parser = argparse.ArgumentParser(description='Purges CloudFlare on file change.')
parser.add_argument('email', help='CloudFlare login email') parser.add_argument('email', help='CloudFlare login email')
@ -53,18 +145,11 @@ def main():
parser.add_argument('-l', '--log', help='log file') parser.add_argument('-l', '--log', help='log file')
args = parser.parse_args() args = parser.parse_args()
logging.basicConfig(filename=args.log, format='%(asctime)-15s %(message)s', logging.basicConfig(filename=args.log, format='%(asctime)-15s %(message)s', level=logging.INFO)
level=logging.INFO)
monitor = CloudFlareMonitorHandler(args.email, args.token, args.zone, args.prefix, args.dir)
monitor.run()
observer = Observer()
observer.schedule(CloudFlareMonitorHandler(args.email, args.token, args.zone, args.prefix, args.dir), args.dir, recursive=True)
observer.start()
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == '__main__': if __name__ == '__main__':
main() main()