Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

from __future__ import absolute_import, division, print_function 

import hashlib 

import json 

import logging 

import subprocess 

import tempfile 

import time 

 

import requests 

from requests.utils import urlparse 

 

__all__ = ['Kubernetes', "get_endpoint"] 

 

logger = logging.getLogger(__name__) 

 

resource_endpoints = { 

"daemonsets": 

"apis/extensions/v1beta1/namespaces/{namespace}/daemonsets", 

"deployments": 

"apis/extensions/v1beta1/namespaces/{namespace}/deployments", 

"horizontalpodautoscalers": 

"apis/extensions/v1beta1/namespaces/{namespace}/horizontalpodautoscalers", 

"ingresses": 

"apis/extensions/v1beta1/namespaces/{namespace}/ingresses", 

"jobs": 

"apis/extensions/v1beta1/namespaces/{namespace}/jobs", 

"namespaces": 

"api/v1/namespaces", 

"replicasets": 

"apis/extensions/v1beta1/namespaces/{namespace}/replicasets", 

"persistentvolumes": 

"api/v1/namespaces/{namespace}/persistentvolumes", 

"persistentvolumeclaims": 

"api/v1/namespaces/{namespace}/persistentvolumeclaims", 

"services": 

"api/v1/namespaces/{namespace}/services", 

"serviceaccounts": 

"api/v1/namespaces/{namespace}/serviceaccounts", 

"secrets": 

"api/v1/namespaces/{namespace}/secrets", 

"configmaps": 

"api/v1/namespaces/{namespace}/configmaps", 

"replicationcontrollers": 

"api/v1/namespaces/{namespace}/replicationcontrollers", 

"pods": 

"api/v1/namespaces/{namespace}/pods", 

"statefulset": 

"apis/apps/v1beta1/namespaces/{namespace}/statefulsets", 

"storageclass": 

"apis/storage.k8s.io/v1beta1/statefulsets", } 

 

resources_alias = { 

"ds": "daemonsets", 

"hpa": "horizontalpodautoscalers", 

"ing": "ingresses", 

"ingress": "ingresses", 

"ns": "namespaces", 

"sc": "storageclasses", 

"sfs": "statefulsets", 

"po": "pods", 

"pv": "persistentvolumes", 

"pvc": "persistentvolumeclaims", 

"rc": "replicationcontrollers", 

"svc": "services"} 

 

ANNOTATIONS = { 

'protected': 'resource.appr/protected', 

'hash': 'resource.appr/hash', 

'version': 'package.appr/version', 

'parent': 'package.appr/parent', 

'rand': 'resource.appr/rand', 

'update-mode': 'resource.appr/update-mode', 

'package': 'package.appr/package'} 

 

 

def get_endpoint(kind): 

name = None 

if kind in resource_endpoints: 

name = kind 

elif kind in resources_alias: 

name = resources_alias[kind] 

elif kind + "s" in resource_endpoints: 

name = kind + "s" 

else: 

return 'unknown' 

return resource_endpoints[name] 

 

 

class Kubernetes(object): 

def __init__(self, namespace=None, endpoint=None, body=None, proxy=None): 

 

self.proxy = None 

if endpoint is not None and endpoint[0] == "/": 

endpoint = endpoint[1:-1] 

self.endpoint = endpoint 

self.body = body 

self.obj = None 

self.protected = False 

self._resource_load() 

self.kind = self.obj['kind'].lower() 

self.name = self.obj['metadata']['name'] 

self.force_rotate = ANNOTATIONS['rand'] in self.obj['metadata'].get('annotations', {}) 

self.namespace = self._namespace(namespace) 

self.result = None 

if proxy: 

self.proxy = urlparse(proxy) 

 

def _resource_load(self): 

self.obj = json.loads(self.body) 

if 'annotations' in self.obj['metadata']: 

if (ANNOTATIONS['protected'] in self.obj['metadata']['annotations'] and 

self.obj['metadata']['annotations'][ANNOTATIONS['protected']] == 'true'): 

self.protected = True 

 

def _gethash(self, src): 

# Copy rand value 

if (src is not None and ANNOTATIONS['rand'] in src['metadata'].get('annotations', {}) and 

ANNOTATIONS['rand'] not in self.obj['metadata']['annotations']): 

self.obj['metadata']['annotations'][ANNOTATIONS['rand']] = src['metadata'][ 

'annotations'][ANNOTATIONS['rand']] 

 

# TODO(ant31) it should hash before the custom annotations 

if ANNOTATIONS['hash'] in self.obj['metadata'].get('annotations', {}): 

if self.obj['metadata']['annotations'][ANNOTATIONS['hash']] is None: 

sha = hashlib.sha256(json.dumps(self.obj, sort_keys=True)).hexdigest() 

self.obj['metadata']['annotations'][ANNOTATIONS['hash']] = sha 

return self.obj['metadata']['annotations'][ANNOTATIONS['hash']] 

else: 

return None 

 

def _namespace(self, namespace=None): 

if namespace: 

return namespace 

elif 'namespace' in self.obj['metadata']: 

return self.obj['metadata']['namespace'] 

else: 

return 'default' 

 

def create(self, force=False, dry=False, strategy='update'): 

""" 

- Check if resource name exists 

- if it exists check if the apprhash is the same 

- if not the same delete the resource and recreate it 

- if force == true, delete the resource and recreate it 

- if doesnt exists create it 

""" 

force = force or self.force_rotate 

r = self.get() 

if r is not None: 

rhash = r['metadata'].get('annotations', {}).get(ANNOTATIONS['hash'], None) 

objhash = self._gethash(r) 

f = tempfile.NamedTemporaryFile() 

method = "apply" 

if self.proxy: 

method = "create" 

strategy = "replace" 

 

cmd = [method, '-f', f.name] 

 

f.write(json.dumps(self.obj)) 

f.flush() 

if r is None: 

self._call(cmd, dry=dry) 

return 'created' 

elif (objhash is None or rhash == objhash) and force is False: 

return 'ok' 

elif rhash != objhash or force is True: 

if self.protected: 

return 'protected' 

if strategy == 'replace': 

self.delete(dry=dry) 

action = "replaced" 

elif strategy == "update": 

action = "updated" 

else: 

raise ValueError("Unknown action %s" % action) 

self._call(cmd, dry=dry) 

return action 

 

def get(self): 

cmd = ['get', self.kind, self.name, '-o', 'json'] 

try: 

self.result = json.loads(self._call(cmd)) 

return self.result 

except RuntimeError: 

return None 

except (requests.exceptions.HTTPError) as e: 

if e.response.status_code == 404: 

return None 

else: 

raise e 

 

def delete(self, dry=False, **kwargs): 

cmd = ['delete', self.kind, self.name] 

if self.protected: 

return 'protected' 

r = self.get() 

if r is not None: 

self._call(cmd, dry=dry) 

return 'deleted' 

else: 

return 'absent' 

 

def wait(self, retries=3, seconds=1): 

r = 1 

time.sleep(seconds) 

obj = self.get() 

while (r < retries and obj is None): 

r += 1 

time.sleep(seconds) 

obj = self.get() 

return obj 

 

def exists(self): 

r = self.get() 

if r is None: 

return False 

else: 

return True 

 

def _call(self, cmd, dry=False): 

command = ['kubectl'] + cmd + ["--namespace", self.namespace] 

if not dry: 

if self.proxy is not None: 

return self._request(cmd[0]) 

else: 

try: 

return subprocess.check_output(command, stderr=subprocess.STDOUT) 

except subprocess.CalledProcessError as e: 

raise RuntimeError("Kubernetes failed to create %s (%s): " 

"%s" % (self.name, self.kind, e.output)) 

else: 

return True 

 

def _request(self, method): 

if method == 'create': 

headers = {'Content-Type': 'application/json'} 

method = 'post' 

url = "%s/%s" % (self.proxy.geturl(), self.endpoint) 

return requests.post(url, data=self.body, headers=headers) 

else: 

url = "%s/%s/%s" % (self.proxy.geturl(), self.endpoint, self.name) 

query = getattr(requests, method) 

r = query(url) 

r.raise_for_status() 

return r.content