Add metastatic driver
This driver supplies "static" nodes that are actually backed by another nodepool node. The use case is to be able to request a single large node (a "backing node") from a cloud provider, and then divide that node up into smaller nodes that are actually used ("requested nodes"). A backing node can support one or more requested nodes, and backing nodes should scale up or down as necessary. Change-Id: I29d78705a87a53ee07dce6022b81a1ce97c54f1d
This commit is contained in:
parent
b5f576c436
commit
5862bef141
@ -31,6 +31,7 @@ The following drivers are available.
|
||||
openshift-pods
|
||||
openstack
|
||||
static
|
||||
metastatic
|
||||
|
||||
The following sections are available. All are required unless
|
||||
otherwise indicated.
|
||||
|
129
doc/source/metastatic.rst
Normal file
129
doc/source/metastatic.rst
Normal file
@ -0,0 +1,129 @@
|
||||
.. _metastatic-driver:
|
||||
|
||||
.. default-domain:: zuul
|
||||
|
||||
Metastatic Driver
|
||||
-----------------
|
||||
|
||||
This driver uses NodePool nodes (from any driver) as backing nodes to
|
||||
further allocate "static-like" nodes for end use.
|
||||
|
||||
A typical use case is to be able to request a large node (a `backing
|
||||
node`) from a cloud provider, and then divide that node up into
|
||||
smaller nodes that are actually used (`requested nodes`). A backing
|
||||
node can support one or more requested nodes, and backing nodes are
|
||||
scaled up or down as necessary based on the number of requested
|
||||
nodes.
|
||||
|
||||
The name is derived from the nodes it provides (which are like
|
||||
"static" nodes) and the fact that the backing nodes come from NodePool
|
||||
itself, which is "meta".
|
||||
|
||||
.. attr-overview::
|
||||
:prefix: providers.[metastatic]
|
||||
:maxdepth: 3
|
||||
|
||||
.. attr:: providers.[metastatic]
|
||||
:type: list
|
||||
|
||||
A metastatic provider's resources are partitioned into groups
|
||||
called `pools`, and within a pool, the node types which are to be
|
||||
made available are listed.
|
||||
|
||||
.. note:: For documentation purposes the option names are prefixed
|
||||
``providers.[metastatic]`` to disambiguate from other
|
||||
drivers, but ``[metastatic]`` is not required in the
|
||||
configuration (e.g. below
|
||||
``providers.[metastatic].pools`` refers to the ``pools``
|
||||
key in the ``providers`` section when the ``metastatic``
|
||||
driver is selected).
|
||||
|
||||
Example:
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
providers:
|
||||
- name: meta-provider
|
||||
driver: metastatic
|
||||
pools:
|
||||
- name: main
|
||||
max-servers: 10
|
||||
labels:
|
||||
- name: small-node
|
||||
backing-label: large-node
|
||||
max-parallel-jobs: 2
|
||||
grace-time: 600
|
||||
|
||||
.. attr:: name
|
||||
:required:
|
||||
|
||||
A unique name for this provider configuration.
|
||||
|
||||
.. attr:: pools
|
||||
:type: list
|
||||
|
||||
A pool defines a group of resources from the provider. Each pool has a
|
||||
maximum number of nodes which can be launched from it, along with a number
|
||||
of attributes that characterize the use of the backing nodes.
|
||||
|
||||
.. attr:: name
|
||||
:required:
|
||||
|
||||
A unique name within the provider for this pool of resources.
|
||||
|
||||
.. attr:: max-servers
|
||||
:type: int
|
||||
|
||||
Maximum number of servers spawnable from this pool. This can
|
||||
be used to limit the number of servers. If not defined
|
||||
nodepool can create as many servers that the backing node
|
||||
providers support.
|
||||
|
||||
.. attr:: labels
|
||||
:type: list
|
||||
|
||||
Each entry in a pool's `labels` section indicates that the
|
||||
corresponding label is available for use in this pool.
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
labels:
|
||||
- name: small-node
|
||||
backing-label: large-node
|
||||
max-parallel-jobs: 2
|
||||
grace-time: 600
|
||||
|
||||
Each entry is a dictionary with the following keys:
|
||||
|
||||
.. attr:: name
|
||||
:type: str
|
||||
:required:
|
||||
|
||||
Identifier for this label.
|
||||
|
||||
.. attr:: backing-label
|
||||
:type: str
|
||||
:required:
|
||||
|
||||
Refers to the name of a different label in Nodepool which
|
||||
will be used to supply the backing nodes for requests of
|
||||
this label.
|
||||
|
||||
.. attr:: max-parallel-jobs
|
||||
:type: int
|
||||
:default: 1
|
||||
|
||||
The number of jobs that can run in parallel on a single
|
||||
backing node.
|
||||
|
||||
.. attr:: grace-time
|
||||
:type: int
|
||||
:default: 60
|
||||
|
||||
When all requested nodes which were assigned to a backing
|
||||
node have been deleted, the backing node itself is
|
||||
eligible for deletion. In order to reduce churn,
|
||||
NodePool will wait a certain amount of time after the
|
||||
last requested node is deleted to see if new requests
|
||||
arrive for this label before deleting the backing node.
|
||||
Set this value to the amount of time in seconds to wait.
|
@ -58,6 +58,7 @@ def generate_password():
|
||||
class AzureInstance(statemachine.Instance):
|
||||
def __init__(self, vm, nic=None, public_ipv4=None,
|
||||
public_ipv6=None, sku=None):
|
||||
super().__init__()
|
||||
self.external_id = vm['name']
|
||||
self.metadata = vm.get('tags', {})
|
||||
self.private_ipv4 = None
|
||||
|
36
nodepool/driver/metastatic/__init__.py
Normal file
36
nodepool/driver/metastatic/__init__.py
Normal file
@ -0,0 +1,36 @@
|
||||
# Copyright 2021 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from nodepool.driver.statemachine import StateMachineDriver
|
||||
from nodepool.driver.statemachine import StateMachineProvider
|
||||
from nodepool.driver.metastatic.config import MetastaticProviderConfig
|
||||
from nodepool.driver.metastatic.adapter import MetastaticAdapter
|
||||
|
||||
|
||||
class MetastaticDriver(StateMachineDriver):
|
||||
def getProvider(self, provider_config):
|
||||
# We usually don't override this method, but since our "cloud"
|
||||
# is actually Nodepool itself, we need to interact with
|
||||
# nodepool as a client, so we need a ZK connection. We can
|
||||
# re-use the launcher's connection for this.
|
||||
adapter = self.getAdapter(provider_config)
|
||||
provider = StateMachineProvider(adapter, provider_config)
|
||||
adapter._setProvider(provider)
|
||||
return provider
|
||||
|
||||
def getProviderConfig(self, provider):
|
||||
return MetastaticProviderConfig(self, provider)
|
||||
|
||||
def getAdapter(self, provider_config):
|
||||
return MetastaticAdapter(provider_config)
|
441
nodepool/driver/metastatic/adapter.py
Normal file
441
nodepool/driver/metastatic/adapter.py
Normal file
@ -0,0 +1,441 @@
|
||||
# Copyright 2021 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import math
|
||||
import logging
|
||||
import json
|
||||
import time
|
||||
|
||||
from nodepool.driver.utils import QuotaInformation, RateLimiter
|
||||
from nodepool.driver import statemachine
|
||||
from nodepool import zk
|
||||
|
||||
|
||||
""" This driver behaves like a static driver execpt that the backing
|
||||
nodes come from other Nodepool drivers.
|
||||
|
||||
The intent is that users will request nodes from this driver, and if
|
||||
any nodes already exist, the request will be satisfied with those
|
||||
first. If not, then this driver will request a new node from another
|
||||
driver in Nodepool, and then add that to this driver's pool of
|
||||
available nodes. Each backing node may supply one or more nodes to
|
||||
the end user.
|
||||
|
||||
For example, a user might request 3 nodes from this driver. Having
|
||||
none available, this driver would then request a single node from an
|
||||
AWS driver. Once that node is available, this driver might configure
|
||||
8 terminal nodes all backed by the single AWS node, and fulfill the
|
||||
request by allocating 3 of them.
|
||||
|
||||
If a further request arrived for 5 nodes, they would be fulfilled from
|
||||
the remaining 5 slots.
|
||||
|
||||
Once all 8 nodes have been returned, this driver will release the
|
||||
underlying AWS node and the AWS driver will reclaim it.
|
||||
|
||||
To accomplish this, the process is roughly:
|
||||
|
||||
* Upon request, if insufficient nodes available, request new backing
|
||||
node(s). The requestor should be "NodePool:metastatic:{providername}".
|
||||
|
||||
* Lock backing nodes with a non-ephemeral lock (so that they persist
|
||||
even if this launcher is stopeed) and use
|
||||
"NodePool:metastatic:{providername}" as identifier.
|
||||
|
||||
* Update the Node.user_data field in ZK to include
|
||||
"NodePool:metastatic:{providername}" along with label and slot count
|
||||
information for the backing node.
|
||||
|
||||
* Set the Node.driver_data field in ZK to include the backing node and
|
||||
slot information of which backing node the requested node is
|
||||
associated with.
|
||||
|
||||
* Periodically, delete unused backing nodes.
|
||||
|
||||
To identify our backing nodes:
|
||||
Our id is: "NodePool:metastatic:{providername}".
|
||||
For every node with our id in user_data:
|
||||
If node locked and first lock contender has our id as identifier:
|
||||
This is one of our nodes
|
||||
The first check is for efficiency, the second is to avoid issues with
|
||||
falsified user_data fields.
|
||||
|
||||
To cleanup unused backing nodes:
|
||||
For each of our end nodes in use: mark backing node in use.
|
||||
|
||||
If a backing node hasn't been in use for {grace-time} seconds,
|
||||
set the node to used and remove the lock on the backing node.
|
||||
|
||||
To identify our end nodes:
|
||||
Check that the node provider matches us, and then consult the
|
||||
driver_data field to find which backing node it's assigned to.
|
||||
|
||||
|
||||
This driver acts as both a provider and a user of Nodepool. It
|
||||
provides requested nodes and it uses backing nodes.
|
||||
|
||||
As a user, it stores node allocation data in the "user_data" field of
|
||||
the backing node. As a provider, it stores node allocation in the
|
||||
"driver_data" field of the requested node.
|
||||
|
||||
In order to avoid extra write calls to ZK on every allocation (and to
|
||||
avoid race conditions that could cause double accounting errors), the
|
||||
only data we store on the backing node is: that we own it, its label,
|
||||
and the number of slots it supports. On the requested node, we store
|
||||
the backing node id and which slot in the backing node this node
|
||||
occupies.
|
||||
|
||||
We have an in-memory proxy (BackingNodeRecord) for the backing nodes
|
||||
to keep track of node allocation. When we start up, we initialize the
|
||||
proxy based on the data in ZK.
|
||||
"""
|
||||
|
||||
|
||||
class MetastaticInstance(statemachine.Instance):
|
||||
def __init__(self, backing_node, slot, node, metadata=None):
|
||||
super().__init__()
|
||||
if metadata:
|
||||
self.metadata = metadata
|
||||
else:
|
||||
self.metadata = node.driver_data['metadata']
|
||||
if backing_node:
|
||||
self.interface_ip = backing_node.interface_ip
|
||||
self.public_ipv4 = backing_node.public_ipv4
|
||||
self.public_ipv6 = backing_node.public_ipv6
|
||||
self.private_ipv4 = backing_node.private_ipv4
|
||||
self.az = backing_node.az
|
||||
self.region = backing_node.region
|
||||
# Image overrides:
|
||||
self.username = backing_node.username
|
||||
self.python_path = backing_node.python_path
|
||||
self.shell_type = backing_node.shell_type
|
||||
self.connection_port = backing_node.connection_port
|
||||
self.connection_type = backing_node.connection_type
|
||||
backing_node_id = backing_node.id
|
||||
else:
|
||||
backing_node_id = None
|
||||
self.driver_data = {
|
||||
'metadata': self.metadata,
|
||||
'backing_node': backing_node_id,
|
||||
'slot': slot,
|
||||
'node': node.id,
|
||||
}
|
||||
self.external_id = node.id
|
||||
|
||||
def getQuotaInformation(self):
|
||||
return QuotaInformation(instances=1)
|
||||
|
||||
|
||||
class MetastaticResource(statemachine.Resource):
|
||||
def __init__(self, metadata, type, name):
|
||||
super().__init__(metadata)
|
||||
self.type = type
|
||||
self.name = name
|
||||
|
||||
|
||||
class MetastaticDeleteStateMachine(statemachine.StateMachine):
|
||||
DEALLOCATING = 'deallocating node'
|
||||
COMPLETE = 'complete'
|
||||
|
||||
def __init__(self, adapter, external_id):
|
||||
super().__init__()
|
||||
self.adapter = adapter
|
||||
self.node_id = external_id
|
||||
|
||||
def advance(self):
|
||||
if self.state == self.START:
|
||||
self.adapter._deallocateBackingNode(self.node_id)
|
||||
self.state = self.COMPLETE
|
||||
|
||||
if self.state == self.COMPLETE:
|
||||
self.complete = True
|
||||
|
||||
|
||||
class MetastaticCreateStateMachine(statemachine.StateMachine):
|
||||
REQUESTING = 'requesting backing node'
|
||||
ALLOCATING = 'allocating node'
|
||||
COMPLETE = 'complete'
|
||||
|
||||
def __init__(self, adapter, hostname, label, image_external_id,
|
||||
metadata, retries):
|
||||
super().__init__()
|
||||
self.adapter = adapter
|
||||
self.retries = retries
|
||||
self.attempts = 0
|
||||
self.image_external_id = image_external_id
|
||||
self.metadata = metadata
|
||||
self.hostname = hostname
|
||||
self.label = label
|
||||
self.node_id = metadata['nodepool_node_id']
|
||||
|
||||
def advance(self):
|
||||
if self.state == self.START:
|
||||
self.backing_node_record, self.slot = \
|
||||
self.adapter._allocateBackingNode(
|
||||
self.label, self.node_id)
|
||||
if self.backing_node_record.node_id is None:
|
||||
# We need to make a new request
|
||||
self.state = self.REQUESTING
|
||||
else:
|
||||
# We have an existing node
|
||||
self.state = self.COMPLETE
|
||||
self.external_id = self.node_id
|
||||
|
||||
if self.state == self.REQUESTING:
|
||||
self.adapter._checkBackingNodeRequests()
|
||||
if self.backing_node_record.failed:
|
||||
raise Exception("Backing node failed")
|
||||
if self.backing_node_record.node_id is None:
|
||||
return
|
||||
self.state = self.COMPLETE
|
||||
|
||||
if self.state == self.COMPLETE:
|
||||
backing_node = self.adapter._getNode(
|
||||
self.backing_node_record.node_id)
|
||||
node = self.adapter._getNode(self.node_id)
|
||||
instance = MetastaticInstance(backing_node, self.slot,
|
||||
node, self.metadata)
|
||||
self.complete = True
|
||||
return instance
|
||||
|
||||
|
||||
class BackingNodeRecord:
|
||||
"""An in-memory record of backing nodes and what nodes are allocated
|
||||
to them.
|
||||
"""
|
||||
def __init__(self, label_name, slot_count):
|
||||
self.label_name = label_name
|
||||
self.slot_count = slot_count
|
||||
self.node_id = None
|
||||
self.request_id = None
|
||||
self.allocated_nodes = [None for x in range(slot_count)]
|
||||
self.failed = False
|
||||
self.last_used = time.time()
|
||||
|
||||
def hasAvailableSlot(self):
|
||||
return None in self.allocated_nodes
|
||||
|
||||
def isEmpty(self):
|
||||
return not any(self.allocated_nodes)
|
||||
|
||||
def allocateSlot(self, node_id, slot_id=None):
|
||||
if slot_id is None:
|
||||
idx = self.allocated_nodes.index(None)
|
||||
else:
|
||||
idx = slot_id
|
||||
if self.allocated_nodes[idx] is not None:
|
||||
raise Exception("Slot %s of %s is already allocated",
|
||||
idx, self.node_id)
|
||||
self.allocated_nodes[idx] = node_id
|
||||
return idx
|
||||
|
||||
def deallocateSlot(self, node_id):
|
||||
idx = self.allocated_nodes.index(node_id)
|
||||
self.allocated_nodes[idx] = None
|
||||
self.last_used = time.time()
|
||||
|
||||
def backsNode(self, node_id):
|
||||
return node_id in self.allocated_nodes
|
||||
|
||||
|
||||
class MetastaticAdapter(statemachine.Adapter):
|
||||
log = logging.getLogger("nodepool.driver.metastatic.MetastaticAdapter")
|
||||
|
||||
def __init__(self, provider_config):
|
||||
self.provider = provider_config
|
||||
self.rate_limiter = RateLimiter(self.provider.name,
|
||||
self.provider.rate)
|
||||
self.backing_node_records = {} # label -> [BackingNodeRecord]
|
||||
self.pending_requests = []
|
||||
# The requestor id
|
||||
self.my_id = f'NodePool:metastatic:{self.provider.name}'
|
||||
# On startup we need to recover our state from the ZK db, this
|
||||
# flag ensures we only do that once.
|
||||
self.performed_init = False
|
||||
|
||||
@property
|
||||
def zk(self):
|
||||
return self._provider._zk
|
||||
|
||||
def getCreateStateMachine(self, hostname, label,
|
||||
image_external_id, metadata, retries):
|
||||
return MetastaticCreateStateMachine(self, hostname, label,
|
||||
image_external_id, metadata,
|
||||
retries)
|
||||
|
||||
def getDeleteStateMachine(self, external_id):
|
||||
return MetastaticDeleteStateMachine(self, external_id)
|
||||
|
||||
def listResources(self):
|
||||
self._init()
|
||||
# Since this is called periodically, this is a good place to
|
||||
# see about deleting unused backing nodes.
|
||||
now = time.time()
|
||||
for label_name, backing_node_records in \
|
||||
self.backing_node_records.items():
|
||||
for bnr in backing_node_records[:]:
|
||||
label_config = self.provider._getLabel(bnr.label_name)
|
||||
if (bnr.isEmpty() and
|
||||
now - bnr.last_used > label_config.grace_time):
|
||||
self.log.info("Backing node %s has been idle for "
|
||||
"%s seconds, releasing",
|
||||
bnr.node_id, now - bnr.last_used)
|
||||
node = self._getNode(bnr.node_id)
|
||||
node.state = zk.USED
|
||||
self.zk.storeNode(node)
|
||||
self.zk.forceUnlockNode(node)
|
||||
backing_node_records.remove(bnr)
|
||||
return []
|
||||
|
||||
def deleteResource(self, resource):
|
||||
self.log.warning("Unhandled request to delete leaked "
|
||||
f"{resource.type}: {resource.name}")
|
||||
# Unused; change log message if we ever use this.
|
||||
|
||||
def listInstances(self):
|
||||
# We don't need this unless we're managing quota
|
||||
self._init()
|
||||
return []
|
||||
|
||||
def getQuotaLimits(self):
|
||||
return QuotaInformation(default=math.inf)
|
||||
|
||||
def getQuotaForLabel(self, label):
|
||||
return QuotaInformation(instances=1)
|
||||
|
||||
# Local implementation below
|
||||
|
||||
def _init(self):
|
||||
if self.performed_init:
|
||||
return
|
||||
self.log.debug("Performing init")
|
||||
# Find backing nodes
|
||||
backing_node_map = {}
|
||||
for node in self.zk.nodeIterator():
|
||||
try:
|
||||
user_data = json.loads(node.user_data)
|
||||
except Exception:
|
||||
continue
|
||||
if 'owner' not in user_data:
|
||||
continue
|
||||
if user_data['owner'] == self.my_id:
|
||||
# This may be a backing node for us, but double check
|
||||
contenders = self.zk.getNodeLockContenders(node)
|
||||
if contenders and contenders[0] == self.my_id:
|
||||
# We hold the lock on this node
|
||||
backing_node_record = BackingNodeRecord(user_data['label'],
|
||||
user_data['slots'])
|
||||
backing_node_record.node_id = node.id
|
||||
self.log.info("Found backing node %s for %s",
|
||||
node.id, user_data['label'])
|
||||
self._addBackingNode(user_data['label'],
|
||||
backing_node_record)
|
||||
backing_node_map[node.id] = backing_node_record
|
||||
# Assign nodes to backing nodes
|
||||
for node in self.zk.nodeIterator():
|
||||
if node.provider == self.provider.name:
|
||||
if not node.driver_data:
|
||||
continue
|
||||
bn_id = node.driver_data.get('backing_node')
|
||||
bn_slot = node.driver_data.get('slot')
|
||||
if bn_id and bn_id in backing_node_map:
|
||||
backing_node_record = backing_node_map[bn_id]
|
||||
backing_node_record.allocateSlot(node.id, bn_slot)
|
||||
self.log.info("Found node %s assigned to backing node %s "
|
||||
"slot %s",
|
||||
node.id, backing_node_record.node_id,
|
||||
bn_slot)
|
||||
self.performed_init = True
|
||||
|
||||
def _setProvider(self, provider):
|
||||
self._provider = provider
|
||||
|
||||
def _allocateBackingNode(self, label, node_id):
|
||||
self._init()
|
||||
# if we have room for the label, allocate and return existing slot
|
||||
# otherwise, make a new backing node
|
||||
backing_node_record = None
|
||||
for bnr in self.backing_node_records.get(label.name, []):
|
||||
if bnr.hasAvailableSlot():
|
||||
backing_node_record = bnr
|
||||
break
|
||||
if backing_node_record is None:
|
||||
req = zk.NodeRequest()
|
||||
req.node_types = [label.backing_label]
|
||||
req.state = zk.REQUESTED
|
||||
req.requestor = self.my_id
|
||||
self.zk.storeNodeRequest(req, priority='100')
|
||||
backing_node_record = BackingNodeRecord(
|
||||
label.name, label.max_parallel_jobs)
|
||||
backing_node_record.request_id = req.id
|
||||
self._addBackingNode(label.name, backing_node_record)
|
||||
slot = backing_node_record.allocateSlot(node_id)
|
||||
self.log.info("Assigned node %s to backing node %s slot %s",
|
||||
node_id, backing_node_record.node_id, slot)
|
||||
return backing_node_record, slot
|
||||
|
||||
def _addBackingNode(self, label_name, backing_node_record):
|
||||
nodelist = self.backing_node_records.setdefault(label_name, [])
|
||||
nodelist.append(backing_node_record)
|
||||
|
||||
def _deallocateBackingNode(self, node_id):
|
||||
self._init()
|
||||
for label_name, backing_node_records in \
|
||||
self.backing_node_records.items():
|
||||
for bn in backing_node_records:
|
||||
if bn.backsNode(node_id):
|
||||
bn.deallocateSlot(node_id)
|
||||
return
|
||||
|
||||
def _checkBackingNodeRequests(self):
|
||||
self._init()
|
||||
waiting_requests = {}
|
||||
for label_name, backing_node_records in \
|
||||
self.backing_node_records.items():
|
||||
for bnr in backing_node_records:
|
||||
if bnr.request_id:
|
||||
waiting_requests[bnr.request_id] = bnr
|
||||
if not waiting_requests:
|
||||
return
|
||||
for request in self.zk.nodeRequestIterator():
|
||||
if request.id not in waiting_requests:
|
||||
continue
|
||||
if request.state == zk.FAILED:
|
||||
self.log.error("Backing request %s failed", request.id)
|
||||
for label_name, records in self.backing_node_records.items():
|
||||
for bnr in records[:]:
|
||||
if bnr.request_id == request.id:
|
||||
bnr.failed = True
|
||||
records.remove(bnr)
|
||||
if request.state == zk.FULFILLED:
|
||||
bnr = waiting_requests[request.id]
|
||||
node_id = request.nodes[0]
|
||||
self.log.info("Backing request %s fulfilled with node id %s",
|
||||
request.id, node_id)
|
||||
node = self._getNode(node_id)
|
||||
self.zk.lockNode(node, blocking=True, timeout=30,
|
||||
ephemeral=False, identifier=self.my_id)
|
||||
node.user_data = json.dumps({
|
||||
'owner': self.my_id,
|
||||
'label': bnr.label_name,
|
||||
'slots': bnr.slot_count,
|
||||
})
|
||||
node.state = zk.IN_USE
|
||||
self.zk.storeNode(node)
|
||||
self.zk.deleteNodeRequest(request)
|
||||
bnr.request_id = None
|
||||
bnr.node_id = node_id
|
||||
|
||||
def _getNode(self, node_id):
|
||||
return self.zk.getNode(node_id)
|
158
nodepool/driver/metastatic/config.py
Normal file
158
nodepool/driver/metastatic/config.py
Normal file
@ -0,0 +1,158 @@
|
||||
# Copyright 2018 Red Hat
|
||||
# Copyright 2021 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
#
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import math
|
||||
|
||||
import voluptuous as v
|
||||
|
||||
from nodepool.driver import ConfigPool
|
||||
from nodepool.driver import ConfigValue
|
||||
from nodepool.driver import ProviderConfig
|
||||
|
||||
|
||||
class MetastaticCloudImage(ConfigValue):
|
||||
def __init__(self):
|
||||
self.name = 'unused'
|
||||
self.username = 'unknown'
|
||||
self.python_path = 'unknown'
|
||||
self.shell_type = 'unknown'
|
||||
self.connection_port = 'unknown'
|
||||
self.connection_type = 'unknown'
|
||||
|
||||
|
||||
class MetastaticLabel(ConfigValue):
|
||||
ignore_equality = ['pool']
|
||||
|
||||
def __init__(self, label, provider_config, provider_pool):
|
||||
self.pool = provider_pool
|
||||
self.name = label['name']
|
||||
self.backing_label = label['backing-label']
|
||||
self.diskimage = None
|
||||
self.cloud_image = MetastaticCloudImage()
|
||||
self.max_parallel_jobs = label.get('max-parallel-jobs', 1)
|
||||
self.grace_time = label.get('grace-time', 60)
|
||||
|
||||
@staticmethod
|
||||
def getSchema():
|
||||
return {
|
||||
v.Required('name'): str,
|
||||
v.Required('backing-label'): str,
|
||||
'max-parallel-jobs': int,
|
||||
'grace-time': int,
|
||||
}
|
||||
|
||||
def isBackingConfigEqual(self, other):
|
||||
# An equality check of the backing configuration
|
||||
return (
|
||||
self.backing_label == other.backing_label and
|
||||
self.max_parallel_jobs == other.max_parallel_jobs and
|
||||
self.grace_time == other.grace_time
|
||||
)
|
||||
|
||||
|
||||
class MetastaticPool(ConfigPool):
|
||||
ignore_equality = ['provider']
|
||||
|
||||
def __init__(self, provider_config, pool_config):
|
||||
super().__init__()
|
||||
self.provider = provider_config
|
||||
self.labels = {}
|
||||
# We will just use the interface_ip of the backing node
|
||||
self.use_internal_ip = False
|
||||
self.load(pool_config)
|
||||
|
||||
def load(self, pool_config):
|
||||
self.name = pool_config['name']
|
||||
self.max_servers = pool_config.get('max-servers', math.inf)
|
||||
for label in pool_config.get('labels', []):
|
||||
b = MetastaticLabel(label, self.provider, self)
|
||||
self.labels[b.name] = b
|
||||
|
||||
@staticmethod
|
||||
def getSchema():
|
||||
label = MetastaticLabel.getSchema()
|
||||
|
||||
pool = ConfigPool.getCommonSchemaDict()
|
||||
pool.update({
|
||||
v.Required('name'): str,
|
||||
v.Required('labels'): [label],
|
||||
'max-servers': int,
|
||||
})
|
||||
return pool
|
||||
|
||||
|
||||
class MetastaticProviderConfig(ProviderConfig):
|
||||
def __init__(self, driver, provider):
|
||||
super().__init__(provider)
|
||||
self._pools = {}
|
||||
self.rate = None
|
||||
self.launch_retries = None
|
||||
|
||||
@property
|
||||
def pools(self):
|
||||
return self._pools
|
||||
|
||||
@property
|
||||
def manage_images(self):
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def reset():
|
||||
pass
|
||||
|
||||
def load(self, config):
|
||||
self.rate = self.provider.get('rate', 1)
|
||||
self.launch_retries = self.provider.get('launch-retries', 3)
|
||||
self.launch_timeout = self.provider.get('launch-timeout', 3600)
|
||||
self.boot_timeout = self.provider.get('boot-timeout', 120)
|
||||
label_defs = {}
|
||||
for pool in self.provider.get('pools', []):
|
||||
pp = MetastaticPool(self, pool)
|
||||
self._pools[pp.name] = pp
|
||||
|
||||
for label in pool.get('labels', []):
|
||||
pl = MetastaticLabel(label, self, pp)
|
||||
|
||||
if pl.backing_label in label_defs:
|
||||
if not pl.isBackingConfigEqual(
|
||||
label_defs[pl.backing_label]):
|
||||
raise Exception(
|
||||
"Multiple label definitions for the same "
|
||||
"backing label must be identical")
|
||||
label_defs[pl.backing_label] = pl
|
||||
config.labels[pl.name].pools.append(pp)
|
||||
|
||||
def getSchema(self):
|
||||
pool = MetastaticPool.getSchema()
|
||||
|
||||
provider = ProviderConfig.getCommonSchemaDict()
|
||||
provider.update({
|
||||
v.Required('pools'): [pool],
|
||||
})
|
||||
return v.Schema(provider)
|
||||
|
||||
def getSupportedLabels(self, pool_name=None):
|
||||
labels = set()
|
||||
for pool in self._pools.values():
|
||||
if not pool_name or (pool.name == pool_name):
|
||||
labels.update(pool.labels.keys())
|
||||
return labels
|
||||
|
||||
def _getLabel(self, label):
|
||||
for pool in self._pools.values():
|
||||
if label in pool.labels:
|
||||
return pool.labels[label]
|
@ -151,6 +151,15 @@ class StateMachineNodeLauncher(stats.StatsReporter):
|
||||
node.public_ipv6 = instance.public_ipv6
|
||||
node.region = instance.region
|
||||
node.az = instance.az
|
||||
node.driver_data = instance.driver_data
|
||||
|
||||
# Optionally, if the node has updated values that we set from
|
||||
# the image attributes earlier, set those.
|
||||
for attr in ('username', 'python_path', 'shell_type',
|
||||
'connection_port', 'connection_type'):
|
||||
if hasattr(instance, attr):
|
||||
setattr(node, attr, getattr(instance, attr))
|
||||
|
||||
self.zk.storeNode(node)
|
||||
|
||||
def runStateMachine(self):
|
||||
@ -698,6 +707,16 @@ class Instance:
|
||||
* private_ipv4: str
|
||||
* az: str
|
||||
* region: str
|
||||
* driver_data: any
|
||||
|
||||
And the following are even more optional (as they are usually
|
||||
already set from the image configuration):
|
||||
|
||||
* username: str
|
||||
* python_path: str
|
||||
* shell_type: str
|
||||
* connection_port: str
|
||||
* connection_type: str
|
||||
"""
|
||||
def __init__(self):
|
||||
self.ready = False
|
||||
@ -710,6 +729,7 @@ class Instance:
|
||||
self.az = None
|
||||
self.region = None
|
||||
self.metadata = {}
|
||||
self.driver_data = None
|
||||
|
||||
def __repr__(self):
|
||||
state = []
|
||||
|
40
nodepool/tests/fixtures/config_validate/metastatic_error.yaml
vendored
Normal file
40
nodepool/tests/fixtures/config_validate/metastatic_error.yaml
vendored
Normal file
@ -0,0 +1,40 @@
|
||||
labels:
|
||||
- name: backing-label
|
||||
min-ready: 0
|
||||
- name: user-label
|
||||
min-ready: 0
|
||||
- name: bad-label
|
||||
min-ready: 0
|
||||
|
||||
providers:
|
||||
# The backing node provider: a cloud
|
||||
- name: fake-provider
|
||||
cloud: fake
|
||||
driver: fake
|
||||
region-name: fake-region
|
||||
rate: 0.0001
|
||||
cloud-images:
|
||||
- name: fake-image
|
||||
pools:
|
||||
- name: main
|
||||
max-servers: 96
|
||||
labels:
|
||||
- name: backing-label
|
||||
cloud-image: fake-image
|
||||
min-ram: 8192
|
||||
flavor-name: 'Fake'
|
||||
|
||||
- name: meta-provider
|
||||
driver: metastatic
|
||||
pools:
|
||||
- name: main
|
||||
max-servers: 10
|
||||
labels:
|
||||
- name: user-label
|
||||
backing-label: backing-label
|
||||
max-parallel-jobs: 2
|
||||
grace-time: 2
|
||||
- name: bad-label
|
||||
backing-label: backing-label
|
||||
max-parallel-jobs: 4 # We can't have different numbers of max-parallel-jobs
|
||||
grace-time: 2
|
40
nodepool/tests/fixtures/config_validate/metastatic_ok.yaml
vendored
Normal file
40
nodepool/tests/fixtures/config_validate/metastatic_ok.yaml
vendored
Normal file
@ -0,0 +1,40 @@
|
||||
labels:
|
||||
- name: backing-label
|
||||
min-ready: 0
|
||||
- name: user-label
|
||||
min-ready: 0
|
||||
- name: bad-label
|
||||
min-ready: 0
|
||||
|
||||
providers:
|
||||
# The backing node provider: a cloud
|
||||
- name: fake-provider
|
||||
cloud: fake
|
||||
driver: fake
|
||||
region-name: fake-region
|
||||
rate: 0.0001
|
||||
cloud-images:
|
||||
- name: fake-image
|
||||
pools:
|
||||
- name: main
|
||||
max-servers: 96
|
||||
labels:
|
||||
- name: backing-label
|
||||
cloud-image: fake-image
|
||||
min-ram: 8192
|
||||
flavor-name: 'Fake'
|
||||
|
||||
- name: meta-provider
|
||||
driver: metastatic
|
||||
pools:
|
||||
- name: main
|
||||
max-servers: 10
|
||||
labels:
|
||||
- name: user-label
|
||||
backing-label: backing-label
|
||||
max-parallel-jobs: 2
|
||||
grace-time: 2
|
||||
- name: bad-label
|
||||
backing-label: backing-label
|
||||
max-parallel-jobs: 2 # These are identical, so it's okay
|
||||
grace-time: 2
|
48
nodepool/tests/fixtures/metastatic.yaml
vendored
Normal file
48
nodepool/tests/fixtures/metastatic.yaml
vendored
Normal file
@ -0,0 +1,48 @@
|
||||
webapp:
|
||||
port: 8005
|
||||
listen_address: '0.0.0.0'
|
||||
|
||||
zookeeper-servers:
|
||||
- host: {zookeeper_host}
|
||||
port: {zookeeper_port}
|
||||
chroot: {zookeeper_chroot}
|
||||
|
||||
zookeeper-tls:
|
||||
ca: {zookeeper_ca}
|
||||
cert: {zookeeper_cert}
|
||||
key: {zookeeper_key}
|
||||
|
||||
labels:
|
||||
- name: backing-label
|
||||
min-ready: 0
|
||||
- name: user-label
|
||||
min-ready: 0
|
||||
|
||||
providers:
|
||||
# The backing node provider: a cloud
|
||||
- name: fake-provider
|
||||
cloud: fake
|
||||
driver: fake
|
||||
region-name: fake-region
|
||||
rate: 0.0001
|
||||
cloud-images:
|
||||
- name: fake-image
|
||||
pools:
|
||||
- name: main
|
||||
max-servers: 96
|
||||
labels:
|
||||
- name: backing-label
|
||||
cloud-image: fake-image
|
||||
min-ram: 8192
|
||||
flavor-name: 'Fake'
|
||||
|
||||
- name: meta-provider
|
||||
driver: metastatic
|
||||
pools:
|
||||
- name: main
|
||||
max-servers: 10
|
||||
labels:
|
||||
- name: user-label
|
||||
backing-label: backing-label
|
||||
max-parallel-jobs: 2
|
||||
grace-time: 2
|
181
nodepool/tests/unit/test_driver_metastatic.py
Normal file
181
nodepool/tests/unit/test_driver_metastatic.py
Normal file
@ -0,0 +1,181 @@
|
||||
# Copyright (C) 2021 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import logging
|
||||
|
||||
import testtools
|
||||
|
||||
from nodepool import tests
|
||||
from nodepool import zk
|
||||
from nodepool.driver.statemachine import StateMachineProvider
|
||||
from nodepool.cmd.config_validator import ConfigValidator
|
||||
|
||||
|
||||
class TestDriverMetastatic(tests.DBTestCase):
|
||||
log = logging.getLogger("nodepool.TestDriverMetastatic")
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
StateMachineProvider.MINIMUM_SLEEP = 0.1
|
||||
StateMachineProvider.MAXIMUM_SLEEP = 1
|
||||
|
||||
def _requestNode(self):
|
||||
req = zk.NodeRequest()
|
||||
req.state = zk.REQUESTED
|
||||
req.node_types.append('user-label')
|
||||
|
||||
self.zk.storeNodeRequest(req)
|
||||
req = self.waitForNodeRequest(req)
|
||||
|
||||
self.assertEqual(req.state, zk.FULFILLED)
|
||||
self.assertNotEqual(req.nodes, [])
|
||||
node = self.zk.getNode(req.nodes[0])
|
||||
self.assertEqual(node.allocated_to, req.id)
|
||||
self.assertEqual(node.state, zk.READY)
|
||||
self.assertIsNotNone(node.launcher)
|
||||
self.assertEqual(node.connection_type, 'ssh')
|
||||
self.assertEqual(node.provider, 'meta-provider')
|
||||
|
||||
return node
|
||||
|
||||
def _getNodes(self):
|
||||
nodes = [n for n in self.zk.nodeIterator()]
|
||||
nodes = sorted(nodes, key=lambda n: n.id)
|
||||
self.log.debug("Nodes:")
|
||||
for n in nodes:
|
||||
self.log.debug(' %s %s', n.id, n.provider)
|
||||
return nodes
|
||||
|
||||
def test_metastatic_validator(self):
|
||||
# Test schema validation
|
||||
config = os.path.join(os.path.dirname(tests.__file__),
|
||||
'fixtures', 'config_validate',
|
||||
'metastatic_ok.yaml')
|
||||
validator = ConfigValidator(config)
|
||||
ret = validator.validate()
|
||||
self.assertEqual(ret, 0)
|
||||
|
||||
# Test runtime value assertions
|
||||
configfile = self.setup_config('config_validate/metastatic_error.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
with testtools.ExpectedException(Exception, 'Multiple label def'):
|
||||
pool.loadConfig()
|
||||
|
||||
configfile = self.setup_config('config_validate/metastatic_ok.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.loadConfig()
|
||||
|
||||
def test_metastatic(self):
|
||||
configfile = self.setup_config('metastatic.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
self.wait_for_config(pool)
|
||||
manager = pool.getProviderManager('fake-provider')
|
||||
manager._client.create_image(name="fake-image")
|
||||
|
||||
# Request a node, verify that there is a backing node, and it
|
||||
# has the same connection info
|
||||
node1 = self._requestNode()
|
||||
nodes = self._getNodes()
|
||||
self.assertEqual(len(nodes), 2)
|
||||
self.assertEqual(nodes[0], node1)
|
||||
self.assertNotEqual(nodes[1], node1)
|
||||
bn1 = nodes[1]
|
||||
self.assertEqual(bn1.provider, 'fake-provider')
|
||||
self.assertEqual(bn1.interface_ip, node1.interface_ip)
|
||||
self.assertEqual(bn1.python_path, node1.python_path)
|
||||
self.assertEqual('auto', node1.python_path)
|
||||
self.assertEqual(bn1.shell_type, node1.shell_type)
|
||||
self.assertEqual(None, node1.shell_type)
|
||||
self.assertEqual(bn1.host_keys, node1.host_keys)
|
||||
self.assertEqual(['ssh-rsa FAKEKEY'], node1.host_keys)
|
||||
self.assertEqual(bn1.id, node1.driver_data['backing_node'])
|
||||
|
||||
# Allocate a second node, should have same backing node
|
||||
node2 = self._requestNode()
|
||||
nodes = self._getNodes()
|
||||
self.assertEqual(nodes, [node1, bn1, node2])
|
||||
self.assertEqual(bn1.id, node2.driver_data['backing_node'])
|
||||
|
||||
# Allocate a third node, should have a second backing node
|
||||
node3 = self._requestNode()
|
||||
nodes = self._getNodes()
|
||||
self.assertNotEqual(nodes[4], node1)
|
||||
self.assertNotEqual(nodes[4], node2)
|
||||
self.assertNotEqual(nodes[4], node3)
|
||||
bn2 = nodes[4]
|
||||
self.assertEqual(nodes, [node1, bn1, node2, node3, bn2])
|
||||
self.assertEqual(bn2.id, node3.driver_data['backing_node'])
|
||||
self.assertNotEqual(bn1.id, bn2.id)
|
||||
|
||||
# Delete node #2, verify that both backing nodes exist
|
||||
node2.state = zk.DELETING
|
||||
self.zk.storeNode(node2)
|
||||
self.waitForNodeDeletion(node2)
|
||||
|
||||
# Allocate a replacement, verify it occupies slot 2
|
||||
node4 = self._requestNode()
|
||||
nodes = self._getNodes()
|
||||
self.assertEqual(nodes, [node1, bn1, node3, bn2, node4])
|
||||
self.assertEqual(bn1.id, node4.driver_data['backing_node'])
|
||||
|
||||
# Delete #4 and #1. verify backing node 1 is removed
|
||||
node4.state = zk.DELETING
|
||||
self.zk.storeNode(node4)
|
||||
node1.state = zk.DELETING
|
||||
self.zk.storeNode(node1)
|
||||
self.waitForNodeDeletion(node4)
|
||||
self.waitForNodeDeletion(node1)
|
||||
self.waitForNodeDeletion(bn1)
|
||||
nodes = self._getNodes()
|
||||
self.assertEqual(nodes, [node3, bn2])
|
||||
|
||||
def test_metastatic_startup(self):
|
||||
configfile = self.setup_config('metastatic.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
self.wait_for_config(pool)
|
||||
manager = pool.getProviderManager('fake-provider')
|
||||
manager._client.create_image(name="fake-image")
|
||||
|
||||
# Request a node, verify that there is a backing node, and it
|
||||
# has the same connection info
|
||||
node1 = self._requestNode()
|
||||
nodes = self._getNodes()
|
||||
bn1 = nodes[1]
|
||||
self.assertEqual(nodes, [node1, bn1])
|
||||
|
||||
# Restart the provider and make sure we load data correctly
|
||||
pool.stop()
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
self.wait_for_config(pool)
|
||||
manager = pool.getProviderManager('fake-provider')
|
||||
manager._client.create_image(name="fake-image")
|
||||
|
||||
# Allocate a second node, should have same backing node
|
||||
node2 = self._requestNode()
|
||||
nodes = self._getNodes()
|
||||
self.assertEqual(nodes, [node1, bn1, node2])
|
||||
self.assertEqual(bn1.id, node2.driver_data['backing_node'])
|
||||
|
||||
# Allocate a third node, should have a second backing node
|
||||
node3 = self._requestNode()
|
||||
nodes = self._getNodes()
|
||||
bn2 = nodes[4]
|
||||
self.assertEqual(nodes, [node1, bn1, node2, node3, bn2])
|
||||
self.assertEqual(bn2.id, node3.driver_data['backing_node'])
|
||||
self.assertNotEqual(bn1.id, bn2.id)
|
@ -594,6 +594,7 @@ class Node(BaseModel):
|
||||
self.attributes = None
|
||||
self.python_path = None
|
||||
self.tenant_name = None
|
||||
self.driver_data = None
|
||||
|
||||
def __repr__(self):
|
||||
d = self.toDict()
|
||||
@ -635,7 +636,8 @@ class Node(BaseModel):
|
||||
self.resources == other.resources and
|
||||
self.attributes == other.attributes and
|
||||
self.python_path == other.python_path and
|
||||
self.tenant_name == other.tenant_name)
|
||||
self.tenant_name == other.tenant_name and
|
||||
self.driver_data == other.driver_data)
|
||||
else:
|
||||
return False
|
||||
|
||||
@ -688,6 +690,7 @@ class Node(BaseModel):
|
||||
d['attributes'] = self.attributes
|
||||
d['python_path'] = self.python_path
|
||||
d['tenant_name'] = self.tenant_name
|
||||
d['driver_data'] = self.driver_data
|
||||
return d
|
||||
|
||||
@staticmethod
|
||||
@ -755,6 +758,7 @@ class Node(BaseModel):
|
||||
self.python_path = d.get('python_path')
|
||||
self.shell_type = d.get('shell_type')
|
||||
self.tenant_name = d.get('tenant_name')
|
||||
self.driver_data = d.get('driver_data')
|
||||
|
||||
|
||||
class ZooKeeper(object):
|
||||
@ -1943,7 +1947,8 @@ class ZooKeeper(object):
|
||||
request.lock.release()
|
||||
request.lock = None
|
||||
|
||||
def lockNode(self, node, blocking=True, timeout=None):
|
||||
def lockNode(self, node, blocking=True, timeout=None,
|
||||
ephemeral=True, identifier=None):
|
||||
'''
|
||||
Lock a node.
|
||||
|
||||
@ -1957,6 +1962,10 @@ class ZooKeeper(object):
|
||||
acquire the lock
|
||||
:param int timeout: When blocking, how long to wait for the lock
|
||||
to get acquired. None, the default, waits forever.
|
||||
:param bool ephemeral: Whether to use an ephemeral lock. Unless
|
||||
you have a really good reason, use the default of True.
|
||||
:param bool identifier: Identifies the lock holder. The default
|
||||
of None is usually fine.
|
||||
|
||||
:raises: TimeoutException if we failed to acquire the lock when
|
||||
blocking with a timeout. ZKLockException if we are not blocking
|
||||
@ -1964,8 +1973,8 @@ class ZooKeeper(object):
|
||||
'''
|
||||
path = self._nodeLockPath(node.id)
|
||||
try:
|
||||
lock = Lock(self.client, path)
|
||||
have_lock = lock.acquire(blocking, timeout)
|
||||
lock = Lock(self.client, path, identifier)
|
||||
have_lock = lock.acquire(blocking, timeout, ephemeral)
|
||||
except kze.LockTimeout:
|
||||
raise npe.TimeoutException(
|
||||
"Timeout trying to acquire lock %s" % path)
|
||||
@ -1998,6 +2007,26 @@ class ZooKeeper(object):
|
||||
node.lock.release()
|
||||
node.lock = None
|
||||
|
||||
def forceUnlockNode(self, node):
|
||||
'''
|
||||
Forcibly unlock a node.
|
||||
|
||||
:param Node node: The node to unlock.
|
||||
'''
|
||||
path = self._nodeLockPath(node.id)
|
||||
try:
|
||||
self.client.delete(path, recursive=True)
|
||||
except kze.NoNodeError:
|
||||
pass
|
||||
|
||||
def getNodeLockContenders(self, node):
|
||||
'''
|
||||
Return the contenders for a node lock.
|
||||
'''
|
||||
path = self._nodeLockPath(node.id)
|
||||
lock = Lock(self.client, path)
|
||||
return lock.contenders()
|
||||
|
||||
def getNodes(self):
|
||||
'''
|
||||
Get the current list of all nodes.
|
||||
|
Loading…
x
Reference in New Issue
Block a user