Rajaram/Vinkesh | Added retries while allocating ips to fix concurrency problem

This commit is contained in:
vinkesh banka 2011-09-23 12:26:04 +05:30 committed by Rajaram Mallya
parent e8124bc569
commit 6fb912a4f6
9 changed files with 139 additions and 27 deletions

View File

@ -44,6 +44,9 @@ dns2 = "ns2.example.com"
#Number of days before deallocated IPs are deleted #Number of days before deallocated IPs are deleted
keep_deallocated_ips_for_days = 2 keep_deallocated_ips_for_days = 2
#Number of retries for allocating an IP
ip_allocation_retries = 5
[composite:melange] [composite:melange]
use = call:melange.common.wsgi:versioned_urlmap use = call:melange.common.wsgi:versioned_urlmap
/: versions /: versions

View File

@ -40,6 +40,9 @@ default_cidr = 10.0.0.0/24
#DNS info for a data_center #DNS info for a data_center
nameserver = "ns.example.com" nameserver = "ns.example.com"
#Number of retries for allocating an IP
ip_allocation_retries = 5
[pipeline:extensions_app_with_filter] [pipeline:extensions_app_with_filter]
pipeline = extensions extensions_test_app pipeline = extensions extensions_test_app

View File

@ -40,3 +40,8 @@ class ParamsMissingError(MelangeError):
class MelangeServiceResponseError(MelangeError): class MelangeServiceResponseError(MelangeError):
message = _("Error while responding to service call") message = _("Error while responding to service call")
class DBConstraintError(MelangeError):
message = _("Failed to save %(model_name)s because: %(error)s")

View File

@ -15,11 +15,13 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import sqlalchemy.exc
from sqlalchemy import and_ from sqlalchemy import and_
from sqlalchemy import or_ from sqlalchemy import or_
from sqlalchemy.orm import aliased from sqlalchemy.orm import aliased
from melange import ipam from melange import ipam
from melange.common import exception
from melange.common import utils from melange.common import utils
from melange.db.sqlalchemy import migration from melange.db.sqlalchemy import migration
from melange.db.sqlalchemy import mappers from melange.db.sqlalchemy import mappers
@ -40,10 +42,14 @@ def find_by(model, **kwargs):
def save(model): def save(model):
db_session = session.get_session() try:
model = db_session.merge(model) db_session = session.get_session()
db_session.flush() model = db_session.merge(model)
return model db_session.flush()
return model
except sqlalchemy.exc.IntegrityError as error:
raise exception.DBConstraintError(model_name=model.__class__.__name__,
error=str(error.orig))
def delete(model): def delete(model):

View File

@ -18,7 +18,9 @@
"""Model classes that form the core of ipam functionality.""" """Model classes that form the core of ipam functionality."""
import datetime import datetime
import logging
import netaddr import netaddr
import sys
from melange import ipv6 from melange import ipv6
from melange.common import config from melange.common import config
@ -26,6 +28,8 @@ from melange.common import exception
from melange.common import utils from melange.common import utils
from melange.db import db_api from melange.db import db_api
LOG = logging.getLogger('melange.ipam.models')
class Query(object): class Query(object):
"""Mimics sqlalchemy query object. """Mimics sqlalchemy query object.
@ -303,6 +307,7 @@ class IpBlock(ModelBase):
def allocate_ip(self, interface_id=None, address=None, def allocate_ip(self, interface_id=None, address=None,
used_by_tenant=None, used_by_device=None, **kwargs): used_by_tenant=None, used_by_device=None, **kwargs):
used_by_tenant = used_by_tenant or self.tenant_id used_by_tenant = used_by_tenant or self.tenant_id
if self.subnets(): if self.subnets():
@ -310,22 +315,37 @@ class IpBlock(ModelBase):
_("Non Leaf block cannot allocate IPAddress")) _("Non Leaf block cannot allocate IPAddress"))
if self.is_full: if self.is_full:
raise NoMoreAddressesError(_("IpBlock is full")) raise NoMoreAddressesError(_("IpBlock is full"))
if address:
return self._allocate_specific_ip(address,
interface_id=interface_id,
used_by_tenant=used_by_tenant,
used_by_device=used_by_device)
return self._allocate_available_ip(interface_id=interface_id,
used_by_tenant=used_by_tenant,
used_by_device=used_by_device,
**kwargs)
if address is None: def _allocate_available_ip(self, interface_id=None, address=None,
used_by_tenant=None, used_by_device=None,
**kwargs):
max_allowed_retry = int(config.Config.get("ip_allocation_retries", 10))
for retries in range(max_allowed_retry):
address = self._generate_ip_address(used_by_tenant=used_by_tenant, address = self._generate_ip_address(used_by_tenant=used_by_tenant,
**kwargs) **kwargs)
else: try:
self._validate_address(address) return IpAddress.create(address=address,
ip_block_id=self.id,
interface_id=interface_id,
used_by_tenant=used_by_tenant,
used_by_device=used_by_device)
if not address: except exception.DBConstraintError as error:
self.update(is_full=True) LOG.debug("IP allocation retry count :{0}".format(retries + 1))
raise NoMoreAddressesError(_("IpBlock is full")) LOG.exception(error)
return IpAddress.create(address=address, raise IpAddressConcurrentAllocationError(block_id=self.id)
interface_id=interface_id,
ip_block_id=self.id,
used_by_tenant=used_by_tenant,
used_by_device=used_by_device)
def _generate_ip_address(self, **kwargs): def _generate_ip_address(self, **kwargs):
if self.is_ipv6(): if self.is_ipv6():
@ -346,23 +366,31 @@ class IpBlock(ModelBase):
if (self._allowed_by_policy(policy, str(ip)) if (self._allowed_by_policy(policy, str(ip))
and (str(ip) not in unavailable_addresses)): and (str(ip) not in unavailable_addresses)):
return str(ip) return str(ip)
return None
def _validate_address(self, address): self.update(is_full=True)
raise NoMoreAddressesError(_("IpBlock is full"))
if (address in [self.broadcast, self.gateway] def _allocate_specific_ip(self, address, interface_id=None,
or (self.get_address(address) is not None)): used_by_tenant=None, used_by_device=None):
raise DuplicateAddressError()
if not self.contains(address): if not self.contains(address):
raise AddressDoesNotBelongError( raise AddressDoesNotBelongError(
_("Address does not belong to IpBlock")) _("Address does not belong to IpBlock"))
policy = self.policy() if (address in [self.broadcast, self.gateway]
if not self._allowed_by_policy(policy, address): or (self.get_address(address) is not None)):
raise DuplicateAddressError()
if not self._allowed_by_policy(self.policy(), address):
raise AddressDisallowedByPolicyError( raise AddressDisallowedByPolicyError(
_("Block policy does not allow this address")) _("Block policy does not allow this address"))
return IpAddress.create(address=address,
ip_block_id=self.id,
interface_id=interface_id,
used_by_tenant=used_by_tenant,
used_by_device=used_by_device)
def _allowed_by_policy(self, policy, address): def _allowed_by_policy(self, policy, address):
return policy is None or policy.allows(self.cidr, address) return policy is None or policy.allows(self.cidr, address)
@ -783,5 +811,10 @@ class InvalidModelError(exception.MelangeError):
super(InvalidModelError, self).__init__(message, errors=errors) super(InvalidModelError, self).__init__(message, errors=errors)
class IpAddressConcurrentAllocationError(exception.MelangeError):
message = _("Cannot allocate address for block %(block_id)s at this time")
def sort(iterable): def sort(iterable):
return sorted(iterable, key=lambda model: model.id) return sorted(iterable, key=lambda model: model.id)

View File

@ -44,6 +44,7 @@ class BaseController(wsgi.Controller):
], ],
webob.exc.HTTPConflict: [ webob.exc.HTTPConflict: [
models.DuplicateAddressError, models.DuplicateAddressError,
models.IpAddressConcurrentAllocationError,
], ],
} }

View File

@ -17,7 +17,7 @@
import httplib2 import httplib2
import json import json
from mox import IgnoreArg import mox
import routes import routes
import urlparse import urlparse
import webob import webob
@ -185,7 +185,7 @@ class TestKeyStoneClient(tests.BaseTest):
res = httplib2.Response(dict(status='200')) res = httplib2.Response(dict(status='200'))
client.request(urlparse.urljoin(url, "/v2.0/tokens"), client.request(urlparse.urljoin(url, "/v2.0/tokens"),
"POST", "POST",
headers=IgnoreArg(), headers=mox.IgnoreArg(),
body=request_body).AndReturn((res, response_body)) body=request_body).AndReturn((res, response_body))
self.mock.ReplayAll() self.mock.ReplayAll()
@ -199,8 +199,8 @@ class TestKeyStoneClient(tests.BaseTest):
response_body = "Failed to get token" response_body = "Failed to get token"
client.request(urlparse.urljoin(url, "/v2.0/tokens"), client.request(urlparse.urljoin(url, "/v2.0/tokens"),
"POST", "POST",
headers=IgnoreArg(), headers=mox.IgnoreArg(),
body=IgnoreArg()).AndReturn((res, response_body)) body=mox.IgnoreArg()).AndReturn((res, response_body))
self.mock.ReplayAll() self.mock.ReplayAll()
expected_error_msg = ("Error occured while retrieving token :" expected_error_msg = ("Error occured while retrieving token :"

View File

@ -16,8 +16,10 @@
# under the License. # under the License.
import datetime import datetime
import mox
from melange import tests from melange import tests
from melange.common import exception
from melange.common import utils from melange.common import utils
from melange.ipam import models from melange.ipam import models
from melange.tests import unit from melange.tests import unit
@ -579,6 +581,48 @@ class TestIpBlock(tests.BaseTest):
self.assertRaises(models.NoMoreAddressesError, ip_block.allocate_ip) self.assertRaises(models.NoMoreAddressesError, ip_block.allocate_ip)
def test_allocate_ip_retries_on_ip_creation_constraint_failure(self):
ip_block = factory_models.PrivateIpBlockFactory(cidr="10.0.0.0/24")
no_of_retries = 3
self.mock.StubOutWithMock(models.IpAddress, 'create')
for i in range(no_of_retries - 1):
self._mock_ip_creation().AndRaise(exception.DBConstraintError())
expected_ip = models.IpAddress(id=1, address="10.0.0.2")
self._mock_ip_creation().AndReturn(expected_ip)
self.mock.ReplayAll()
with unit.StubConfig(ip_allocation_retries=no_of_retries):
actual_ip = ip_block.allocate_ip()
self.assertEqual(actual_ip, expected_ip)
def test_allocate_ip_raises_error_after_max_retries(self):
ip_block = factory_models.PrivateIpBlockFactory(cidr="10.0.0.0/24")
no_of_retries = 3
self.mock.StubOutWithMock(models.IpAddress, 'create')
for i in range(no_of_retries):
self._mock_ip_creation().AndRaise(exception.DBConstraintError())
self.mock.ReplayAll()
expected_error_msg = ("Cannot allocate address for block {0} "
"at this time".format(ip_block.id))
expected_exception = models.IpAddressConcurrentAllocationError
with unit.StubConfig(ip_allocation_retries=no_of_retries):
self.assertRaisesExcMessage(expected_exception,
expected_error_msg,
ip_block.allocate_ip)
def _mock_ip_creation(self):
return models.IpAddress.create(address=mox.IgnoreArg(),
interface_id=mox.IgnoreArg(),
ip_block_id=mox.IgnoreArg(),
used_by_device=mox.IgnoreArg(),
used_by_tenant=mox.IgnoreArg())
def test_ip_block_is_not_full(self): def test_ip_block_is_not_full(self):
ip_block = factory_models.PrivateIpBlockFactory(cidr="10.0.0.0/28") ip_block = factory_models.PrivateIpBlockFactory(cidr="10.0.0.0/28")
self.assertFalse(ip_block.is_full) self.assertFalse(ip_block.is_full)
@ -854,9 +898,25 @@ class TestIpBlock(tests.BaseTest):
class TestIpAddress(tests.BaseTest): class TestIpAddress(tests.BaseTest):
def test_str(self): def test_str_returns_address(self):
self.assertEqual(str(models.IpAddress(address="10.0.1.1")), "10.0.1.1") self.assertEqual(str(models.IpAddress(address="10.0.1.1")), "10.0.1.1")
def test_address_for_a_ip_block_is_unique(self):
block1 = factory_models.PrivateIpBlockFactory(cidr="10.1.1.1/24")
block2 = factory_models.PrivateIpBlockFactory(cidr="10.1.1.1/24")
block1_ip = block1.allocate_ip("10.1.1.3")
expected_error = ("Failed to save IpAddress because: "
"columns address, ip_block_id are not unique")
self.assertRaisesExcMessage(exception.DBConstraintError,
expected_error,
models.IpAddress.create,
ip_block_id=block1.id,
address=block1_ip.address)
self.assertIsNotNone(models.IpAddress.create(ip_block_id=block2.id,
address=block1_ip.address))
def test_find_ip_address(self): def test_find_ip_address(self):
block = factory_models.PrivateIpBlockFactory(cidr="10.0.0.1/8") block = factory_models.PrivateIpBlockFactory(cidr="10.0.0.1/8")
ip_address = factory_models.IpAddressFactory(ip_block_id=block.id, ip_address = factory_models.IpAddressFactory(ip_block_id=block.id,

View File

@ -70,6 +70,7 @@ class TestBaseController(unittest.TestCase):
self._assert_mapping(models.AddressDoesNotBelongError, 422) self._assert_mapping(models.AddressDoesNotBelongError, 422)
self._assert_mapping(models.AddressLockedError, 422) self._assert_mapping(models.AddressLockedError, 422)
self._assert_mapping(models.DuplicateAddressError, 409) self._assert_mapping(models.DuplicateAddressError, 409)
self._assert_mapping(models.IpAddressConcurrentAllocationError, 409)
self._assert_mapping(exception.ParamsMissingError, 400) self._assert_mapping(exception.ParamsMissingError, 400)
def test_http_excpetions_are_bubbled_up(self): def test_http_excpetions_are_bubbled_up(self):