Removes client and associated functional test code

All the functionality removed has already made it into the client repo

Change-Id: Icc4748100d06d0bac08619d7fe17c924b81a106f
This commit is contained in:
rajarammallya 2012-01-23 17:36:07 +05:30
parent 94469fc379
commit ce0ba2e45a
12 changed files with 1 additions and 1549 deletions

View File

@ -1,234 +0,0 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""CLI interface for common Melange client opertaions.
Simple cli for creating ip blocks, adding policies and rules for ip address
allocations from these blocks.
"""
import optparse
import os
from os import environ as env
import sys
# If ../melange/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'melange', '__init__.py')):
sys.path.insert(0, possible_topdir)
from melange import version
from melange.common import auth
from melange.common import client as base_client
from melange.common import exception
from melange.common import utils
from melange.ipam import client
def create_options(parser):
"""Sets up the CLI and config-file options.
:param parser: The option parser
:returns: None
"""
parser.add_option('-v', '--verbose', default=False, action="store_true",
help="Print more verbose output")
parser.add_option('-H', '--host', metavar="ADDRESS", default="0.0.0.0",
help="Address of Melange API host. "
"Default: %default")
parser.add_option('-p', '--port', dest="port", metavar="PORT",
type=int, default=9898,
help="Port the Melange API host listens on. "
"Default: %default")
parser.add_option('-t', '--tenant', dest="tenant", metavar="TENANT",
type=str, default=env.get('MELANGE_TENANT', None),
help="tenant id in case of tenant resources")
parser.add_option('--auth-token', dest="auth_token",
metavar="MELANGE_AUTH_TOKEN",
default=env.get('MELANGE_AUTH_TOKEN', None),
type=str, help="Auth token received from keystone")
parser.add_option('-u', '--username', dest="username",
metavar="MELANGE_USERNAME",
default=env.get('MELANGE_USERNAME', None),
type=str, help="Melange user name")
parser.add_option('-k', '--api-key', dest="api_key",
metavar="MELANGE_API_KEY",
default=env.get('MELANGE_API_KEY', None),
type=str, help="Melange access key")
parser.add_option('-a', '--auth-url', dest="auth_url",
metavar="MELANGE_AUTH_URL", type=str,
default=env.get('MELANGE_AUTH_URL', None),
help="Url of keystone service")
parser.add_option('--timeout', dest="timeout",
metavar="MELANGE_TIME_OUT", type=int,
default=env.get('MELANGE_TIME_OUT', None),
help="timeout for melange client operations")
def parse_options(parser, cli_args):
"""Parses CLI options.
Returns parsed CLI options, command to run and its arguments, merged
with any same-named options found in a configuration file
:param parser: The option parser
:returns: (options, args)
"""
(options, args) = parser.parse_args(cli_args)
if not args:
parser.print_usage()
sys.exit(2)
return (options, args)
def usage():
usage = """
%prog category action [args] [options]
Available categories:
"""
for k, _v in categories.iteritems():
usage = usage + ("\t%s\n" % k)
return usage.strip()
categories = {
'ip_block': client.IpBlockClient,
'subnet': client.SubnetClient,
'policy': client.PolicyClient,
'unusable_ip_range': client.UnusableIpRangesClient,
'unusable_ip_octet': client.UnusableIpOctetsClient,
'allocated_ips': client.AllocatedIpAddressesClient,
'ip_address': client.IpAddressesClient,
'ip_route': client.IpRouteClient,
'interface': client.InterfaceClient,
'mac_address_range': client.MacAddressRangeClient,
'allowed_ip': client.AllowedIpClient,
}
def lookup(name, hash):
result = hash.get(name, None)
if not result:
print "%s does not match any options:" % name
print_keys(hash)
sys.exit(2)
return result
def print_keys(hash):
for k, _v in hash.iteritems():
print "\t%s" % k
def methods_of(obj):
"""Gets callable public methods.
Get all callable methods of an object that don't start with underscore
returns a dictionary of the form dict(method_name, method)
"""
def is_public_method(attr):
return callable(getattr(obj, attr)) and not attr.startswith('_')
return dict((attr, getattr(obj, attr)) for attr in dir(obj)
if is_public_method(attr))
def auth_client_factory(options):
if options.auth_url or options.auth_token:
return auth.KeystoneClient(options.auth_url,
options.username,
options.api_key,
options.auth_token)
def args_to_dict(args):
try:
return dict(arg.split("=") for arg in args)
except ValueError:
raise exception.MelangeError("Action arguments "
"should be in the form of field=value")
def main():
oparser = optparse.OptionParser(version='%%prog %s'
% version.version_string(),
usage=usage())
create_options(oparser)
(options, args) = parse_options(oparser, sys.argv[1:])
script_name = os.path.basename(sys.argv[0])
category = args.pop(0)
http_client = base_client.HTTPClient(options.host,
options.port,
options.timeout)
category_client_class = lookup(category, categories)
client = category_client_class(http_client,
auth_client_factory(options),
options.tenant)
client_actions = methods_of(client)
if len(args) < 1:
print "Usage: " + script_name + " category action [<args>]"
print _("Available actions for %s category:") % category
print_keys(client_actions)
sys.exit(2)
if category_client_class.TENANT_ID_REQUIRED and not options.tenant:
print _("Please provide a tenant id for this action."
"You can use option '-t' to provide the tenant id.")
sys.exit(2)
action = args.pop(0)
fn = lookup(action, client_actions)
# call the action with the remaining arguments
try:
print fn(**args_to_dict(args))
sys.exit(0)
except TypeError:
print _("Possible wrong number of arguments supplied")
print "Usage: %s %s %s" % (script_name, category,
utils.MethodInspector(fn))
if options.verbose:
raise
sys.exit(2)
except exception.MelangeError as error:
print error
sys.exit(2)
except Exception:
print _("Command failed, please check log for more info")
if options.verbose:
raise
sys.exit(2)
if __name__ == '__main__':
main()

View File

@ -127,8 +127,6 @@ pygments_style = 'sphinx'
man_pages = [
('man/melange', 'melange', u'melange API Server',
[u'OpenStack'], 1),
('man/melangeclient', 'melange-client', u'melange CLI',
[u'OpenStack'], 1),
('man/melangemanage', 'melange-manage', u'melange Management Utility',
[u'OpenStack'], 1)
]
@ -152,7 +150,7 @@ html_theme = '_theme'
# The name for this set of Sphinx documents. If None, it defaults to
# "<project> v<release> documentation".
#html_title = None
# A shorter title for the navigation bar. Default is the same as html_title.
#html_short_title = None

View File

@ -57,27 +57,3 @@ class TenantBasedAuth(object):
return True
raise exc.HTTPForbidden(_("User with tenant id %s cannot access "
"this resource") % tenant_id)
class KeystoneClient(httplib2.Http):
def __init__(self, url, username, access_key, auth_token=None):
super(KeystoneClient, self).__init__()
self.url = urlparse.urljoin(url, "/v2.0/tokens")
self.username = username
self.access_key = access_key
self.auth_token = auth_token
def get_token(self):
if self.auth_token:
return self.auth_token
headers = {'content-type': 'application/json'}
request_body = json.dumps({"passwordCredentials":
{"username": self.username,
'password': self.access_key}})
res, body = self.request(self.url, "POST", headers=headers,
body=request_body)
if int(res.status) >= 400:
raise Exception(_("Error occured while retrieving token : %s")
% body)
return json.loads(body)['auth']['token']['id']

View File

@ -1,57 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import httplib
import socket
import urllib
from melange.common import exception
class HTTPClient(object):
def __init__(self, host='localhost', port=8080, use_ssl=False, timeout=60):
self.host = host
self.port = port
self.use_ssl = use_ssl
self.timeout = timeout
def _get_connection(self):
if self.use_ssl:
return httplib.HTTPSConnection(self.host, self.port,
timeout=self.timeout)
else:
return httplib.HTTPConnection(self.host, self.port,
timeout=self.timeout)
def do_request(self, method, path, body=None, headers=None, params=None):
params = params or {}
headers = headers or {}
url = path + '?' + urllib.urlencode(params)
try:
connection = self._get_connection()
connection.request(method, url, body, headers)
response = connection.getresponse()
if response.status >= 400:
raise exception.MelangeServiceResponseError(response.read())
return response
except (socket.error, IOError) as error:
raise exception.ClientConnectionError(
_("Error while communicating with server. "
"Got error: %s") % error)

View File

@ -19,10 +19,7 @@
import datetime
import inspect
import logging
import os
import re
import subprocess
import uuid
from openstack.common import utils as openstack_utils
@ -42,29 +39,6 @@ def parse_int(subject):
return None
def execute(cmd, process_input=None, addl_env=None, check_exit_code=True):
logging.debug("Running cmd: %s", cmd)
env = os.environ.copy()
if addl_env:
env.update(addl_env)
obj = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
if process_input != None:
result = obj.communicate(process_input)
else:
result = obj.communicate()
obj.stdin.close()
if obj.returncode:
logging.debug("Result was %s" % (obj.returncode))
if check_exit_code and obj.returncode != 0:
(stdout, stderr) = result
raise exception.ProcessExecutionError(exit_code=obj.returncode,
stdout=stdout,
stderr=stderr,
cmd=cmd)
return result
def utcnow():
return datetime.datetime.utcnow()

View File

@ -1,342 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import urlparse
from melange.common import utils
class Resource(object):
def __init__(self, path, name, client, auth_client, tenant_id=None):
if tenant_id:
path = "tenants/{0}/{1}".format(tenant_id, path)
self.path = urlparse.urljoin("/v0.1/ipam/", path)
self.name = name
self.client = client
self.auth_client = auth_client
def create(self, **kwargs):
return self.request("POST",
self.path,
body=json.dumps({self.name: kwargs}))
def update(self, id, **kwargs):
return self.request("PUT",
self._member_path(id),
body=json.dumps(
{self.name: utils.remove_nones(kwargs)}))
def all(self, **params):
return self.request("GET",
self.path,
params=utils.remove_nones(params))
def find(self, id):
return self.request("GET", self._member_path(id))
def delete(self, id):
return self.request("DELETE", self._member_path(id))
def _member_path(self, id):
return "{0}/{1}".format(self.path, id)
def request(self, method, path, **kwargs):
kwargs['headers'] = {'Content-Type': "application/json"}
if self.auth_client:
kwargs['headers']['X-AUTH-TOKEN'] = self.auth_client.get_token()
response = self.client.do_request(method, path, **kwargs)
return response.read()
class BaseClient(object):
TENANT_ID_REQUIRED = True
def __init__(self, client, auth_client, tenant_id):
self.client = client
self.auth_client = auth_client
self.tenant_id = tenant_id
class IpBlockClient(BaseClient):
def __init__(self, client, auth_client, tenant_id):
self.resource = Resource("ip_blocks", "ip_block", client, auth_client,
tenant_id)
def create(self, type, cidr, network_id=None, policy_id=None):
return self.resource.create(type=type,
cidr=cidr,
network_id=network_id,
policy_id=policy_id)
def list(self):
return self.resource.all()
def show(self, id):
return self.resource.find(id)
def update(self, id, network_id=None, policy_id=None):
return self.resource.update(id,
network_id=network_id,
policy_id=policy_id)
def delete(self, id):
return self.resource.delete(id)
class SubnetClient(BaseClient):
def _resource(self, parent_id):
return Resource("ip_blocks/{0}/subnets".format(parent_id),
"subnet",
self.client,
self.auth_client,
self.tenant_id)
def create(self, parent_id, cidr, network_id=None):
return self._resource(parent_id).create(cidr=cidr,
network_id=network_id)
def list(self, parent_id):
return self._resource(parent_id).all()
class PolicyClient(BaseClient):
def __init__(self, client, auth_client, tenant_id):
self.resource = Resource("policies",
"policy",
client,
auth_client,
tenant_id)
def create(self, name, desc=None):
return self.resource.create(name=name, description=desc)
def update(self, id, name, desc=None):
return self.resource.update(id, name=name, description=desc)
def list(self):
return self.resource.all()
def show(self, id):
return self.resource.find(id)
def delete(self, id):
return self.resource.delete(id)
class UnusableIpRangesClient(BaseClient):
def _resource(self, policy_id):
return Resource("policies/{0}/unusable_ip_ranges".format(policy_id),
"ip_range",
self.client,
self.auth_client,
self.tenant_id)
def create(self, policy_id, offset, length):
return self._resource(policy_id).create(offset=offset, length=length)
def update(self, policy_id, id, offset=None, length=None):
return self._resource(policy_id).update(id,
offset=offset,
length=length)
def list(self, policy_id):
return self._resource(policy_id).all()
def show(self, policy_id, id):
return self. _resource(policy_id).find(id)
def delete(self, policy_id, id):
return self._resource(policy_id).delete(id)
class UnusableIpOctetsClient(BaseClient):
def _resource(self, policy_id):
return Resource("policies/{0}/unusable_ip_octets".format(policy_id),
"ip_octet",
self.client,
self.auth_client,
self.tenant_id)
def create(self, policy_id, octet):
return self._resource(policy_id).create(octet=octet)
def update(self, policy_id, id, octet=None):
return self._resource(policy_id).update(id, octet=octet)
def list(self, policy_id):
return self._resource(policy_id).all()
def show(self, policy_id, id):
return self._resource(policy_id).find(id)
def delete(self, policy_id, id):
return self._resource(policy_id).delete(id)
class AllocatedIpAddressesClient(BaseClient):
TENANT_ID_REQUIRED = False
def __init__(self, client, auth_client, tenant_id=None):
self._resource = Resource("allocated_ip_addresses",
"allocated_ip_addresses",
client,
auth_client,
tenant_id)
def list(self, used_by_device=None):
return self._resource.all(used_by_device=used_by_device)
class IpAddressesClient(BaseClient):
def _resource(self, ip_block_id):
path = "ip_blocks/{0}/ip_addresses".format(ip_block_id)
return Resource(path,
"ip_address",
self.client,
self.auth_client,
self.tenant_id)
def create(self, ip_block_id, address=None, interface_id=None,
used_by_tenant=None, used_by_device=None):
resource = self._resource(ip_block_id)
return resource.create(address=address,
interface_id=interface_id,
used_by_device=used_by_device,
tenant_id=used_by_tenant)
def list(self, ip_block_id):
return self._resource(ip_block_id).all()
def show(self, ip_block_id, address):
return self._resource(ip_block_id).find(address)
def delete(self, ip_block_id, address):
return self._resource(ip_block_id).delete(address)
class IpRouteClient(BaseClient):
def _resource(self, ip_block_id):
path = "ip_blocks/{0}/ip_routes".format(ip_block_id)
return Resource(path,
"ip_route",
self.client,
self.auth_client,
self.tenant_id)
def create(self, ip_block_id, destination, gateway, netmask=None):
resource = self._resource(ip_block_id)
return resource.create(destination=destination,
gateway=gateway,
netmask=netmask)
def list(self, ip_block_id):
return self._resource(ip_block_id).all()
def show(self, ip_block_id, route_id):
return self._resource(ip_block_id).find(route_id)
def delete(self, ip_block_id, route_id):
return self._resource(ip_block_id).delete(route_id)
class InterfaceClient(BaseClient):
TENANT_ID_REQUIRED = False
def __init__(self, client, auth_client, tenant_id=None):
self._resource = Resource("interfaces",
"interface",
client,
auth_client,
tenant_id)
def create(self, vif_id, tenant_id, device_id=None, network_id=None):
request_params = dict(id=vif_id, tenant_id=tenant_id,
device_id=device_id)
if network_id:
request_params['network'] = dict(id=network_id)
return self._resource.create(**request_params)
def show(self, vif_id):
return self._resource.find(vif_id)
def delete(self, vif_id):
return self._resource.delete(vif_id)
class MacAddressRangeClient(BaseClient):
TENANT_ID_REQUIRED = False
def __init__(self, client, auth_client, tenant_id=None):
self._resource = Resource("mac_address_ranges",
"mac_address_range",
client,
auth_client,
tenant_id)
def create(self, cidr):
return self._resource.create(cidr=cidr)
def show(self, id):
return self._resource.find(id)
def list(self):
return self._resource.all()
def delete(self, id):
return self._resource.delete(id)
class AllowedIpClient(BaseClient):
def __init__(self, client, auth_client, tenant_id=None):
self.client = client
self.auth_client = auth_client
self.tenant_id = tenant_id
def _resource(self, interface_id):
return Resource("interfaces/{0}/allowed_ips".format(interface_id),
"allowed_ip",
self.client,
self.auth_client,
self.tenant_id)
def create(self, interface_id, network_id, ip_address):
return self._resource(interface_id).create(network_id=network_id,
ip_address=ip_address)
def show(self, interface_id, ip_address):
return self._resource(interface_id).find(ip_address)
def list(self, interface_id):
return self._resource(interface_id).all()
def delete(self, interface_id, ip_address):
return self._resource(interface_id).delete(ip_address)

View File

@ -16,16 +16,11 @@
# under the License.
import os
import socket
import subprocess
import melange
from melange.common import config
from melange.db import db_api
from melange.tests.functional import server
_PORT = None
def test_config_file():
@ -33,12 +28,8 @@ def test_config_file():
def setup():
print "Restarting melange server..."
srv = server.Server("melange",
melange.melange_bin_path('melange'), )
options = dict(config_file=test_config_file())
_db_sync(options)
srv.restart(port=setup_unused_port(), **options)
_configure_db(options)
@ -53,11 +44,6 @@ def _db_sync(options):
db_api.db_sync(conf)
def teardown():
print "Stopping melange server..."
server.Server("melange", melange.melange_bin_path('melange')).stop()
def execute(cmd, raise_error=True):
"""Executes a command in a subprocess.
Returns a tuple of (exitcode, out, err), where out is the string output
@ -91,22 +77,3 @@ def execute(cmd, raise_error=True):
"\n\nSTDERR: %(err)s" % locals()
raise RuntimeError(msg)
return exitcode, out, err
def get_unused_port():
"""Returns an unused port on localhost."""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('localhost', 0))
addr, port = s.getsockname()
s.close()
return port
def setup_unused_port():
global _PORT
_PORT = get_unused_port()
return _PORT
def get_api_port():
return _PORT

View File

@ -1,78 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import os
import time
import urllib2
class Server(object):
def __init__(self, name, path):
self.name = name
self.path = path
def restart(self, port, config_file):
self.stop()
self.start(port, config_file)
def start(self, port, config_file):
pid = os.fork()
if pid == 0:
os.setsid()
self._close_stdio()
try:
os.system("(%s -p %s --config-file=%s >func_test.log)" %
(self.path, port, config_file))
except OSError:
os._exit(1)
os._exit(0)
else:
self._wait_till_running(port)
def stop(self):
os.system("ps x -o pid,command | grep -v 'grep'|"
"grep 'bin/melange' | awk '{print $1}'| xargs kill -9")
def _close_stdio(self):
with open(os.devnull, 'r+b') as nullfile:
for desc in (0, 1, 2): # close stdio
try:
os.dup2(nullfile.fileno(), desc)
except OSError:
pass
def _pid_file_path(self):
return os.path.join('/', 'tmp', self.name + ".pid")
def _wait_till_running(self, port, timeout=10):
now = datetime.datetime.now()
timeout_time = now + datetime.timedelta(seconds=timeout)
while (timeout_time > now):
if self._running(port):
return
now = datetime.datetime.now()
time.sleep(0.05)
print("Failed to start servers.")
def _running(self, port):
try:
urllib2.urlopen("http://localhost:{0}".format(port))
return True
except urllib2.URLError:
return False

View File

@ -25,14 +25,6 @@ from melange.tests.factories import models as factory_models
from melange.tests import functional
def run(command, **kwargs):
full_command = "{0} --port={1} {2} -v ""--auth-token=test".format(
melange.melange_bin_path('melange-client'),
functional.get_api_port(),
command)
return functional.execute(full_command, **kwargs)
def run_melange_manage(command):
melange_manage = melange.melange_bin_path('melange-manage')
config_file = functional.test_config_file()
@ -40,611 +32,6 @@ def run_melange_manage(command):
"--config-file=%(config_file)s" % locals())
class TestIpBlockCLI(tests.BaseTest):
def test_create(self):
policy = factory_models.PolicyFactory(tenant_id=123)
exitcode, out, err = run("ip_block create type=private "
"cidr=10.1.1.0/29 network_id=net1 "
"policy_id=%s -t 123"
% policy.id)
self.assertEqual(exitcode, 0)
ip_block = models.IpBlock.get_by(cidr="10.1.1.0/29",
type="private",
tenant_id="123")
self.assertTrue(ip_block is not None)
self.assertEqual(ip_block.network_id, "net1")
self.assertEqual(ip_block.policy_id, policy.id)
def test_list(self):
exitcode, out, err = run("ip_block list -t 123")
self.assertEqual(exitcode, 0)
self.assertIn("ip_blocks", out)
def test_list_without_tenant_id_should_error_out(self):
expected_error_msg = "Please provide a tenant id for this action"
self.assertRaisesExcMessage(RuntimeError,
expected_error_msg,
run,
"ip_block list")
def test_show(self):
ip_block = factory_models.PrivateIpBlockFactory(tenant_id=123)
exitcode, out, err = run("ip_block show id=%s -t 123" % ip_block.id)
self.assertEqual(exitcode, 0)
self.assertIn(ip_block.cidr, out)
def test_update(self):
ip_block = factory_models.PrivateIpBlockFactory(tenant_id="123")
policy = factory_models.PolicyFactory()
exitcode, out, err = run("ip_block update id=%s network_id=new_net "
"policy_id=%s -t 123"
% (ip_block.id, policy.id))
self.assertEqual(exitcode, 0)
updated_block = models.IpBlock.find_by(id=ip_block.id, tenant_id="123")
self.assertEqual(updated_block.network_id, "new_net")
self.assertEqual(updated_block.policy_id, policy.id)
def test_delete(self):
ip_block = factory_models.PrivateIpBlockFactory(tenant_id=123)
exitcode, out, err = run("ip_block delete "
"id=%s -t 123" % ip_block.id)
self.assertEqual(exitcode, 0)
self.assertTrue(models.IpBlock.get(ip_block.id) is None)
class TestSubnetCLI(tests.BaseTest):
def test_create(self):
block = factory_models.IpBlockFactory(cidr="10.0.0.0/28",
tenant_id="123")
exitcode, out, err = run("subnet create parent_id={0} "
"cidr=10.0.0.0/29 -t 123".format(block.id))
self.assertEqual(exitcode, 0)
subnet = models.IpBlock.get_by(parent_id=block.id)
self.assertTrue(subnet is not None)
self.assertEqual(subnet.tenant_id, "123")
def test_index(self):
block = factory_models.IpBlockFactory(cidr="10.0.0.0/28",
tenant_id="123")
block.subnet("10.0.0.0/30")
block.subnet("10.0.0.4/30")
block.subnet("10.0.0.8/30")
exitcode, out, err = run(
"subnet list parent_id={0} -t 123".format(block.id))
self.assertEqual(exitcode, 0)
self.assertIn("subnets", out)
self.assertIn("10.0.0.0/30", out)
self.assertIn("10.0.0.4/30", out)
self.assertIn("10.0.0.8/30", out)
class TestPolicyCLI(tests.BaseTest):
def test_update(self):
policy = factory_models.PolicyFactory(tenant_id="1234",
name='name',
description='desc')
exitcode, out, err = run("policy update -t 1234"
" id={0} name=new_name".format(policy.id))
self.assertEqual(exitcode, 0)
updated_policy = models.Policy.get(policy.id)
self.assertEqual(updated_policy.name, "new_name")
self.assertEqual(updated_policy.description, "desc")
def test_list(self):
exitcode, out, err = run("policy list -t 1234")
self.assertEqual(exitcode, 0)
self.assertIn("policies", out)
def test_show(self):
policy = factory_models.PolicyFactory(tenant_id="1234", name="blah")
exitcode, out, err = run("policy show id=%s -t 1234" % policy.id)
self.assertEqual(exitcode, 0)
self.assertIn(policy.name, out)
def test_delete(self):
policy = factory_models.PolicyFactory(tenant_id="1234", name="blah")
exitcode, out, err = run("policy delete id=%s -t 1234" % policy.id)
self.assertEqual(exitcode, 0)
self.assertTrue(models.Policy.get(policy.id) is None)
def test_create(self):
command = "policy create name=policy_name desc=policy_desc -t 1234"
exitcode, out, err = run(command)
self.assertEqual(exitcode, 0)
policy = models.Policy.get_by(name="policy_name",
description="policy_desc")
self.assertTrue(policy is not None)
self.assertEqual(policy.tenant_id, "1234")
class TestUnusableIpRangesCLI(tests.BaseTest):
def test_create(self):
policy = factory_models.PolicyFactory(tenant_id="1234")
exitcode, out, err = run("unusable_ip_range create "
"policy_id={0} offset=1 length=2 "
"-t 1234".format(policy.id))
self.assertEqual(exitcode, 0)
ip_range = models.IpRange.get_by(policy_id=policy.id,
offset=1,
length=2)
self.assertTrue(ip_range is not None)
def test_update(self):
policy = factory_models.PolicyFactory(tenant_id="1234")
ip_range = factory_models.IpRangeFactory(policy_id=policy.id,
offset=0,
length=1)
exitcode, out, err = run("unusable_ip_range update "
"policy_id={0} id={1} offset=10 length=122 "
"-t 1234".format(policy.id, ip_range.id))
updated_ip_range = models.IpRange.find(ip_range.id)
self.assertEqual(exitcode, 0)
self.assertEqual(updated_ip_range.offset, 10)
self.assertEqual(updated_ip_range.length, 122)
def test_update_with_optional_params(self):
policy = factory_models.PolicyFactory(tenant_id="1234")
ip_range = factory_models.IpRangeFactory(policy_id=policy.id,
offset=0,
length=1)
exitcode, out, err = run("unusable_ip_range update"
" policy_id={0} id={1} offset=10"
" -t 1234".format(policy.id, ip_range.id))
updated_ip_range = models.IpRange.find(ip_range.id)
self.assertEqual(exitcode, 0)
self.assertEqual(updated_ip_range.offset, 10)
self.assertEqual(updated_ip_range.length, 1)
def test_list(self):
policy = factory_models.PolicyFactory(tenant_id="1234")
exitcode, out, err = run("unusable_ip_range list"
" policy_id={0} -t 1234".format(policy.id))
self.assertEqual(exitcode, 0)
self.assertIn("ip_ranges", out)
def test_show(self):
policy = factory_models.PolicyFactory(tenant_id="1234")
ip_range = factory_models.IpRangeFactory(policy_id=policy.id)
exitcode, out, err = run("unusable_ip_range show"
" policy_id={0} id={1} "
"-t 1234".format(policy.id, ip_range.id))
self.assertEqual(exitcode, 0)
self.assertIn(ip_range.policy_id, out)
def test_delete(self):
policy = factory_models.PolicyFactory(tenant_id="1234")
ip_range = factory_models.IpRangeFactory(policy_id=policy.id)
exitcode, out, err = run("unusable_ip_range delete "
"policy_id={0} id={1} "
" -t 1234".format(policy.id, ip_range.id))
self.assertEqual(exitcode, 0)
self.assertTrue(models.IpRange.get(ip_range.id) is None)
class TestUnusableIpOctetsCLI(tests.BaseTest):
def test_create(self):
policy = factory_models.PolicyFactory(tenant_id="1234")
exitcode, out, err = run("unusable_ip_octet create "
" policy_id={0} octet=255 "
" -t 1234".format(policy.id))
self.assertEqual(exitcode, 0)
ip_octet = models.IpOctet.get_by(policy_id=policy.id, octet=255)
self.assertTrue(ip_octet is not None)
def test_update(self):
policy = factory_models.PolicyFactory(tenant_id="1234")
ip_octet = factory_models.IpOctetFactory(policy_id=policy.id,
octet=222)
exitcode, out, err = run("unusable_ip_octet update policy_id={0} "
"id={1} octet=255"
" -t 1234".format(policy.id, ip_octet.id))
updated_ip_octet = models.IpOctet.find(ip_octet.id)
self.assertEqual(exitcode, 0)
self.assertEqual(updated_ip_octet.octet, 255)
def test_update_with_optional_params(self):
policy = factory_models.PolicyFactory(tenant_id="1234")
ip_octet = factory_models.IpOctetFactory(policy_id=policy.id,
octet=222)
exitcode, out, err = run("unusable_ip_octet update "
"policy_id={0} id={1} "
"-t 1234".format(policy.id, ip_octet.id))
updated_ip_octet = models.IpOctet.find(ip_octet.id)
self.assertEqual(exitcode, 0)
self.assertEqual(updated_ip_octet.octet, 222)
def test_list(self):
policy = factory_models.PolicyFactory(tenant_id="1234")
exitcode, out, err = run("unusable_ip_octet "
"list policy_id={0} "
"-t 1234".format(policy.id))
self.assertEqual(exitcode, 0)
self.assertIn("ip_octets", out)
def test_show(self):
policy = factory_models.PolicyFactory(tenant_id="1234")
ip_octet = factory_models.IpOctetFactory(policy_id=policy.id)
exitcode, out, err = run("unusable_ip_octet show "
"policy_id={0} id={1} "
"-t 1234".format(policy.id, ip_octet.id))
self.assertEqual(exitcode, 0)
self.assertIn(ip_octet.policy_id, out)
def test_delete(self):
policy = factory_models.PolicyFactory(tenant_id="1234")
ip_octet = factory_models.IpOctetFactory(policy_id=policy.id)
exitcode, out, err = run("unusable_ip_octet delete "
"policy_id={0} id={1} "
"-t 1234".format(policy.id,
ip_octet.id))
self.assertEqual(exitcode, 0)
self.assertTrue(models.IpOctet.get(ip_octet.id) is None)
class TestAllocatedIpAddressCLI(tests.BaseTest):
def test_list(self):
interface1 = factory_models.InterfaceFactory(device_id="device1")
interface2 = factory_models.InterfaceFactory(device_id="device2")
factory_models.IpAddressFactory(address="10.1.1.1",
interface_id=interface1.id)
factory_models.IpAddressFactory(address="20.1.1.1",
interface_id=interface2.id)
exitcode, out, err = run("allocated_ips list used_by_device=device1")
self.assertEqual(exitcode, 0)
self.assertIn("ip_addresses", out)
self.assertIn('"address": "10.1.1.1"', out)
self.assertNotIn('"address": "20.1.1.1"', out)
def test_list_with_tenant(self):
interface1 = factory_models.InterfaceFactory(tenant_id="tenant1")
interface2 = factory_models.InterfaceFactory(tenant_id="tenant2")
factory_models.IpAddressFactory(address="10.1.1.1",
interface_id=interface1.id)
factory_models.IpAddressFactory(address="20.1.1.1",
interface_id=interface2.id)
exitcode, out, err = run("allocated_ips list -t tenant1")
self.assertEqual(exitcode, 0)
self.assertIn("ip_addresses", out)
self.assertIn('"address": "10.1.1.1"', out)
self.assertNotIn('"address": "20.1.1.1"', out)
class TestIpAddressCLI(tests.BaseTest):
def test_create(self):
block = factory_models.PrivateIpBlockFactory(cidr="10.1.1.0/24",
tenant_id="123")
exitcode, out, err = run("ip_address create ip_block_id={0} "
"address=10.1.1.2 interface_id=interface_id "
"used_by_tenant=used_by_tenant_id "
"used_by_device=used_by_device_id "
"-t 123 -v".format(block.id))
self.assertEqual(exitcode, 0)
ip = models.IpAddress.get_by(ip_block_id=block.id)
interface = models.Interface.find(ip.interface_id)
self.assertTrue(ip is not None)
self.assertEqual(ip.address, "10.1.1.2")
self.assertEqual(interface.tenant_id, "used_by_tenant_id")
self.assertEqual(interface.device_id, "used_by_device_id")
def test_list(self):
block = factory_models.PrivateIpBlockFactory(cidr="10.1.1.0/24",
tenant_id="123")
ip1 = factory_models.IpAddressFactory(ip_block_id=block.id,
address="10.1.1.2")
ip2 = factory_models.IpAddressFactory(ip_block_id=block.id,
address="10.1.1.3")
exitcode, out, err = run("ip_address list ip_block_id={0} "
"-t 123".format(block.id))
self.assertEqual(exitcode, 0)
self.assertIn("ip_addresses", out)
self.assertIn('"address": "10.1.1.2"', out)
self.assertIn('"address": "10.1.1.3"', out)
def test_show(self):
block = factory_models.PrivateIpBlockFactory(tenant_id="123")
ip = factory_models.IpAddressFactory(ip_block_id=block.id)
exitcode, out, err = run("ip_address show ip_block_id={0} address={1} "
"-t 123".format(block.id, ip.address))
self.assertEqual(exitcode, 0)
self.assertIn(ip.address, out)
def test_delete(self):
block = factory_models.PrivateIpBlockFactory(tenant_id="123")
ip = factory_models.IpAddressFactory(ip_block_id=block.id)
exitcode, out, err = run("ip_address delete "
"ip_block_id={0} address={1} "
"-t 123".format(block.id, ip.address))
self.assertEqual(exitcode, 0)
self.assertTrue(models.IpAddress.get(ip.id).marked_for_deallocation)
class TestIpRoutesCLI(tests.BaseTest):
def test_create(self):
block = factory_models.IpBlockFactory(cidr="77.1.1.0/24",
tenant_id="123")
exitcode, out, err = run("ip_route create ip_block_id={0} "
"destination=10.1.1.2 gateway=10.1.1.1 "
"netmask=255.255.255.0 "
"-t 123".format(block.id))
self.assertEqual(exitcode, 0)
ip_route = models.IpRoute.get_by(source_block_id=block.id)
self.assertTrue(ip_route is not None)
self.assertEqual(ip_route.destination, "10.1.1.2")
self.assertEqual(ip_route.gateway, "10.1.1.1")
self.assertEqual(ip_route.netmask, "255.255.255.0")
def test_list(self):
block = factory_models.PrivateIpBlockFactory(cidr="10.1.1.0/24",
tenant_id="123")
ip_route1 = factory_models.IpRouteFactory(source_block_id=block.id)
ip_route2 = factory_models.IpRouteFactory(source_block_id=block.id)
exitcode, out, err = run("ip_route list ip_block_id={0} "
"-t 123".format(block.id))
self.assertEqual(exitcode, 0)
self.assertIn("ip_routes", out)
self.assertIn('"destination": "%s"' % ip_route1.destination, out)
self.assertIn('"destination": "%s"' % ip_route2.destination, out)
def test_show(self):
block = factory_models.PrivateIpBlockFactory(tenant_id="123")
ip_route = factory_models.IpRouteFactory(source_block_id=block.id)
exitcode, out, err = run("ip_route show ip_block_id={0} route_id={1} "
"-t 123".format(block.id, ip_route.id))
self.assertEqual(exitcode, 0)
self.assertIn(ip_route.destination, out)
def test_delete(self):
block = factory_models.PrivateIpBlockFactory(tenant_id="123")
ip_route = factory_models.IpRouteFactory(source_block_id=block.id)
exitcode, out, err = run("ip_route delete ip_block_id={0} "
"route_id={1} "
"-t 123".format(block.id, ip_route.id))
self.assertEqual(exitcode, 0)
self.assertIsNone(models.IpRoute.get(ip_route.id))
class TestInterfaceCLI(tests.BaseTest):
def test_create(self):
factory_models.MacAddressRangeFactory()
factory_models.IpBlockFactory(network_id="network_id",
tenant_id="tenant_id")
exitcode, out, err = run("interface create vif_id=vif_id "
"tenant_id=tenant_id "
"device_id=device_id "
"network_id=network_id")
self.assertEqual(exitcode, 0)
created_interface = models.Interface.find_by(
virtual_interface_id="vif_id")
self.assertEqual(created_interface.tenant_id, "tenant_id")
self.assertEqual(created_interface.device_id, "device_id")
self.assertIsNotNone(created_interface.mac_address_eui_format)
self.assertIsNotNone(created_interface.ip_addresses)
def test_show(self):
interface = factory_models.InterfaceFactory(
virtual_interface_id="vif_id", tenant_id="tenant_id")
mac = models.MacAddress.create(address="ab-bc-cd-12-23-34",
interface_id=interface.id)
ip1 = factory_models.IpAddressFactory(interface_id=interface.id)
ip2 = factory_models.IpAddressFactory(interface_id=interface.id)
noise_ip = factory_models.IpAddressFactory()
exitcode, out, err = run("interface show vif_id=vif_id -t tenant_id")
self.assertEqual(exitcode, 0)
self.assertIn("vif_id", out)
self.assertIn(mac.eui_format, out)
self.assertIn(ip1.address, out)
self.assertIn(ip2.address, out)
self.assertNotIn(noise_ip.address, out)
def test_delete(self):
interface = factory_models.InterfaceFactory(
virtual_interface_id="vif_id")
mac = models.MacAddress.create(address="ab-bc-cd-12-23-34",
interface_id=interface.id)
ip1 = factory_models.IpAddressFactory(interface_id=interface.id)
ip2 = factory_models.IpAddressFactory(interface_id=interface.id)
noise_ip = factory_models.IpAddressFactory()
exitcode, out, err = run("interface delete vif_id=vif_id")
self.assertEqual(exitcode, 0)
self.assertIsNone(models.Interface.get(interface.id))
self.assertIsNone(models.MacAddress.get(mac.id))
self.assertTrue(models.IpAddress.get(ip1.id).marked_for_deallocation)
self.assertTrue(models.IpAddress.get(ip2.id).marked_for_deallocation)
class TestMacAddressRangeCLI(tests.BaseTest):
def test_create(self):
exitcode, out, err = run("mac_address_range create "
"cidr=ab-bc-cd-12-23-34/24")
self.assertEqual(exitcode, 0)
self.assertIsNotNone(models.MacAddressRange.get_by(
cidr="ab-bc-cd-12-23-34/24"))
def test_show(self):
rng = factory_models.MacAddressRangeFactory()
exitcode, out, err = run("mac_address_range show "
"id=%s" % rng.id)
self.assertEqual(exitcode, 0)
self.assertIn('"id": "%s"' % rng.id, out)
self.assertIn('"cidr": "%s"' % rng.cidr, out)
def test_index(self):
rng1 = factory_models.MacAddressRangeFactory()
rng2 = factory_models.MacAddressRangeFactory()
exitcode, out, err = run("mac_address_range list")
self.assertEqual(exitcode, 0)
self.assertIn('"id": "%s"' % rng1.id, out)
self.assertIn('"id": "%s"' % rng2.id, out)
def test_delete(self):
rng = factory_models.MacAddressRangeFactory()
exitcode, out, err = run("mac_address_range delete id=%s" % rng.id)
self.assertEquals(exitcode, 0)
self.assertIsNone(models.MacAddressRange.get(rng.id))
class TestAllowedIpCLI(tests.BaseTest):
def test_create(self):
interface = factory_models.InterfaceFactory(network_id="123",
virtual_interface_id="x",
tenant_id="RAX")
block = factory_models.IpBlockFactory(network_id="123",
tenant_id="RAX")
ip_plugged_into_interface = block.allocate_ip(interface)
ip_to_allow = block.allocate_ip(
factory_models.InterfaceFactory(network_id="123"))
exitcode, out, err = run("allowed_ip create interface_id=%s "
"network_id=123 ip_address=%s "
"-t RAX" % (interface.virtual_interface_id,
ip_to_allow.address))
self.assertEqual(exitcode, 0)
self.assertModelsEqual(interface.ips_allowed(),
[ip_plugged_into_interface, ip_to_allow])
def test_index(self):
interface = factory_models.InterfaceFactory(
tenant_id="RAX", virtual_interface_id="vif_id")
ip_factory = factory_models.IpAddressFactory
block_factory = factory_models.IpBlockFactory
ip_on_interface = block_factory(network_id="1",
tenant_id="RAX").allocate_ip(interface)
allowed_ip = ip_factory(ip_block_id=block_factory(network_id="1").id)
interface.allow_ip(allowed_ip)
exitcode, out, err = run("allowed_ip list interface_id=%s -t RAX"
% interface.virtual_interface_id)
self.assertEqual(exitcode, 0)
self.assertIn("ip_addresses", out)
self.assertIn('"address": "%s"' % ip_on_interface.address, out)
self.assertIn('"address": "%s"' % allowed_ip.address, out)
def test_show(self):
interface = factory_models.InterfaceFactory(
tenant_id="RAX", virtual_interface_id="vif_id")
block = factory_models.IpBlockFactory(network_id="net123",
tenant_id="RAX")
ip_on_interface = block.allocate_ip(interface)
exitcode, out, err = run("allowed_ip show "
"interface_id=%s ip_address=%s -t RAX"
% (interface.virtual_interface_id,
ip_on_interface.address))
self.assertEqual(exitcode, 0)
self.assertIn("ip_address", out)
self.assertIn('"address": "%s"' % ip_on_interface.address, out)
def test_delete(self):
interface = factory_models.InterfaceFactory(
tenant_id="RAX", virtual_interface_id="vif_id")
block = factory_models.IpBlockFactory(network_id="net123",
tenant_id="RAX")
ip_on_interface = block.allocate_ip(interface)
allowed_ip = block.allocate_ip(factory_models.InterfaceFactory())
interface.allow_ip(allowed_ip)
exitcode, out, err = run("allowed_ip delete "
"interface_id=%s ip_address=%s -t RAX"
% (interface.virtual_interface_id,
allowed_ip.address))
self.assertEqual(exitcode, 0)
self.assertEqual(interface.ips_allowed(), [ip_on_interface])
class TestMelangeCLI(tests.BaseTest):
def test_raises_error_for_non_keyword_arguments(self):
exitcode, out, err = run("allowed_ip delete interface_id123 -t RAX",
raise_error=False)
self.assertEqual(exitcode, 2)
self.assertIn("Action arguments should be in the form of field=value",
out)
class TestDBSyncCLI(tests.BaseTest):
def test_db_sync_executes(self):

View File

@ -1,83 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from melange import tests
from melange.common import client
from melange.common import exception
from melange.tests import functional
class FunctionalTest(tests.BaseTest):
def setUp(self):
super(FunctionalTest, self).setUp()
self.client = client.HTTPClient(port=functional.get_api_port())
def client_get(self, path, params=None, headers=None):
params = params or {}
headers = headers or {}
return self.client.do_request("GET", path, params=params,
headers=headers)
class TestServiceConf(FunctionalTest):
def test_root_url_returns_versions(self):
response = self.client_get("/")
self.assertEqual(response.status, 200)
self.assertTrue("versions" in response.read())
def test_extensions_are_loaded(self):
response = self.client_get("/v0.1/extensions")
self.assertEqual(response.status, 200)
self.assertTrue("extensions" in response.read())
def test_ipam_service_can_be_accessed(self):
response = self.client_get("/v0.1/ipam/tenants/123/ip_blocks")
self.assertEqual(response.status, 200)
self.assertTrue("ip_blocks" in response.read())
class TestMimeTypeVersioning(FunctionalTest):
def test_ipam_service_can_be_accessed_with_mime_type_versioning(self):
headers = {
'Accept': "application/vnd.openstack.melange+xml;"
"version=0.1",
}
response = self.client_get("/ipam/tenants/123/ip_blocks",
headers=headers)
self.assertEqual(response.status, 200)
self.assertIn("application/xml", response.getheader('content-type'))
self.assertTrue("ip_blocks" in response.read())
def test_requesting_nonexistent_version_via_mime_type_versioning(self):
headers = {
'X_ROLE': 'Admin',
'Accept': "application/vnd.openstack.melange+xml;"
"version=99.1",
}
self.assertRaisesExcMessage(exception.MelangeServiceResponseError,
"version not supported",
self.client_get,
"/ipam/tenants/123/ip_blocks",
headers=headers)

View File

@ -15,11 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import httplib2
import json
import mox
import routes
import urlparse
import webob
import webob.exc
@ -157,54 +153,3 @@ class TestTenantBasedAuth(tests.BaseTest):
self.assertTrue(self.auth_provider.authorize(request,
tenant_id="1",
roles=["Admin"]))
class TestKeyStoneClient(tests.BaseTest):
def test_get_token_doesnot_call_auth_service_when_token_is_given(self):
url = "http://localhost:5001"
client = auth.KeystoneClient(url, "username", "access_key",
"auth_token")
self.mock.StubOutWithMock(client, "request")
self.assertEqual(client.get_token(), "auth_token")
def test_get_token_calls_auth_service_when_token_is_not_given(self):
url = "http://localhost:5001"
client = auth.KeystoneClient(url, "username", "access_key",
auth_token=None)
self.mock.StubOutWithMock(client, "request")
request_body = json.dumps({
"passwordCredentials": {
"username": "username",
'password': "access_key"},
})
response_body = json.dumps({'auth': {'token': {'id': "auth_token"}}})
res = httplib2.Response(dict(status='200'))
client.request(urlparse.urljoin(url, "/v2.0/tokens"),
"POST",
headers=mox.IgnoreArg(),
body=request_body).AndReturn((res, response_body))
self.mock.ReplayAll()
self.assertEqual(client.get_token(), "auth_token")
def test_raises_error_when_retreiveing_token_fails(self):
url = "http://localhost:5001"
client = auth.KeystoneClient(url, None, "access_key", auth_token=None)
self.mock.StubOutWithMock(client, "request")
res = httplib2.Response(dict(status='401'))
response_body = "Failed to get token"
client.request(urlparse.urljoin(url, "/v2.0/tokens"),
"POST",
headers=mox.IgnoreArg(),
body=mox.IgnoreArg()).AndReturn((res, response_body))
self.mock.ReplayAll()
expected_error_msg = ("Error occured while retrieving token :"
" Failed to get token")
self.assertRaisesExcMessage(Exception,
expected_error_msg,
client.get_token)

View File

@ -124,7 +124,6 @@ setup(name='melange',
'Environment :: No Input/Output (Daemon)',
],
scripts=['bin/melange',
'bin/melange-client',
'bin/melange-manage',
'bin/melange-delete-deallocated-ips',
],