//-< CLIENT.CXX >----------------------------------------------------*--------* // GOODS Version 1.0 (c) 1997 GARRET * ? * // (Generic Object Oriented Database System) * /\| * // * / \ * // Created: 7-Jan-97 K.A. Knizhnik * / [] \ * // Last update: 28-Oct-97 K.A. Knizhnik * GARRET * //-------------------------------------------------------------------*--------* // Client storage interface. This store is responsible for transferring // client requests to server and receiving server's replies. //-------------------------------------------------------------------*--------* #include "goods.h" #ifdef _WIN32 #pragma hdrstop #endif #include "client.h" BEGIN_GOODS_NAMESPACE boolean dbs_client_storage::handle_communication_error() { msg_buf buf; sock->get_error_text(buf, sizeof buf); console::error("Communication with server failed: %s\n", buf); return False; } void dbs_client_storage::read(void* buf, size_t size) { internal_assert(sock != NULL); while (!sock->read(buf, size)) { if (!handle_communication_error()) { console::error("Read operation failed"); } } } int send_request_code; void dbs_client_storage::write(void const* buf, size_t size) { internal_assert(sock != NULL); send_request_code = ((dbs_request*)buf)->cmd; ((dbs_request*)buf)->pack(); critical_section guard(snd_cs); if (opened) { while (!sock->write(buf, size)) { if (!handle_communication_error()) { console::error("Write operation failed"); } } } } void dbs_client_storage::send_notifications() { critical_section guard(notify_cs); int n = notify_buf.size(); if (n > 0) { dbs_request* req = ¬ify_buf; req->object.extra = n - 1; while (--n > 0) (++req)->pack(); write(¬ify_buf, notify_buf.size()*sizeof(dbs_request)); notify_buf.change_size(0); } } void dbs_client_storage::receive_header() { dnm_array buf; while (opened) { read(&reply, sizeof reply); reply.unpack(); switch (reply.cmd) { case dbs_request::cmd_invalidate: proceeding_invalidation = True; application->invalidate(id, reply.object.opid); if (reply.object.extra > 0) { int n = reply.object.extra; buf.change_size(n); dbs_request* rp = &buf; read(rp, n*sizeof(dbs_request)); while (--n >= 0) { rp->unpack(); internal_assert(rp->cmd == dbs_request::cmd_invalidate); application->invalidate(id, rp->object.opid); rp += 1; } } proceeding_invalidation = False; if (!waiting_reply) { send_notifications(); } break; case dbs_request::cmd_bye: opened = False; if (!closing) { application->disconnected(id); break; } // no break default: rep_sem.signal(); rcv_sem.wait(); } } term_event.signal(); } void dbs_client_storage::send_receive(void const* snd_buf, size_t snd_len, dbs_request& rcv_req, int exp_cmd) { if (snd_buf != NULL) { write(snd_buf, snd_len); } waiting_reply = True; rep_sem.wait(); rcv_req = reply; assert(rcv_req.cmd == exp_cmd); waiting_reply = False; send_notifications(); rcv_sem.signal(); } void dbs_client_storage::send_receive(void const* snd_buf, size_t snd_len, dbs_request& rcv_req, int exp_cmd, dnm_buffer& rcv_buf) { if (snd_buf != NULL) { write(snd_buf, snd_len); } waiting_reply = True; rep_sem.wait(); rcv_req = reply; assert(rcv_req.cmd == exp_cmd); rcv_buf.put(rcv_req.result.size); if (rcv_req.result.size != 0) { read(&rcv_buf, rcv_req.result.size); } waiting_reply = False; send_notifications(); rcv_sem.signal(); } void task_proc dbs_client_storage::receiver(void* arg) { ((dbs_client_storage*)arg)->receive_header(); } //--------------------------------------------------------- opid_t dbs_client_storage::allocate(cpid_t cpid, size_t size, boolean aligned) { dbs_request snd_req, rcv_req; snd_req.cmd = dbs_request::cmd_alloc; snd_req.alloc.cpid = cpid; snd_req.alloc.size = size; snd_req.alloc.align = aligned; send_receive(&snd_req, sizeof snd_req, rcv_req, dbs_request::cmd_location); return rcv_req.object.opid; } void dbs_client_storage::bulk_allocate(size_t sizeBuf[], cpid_t cpidBuf[], size_t nAllocObjects, opid_t opidBuf[], size_t nReservedOids) { size_t i; dbs_request* req = (dbs_request*)snd_buf.put(sizeof(dbs_request) + nAllocObjects*6); dbs_request reply; req->cmd = dbs_request::cmd_bulk_alloc; req->alloc.size = nAllocObjects; req->alloc.reserved = nReservedOids; char* dst = (char*)(req+1); for (i = 0; i < nAllocObjects; i++) { dst = pack4(dst, sizeBuf[i]); } for (i = 0; i < nAllocObjects; i++) { dst = pack2(dst, cpidBuf[i]); } send_receive(req, sizeof(dbs_request) + nAllocObjects*6, reply, dbs_request::cmd_opids, snd_buf); char* src = &snd_buf; for (i = 0; i < nReservedOids; i++) { opidBuf[i] = unpack4(src); src += 4; } } void dbs_client_storage::deallocate(opid_t opid) { dbs_request snd_req; snd_req.cmd = dbs_request::cmd_free; snd_req.object.opid = opid; write(&snd_req, sizeof snd_req); } boolean dbs_client_storage::lock(opid_t opid, lck_t lck, int attr) { dbs_request snd_req, rcv_req; snd_req.cmd = dbs_request::cmd_lock; snd_req.lock.type = lck; snd_req.lock.attr = attr; snd_req.lock.opid = opid; send_receive(&snd_req, sizeof snd_req, rcv_req, dbs_request::cmd_lockresult); return rcv_req.result.status; } void dbs_client_storage::unlock(opid_t opid, lck_t lck) { dbs_request snd_req; snd_req.cmd = dbs_request::cmd_unlock; snd_req.lock.type = lck; snd_req.lock.opid = opid; write(&snd_req, sizeof snd_req); } void dbs_client_storage::get_class(cpid_t cpid, dnm_buffer& buf) { dbs_request snd_req, rcv_req; snd_req.cmd = dbs_request::cmd_getclass; snd_req.clsdesc.cpid = cpid; send_receive(&snd_req, sizeof snd_req, rcv_req, dbs_request::cmd_classdesc, buf); } cpid_t dbs_client_storage::put_class(dbs_class_descriptor* dbs_desc) { dbs_request rcv_req; size_t dbs_desc_size = dbs_desc->get_size(); dbs_request* put_class_req = (dbs_request*)snd_buf.put(sizeof(dbs_request) + dbs_desc_size); put_class_req->cmd = dbs_request::cmd_putclass; put_class_req->clsdesc.size = dbs_desc_size; memcpy(put_class_req+1, dbs_desc, dbs_desc_size); ((dbs_class_descriptor*)(put_class_req+1))->pack(); send_receive(put_class_req, sizeof(dbs_request)+dbs_desc_size, rcv_req, dbs_request::cmd_classid); return rcv_req.clsdesc.cpid; } void dbs_client_storage::change_class(cpid_t cpid, dbs_class_descriptor* dbs_desc) { size_t dbs_desc_size = dbs_desc->get_size(); dbs_request* put_class_req = (dbs_request*)snd_buf.put(sizeof(dbs_request) + dbs_desc_size); put_class_req->cmd = dbs_request::cmd_modclass; put_class_req->clsdesc.cpid = cpid; put_class_req->clsdesc.size = dbs_desc_size; memcpy(put_class_req+1, dbs_desc, dbs_desc_size); ((dbs_class_descriptor*)(put_class_req+1))->pack(); write(put_class_req, sizeof(dbs_request) + dbs_desc_size); } void dbs_client_storage::load(opid_t opid, int flags, dnm_buffer& buf) { dbs_request snd_req, rcv_req; snd_req.cmd = dbs_request::cmd_load; snd_req.object.flags = flags; snd_req.object.opid = opid; snd_req.object.extra = 0; send_receive(&snd_req, sizeof snd_req, rcv_req, dbs_request::cmd_object, buf); } void dbs_client_storage::load(opid_t* opp, int n_objects, int flags, dnm_buffer& buf) { dbs_request rcv_req; dnm_buffer snd_buf; if (n_objects > 0) { snd_buf.put(sizeof(dbs_request)*n_objects); dbs_request* rp = (dbs_request*)&snd_buf; rp->cmd = dbs_request::cmd_load; rp->object.flags = flags; rp->object.opid = *opp++; rp->object.extra = n_objects - 1; while (--n_objects != 0) { rp += 1; rp->cmd = dbs_request::cmd_load; rp->object.flags = flags; rp->object.opid = *opp++; rp->pack(); } send_receive(&snd_buf, n_objects*sizeof(dbs_request), rcv_req, dbs_request::cmd_object, buf); } else { buf.put(0); } } void dbs_client_storage::forget_object(opid_t opid) { dbs_request snd_req; snd_req.cmd = dbs_request::cmd_forget; snd_req.object.opid = opid; snd_req.object.extra = 0; notify_cs.enter(); if (proceeding_invalidation || waiting_reply) { dbs_request* req = ¬ify_buf; int n = notify_buf.size(); while (--n >= 0) { if (req->object.opid == opid) { req->cmd = dbs_request::cmd_forget; notify_cs.leave(); return; } req += 1; } notify_buf.push(snd_req); notify_cs.leave(); } else { notify_cs.leave(); write(&snd_req, sizeof snd_req); } } void dbs_client_storage::throw_object(opid_t opid) { dbs_request snd_req; snd_req.cmd = dbs_request::cmd_throw; snd_req.object.opid = opid; snd_req.object.extra = 0; notify_cs.enter(); if (proceeding_invalidation || waiting_reply) { notify_buf.push(snd_req); notify_cs.leave(); } else { notify_cs.leave(); write(&snd_req, sizeof snd_req); } } void dbs_client_storage::begin_transaction(dnm_buffer& buf) { buf.put(sizeof(dbs_request)); } boolean dbs_client_storage::commit_coordinator_transaction(int n_trans_servers, stid_t*trans_servers, dnm_buffer& buf, trid_t& tid) { dbs_request rcv_req; if (n_trans_servers > 1) { char* p = buf.append(n_trans_servers*sizeof(stid_t)); for (int i = 0 ; i < n_trans_servers; i++) { p = pack2(p, trans_servers[i]); } } dbs_request* trans_req = (dbs_request*)&buf; trans_req->cmd = dbs_request::cmd_transaction; trans_req->trans.n_servers = n_trans_servers; trans_req->trans.size = buf.size() - sizeof(dbs_request); send_receive(trans_req, buf.size(), rcv_req, dbs_request::cmd_transresult); if (rcv_req.result.status) { // committed ? tid = rcv_req.result.tid; } return rcv_req.result.status; } void dbs_client_storage::commit_transaction(stid_t coordinator, int n_trans_servers, stid_t* trans_servers, dnm_buffer& buf, trid_t tid) { if (n_trans_servers > 1) { char* p = buf.append(n_trans_servers*sizeof(stid_t)); for (int i = 0 ; i < n_trans_servers; i++) { p = pack2(p, trans_servers[i]); } } dbs_request* trans_req = (dbs_request*)&buf; trans_req->cmd = dbs_request::cmd_subtransact; trans_req->trans.n_servers = n_trans_servers; trans_req->trans.coordinator = coordinator; trans_req->trans.size = buf.size() - sizeof(dbs_request); trans_req->trans.tid = tid; write(trans_req, buf.size()); } boolean dbs_client_storage::wait_global_transaction_completion() { dbs_request rcv_req; send_receive(NULL, 0, rcv_req, dbs_request::cmd_transresult); return rcv_req.result.status; } boolean dbs_client_storage::open(const char* server_connection_name) { sock = socket_t::connect(server_connection_name); if (!sock->is_ok()) { msg_buf buf; sock->get_error_text(buf, sizeof buf); console::output("Failed to connect server '%s': %s\n", server_connection_name, buf); delete sock; sock = NULL; return False; } opened = True; closing = False; waiting_reply = False; proceeding_invalidation = False; notify_buf.change_size(0); char const* my_name = get_process_name(); size_t len = strlen(my_name) + 1; snd_buf.put(sizeof(dbs_request) + len); dbs_request* login_req = (dbs_request*)&snd_buf; login_req->cmd = dbs_request::cmd_login; login_req->login.name_len = len; strcpy((char*)(login_req+1), my_name); write(login_req, sizeof(dbs_request) + len); dbs_request rcv_req; read(&rcv_req, sizeof rcv_req); rcv_req.unpack(); assert(rcv_req.cmd == dbs_request::cmd_ok || rcv_req.cmd == dbs_request::cmd_bye || rcv_req.cmd == dbs_request::cmd_refused); switch (rcv_req.cmd) { case dbs_request::cmd_bye: opened = False; application->disconnected(id); return False; case dbs_request::cmd_refused: opened = False; application->login_refused(id); return False; } task::create(receiver, this, task::pri_high); return True; } void dbs_client_storage::close() { dbs_request snd_req, rcv_req; snd_req.cmd = dbs_request::cmd_logout; closing = True; send_receive(&snd_req, sizeof snd_req, rcv_req, dbs_request::cmd_bye); sock->close(); term_event.wait(); delete sock; } nat8 dbs_client_storage::get_used_size() { dbs_request snd_req, rcv_req; snd_req.cmd = dbs_request::cmd_get_size; send_receive(&snd_req, sizeof snd_req, rcv_req, dbs_request::cmd_get_size); return cons_nat8(rcv_req.any.arg3, rcv_req.any.arg4); } dbs_client_storage::~dbs_client_storage() { } END_GOODS_NAMESPACE