Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

import os 

import time 

import shutil 

from appr.utils import Singleton, mkdir_p 

 

 

class FilesystemClient(object): 

__metaclass__ = Singleton 

 

def __init__(self, path): 

self.base_path = path 

 

def lockttl(self, key, ttl): 

if key[0] == "/": 

key = key[1:] 

path = os.path.join(self.base_path, key) 

expiration = 0.0 

if os.path.exists(path): 

with open(path, 'rb') as f: 

expiration = float(f.read()) 

if time.time() >= expiration: 

next_expiration = time.time() + ttl 

return self.set(key, str(next_expiration), None) 

return False 

 

def set(self, key, data, nx=None): 

if key[0] == "/": 

key = key[1:] 

path = os.path.join(self.base_path, key) 

if nx is False and os.path.exists(path): 

return None 

mkdir_p(os.path.dirname(path)) 

with open(path, 'wb') as f: 

f.write(data) 

return True 

 

def get(self, key): 

path = os.path.join(self.base_path, key) 

if not os.path.exists(path): 

return None 

with open(path, 'rb') as f: 

return f.read() 

 

def delete(self, key): 

path = os.path.join(self.base_path, key) 

if not os.path.exists(path): 

return None 

return os.remove(path) 

 

def flushall(self, root): 

if root[0] == "/": 

root = root[1:] 

path = os.path.join(self.base_path, root) 

if os.path.exists(path): 

shutil.rmtree(path) 

 

 

BASE_PATH = os.getenv('DATABASE_URL', "/tmp/appr") 

filesystem_client = FilesystemClient(BASE_PATH)