diff --git a/oslo/messaging/rpc/client.py b/oslo/messaging/rpc/client.py
index e1189c96d..d78bb940b 100644
--- a/oslo/messaging/rpc/client.py
+++ b/oslo/messaging/rpc/client.py
@@ -58,6 +58,8 @@ class ClientSendError(exceptions.MessagingException):
 
 class _CallContext(object):
 
+    _marker = object()
+
     def __init__(self, transport, target, serializer,
                  timeout=None, check_for_lock=None, version_cap=None):
         self.conf = transport.conf
@@ -90,6 +92,13 @@ class _CallContext(object):
             raise RPCVersionCapError(version=version,
                                      version_cap=self.version_cap)
 
+    def can_send_version(self, version=_marker):
+        """Check to see if a version is compatible with the version cap."""
+        version = self.target.version if version is self._marker else version
+        return (not self.version_cap or
+                utils.version_is_compatible(self.version_cap,
+                                            self.target.version))
+
     def cast(self, ctxt, method, **kwargs):
         """Invoke a method and return immediately. See RPCClient.cast()."""
         msg = self._make_message(ctxt, method, kwargs)
@@ -129,8 +138,6 @@ class _CallContext(object):
             raise ClientSendError(self.target, ex)
         return self.serializer.deserialize_entity(ctxt, result)
 
-    _marker = object()
-
     @classmethod
     def _prepare(cls, base,
                  exchange=_marker, topic=_marker, namespace=_marker,
@@ -324,3 +331,7 @@ class RPCClient(object):
         :raises: MessagingTimeout
         """
         return self.prepare().call(ctxt, method, **kwargs)
+
+    def can_send_version(self, version=_marker):
+        """Check to see if a version is compatible with the version cap."""
+        return self.prepare(version=version).can_send_version()
diff --git a/tests/test_rpc_client.py b/tests/test_rpc_client.py
index 5a32cf9e2..3641763e2 100644
--- a/tests/test_rpc_client.py
+++ b/tests/test_rpc_client.py
@@ -409,6 +409,80 @@ class TestVersionCap(test_utils.BaseTestCase):
 TestVersionCap.generate_scenarios()
 
 
+class TestCanSendVersion(test_utils.BaseTestCase):
+
+    scenarios = [
+        ('all_none',
+         dict(cap=None, prepare_cap=_notset,
+              version=None, prepare_version=_notset,
+              can_send_version=_notset,
+              can_send=True)),
+        ('ctor_cap_ok',
+         dict(cap='1.1', prepare_cap=_notset,
+              version='1.0', prepare_version=_notset,
+              can_send_version=_notset,
+              can_send=True)),
+        ('ctor_cap_override_ok',
+         dict(cap='2.0', prepare_cap='1.1',
+              version='1.0', prepare_version='1.0',
+              can_send_version=_notset,
+              can_send=True)),
+        ('ctor_cap_override_none_ok',
+         dict(cap='1.1', prepare_cap=None,
+              version='1.0', prepare_version=_notset,
+              can_send_version=_notset,
+              can_send=True)),
+        ('ctor_cap_can_send_ok',
+         dict(cap='1.1', prepare_cap=None,
+              version='1.0', prepare_version=_notset,
+              can_send_version='1.1',
+              can_send=True)),
+        ('ctor_cap_can_send_none_ok',
+         dict(cap='1.1', prepare_cap=None,
+              version='1.0', prepare_version=_notset,
+              can_send_version=None,
+              can_send=True)),
+        ('ctor_cap_minor_fail',
+         dict(cap='1.0', prepare_cap=_notset,
+              version='1.1', prepare_version=_notset,
+              can_send_version=_notset,
+              can_send=False)),
+        ('ctor_cap_major_fail',
+         dict(cap='2.0', prepare_cap=_notset,
+              version=None, prepare_version='1.0',
+              can_send_version=_notset,
+              can_send=False)),
+    ]
+
+    def setUp(self):
+        super(TestCanSendVersion, self).setUp(conf=cfg.ConfigOpts())
+        self.conf.register_opts(rpc_client._client_opts)
+
+    def test_version_cap(self):
+        self.config(rpc_response_timeout=None)
+
+        transport = _FakeTransport(self.conf)
+
+        target = messaging.Target(version=self.version)
+        client = messaging.RPCClient(transport, target,
+                                     version_cap=self.cap)
+
+        prep_kwargs = {}
+        if self.prepare_cap is not _notset:
+            prep_kwargs['version_cap'] = self.prepare_cap
+        if self.prepare_version is not _notset:
+            prep_kwargs['version'] = self.prepare_version
+        if prep_kwargs:
+            client = client.prepare(**prep_kwargs)
+
+        if self.can_send_version is not _notset:
+            can_send = client.can_send_version(version=self.can_send_version)
+        else:
+            can_send = client.can_send_version()
+
+        self.assertEquals(can_send, self.can_send)
+
+
 class TestCheckForLock(test_utils.BaseTestCase):
 
     scenarios = [